Skip to main content

fips_core/node/
mod.rs

1//! FIPS Node Entity
2//!
3//! Top-level structure representing a running FIPS instance. The Node
4//! holds all state required for mesh routing: identity, tree state,
5//! Bloom filters, coordinate caches, transports, links, and peers.
6
7mod acl;
8mod bloom;
9mod decrypt_worker;
10mod discovery_rate_limit;
11mod encrypt_worker;
12mod handlers;
13mod lifecycle;
14mod rate_limit;
15mod retry;
16mod routing;
17mod routing_error_rate_limit;
18pub(crate) mod session;
19pub(crate) mod session_wire;
20pub(crate) mod stats;
21pub(crate) mod stats_history;
22#[cfg(test)]
23mod tests;
24mod tree;
25pub(crate) mod wire;
26
27use self::discovery_rate_limit::{DiscoveryBackoff, DiscoveryForwardRateLimiter};
28use self::rate_limit::HandshakeRateLimiter;
29use self::routing::{LearnedRouteTable, LearnedRouteTableSnapshot};
30use self::routing_error_rate_limit::RoutingErrorRateLimiter;
31use self::wire::{
32    ESTABLISHED_HEADER_SIZE, FLAG_CE, FLAG_KEY_EPOCH, FLAG_SP, build_encrypted,
33    build_established_header, prepend_inner_header,
34};
35use crate::bloom::BloomState;
36use crate::cache::CoordCache;
37use crate::config::{PeerConfig, RoutingMode};
38use crate::node::session::SessionEntry;
39use crate::peer::{ActivePeer, PeerConnection};
40#[cfg(any(target_os = "linux", target_os = "macos"))]
41use crate::transport::ethernet::EthernetTransport;
42use crate::transport::tcp::TcpTransport;
43use crate::transport::tor::TorTransport;
44use crate::transport::udp::UdpTransport;
45#[cfg(feature = "webrtc-transport")]
46use crate::transport::webrtc::WebRtcTransport;
47use crate::transport::{
48    ConnectionState, Link, LinkId, PacketRx, PacketTx, TransportAddr, TransportError,
49    TransportHandle, TransportId,
50};
51use crate::tree::TreeState;
52use crate::upper::hosts::HostMap;
53use crate::upper::icmp_rate_limit::IcmpRateLimiter;
54use crate::upper::tun::{TunError, TunOutboundRx, TunState, TunTx};
55use crate::utils::index::IndexAllocator;
56use crate::{
57    Config, ConfigError, FipsAddress, Identity, IdentityError, NodeAddr, PeerIdentity,
58    SessionMessageType, encode_npub,
59};
60use rand::Rng;
61use std::collections::{HashMap, HashSet, VecDeque};
62use std::fmt;
63use std::sync::Arc;
64use std::thread::JoinHandle;
65use thiserror::Error;
66use tracing::{debug, warn};
67
68/// Half-range of the symmetric jitter applied to per-session rekey timers.
69///
70/// Each FMP/FSP session draws an offset uniformly from
71/// `[-REKEY_JITTER_SECS, +REKEY_JITTER_SECS]` seconds at construction and
72/// after each cutover. This preserves the configured mean interval while
73/// reducing dual-initiation bursts in symmetric-start meshes.
74pub(crate) const REKEY_JITTER_SECS: i64 = 15;
75
76/// Errors related to node operations.
77#[derive(Debug, Error)]
78pub enum NodeError {
79    #[error("node not started")]
80    NotStarted,
81
82    #[error("node already started")]
83    AlreadyStarted,
84
85    #[error("node already stopped")]
86    AlreadyStopped,
87
88    #[error("transport not found: {0}")]
89    TransportNotFound(TransportId),
90
91    #[error("no transport available for type: {0}")]
92    NoTransportForType(String),
93
94    #[error("link not found: {0}")]
95    LinkNotFound(LinkId),
96
97    #[error("connection not found: {0}")]
98    ConnectionNotFound(LinkId),
99
100    #[error("peer not found: {0:?}")]
101    PeerNotFound(NodeAddr),
102
103    #[error("peer already exists: {0:?}")]
104    PeerAlreadyExists(NodeAddr),
105
106    #[error("connection already exists for link: {0}")]
107    ConnectionAlreadyExists(LinkId),
108
109    #[error("invalid peer npub '{npub}': {reason}")]
110    InvalidPeerNpub { npub: String, reason: String },
111
112    #[error("discovery error: {0}")]
113    Discovery(String),
114
115    #[error("access denied: {0}")]
116    AccessDenied(String),
117
118    #[error("max connections exceeded: {max}")]
119    MaxConnectionsExceeded { max: usize },
120
121    #[error("max peers exceeded: {max}")]
122    MaxPeersExceeded { max: usize },
123
124    #[error("max links exceeded: {max}")]
125    MaxLinksExceeded { max: usize },
126
127    #[error("handshake incomplete for link {0}")]
128    HandshakeIncomplete(LinkId),
129
130    #[error("no session available for link {0}")]
131    NoSession(LinkId),
132
133    #[error("promotion failed for link {link_id}: {reason}")]
134    PromotionFailed { link_id: LinkId, reason: String },
135
136    #[error("send failed to {node_addr}: {reason}")]
137    SendFailed { node_addr: NodeAddr, reason: String },
138
139    #[error("mtu exceeded forwarding to {node_addr}: packet {packet_size} > mtu {mtu}")]
140    MtuExceeded {
141        node_addr: NodeAddr,
142        packet_size: usize,
143        mtu: u16,
144    },
145
146    #[error("config error: {0}")]
147    Config(#[from] ConfigError),
148
149    #[error("identity error: {0}")]
150    Identity(#[from] IdentityError),
151
152    #[error("TUN error: {0}")]
153    Tun(#[from] TunError),
154
155    #[error("index allocation failed: {0}")]
156    IndexAllocationFailed(String),
157
158    #[error("handshake failed: {0}")]
159    HandshakeFailed(String),
160
161    #[error("transport error: {0}")]
162    TransportError(String),
163
164    #[error("bootstrap handoff failed: {0}")]
165    BootstrapHandoff(String),
166}
167
168/// Source-attributed packet delivered by a node running without a system TUN.
169#[derive(Debug, Clone, PartialEq, Eq)]
170pub struct NodeDeliveredPacket {
171    /// FIPS node address that originated the packet.
172    pub source_node_addr: NodeAddr,
173    /// Source Nostr public key when the node has learned it.
174    pub source_npub: Option<String>,
175    /// Destination FIPS address from the IPv6 packet.
176    pub destination: FipsAddress,
177    /// Full IPv6 packet after FIPS session decapsulation.
178    pub packet: Vec<u8>,
179}
180
181#[derive(Debug, Clone)]
182struct IdentityCacheEntry {
183    node_addr: NodeAddr,
184    pubkey: secp256k1::PublicKey,
185    npub: String,
186    last_seen_ms: u64,
187}
188
189impl IdentityCacheEntry {
190    fn new(
191        node_addr: NodeAddr,
192        pubkey: secp256k1::PublicKey,
193        npub: String,
194        last_seen_ms: u64,
195    ) -> Self {
196        Self {
197            node_addr,
198            pubkey,
199            npub,
200            last_seen_ms,
201        }
202    }
203}
204
205/// App-owned packet channels for embedding FIPS without a system TUN.
206#[derive(Debug)]
207pub struct ExternalPacketIo {
208    /// Send outbound IPv6 packets into the node.
209    pub outbound_tx: crate::upper::tun::TunOutboundTx,
210    /// Receive inbound IPv6 packets delivered by FIPS sessions.
211    pub inbound_rx: tokio::sync::mpsc::Receiver<NodeDeliveredPacket>,
212}
213
214/// App-owned endpoint data channels for embedding FIPS without a daemon.
215#[derive(Debug)]
216pub(crate) struct EndpointDataIo {
217    /// Send endpoint data commands into the node RX loop.
218    ///
219    /// Bounded with a generous default so normal sender bursts do not
220    /// stall on semaphore acquisition. macOS pacing happens at the UDP
221    /// egress thread where the real Wi-Fi/interface bottleneck is visible;
222    /// constraining this app queue instead caused the inner TCP flow to
223    /// collapse under iperf. `FIPS_ENDPOINT_DATA_QUEUE_CAP` overrides the
224    /// default for benches.
225    pub(crate) command_tx: tokio::sync::mpsc::Sender<NodeEndpointCommand>,
226    /// Receive endpoint data delivered by FIPS sessions.
227    ///
228    /// Unbounded so the rx_loop's send on inbound packet delivery is a
229    /// wait-free push (no semaphore acquire), and so we can drop the
230    /// per-packet cross-task relay that previously sat between the node
231    /// task and the `FipsEndpoint::recv()` consumer. Backpressure is
232    /// naturally bounded — the rx_loop both produces here and runs the
233    /// same runtime that schedules the consumer, so a stalled consumer
234    /// stalls production too.
235    pub(crate) event_rx: tokio::sync::mpsc::UnboundedReceiver<NodeEndpointEvent>,
236    /// Clone of the event_tx exposed for in-process loopback (e.g.
237    /// `FipsEndpoint::send` to self_npub). Lets the endpoint inject an
238    /// event into the same queue without going through the encrypt /
239    /// decrypt path, while keeping every consumer reading from a single
240    /// channel.
241    pub(crate) event_tx: tokio::sync::mpsc::UnboundedSender<NodeEndpointEvent>,
242}
243
244fn endpoint_data_command_capacity(requested: usize) -> usize {
245    if let Ok(raw) = std::env::var("FIPS_ENDPOINT_DATA_QUEUE_CAP")
246        && let Ok(value) = raw.trim().parse::<usize>()
247        && value > 0
248    {
249        return value;
250    }
251
252    requested.max(1).max(32_768)
253}
254
255/// Commands accepted by the node endpoint data service.
256#[derive(Debug)]
257pub(crate) enum NodeEndpointCommand {
258    /// Send with an explicit response channel — used by callers that
259    /// care whether the local-stack handoff succeeded (e.g.
260    /// `blocking_send` waits for the runtime to accept the send).
261    Send {
262        remote: PeerIdentity,
263        payload: Vec<u8>,
264        queued_at: Option<std::time::Instant>,
265        response_tx: tokio::sync::oneshot::Sender<Result<(), NodeError>>,
266    },
267    /// **Fire-and-forget** variant of `Send` — no oneshot allocation,
268    /// no per-packet result channel. Used by the data-plane fast path
269    /// (`FipsEndpoint::send`) where the caller already discards the
270    /// result. Saves one oneshot::channel() allocation per outbound
271    /// packet on the application's send hot path.
272    SendOneway {
273        remote: PeerIdentity,
274        payload: Vec<u8>,
275        queued_at: Option<std::time::Instant>,
276    },
277    PeerSnapshot {
278        response_tx: tokio::sync::oneshot::Sender<Vec<NodeEndpointPeer>>,
279    },
280    RelaySnapshot {
281        response_tx: tokio::sync::oneshot::Sender<Vec<NodeEndpointRelayStatus>>,
282    },
283    UpdateRelays {
284        advert_relays: Vec<String>,
285        dm_relays: Vec<String>,
286        response_tx: tokio::sync::oneshot::Sender<Result<(), NodeError>>,
287    },
288    /// Replace the runtime peer list. Newly added auto-connect peers get
289    /// `initiate_peer_connection` immediately; removed peers are dropped
290    /// from the retry queue (the regular liveness timeout reaps any active
291    /// session). Existing entries are kept and their `addresses` field is
292    /// refreshed so the next retry sees the latest hints.
293    UpdatePeers {
294        peers: Vec<crate::config::PeerConfig>,
295        response_tx: tokio::sync::oneshot::Sender<Result<UpdatePeersOutcome, NodeError>>,
296    },
297}
298
299/// Reports what changed in response to `UpdatePeers`.
300#[derive(Debug, Clone, Default, PartialEq, Eq)]
301pub(crate) struct UpdatePeersOutcome {
302    pub(crate) added: usize,
303    pub(crate) removed: usize,
304    pub(crate) updated: usize,
305    pub(crate) unchanged: usize,
306}
307
308/// Endpoint data events emitted by the node session receive path.
309#[derive(Debug)]
310pub(crate) enum NodeEndpointEvent {
311    Data {
312        source_node_addr: NodeAddr,
313        source_npub: Option<String>,
314        payload: Vec<u8>,
315        queued_at: Option<std::time::Instant>,
316    },
317}
318
319/// Authenticated peer state exposed to embedded endpoint callers.
320#[derive(Debug, Clone, PartialEq, Eq)]
321pub(crate) struct NodeEndpointPeer {
322    pub(crate) npub: String,
323    pub(crate) transport_addr: Option<String>,
324    pub(crate) transport_type: Option<String>,
325    pub(crate) link_id: u64,
326    pub(crate) srtt_ms: Option<u64>,
327    pub(crate) packets_sent: u64,
328    pub(crate) packets_recv: u64,
329    pub(crate) bytes_sent: u64,
330    pub(crate) bytes_recv: u64,
331}
332
333/// Live Nostr relay state exposed to embedded endpoint callers.
334#[derive(Debug, Clone, PartialEq, Eq)]
335pub(crate) struct NodeEndpointRelayStatus {
336    pub(crate) url: String,
337    pub(crate) status: String,
338}
339
340/// Node operational state.
341#[derive(Clone, Copy, Debug, PartialEq, Eq)]
342pub enum NodeState {
343    /// Created but not started.
344    Created,
345    /// Starting up (initializing transports).
346    Starting,
347    /// Fully operational.
348    Running,
349    /// Shutting down.
350    Stopping,
351    /// Stopped.
352    Stopped,
353}
354
355impl NodeState {
356    /// Check if node is operational.
357    pub fn is_operational(&self) -> bool {
358        matches!(self, NodeState::Running)
359    }
360
361    /// Check if node can be started.
362    pub fn can_start(&self) -> bool {
363        matches!(self, NodeState::Created | NodeState::Stopped)
364    }
365
366    /// Check if node can be stopped.
367    pub fn can_stop(&self) -> bool {
368        matches!(self, NodeState::Running)
369    }
370}
371
372impl fmt::Display for NodeState {
373    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
374        let s = match self {
375            NodeState::Created => "created",
376            NodeState::Starting => "starting",
377            NodeState::Running => "running",
378            NodeState::Stopping => "stopping",
379            NodeState::Stopped => "stopped",
380        };
381        write!(f, "{}", s)
382    }
383}
384
385/// Recent request tracking for dedup and reverse-path forwarding.
386///
387/// When a LookupRequest is forwarded through a node, the node stores the
388/// request_id and which peer sent it. When the corresponding LookupResponse
389/// arrives, it's forwarded back to that peer (reverse-path forwarding).
390/// The `response_forwarded` flag prevents response routing loops.
391#[derive(Clone, Debug)]
392pub(crate) struct RecentRequest {
393    /// The peer who sent this request to us.
394    pub(crate) from_peer: NodeAddr,
395    /// When we received this request (Unix milliseconds).
396    pub(crate) timestamp_ms: u64,
397    /// Whether we've already forwarded a response for this request.
398    /// Prevents response routing loops when convergent request paths
399    /// create bidirectional entries in recent_requests.
400    pub(crate) response_forwarded: bool,
401}
402
403impl RecentRequest {
404    pub(crate) fn new(from_peer: NodeAddr, timestamp_ms: u64) -> Self {
405        Self {
406            from_peer,
407            timestamp_ms,
408            response_forwarded: false,
409        }
410    }
411
412    /// Check if this entry has expired (older than expiry_ms).
413    pub(crate) fn is_expired(&self, current_time_ms: u64, expiry_ms: u64) -> bool {
414        current_time_ms.saturating_sub(self.timestamp_ms) > expiry_ms
415    }
416}
417
418/// Key for addr_to_link reverse lookup.
419type AddrKey = (TransportId, TransportAddr);
420
421/// Per-transport kernel drop tracking for congestion detection.
422///
423/// Sampled every tick (1s). The `dropping` flag indicates whether new
424/// kernel drops were observed since the previous sample.
425#[derive(Debug, Default)]
426struct TransportDropState {
427    /// Previous `recv_drops` sample (cumulative counter).
428    prev_drops: u64,
429    /// True if drops increased since the last sample.
430    dropping: bool,
431}
432
433/// State for a link waiting for transport-level connection establishment.
434///
435/// For connection-oriented transports (TCP, Tor), the transport connect runs
436/// asynchronously. This struct holds the data needed to complete the handshake
437/// once the connection is ready.
438struct PendingConnect {
439    /// The link that was created for this connection.
440    link_id: LinkId,
441    /// Which transport is being used.
442    transport_id: TransportId,
443    /// The remote address being connected to.
444    remote_addr: TransportAddr,
445    /// The peer identity (for handshake initiation).
446    peer_identity: PeerIdentity,
447}
448
449/// A running FIPS node instance.
450///
451/// This is the top-level container holding all node state.
452///
453/// ## Peer Lifecycle
454///
455/// Peers go through two phases:
456/// 1. **Connection phase** (`connections`): Handshake in progress, indexed by LinkId
457/// 2. **Active phase** (`peers`): Authenticated, indexed by NodeAddr
458///
459/// The `addr_to_link` map enables dispatching incoming packets to the right
460/// connection before authentication completes.
461// Discovery lookup constants moved to config: node.discovery.attempt_timeouts_secs, node.discovery.ttl
462pub struct Node {
463    // === Identity ===
464    /// This node's cryptographic identity.
465    identity: Identity,
466
467    /// Random epoch generated at startup for peer restart detection.
468    /// Exchanged inside Noise handshake messages so peers can detect restarts.
469    startup_epoch: [u8; 8],
470
471    /// Instant when the node was created, for uptime reporting.
472    started_at: std::time::Instant,
473
474    // === Configuration ===
475    /// Loaded configuration.
476    config: Config,
477
478    // === State ===
479    /// Node operational state.
480    state: NodeState,
481
482    /// Whether this is a leaf-only node.
483    is_leaf_only: bool,
484
485    // === Spanning Tree ===
486    /// Local spanning tree state.
487    tree_state: TreeState,
488
489    // === Bloom Filter ===
490    /// Local Bloom filter state.
491    bloom_state: BloomState,
492
493    // === Routing ===
494    /// Address -> coordinates cache (from session setup and discovery).
495    coord_cache: CoordCache,
496    /// Locally learned reverse-path next-hop hints.
497    learned_routes: LearnedRouteTable,
498    /// Recent discovery requests (dedup + reverse-path forwarding).
499    /// Maps request_id → RecentRequest.
500    recent_requests: HashMap<u64, RecentRequest>,
501    /// Per-destination path MTU lookup, keyed by FipsAddress (mirrors
502    /// `coord_cache.entries[*].path_mtu`). Sync read-only access from
503    /// the TUN reader/writer threads at TCP MSS clamp time so the
504    /// SYN/SYN-ACK clamp can use the smaller of the local-egress floor
505    /// and the learned per-destination path MTU.
506    path_mtu_lookup: Arc<std::sync::RwLock<HashMap<crate::FipsAddress, u16>>>,
507
508    // === Transports & Links ===
509    /// Active transports (owned by Node).
510    transports: HashMap<TransportId, TransportHandle>,
511    /// Per-transport kernel drop tracking for congestion detection.
512    transport_drops: HashMap<TransportId, TransportDropState>,
513    /// Active links.
514    links: HashMap<LinkId, Link>,
515    /// Reverse lookup: (transport_id, remote_addr) -> link_id.
516    addr_to_link: HashMap<AddrKey, LinkId>,
517
518    // === Packet Channel ===
519    /// Packet sender for transports.
520    packet_tx: Option<PacketTx>,
521    /// Packet receiver (for event loop).
522    packet_rx: Option<PacketRx>,
523
524    // === Connections (Handshake Phase) ===
525    /// Pending connections (handshake in progress).
526    /// Indexed by LinkId since we don't know the peer's identity yet.
527    connections: HashMap<LinkId, PeerConnection>,
528
529    // === Peers (Active Phase) ===
530    /// Authenticated peers.
531    /// Indexed by NodeAddr (verified identity).
532    peers: HashMap<NodeAddr, ActivePeer>,
533
534    // === End-to-End Sessions ===
535    /// Session table for end-to-end encrypted sessions.
536    /// Keyed by remote NodeAddr.
537    sessions: HashMap<NodeAddr, SessionEntry>,
538
539    // === Identity Cache ===
540    /// Maps FipsAddress prefix bytes (bytes 1-15) to cached peer identity data.
541    /// Enables reverse lookup from IPv6 destination to session/routing identity.
542    identity_cache: HashMap<[u8; 15], IdentityCacheEntry>,
543
544    // === Pending TUN Packets ===
545    /// Packets queued while waiting for session establishment.
546    /// Keyed by destination NodeAddr, bounded per-dest and total.
547    pending_tun_packets: HashMap<NodeAddr, VecDeque<Vec<u8>>>,
548    /// Endpoint data payloads queued while waiting for session establishment.
549    pending_endpoint_data: HashMap<NodeAddr, VecDeque<Vec<u8>>>,
550    // === Pending Discovery Lookups ===
551    /// Tracks in-flight discovery lookups. Maps target NodeAddr to the
552    /// initiation timestamp (Unix ms). Prevents duplicate flood queries.
553    pending_lookups: HashMap<NodeAddr, handlers::discovery::PendingLookup>,
554
555    // === Resource Limits ===
556    /// Maximum connections (0 = unlimited).
557    max_connections: usize,
558    /// Maximum peers (0 = unlimited).
559    max_peers: usize,
560    /// Maximum links (0 = unlimited).
561    max_links: usize,
562
563    // === Counters ===
564    /// Next link ID to allocate.
565    next_link_id: u64,
566    /// Next transport ID to allocate.
567    next_transport_id: u32,
568
569    // === Node Statistics ===
570    /// Routing, forwarding, discovery, and error signal counters.
571    stats: stats::NodeStats,
572
573    /// Time-series history of node-level metrics (1s/1m rings).
574    stats_history: stats_history::StatsHistory,
575
576    // === TUN Interface ===
577    /// TUN device state.
578    tun_state: TunState,
579    /// TUN interface name (for cleanup).
580    tun_name: Option<String>,
581    /// TUN packet sender channel.
582    tun_tx: Option<TunTx>,
583    /// Receiver for outbound packets from the TUN reader.
584    tun_outbound_rx: Option<TunOutboundRx>,
585    /// App-owned packet sink used by embedded/no-TUN integrations.
586    external_packet_tx: Option<tokio::sync::mpsc::Sender<NodeDeliveredPacket>>,
587    /// Endpoint data command receiver used by embedded/no-daemon integrations.
588    endpoint_command_rx: Option<tokio::sync::mpsc::Receiver<NodeEndpointCommand>>,
589    /// Endpoint data event sink used by embedded/no-daemon integrations.
590    endpoint_event_tx: Option<tokio::sync::mpsc::UnboundedSender<NodeEndpointEvent>>,
591    /// Off-task FMP-encrypt + UDP-send worker pool. `None` if not yet
592    /// spawned (set up in `start()` once transports are running).
593    /// `Some(pool)` once available; the pool internally holds
594    /// per-worker mpsc senders and round-robins jobs across them.
595    /// See `node::encrypt_worker` for the rationale and layout.
596    encrypt_workers: Option<encrypt_worker::EncryptWorkerPool>,
597    /// Off-task FMP + FSP decrypt + delivery worker pool. Mirror of
598    /// `encrypt_workers` for the receive side.
599    decrypt_workers: Option<decrypt_worker::DecryptWorkerPool>,
600    /// Set of sessions that have been registered with the decrypt
601    /// shard worker pool. Used by rx_loop to decide between fast-path
602    /// dispatch (worker owns the session) and legacy in-place decrypt
603    /// (worker doesn't have it yet). Per the data-plane restructure,
604    /// the worker owns its session state directly — there's no shared
605    /// `Arc<RwLock<HashMap>>` of cipher / replay state anymore, only
606    /// this set tracks **whether** the worker has been told about a
607    /// given session.
608    decrypt_registered_sessions: std::collections::HashSet<(TransportId, u32)>,
609    /// Fallback channel: decrypt worker bounces non-fast-path packets
610    /// (anything that's not bulk EndpointData) back here for rx_loop
611    /// to handle via the legacy path. Drained by a new rx_loop arm.
612    decrypt_fallback_rx:
613        Option<tokio::sync::mpsc::UnboundedReceiver<decrypt_worker::DecryptWorkerEvent>>,
614    decrypt_fallback_tx: tokio::sync::mpsc::UnboundedSender<decrypt_worker::DecryptWorkerEvent>,
615    /// TUN reader thread handle.
616    tun_reader_handle: Option<JoinHandle<()>>,
617    /// TUN writer thread handle.
618    tun_writer_handle: Option<JoinHandle<()>>,
619    /// Shutdown pipe: writing to this fd unblocks the TUN reader thread on macOS.
620    /// On Linux, deleting the interface via netlink serves the same purpose.
621    #[cfg(target_os = "macos")]
622    tun_shutdown_fd: Option<std::os::unix::io::RawFd>,
623
624    // === DNS Responder ===
625    /// Receiver for resolved identities from the DNS responder.
626    dns_identity_rx: Option<crate::upper::dns::DnsIdentityRx>,
627    /// DNS responder task handle.
628    dns_task: Option<tokio::task::JoinHandle<()>>,
629
630    // === Index-Based Session Dispatch ===
631    /// Allocator for session indices.
632    index_allocator: IndexAllocator,
633    /// O(1) lookup: (transport_id, our_index) → NodeAddr.
634    /// This maps our session index to the peer that uses it.
635    peers_by_index: HashMap<(TransportId, u32), NodeAddr>,
636    /// Pending outbound handshakes by our sender_idx.
637    /// Tracks which LinkId corresponds to which session index.
638    pending_outbound: HashMap<(TransportId, u32), LinkId>,
639
640    // === Rate Limiting ===
641    /// Rate limiter for msg1 processing (DoS protection).
642    msg1_rate_limiter: HandshakeRateLimiter,
643    /// Rate limiter for ICMP Packet Too Big messages.
644    icmp_rate_limiter: IcmpRateLimiter,
645    /// Rate limiter for routing error signals (CoordsRequired / PathBroken).
646    routing_error_rate_limiter: RoutingErrorRateLimiter,
647    /// Rate limiter for source-side CoordsRequired/PathBroken responses.
648    coords_response_rate_limiter: RoutingErrorRateLimiter,
649    /// Backoff for failed discovery lookups (originator-side).
650    discovery_backoff: DiscoveryBackoff,
651    /// Rate limiter for forwarded discovery requests (transit-side).
652    discovery_forward_limiter: DiscoveryForwardRateLimiter,
653
654    // === Pending Transport Connects ===
655    /// Links waiting for transport-level connection establishment before
656    /// sending handshake msg1. For connection-oriented transports (TCP, Tor),
657    /// the transport connect runs in the background; the tick handler polls
658    /// connection_state() and initiates the handshake when connected.
659    pending_connects: Vec<PendingConnect>,
660
661    // === Connection Retry ===
662    /// Retry state for peers whose outbound connections have failed.
663    /// Keyed by NodeAddr. Entries are created when a handshake times out
664    /// or fails, and removed on successful promotion or when max retries
665    /// are exhausted.
666    retry_pending: HashMap<NodeAddr, retry::RetryState>,
667
668    /// Optional Nostr/STUN overlay discovery coordinator for `udp:nat` peers.
669    nostr_discovery: Option<Arc<crate::discovery::nostr::NostrDiscovery>>,
670    /// mDNS / DNS-SD responder + browser for local-link peer discovery.
671    /// Identity is unverified at this layer — the Noise XX handshake
672    /// initiated against an mDNS-observed endpoint is what proves the
673    /// peer holds the matching private key.
674    lan_discovery: Option<Arc<crate::discovery::lan::LanDiscovery>>,
675    /// Same-host JSON registry under `~/.fips/instances`. Records are
676    /// loopback routing hints only; peer identity is still verified by the
677    /// Noise handshake.
678    local_instance_registry: Option<crate::discovery::local::LocalInstanceRegistry>,
679    local_instance_started_at_ms: Option<u64>,
680    last_local_instance_publish_ms: Option<u64>,
681    last_local_instance_scan_ms: Option<u64>,
682    /// Wall-clock ms when Nostr discovery successfully started, used to
683    /// schedule the one-shot startup advert sweep after a settle delay.
684    /// `None` until discovery comes up; remains `None` if discovery is
685    /// disabled or failed to start.
686    nostr_discovery_started_at_ms: Option<u64>,
687    /// Whether the one-shot startup advert sweep has run. Set to true
688    /// after the first sweep fires (under `policy: open`); thereafter
689    /// only the per-tick `queue_open_discovery_retries` continues.
690    startup_open_discovery_sweep_done: bool,
691    /// Per-peer UDP transports adopted from NAT traversal handoff.
692    bootstrap_transports: HashSet<TransportId>,
693    /// Originating peer npub (bech32) for each adopted bootstrap
694    /// transport, captured at `adopt_established_traversal` time.
695    /// Populated alongside `bootstrap_transports`; cleared in
696    /// `cleanup_bootstrap_transport_if_unused`. Used by the rx loop to
697    /// route fatal-protocol-mismatch observations back to the
698    /// Nostr-discovery `failure_state` for long cooldown application.
699    bootstrap_transport_npubs: HashMap<TransportId, String>,
700    /// Peers that should not be used as reply-learned fallback transit for
701    /// other destinations. Direct lookups to the peer are still permitted.
702    discovery_fallback_transit_blocked_peers: HashSet<NodeAddr>,
703
704    // === Periodic Parent Re-evaluation ===
705    /// Timestamp of last periodic parent re-evaluation (for pacing).
706    last_parent_reeval: Option<crate::time::Instant>,
707
708    // === Congestion Logging ===
709    /// Timestamp of last congestion detection log (rate-limited to 5s).
710    last_congestion_log: Option<std::time::Instant>,
711
712    // === Mesh Size Estimate ===
713    /// Cached estimated mesh size (computed once per tick from bloom filters).
714    estimated_mesh_size: Option<u64>,
715    /// Timestamp of last mesh size log emission.
716    last_mesh_size_log: Option<std::time::Instant>,
717
718    // === Bloom Self-Plausibility ===
719    /// Rate-limit state for the self-plausibility WARN. Fires at most
720    /// once per 60s globally when our own outgoing FilterAnnounce has
721    /// an FPR above `node.bloom.max_inbound_fpr`, signalling either
722    /// aggregation drift or an ingress bypass.
723    last_self_warn: Option<std::time::Instant>,
724
725    // === Local Outbound Liveness ===
726    /// Set when a `transport.send` returned a local-side io error
727    /// (`NetworkUnreachable` / `HostUnreachable` / `AddrNotAvailable`),
728    /// cleared on the next successful send. Used by
729    /// `check_link_heartbeats` to compress the dead-timeout to
730    /// `fast_link_dead_timeout_secs` while our outbound is observed
731    /// broken — direct kernel evidence beats waiting on receive-silence.
732    last_local_send_failure_at: Option<std::time::Instant>,
733
734    // === Display Names ===
735    /// Human-readable names for configured peers (alias or short npub).
736    /// Populated at startup from peer config.
737    peer_aliases: HashMap<NodeAddr, String>,
738    /// Scheduler weight for explicitly configured peers. Built when config
739    /// changes so the packet hot path only does a NodeAddr hash lookup.
740    configured_peer_send_weights: HashMap<NodeAddr, u8>,
741
742    /// Reloadable peer ACL state from standard allow/deny files.
743    peer_acl: acl::PeerAclReloader,
744
745    // === Host Map ===
746    /// Static hostname → npub mapping for DNS resolution.
747    /// Built at construction from peer aliases and /etc/fips/hosts.
748    host_map: Arc<HostMap>,
749}
750
751impl Node {
752    /// Create a new node from configuration.
753    pub fn new(config: Config) -> Result<Self, NodeError> {
754        config.validate()?;
755        let identity = config.create_identity()?;
756        let node_addr = *identity.node_addr();
757        let is_leaf_only = config.is_leaf_only();
758
759        let (decrypt_fallback_tx, decrypt_fallback_rx) = tokio::sync::mpsc::unbounded_channel();
760        let decrypt_fallback_rx = Some(decrypt_fallback_rx);
761
762        let mut startup_epoch = [0u8; 8];
763        rand::rng().fill_bytes(&mut startup_epoch);
764
765        let mut bloom_state = if is_leaf_only {
766            BloomState::leaf_only(node_addr)
767        } else {
768            BloomState::new(node_addr)
769        };
770        bloom_state.set_update_debounce_ms(config.node.bloom.update_debounce_ms);
771
772        let tun_state = if config.tun.enabled {
773            TunState::Configured
774        } else {
775            TunState::Disabled
776        };
777
778        // Initialize tree state with signed self-declaration
779        let mut tree_state = TreeState::new(node_addr);
780        tree_state.set_parent_hysteresis(config.node.tree.parent_hysteresis);
781        tree_state.set_hold_down(config.node.tree.hold_down_secs);
782        tree_state.set_flap_dampening(
783            config.node.tree.flap_threshold,
784            config.node.tree.flap_window_secs,
785            config.node.tree.flap_dampening_secs,
786        );
787        tree_state
788            .sign_declaration(&identity)
789            .expect("signing own declaration should never fail");
790
791        let coord_cache = CoordCache::new(
792            config.node.cache.coord_size,
793            config.node.cache.coord_ttl_secs * 1000,
794        );
795        let rl = &config.node.rate_limit;
796        let msg1_rate_limiter = HandshakeRateLimiter::with_params(
797            rate_limit::TokenBucket::with_params(rl.handshake_burst, rl.handshake_rate),
798            config.node.limits.max_pending_inbound,
799        );
800
801        let max_connections = config.node.limits.max_connections;
802        let max_peers = config.node.limits.max_peers;
803        let max_links = config.node.limits.max_links;
804        let coords_response_interval_ms = config.node.session.coords_response_interval_ms;
805        let backoff_base_secs = config.node.discovery.backoff_base_secs;
806        let backoff_max_secs = config.node.discovery.backoff_max_secs;
807        let forward_min_interval_secs = config.node.discovery.forward_min_interval_secs;
808
809        let (host_map, peer_acl) = Self::host_map_and_peer_acl(&config);
810        let configured_peer_send_weights = Self::configured_peer_send_weights(&config);
811
812        Ok(Self {
813            identity,
814            startup_epoch,
815            started_at: std::time::Instant::now(),
816            config,
817            state: NodeState::Created,
818            is_leaf_only,
819            tree_state,
820            bloom_state,
821            coord_cache,
822            learned_routes: LearnedRouteTable::default(),
823            recent_requests: HashMap::new(),
824            transports: HashMap::new(),
825            transport_drops: HashMap::new(),
826            links: HashMap::new(),
827            addr_to_link: HashMap::new(),
828            packet_tx: None,
829            packet_rx: None,
830            connections: HashMap::new(),
831            peers: HashMap::new(),
832            sessions: HashMap::new(),
833            identity_cache: HashMap::new(),
834            pending_tun_packets: HashMap::new(),
835            pending_endpoint_data: HashMap::new(),
836            pending_lookups: HashMap::new(),
837            max_connections,
838            max_peers,
839            max_links,
840            next_link_id: 1,
841            next_transport_id: 1,
842            stats: stats::NodeStats::new(),
843            stats_history: stats_history::StatsHistory::new(),
844            tun_state,
845            tun_name: None,
846            tun_tx: None,
847            tun_outbound_rx: None,
848            external_packet_tx: None,
849            endpoint_command_rx: None,
850            endpoint_event_tx: None,
851            encrypt_workers: None,
852            decrypt_workers: None,
853            decrypt_registered_sessions: std::collections::HashSet::new(),
854            decrypt_fallback_tx,
855            decrypt_fallback_rx,
856            tun_reader_handle: None,
857            tun_writer_handle: None,
858            #[cfg(target_os = "macos")]
859            tun_shutdown_fd: None,
860            dns_identity_rx: None,
861            dns_task: None,
862            index_allocator: IndexAllocator::new(),
863            peers_by_index: HashMap::new(),
864            pending_outbound: HashMap::new(),
865            msg1_rate_limiter,
866            icmp_rate_limiter: IcmpRateLimiter::new(),
867            routing_error_rate_limiter: RoutingErrorRateLimiter::new(),
868            coords_response_rate_limiter: RoutingErrorRateLimiter::with_interval(
869                std::time::Duration::from_millis(coords_response_interval_ms),
870            ),
871            discovery_backoff: DiscoveryBackoff::with_params(backoff_base_secs, backoff_max_secs),
872            discovery_forward_limiter: DiscoveryForwardRateLimiter::with_interval(
873                std::time::Duration::from_secs(forward_min_interval_secs),
874            ),
875            pending_connects: Vec::new(),
876            retry_pending: HashMap::new(),
877            nostr_discovery: None,
878            nostr_discovery_started_at_ms: None,
879            lan_discovery: None,
880            local_instance_registry: None,
881            local_instance_started_at_ms: None,
882            last_local_instance_publish_ms: None,
883            last_local_instance_scan_ms: None,
884            startup_open_discovery_sweep_done: false,
885            bootstrap_transports: HashSet::new(),
886            bootstrap_transport_npubs: HashMap::new(),
887            discovery_fallback_transit_blocked_peers: HashSet::new(),
888            last_parent_reeval: None,
889            last_congestion_log: None,
890            estimated_mesh_size: None,
891            last_mesh_size_log: None,
892            last_self_warn: None,
893            last_local_send_failure_at: None,
894            peer_aliases: HashMap::new(),
895            configured_peer_send_weights,
896            peer_acl,
897            host_map,
898            path_mtu_lookup: Arc::new(std::sync::RwLock::new(HashMap::new())),
899        })
900    }
901
902    /// Create a node with a specific identity.
903    ///
904    /// This constructor validates cross-field config invariants before
905    /// constructing the node, same as [`Node::new`].
906    pub fn with_identity(identity: Identity, config: Config) -> Result<Self, NodeError> {
907        config.validate()?;
908        let node_addr = *identity.node_addr();
909
910        let (decrypt_fallback_tx, decrypt_fallback_rx) = tokio::sync::mpsc::unbounded_channel();
911        let decrypt_fallback_rx = Some(decrypt_fallback_rx);
912
913        let mut startup_epoch = [0u8; 8];
914        rand::rng().fill_bytes(&mut startup_epoch);
915
916        let tun_state = if config.tun.enabled {
917            TunState::Configured
918        } else {
919            TunState::Disabled
920        };
921
922        // Initialize tree state with signed self-declaration
923        let mut tree_state = TreeState::new(node_addr);
924        tree_state.set_parent_hysteresis(config.node.tree.parent_hysteresis);
925        tree_state.set_hold_down(config.node.tree.hold_down_secs);
926        tree_state.set_flap_dampening(
927            config.node.tree.flap_threshold,
928            config.node.tree.flap_window_secs,
929            config.node.tree.flap_dampening_secs,
930        );
931        tree_state
932            .sign_declaration(&identity)
933            .expect("signing own declaration should never fail");
934
935        let mut bloom_state = BloomState::new(node_addr);
936        bloom_state.set_update_debounce_ms(config.node.bloom.update_debounce_ms);
937
938        let coord_cache = CoordCache::new(
939            config.node.cache.coord_size,
940            config.node.cache.coord_ttl_secs * 1000,
941        );
942        let rl = &config.node.rate_limit;
943        let msg1_rate_limiter = HandshakeRateLimiter::with_params(
944            rate_limit::TokenBucket::with_params(rl.handshake_burst, rl.handshake_rate),
945            config.node.limits.max_pending_inbound,
946        );
947
948        let max_connections = config.node.limits.max_connections;
949        let max_peers = config.node.limits.max_peers;
950        let max_links = config.node.limits.max_links;
951        let coords_response_interval_ms = config.node.session.coords_response_interval_ms;
952
953        let (host_map, peer_acl) = Self::host_map_and_peer_acl(&config);
954        let configured_peer_send_weights = Self::configured_peer_send_weights(&config);
955
956        Ok(Self {
957            identity,
958            startup_epoch,
959            started_at: std::time::Instant::now(),
960            config,
961            state: NodeState::Created,
962            is_leaf_only: false,
963            tree_state,
964            bloom_state,
965            coord_cache,
966            learned_routes: LearnedRouteTable::default(),
967            recent_requests: HashMap::new(),
968            transports: HashMap::new(),
969            transport_drops: HashMap::new(),
970            links: HashMap::new(),
971            addr_to_link: HashMap::new(),
972            packet_tx: None,
973            packet_rx: None,
974            connections: HashMap::new(),
975            peers: HashMap::new(),
976            sessions: HashMap::new(),
977            identity_cache: HashMap::new(),
978            pending_tun_packets: HashMap::new(),
979            pending_endpoint_data: HashMap::new(),
980            pending_lookups: HashMap::new(),
981            max_connections,
982            max_peers,
983            max_links,
984            next_link_id: 1,
985            next_transport_id: 1,
986            stats: stats::NodeStats::new(),
987            stats_history: stats_history::StatsHistory::new(),
988            tun_state,
989            tun_name: None,
990            tun_tx: None,
991            tun_outbound_rx: None,
992            external_packet_tx: None,
993            endpoint_command_rx: None,
994            endpoint_event_tx: None,
995            encrypt_workers: None,
996            decrypt_workers: None,
997            decrypt_registered_sessions: std::collections::HashSet::new(),
998            decrypt_fallback_tx,
999            decrypt_fallback_rx,
1000            tun_reader_handle: None,
1001            tun_writer_handle: None,
1002            #[cfg(target_os = "macos")]
1003            tun_shutdown_fd: None,
1004            dns_identity_rx: None,
1005            dns_task: None,
1006            index_allocator: IndexAllocator::new(),
1007            peers_by_index: HashMap::new(),
1008            pending_outbound: HashMap::new(),
1009            msg1_rate_limiter,
1010            icmp_rate_limiter: IcmpRateLimiter::new(),
1011            routing_error_rate_limiter: RoutingErrorRateLimiter::new(),
1012            coords_response_rate_limiter: RoutingErrorRateLimiter::with_interval(
1013                std::time::Duration::from_millis(coords_response_interval_ms),
1014            ),
1015            discovery_backoff: DiscoveryBackoff::new(),
1016            discovery_forward_limiter: DiscoveryForwardRateLimiter::new(),
1017            pending_connects: Vec::new(),
1018            retry_pending: HashMap::new(),
1019            nostr_discovery: None,
1020            nostr_discovery_started_at_ms: None,
1021            lan_discovery: None,
1022            local_instance_registry: None,
1023            local_instance_started_at_ms: None,
1024            last_local_instance_publish_ms: None,
1025            last_local_instance_scan_ms: None,
1026            startup_open_discovery_sweep_done: false,
1027            bootstrap_transports: HashSet::new(),
1028            bootstrap_transport_npubs: HashMap::new(),
1029            discovery_fallback_transit_blocked_peers: HashSet::new(),
1030            last_parent_reeval: None,
1031            last_congestion_log: None,
1032            estimated_mesh_size: None,
1033            last_mesh_size_log: None,
1034            last_self_warn: None,
1035            last_local_send_failure_at: None,
1036            peer_aliases: HashMap::new(),
1037            configured_peer_send_weights,
1038            peer_acl,
1039            host_map,
1040            path_mtu_lookup: Arc::new(std::sync::RwLock::new(HashMap::new())),
1041        })
1042    }
1043
1044    /// Create a leaf-only node (simplified state).
1045    pub fn leaf_only(config: Config) -> Result<Self, NodeError> {
1046        let mut node = Self::new(config)?;
1047        node.is_leaf_only = true;
1048        node.bloom_state = BloomState::leaf_only(*node.identity.node_addr());
1049        Ok(node)
1050    }
1051
1052    fn host_map_and_peer_acl(config: &Config) -> (Arc<HostMap>, acl::PeerAclReloader) {
1053        let base_host_map = HostMap::from_peer_configs(config.peers());
1054        if !config.node.system_files_enabled {
1055            return (
1056                Arc::new(base_host_map.clone()),
1057                acl::PeerAclReloader::memory_only(base_host_map),
1058            );
1059        }
1060
1061        let mut host_map = base_host_map.clone();
1062        let hosts_path = std::path::PathBuf::from(crate::upper::hosts::DEFAULT_HOSTS_PATH);
1063        let hosts_file = HostMap::load_hosts_file(std::path::Path::new(
1064            crate::upper::hosts::DEFAULT_HOSTS_PATH,
1065        ));
1066        host_map.merge(hosts_file);
1067        let peer_acl = acl::PeerAclReloader::with_alias_sources(
1068            std::path::PathBuf::from(acl::DEFAULT_PEERS_ALLOW_PATH),
1069            std::path::PathBuf::from(acl::DEFAULT_PEERS_DENY_PATH),
1070            base_host_map,
1071            hosts_path,
1072        );
1073        (Arc::new(host_map), peer_acl)
1074    }
1075
1076    fn configured_peer_send_weights(config: &Config) -> HashMap<NodeAddr, u8> {
1077        config
1078            .peers()
1079            .iter()
1080            .filter_map(|peer| {
1081                PeerIdentity::from_npub(&peer.npub).ok().map(|identity| {
1082                    (
1083                        *identity.node_addr(),
1084                        encrypt_worker::EXPLICIT_PEER_SEND_WEIGHT,
1085                    )
1086                })
1087            })
1088            .collect()
1089    }
1090
1091    fn send_weight_for_peer(&self, peer_addr: &NodeAddr) -> u8 {
1092        self.configured_peer_send_weights
1093            .get(peer_addr)
1094            .copied()
1095            .unwrap_or(encrypt_worker::DEFAULT_SEND_WEIGHT)
1096    }
1097
1098    /// Create transport instances from configuration.
1099    ///
1100    /// Returns a vector of TransportHandles for all configured transports.
1101    async fn create_transports(&mut self, packet_tx: &PacketTx) -> Vec<TransportHandle> {
1102        let mut transports = Vec::new();
1103
1104        // Collect UDP configs with optional names to avoid borrow conflicts
1105        let udp_instances: Vec<_> = self
1106            .config
1107            .transports
1108            .udp
1109            .iter()
1110            .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1111            .collect();
1112
1113        // Create UDP transport instances
1114        for (name, udp_config) in udp_instances {
1115            let transport_id = self.allocate_transport_id();
1116            let udp = UdpTransport::new(transport_id, name, udp_config, packet_tx.clone());
1117            transports.push(TransportHandle::Udp(udp));
1118        }
1119
1120        #[cfg(feature = "sim-transport")]
1121        {
1122            let sim_instances: Vec<_> = self
1123                .config
1124                .transports
1125                .sim
1126                .iter()
1127                .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1128                .collect();
1129
1130            for (name, sim_config) in sim_instances {
1131                let transport_id = self.allocate_transport_id();
1132                let sim = crate::transport::sim::SimTransport::new(
1133                    transport_id,
1134                    name,
1135                    sim_config,
1136                    packet_tx.clone(),
1137                );
1138                transports.push(TransportHandle::Sim(sim));
1139            }
1140        }
1141
1142        // Create Ethernet transport instances where raw-socket support exists.
1143        #[cfg(any(target_os = "linux", target_os = "macos"))]
1144        {
1145            let eth_instances: Vec<_> = self
1146                .config
1147                .transports
1148                .ethernet
1149                .iter()
1150                .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1151                .collect();
1152            let xonly = self.identity.pubkey();
1153            for (name, eth_config) in eth_instances {
1154                let mut eth_config = eth_config;
1155                if eth_config.discovery_scope.is_none() {
1156                    eth_config.discovery_scope = self.lan_discovery_scope();
1157                }
1158                let transport_id = self.allocate_transport_id();
1159                let mut eth =
1160                    EthernetTransport::new(transport_id, name, eth_config, packet_tx.clone());
1161                eth.set_local_pubkey(xonly);
1162                transports.push(TransportHandle::Ethernet(eth));
1163            }
1164        }
1165
1166        // Create TCP transport instances
1167        let tcp_instances: Vec<_> = self
1168            .config
1169            .transports
1170            .tcp
1171            .iter()
1172            .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1173            .collect();
1174
1175        for (name, tcp_config) in tcp_instances {
1176            let transport_id = self.allocate_transport_id();
1177            let tcp = TcpTransport::new(transport_id, name, tcp_config, packet_tx.clone());
1178            transports.push(TransportHandle::Tcp(tcp));
1179        }
1180
1181        // Create Tor transport instances
1182        let tor_instances: Vec<_> = self
1183            .config
1184            .transports
1185            .tor
1186            .iter()
1187            .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1188            .collect();
1189
1190        for (name, tor_config) in tor_instances {
1191            let transport_id = self.allocate_transport_id();
1192            let tor = TorTransport::new(transport_id, name, tor_config, packet_tx.clone());
1193            transports.push(TransportHandle::Tor(tor));
1194        }
1195
1196        let webrtc_instances: Vec<_> = self
1197            .config
1198            .transports
1199            .webrtc
1200            .iter()
1201            .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1202            .collect();
1203
1204        #[cfg(feature = "webrtc-transport")]
1205        {
1206            for (name, webrtc_config) in webrtc_instances {
1207                let transport_id = self.allocate_transport_id();
1208                match WebRtcTransport::new(
1209                    transport_id,
1210                    name,
1211                    webrtc_config,
1212                    packet_tx.clone(),
1213                    &self.identity,
1214                    &self.config.node.discovery.nostr,
1215                ) {
1216                    Ok(webrtc) => transports.push(TransportHandle::WebRtc(Box::new(webrtc))),
1217                    Err(err) => {
1218                        warn!(
1219                            transport_id = %transport_id,
1220                            error = %err,
1221                            "failed to initialize WebRTC transport"
1222                        );
1223                    }
1224                }
1225            }
1226        }
1227        #[cfg(not(feature = "webrtc-transport"))]
1228        if !webrtc_instances.is_empty() {
1229            warn!("WebRTC transport configured but this build lacks WebRTC transport support");
1230        }
1231
1232        // Create BLE transport instances
1233        #[cfg(bluer_available)]
1234        {
1235            let ble_instances: Vec<_> = self
1236                .config
1237                .transports
1238                .ble
1239                .iter()
1240                .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1241                .collect();
1242
1243            #[cfg(all(bluer_available, not(test)))]
1244            for (name, ble_config) in ble_instances {
1245                let transport_id = self.allocate_transport_id();
1246                let adapter = ble_config.adapter().to_string();
1247                let mtu = ble_config.mtu();
1248                match crate::transport::ble::io::BluerIo::new(&adapter, mtu).await {
1249                    Ok(io) => {
1250                        let mut ble = crate::transport::ble::BleTransport::new(
1251                            transport_id,
1252                            name,
1253                            ble_config,
1254                            io,
1255                            packet_tx.clone(),
1256                        );
1257                        ble.set_local_pubkey(self.identity.pubkey().serialize());
1258                        transports.push(TransportHandle::Ble(ble));
1259                    }
1260                    Err(e) => {
1261                        tracing::warn!(adapter = %adapter, error = %e, "failed to initialize BLE adapter");
1262                    }
1263                }
1264            }
1265
1266            #[cfg(any(not(bluer_available), test))]
1267            if !ble_instances.is_empty() {
1268                #[cfg(not(test))]
1269                tracing::warn!("BLE transport configured but this build lacks BlueZ support");
1270            }
1271        }
1272
1273        transports
1274    }
1275
1276    /// Find an operational transport that matches the given transport type name.
1277    ///
1278    /// Adopted UDP bootstrap transports are point-to-point sockets handed off
1279    /// from Nostr/STUN traversal. They must not be reused for ordinary
1280    /// `udp host:port` dials discovered through static config, mDNS, or overlay
1281    /// adverts: on macOS a `send_to` through the wrong adopted socket can fail
1282    /// with `EINVAL`, and even on platforms that allow it the packet would use
1283    /// the wrong 5-tuple/NAT mapping. Prefer configured transports and make the
1284    /// choice deterministic by lowest transport id instead of HashMap order.
1285    fn find_transport_for_type(&self, transport_type: &str) -> Option<TransportId> {
1286        self.transports
1287            .iter()
1288            .filter(|(id, handle)| {
1289                handle.transport_type().name == transport_type
1290                    && handle.is_operational()
1291                    && !self.bootstrap_transports.contains(id)
1292            })
1293            .min_by_key(|(id, _)| id.as_u32())
1294            .map(|(id, _)| *id)
1295    }
1296
1297    /// Resolve an Ethernet peer address ("interface/mac") to a transport ID
1298    /// and binary TransportAddr.
1299    ///
1300    /// Finds the Ethernet transport instance bound to the named interface
1301    /// and parses the MAC portion into a 6-byte TransportAddr.
1302    #[allow(unused_variables)]
1303    fn resolve_ethernet_addr(
1304        &self,
1305        addr_str: &str,
1306    ) -> Result<(TransportId, TransportAddr), NodeError> {
1307        #[cfg(any(target_os = "linux", target_os = "macos"))]
1308        {
1309            let (iface, mac_str) = addr_str.split_once('/').ok_or_else(|| {
1310                NodeError::NoTransportForType(format!(
1311                    "invalid Ethernet address format '{}': expected 'interface/mac'",
1312                    addr_str
1313                ))
1314            })?;
1315
1316            // Find the Ethernet transport bound to this interface
1317            let transport_id = self
1318                .transports
1319                .iter()
1320                .find(|(_, handle)| {
1321                    handle.transport_type().name == "ethernet"
1322                        && handle.is_operational()
1323                        && handle.interface_name() == Some(iface)
1324                })
1325                .map(|(id, _)| *id)
1326                .ok_or_else(|| {
1327                    NodeError::NoTransportForType(format!(
1328                        "no operational Ethernet transport for interface '{}'",
1329                        iface
1330                    ))
1331                })?;
1332
1333            let mac = crate::transport::ethernet::parse_mac_string(mac_str).map_err(|e| {
1334                NodeError::NoTransportForType(format!("invalid MAC in '{}': {}", addr_str, e))
1335            })?;
1336
1337            Ok((transport_id, TransportAddr::from_bytes(&mac)))
1338        }
1339        #[cfg(not(any(target_os = "linux", target_os = "macos")))]
1340        {
1341            Err(NodeError::NoTransportForType(
1342                "Ethernet transport is not supported on this platform".to_string(),
1343            ))
1344        }
1345    }
1346
1347    /// Resolve a BLE address string (`"adapter/AA:BB:CC:DD:EE:FF"`) to a
1348    /// (TransportId, TransportAddr) pair by finding the BLE transport
1349    /// instance matching the adapter name.
1350    #[cfg(bluer_available)]
1351    fn resolve_ble_addr(&self, addr_str: &str) -> Result<(TransportId, TransportAddr), NodeError> {
1352        let ta = TransportAddr::from_string(addr_str);
1353        let adapter = crate::transport::ble::addr::adapter_from_addr(&ta).ok_or_else(|| {
1354            NodeError::NoTransportForType(format!(
1355                "invalid BLE address format '{}': expected 'adapter/mac'",
1356                addr_str
1357            ))
1358        })?;
1359
1360        // Find the BLE transport for this adapter
1361        let transport_id = self
1362            .transports
1363            .iter()
1364            .find(|(_, handle)| handle.transport_type().name == "ble" && handle.is_operational())
1365            .map(|(id, _)| *id)
1366            .ok_or_else(|| {
1367                NodeError::NoTransportForType(format!(
1368                    "no operational BLE transport for adapter '{}'",
1369                    adapter
1370                ))
1371            })?;
1372
1373        // Validate the address format
1374        crate::transport::ble::addr::BleAddr::parse(addr_str).map_err(|e| {
1375            NodeError::NoTransportForType(format!("invalid BLE address '{}': {}", addr_str, e))
1376        })?;
1377
1378        Ok((transport_id, TransportAddr::from_string(addr_str)))
1379    }
1380
1381    // === Identity Accessors ===
1382
1383    /// Get this node's identity.
1384    pub fn identity(&self) -> &Identity {
1385        &self.identity
1386    }
1387
1388    /// Get this node's NodeAddr.
1389    pub fn node_addr(&self) -> &NodeAddr {
1390        self.identity.node_addr()
1391    }
1392
1393    /// Get this node's npub.
1394    pub fn npub(&self) -> String {
1395        self.identity.npub()
1396    }
1397
1398    /// Return a human-readable display name for a NodeAddr.
1399    ///
1400    /// Lookup order:
1401    /// 1. Host map hostname (from peer aliases + /etc/fips/hosts)
1402    /// 2. Configured peer alias or short npub (from startup map)
1403    /// 3. Active peer's short npub (e.g., inbound peer not in config)
1404    /// 4. Session endpoint's short npub (end-to-end, may not be direct peer)
1405    /// 5. Truncated NodeAddr hex (unknown address)
1406    pub(crate) fn peer_display_name(&self, addr: &NodeAddr) -> String {
1407        if let Some(hostname) = self.host_map.lookup_hostname(addr) {
1408            return hostname.to_string();
1409        }
1410        if let Some(name) = self.peer_aliases.get(addr) {
1411            return name.clone();
1412        }
1413        if let Some(peer) = self.peers.get(addr) {
1414            return peer.identity().short_npub();
1415        }
1416        if let Some(entry) = self.sessions.get(addr) {
1417            let (xonly, _) = entry.remote_pubkey().x_only_public_key();
1418            return PeerIdentity::from_pubkey(xonly).short_npub();
1419        }
1420        addr.short_hex()
1421    }
1422
1423    /// Tear down a `peers_by_index` entry **and** keep the shard-owned
1424    /// decrypt-worker state coherent: removes the same `cache_key`
1425    /// from the registered-sessions tracking set and tells the
1426    /// assigned shard worker to drop its `OwnedSessionState` entry.
1427    ///
1428    /// Use this instead of a bare `self.peers_by_index.remove(&key)`
1429    /// at every session-lifecycle teardown site (rekey cross-connection
1430    /// swap, peer disconnect, dispatch session-rotation) so the worker
1431    /// doesn't keep stale ciphers / replay windows around. The
1432    /// follow-up `RegisterSession` for the NEW key (if any) will then
1433    /// install the fresh state on the same shard.
1434    pub(in crate::node) fn deregister_session_index(&mut self, cache_key: (TransportId, u32)) {
1435        // Find the peer that owns this index BEFORE removing it from
1436        // the index map, so we can decide whether the deregistration
1437        // also tears down the peer's connected UDP socket.
1438        let owning_peer = self.peers_by_index.get(&cache_key).copied();
1439        self.peers_by_index.remove(&cache_key);
1440        if self.decrypt_registered_sessions.remove(&cache_key)
1441            && let Some(workers) = self.decrypt_workers.as_ref()
1442        {
1443            workers.unregister_session(cache_key);
1444        }
1445        // Tear down the per-peer connected UDP socket *only* if no
1446        // other peers_by_index entry still resolves to this peer.
1447        // Rekey drain calls into this helper with the OLD session
1448        // index while the NEW index is already installed and points
1449        // at the same peer — there the connect()-ed 5-tuple is
1450        // still valid for the new session and we must not close it.
1451        // Peer-teardown sites (CrossConnection swap, stale-index
1452        // fall-through in encrypted.rs, disconnect handler) call
1453        // here when this is the peer's last index, so the connected
1454        // socket goes away with the peer.
1455        if let Some(peer_addr) = owning_peer {
1456            let peer_has_other_index = self
1457                .peers_by_index
1458                .values()
1459                .any(|other| *other == peer_addr);
1460            if !peer_has_other_index {
1461                self.clear_connected_udp_for_peer(&peer_addr);
1462            }
1463        }
1464    }
1465
1466    /// Ensure the current FMP receive index resolves to this peer.
1467    ///
1468    /// Rekey msg1/msg2 handlers pre-register the pending index before
1469    /// cutover, but losing that registration in a debug build used to
1470    /// panic in the cutover path. Repairing the map here is safe: the
1471    /// peer has already promoted the pending session, and the decrypt
1472    /// worker registration immediately after cutover depends on the
1473    /// same `(transport_id, our_index)` key.
1474    pub(in crate::node) fn ensure_current_session_index_registered(
1475        &mut self,
1476        node_addr: &NodeAddr,
1477        context: &'static str,
1478    ) -> bool {
1479        let Some(peer) = self.peers.get(node_addr) else {
1480            return false;
1481        };
1482        let Some(transport_id) = peer.transport_id() else {
1483            warn!(
1484                peer = %self.peer_display_name(node_addr),
1485                context,
1486                "Cannot register current session index without transport id"
1487            );
1488            return false;
1489        };
1490        let Some(our_index) = peer.our_index() else {
1491            warn!(
1492                peer = %self.peer_display_name(node_addr),
1493                context,
1494                "Cannot register current session index without local index"
1495            );
1496            return false;
1497        };
1498
1499        let cache_key = (transport_id, our_index.as_u32());
1500        match self.peers_by_index.get(&cache_key).copied() {
1501            Some(existing) if existing == *node_addr => true,
1502            Some(existing) => {
1503                warn!(
1504                    peer = %self.peer_display_name(node_addr),
1505                    previous_owner = %self.peer_display_name(&existing),
1506                    transport_id = %transport_id,
1507                    our_index = %our_index,
1508                    context,
1509                    "Repairing current session index with stale owner"
1510                );
1511                self.peers_by_index.insert(cache_key, *node_addr);
1512                true
1513            }
1514            None => {
1515                warn!(
1516                    peer = %self.peer_display_name(node_addr),
1517                    transport_id = %transport_id,
1518                    our_index = %our_index,
1519                    context,
1520                    "Repairing missing current session index"
1521                );
1522                self.peers_by_index.insert(cache_key, *node_addr);
1523                true
1524            }
1525        }
1526    }
1527
1528    // === Configuration ===
1529
1530    /// Get the configuration.
1531    pub fn config(&self) -> &Config {
1532        &self.config
1533    }
1534
1535    /// Calculate the effective IPv6 MTU that can be sent over FIPS.
1536    ///
1537    /// Delegates to `upper::icmp::effective_ipv6_mtu()` with this node's
1538    /// transport MTU. Returns the maximum IPv6 packet size (including
1539    /// IPv6 header) that can be transmitted through the FIPS mesh.
1540    pub fn effective_ipv6_mtu(&self) -> u16 {
1541        crate::upper::icmp::effective_ipv6_mtu(self.transport_mtu())
1542    }
1543
1544    /// Get the transport MTU governing the global TUN-boundary MSS clamp.
1545    ///
1546    /// Returns the **minimum** MTU across all operational transports, or
1547    /// 1280 (IPv6 minimum) as fallback. Used for initial TUN configuration
1548    /// where a specific egress transport isn't yet known: the resulting
1549    /// `effective_ipv6_mtu` (transport_mtu - 77) and `max_mss`
1550    /// (effective_mtu - 60) form a conservative ceiling that fits ANY
1551    /// configured-transport's egress, eliminating PMTU-D black holes that
1552    /// would otherwise occur when a flow's actual egress is smaller than
1553    /// the clamp ceiling assumed at TUN init.
1554    ///
1555    /// Returning the smallest (rather than the first-iterated, which used
1556    /// to vary across HashMap iteration order + async-startup race) makes
1557    /// the clamp deterministic across daemon restarts.
1558    ///
1559    /// See `ISSUE-2026-0011` for the empirical investigation.
1560    pub fn transport_mtu(&self) -> u16 {
1561        let min_operational = self
1562            .transports
1563            .values()
1564            .filter(|h| h.is_operational())
1565            .map(|h| h.mtu())
1566            .min();
1567        if let Some(mtu) = min_operational {
1568            return mtu;
1569        }
1570        // Fallback to config: try UDP first, then Ethernet
1571        if let Some((_, cfg)) = self.config.transports.udp.iter().next() {
1572            return cfg.mtu();
1573        }
1574        1280
1575    }
1576
1577    // === State ===
1578
1579    /// Get the node state.
1580    pub fn state(&self) -> NodeState {
1581        self.state
1582    }
1583
1584    /// Get the node uptime.
1585    pub fn uptime(&self) -> std::time::Duration {
1586        self.started_at.elapsed()
1587    }
1588
1589    /// Check if node is operational.
1590    pub fn is_running(&self) -> bool {
1591        self.state.is_operational()
1592    }
1593
1594    /// Check if this is a leaf-only node.
1595    pub fn is_leaf_only(&self) -> bool {
1596        self.is_leaf_only
1597    }
1598
1599    // === Tree State ===
1600
1601    /// Get the tree state.
1602    pub fn tree_state(&self) -> &TreeState {
1603        &self.tree_state
1604    }
1605
1606    /// Get mutable tree state.
1607    pub fn tree_state_mut(&mut self) -> &mut TreeState {
1608        &mut self.tree_state
1609    }
1610
1611    // === Bloom State ===
1612
1613    /// Get the Bloom filter state.
1614    pub fn bloom_state(&self) -> &BloomState {
1615        &self.bloom_state
1616    }
1617
1618    /// Get mutable Bloom filter state.
1619    pub fn bloom_state_mut(&mut self) -> &mut BloomState {
1620        &mut self.bloom_state
1621    }
1622
1623    // === Mesh Size Estimate ===
1624
1625    /// Get the cached estimated mesh size.
1626    pub fn estimated_mesh_size(&self) -> Option<u64> {
1627        self.estimated_mesh_size
1628    }
1629
1630    /// Compute and cache the estimated mesh size from bloom filters.
1631    ///
1632    /// Uses the spanning tree partition: parent's filter covers nodes reachable
1633    /// upward, children's filters cover disjoint subtrees downward. The sum
1634    /// of estimated entry counts plus one (self) approximates total network size.
1635    pub(crate) fn compute_mesh_size(&mut self) {
1636        let my_addr = *self.tree_state.my_node_addr();
1637        let parent_id = *self.tree_state.my_declaration().parent_id();
1638        let is_root = self.tree_state.is_root();
1639
1640        let max_fpr = self.config.node.bloom.max_inbound_fpr;
1641        let mut total: f64 = 1.0; // count self
1642        let mut child_count: u32 = 0;
1643        let mut has_data = false;
1644
1645        // Parent's filter: nodes reachable upward through the tree.
1646        // If any contributing filter is above the FPR cap, we refuse to
1647        // estimate rather than substitute a partial/biased aggregate —
1648        // Node.estimated_mesh_size is already Option<u64> and consumers
1649        // (control socket, fipstop, periodic debug log) handle None.
1650        if !is_root
1651            && let Some(parent) = self.peers.get(&parent_id)
1652            && let Some(filter) = parent.inbound_filter()
1653        {
1654            match filter.estimated_count(max_fpr) {
1655                Some(n) => {
1656                    total += n;
1657                    has_data = true;
1658                }
1659                None => {
1660                    self.estimated_mesh_size = None;
1661                    return;
1662                }
1663            }
1664        }
1665
1666        // Children's filters: each child's subtree is disjoint
1667        for (peer_addr, peer) in &self.peers {
1668            if peer_addr == &parent_id {
1669                continue;
1670            }
1671            if let Some(decl) = self.tree_state.peer_declaration(peer_addr)
1672                && *decl.parent_id() == my_addr
1673            {
1674                child_count += 1;
1675                if let Some(filter) = peer.inbound_filter() {
1676                    match filter.estimated_count(max_fpr) {
1677                        Some(n) => {
1678                            total += n;
1679                            has_data = true;
1680                        }
1681                        None => {
1682                            self.estimated_mesh_size = None;
1683                            return;
1684                        }
1685                    }
1686                }
1687            }
1688        }
1689
1690        if !has_data {
1691            self.estimated_mesh_size = None;
1692            return;
1693        }
1694
1695        let size = total.round() as u64;
1696        self.estimated_mesh_size = Some(size);
1697
1698        // Periodic logging (reuse MMP default interval: 30s)
1699        let now = std::time::Instant::now();
1700        let should_log = match self.last_mesh_size_log {
1701            None => true,
1702            Some(last) => {
1703                now.duration_since(last)
1704                    >= std::time::Duration::from_secs(self.config.node.mmp.log_interval_secs)
1705            }
1706        };
1707        if should_log {
1708            tracing::debug!(
1709                estimated_mesh_size = size,
1710                peers = self.peers.len(),
1711                children = child_count,
1712                "Mesh size estimate"
1713            );
1714            self.last_mesh_size_log = Some(now);
1715        }
1716    }
1717
1718    // === Coord Cache ===
1719
1720    /// Get the coordinate cache.
1721    pub fn coord_cache(&self) -> &CoordCache {
1722        &self.coord_cache
1723    }
1724
1725    /// Get mutable coordinate cache.
1726    pub fn coord_cache_mut(&mut self) -> &mut CoordCache {
1727        &mut self.coord_cache
1728    }
1729
1730    // === Node Statistics ===
1731
1732    /// Get the node statistics.
1733    pub fn stats(&self) -> &stats::NodeStats {
1734        &self.stats
1735    }
1736
1737    /// Get mutable node statistics.
1738    pub(crate) fn stats_mut(&mut self) -> &mut stats::NodeStats {
1739        &mut self.stats
1740    }
1741
1742    /// Get the stats history collector.
1743    pub fn stats_history(&self) -> &stats_history::StatsHistory {
1744        &self.stats_history
1745    }
1746
1747    /// Sample the current node state into the stats history ring.
1748    /// Called once per tick from the RX loop.
1749    pub(crate) fn record_stats_history(&mut self) {
1750        let fwd = &self.stats.forwarding;
1751        let peers_with_mmp: Vec<f64> = self
1752            .peers
1753            .values()
1754            .filter_map(|p| p.mmp().map(|m| m.metrics.loss_rate()))
1755            .collect();
1756        let loss_rate = if peers_with_mmp.is_empty() {
1757            0.0
1758        } else {
1759            peers_with_mmp.iter().sum::<f64>() / peers_with_mmp.len() as f64
1760        };
1761
1762        let snap = stats_history::Snapshot {
1763            mesh_size: self.estimated_mesh_size,
1764            tree_depth: self.tree_state.my_coords().depth() as u32,
1765            peer_count: self.peers.len() as u64,
1766            parent_switches_total: self.stats.tree.parent_switches,
1767            bytes_in_total: fwd.received_bytes,
1768            bytes_out_total: fwd.forwarded_bytes + fwd.originated_bytes,
1769            packets_in_total: fwd.received_packets,
1770            packets_out_total: fwd.forwarded_packets + fwd.originated_packets,
1771            loss_rate,
1772            active_sessions: self.sessions.len() as u64,
1773        };
1774
1775        let now = std::time::Instant::now();
1776        let peer_snaps: Vec<stats_history::PeerSnapshot> = self
1777            .peers
1778            .values()
1779            .map(|p| {
1780                let stats = p.link_stats();
1781                let (srtt_ms, loss_rate, ecn_ce) = match p.mmp() {
1782                    Some(m) => (
1783                        m.metrics.srtt_ms(),
1784                        Some(m.metrics.loss_rate()),
1785                        m.receiver.ecn_ce_count() as u64,
1786                    ),
1787                    None => (None, None, 0),
1788                };
1789                stats_history::PeerSnapshot {
1790                    node_addr: *p.node_addr(),
1791                    last_seen: now,
1792                    srtt_ms,
1793                    loss_rate,
1794                    bytes_in_total: stats.bytes_recv,
1795                    bytes_out_total: stats.bytes_sent,
1796                    packets_in_total: stats.packets_recv,
1797                    packets_out_total: stats.packets_sent,
1798                    ecn_ce_total: ecn_ce,
1799                }
1800            })
1801            .collect();
1802
1803        self.stats_history.tick(now, &snap, &peer_snaps);
1804    }
1805
1806    // === TUN Interface ===
1807
1808    /// Get the TUN state.
1809    pub fn tun_state(&self) -> TunState {
1810        self.tun_state
1811    }
1812
1813    /// Get the TUN interface name, if active.
1814    pub fn tun_name(&self) -> Option<&str> {
1815        self.tun_name.as_deref()
1816    }
1817
1818    // === Resource Limits ===
1819
1820    /// Set the maximum number of connections (handshake phase).
1821    pub fn set_max_connections(&mut self, max: usize) {
1822        self.max_connections = max;
1823    }
1824
1825    /// Set the maximum number of peers (authenticated).
1826    pub fn set_max_peers(&mut self, max: usize) {
1827        self.max_peers = max;
1828    }
1829
1830    /// Returns false when starting more outbound work would exceed a resource
1831    /// cap. A cap of `0` means uncapped.
1832    pub(crate) fn outbound_admission_check(&self) -> bool {
1833        let connection_used = self
1834            .connections
1835            .len()
1836            .saturating_add(self.pending_connects.len());
1837        let peer_allowed = self.max_peers == 0 || self.peers.len() < self.max_peers;
1838        let connection_allowed =
1839            self.max_connections == 0 || connection_used < self.max_connections;
1840        let link_allowed = self.max_links == 0 || self.links.len() < self.max_links;
1841        peer_allowed && connection_allowed && link_allowed
1842    }
1843
1844    /// Like `outbound_admission_check`, but for racing a better path to a
1845    /// peer that is already authenticated. This may temporarily add a
1846    /// connection/link, but it does not consume a new peer slot.
1847    pub(crate) fn outbound_direct_refresh_admission_check(&self) -> bool {
1848        let connection_used = self
1849            .connections
1850            .len()
1851            .saturating_add(self.pending_connects.len());
1852        let connection_allowed =
1853            self.max_connections == 0 || connection_used < self.max_connections;
1854        let link_allowed = self.max_links == 0 || self.links.len() < self.max_links;
1855        connection_allowed && link_allowed
1856    }
1857
1858    /// Set the maximum number of links.
1859    pub fn set_max_links(&mut self, max: usize) {
1860        self.max_links = max;
1861    }
1862
1863    // === Counts ===
1864
1865    /// Number of pending connections (handshake in progress).
1866    pub fn connection_count(&self) -> usize {
1867        self.connections.len()
1868    }
1869
1870    /// Number of authenticated peers.
1871    pub fn peer_count(&self) -> usize {
1872        self.peers.len()
1873    }
1874
1875    /// Number of active links.
1876    pub fn link_count(&self) -> usize {
1877        self.links.len()
1878    }
1879
1880    /// Number of active transports.
1881    pub fn transport_count(&self) -> usize {
1882        self.transports.len()
1883    }
1884
1885    // === Transport Management ===
1886
1887    /// Allocate a new transport ID.
1888    pub fn allocate_transport_id(&mut self) -> TransportId {
1889        let id = TransportId::new(self.next_transport_id);
1890        self.next_transport_id += 1;
1891        id
1892    }
1893
1894    /// Get a transport by ID.
1895    pub fn get_transport(&self, id: &TransportId) -> Option<&TransportHandle> {
1896        self.transports.get(id)
1897    }
1898
1899    /// Get mutable transport by ID.
1900    pub fn get_transport_mut(&mut self, id: &TransportId) -> Option<&mut TransportHandle> {
1901        self.transports.get_mut(id)
1902    }
1903
1904    /// Iterate over transport IDs.
1905    pub fn transport_ids(&self) -> impl Iterator<Item = &TransportId> {
1906        self.transports.keys()
1907    }
1908
1909    /// Get the packet receiver for the event loop.
1910    pub fn packet_rx(&mut self) -> Option<&mut PacketRx> {
1911        self.packet_rx.as_mut()
1912    }
1913
1914    // === Link Management ===
1915
1916    /// Allocate a new link ID.
1917    pub fn allocate_link_id(&mut self) -> LinkId {
1918        let id = LinkId::new(self.next_link_id);
1919        self.next_link_id += 1;
1920        id
1921    }
1922
1923    /// Add a link.
1924    pub fn add_link(&mut self, link: Link) -> Result<(), NodeError> {
1925        if self.max_links > 0 && self.links.len() >= self.max_links {
1926            return Err(NodeError::MaxLinksExceeded {
1927                max: self.max_links,
1928            });
1929        }
1930        let link_id = link.link_id();
1931        let transport_id = link.transport_id();
1932        let remote_addr = link.remote_addr().clone();
1933
1934        self.links.insert(link_id, link);
1935        self.addr_to_link
1936            .insert((transport_id, remote_addr), link_id);
1937        Ok(())
1938    }
1939
1940    /// Get a link by ID.
1941    pub fn get_link(&self, link_id: &LinkId) -> Option<&Link> {
1942        self.links.get(link_id)
1943    }
1944
1945    /// Get a mutable link by ID.
1946    pub fn get_link_mut(&mut self, link_id: &LinkId) -> Option<&mut Link> {
1947        self.links.get_mut(link_id)
1948    }
1949
1950    /// Find link ID by transport address.
1951    pub fn find_link_by_addr(
1952        &self,
1953        transport_id: TransportId,
1954        addr: &TransportAddr,
1955    ) -> Option<LinkId> {
1956        self.addr_to_link
1957            .get(&(transport_id, addr.clone()))
1958            .copied()
1959    }
1960
1961    /// Remove a link.
1962    ///
1963    /// Only removes the addr_to_link reverse lookup if it still points to this
1964    /// link. In cross-connection scenarios, a newer link may have replaced the
1965    /// entry for the same address.
1966    pub fn remove_link(&mut self, link_id: &LinkId) -> Option<Link> {
1967        if let Some(link) = self.links.remove(link_id) {
1968            // Clean up reverse lookup only if it still maps to this link
1969            let key = (link.transport_id(), link.remote_addr().clone());
1970            if self.addr_to_link.get(&key) == Some(link_id) {
1971                self.addr_to_link.remove(&key);
1972            }
1973            Some(link)
1974        } else {
1975            None
1976        }
1977    }
1978
1979    pub(crate) fn cleanup_bootstrap_transport_if_unused(&mut self, transport_id: TransportId) {
1980        if !self.bootstrap_transports.contains(&transport_id) {
1981            return;
1982        }
1983
1984        let transport_in_use = self
1985            .links
1986            .values()
1987            .any(|link| link.transport_id() == transport_id)
1988            || self
1989                .connections
1990                .values()
1991                .any(|conn| conn.transport_id() == Some(transport_id))
1992            || self
1993                .peers
1994                .values()
1995                .any(|peer| peer.transport_id() == Some(transport_id))
1996            || self
1997                .pending_connects
1998                .iter()
1999                .any(|pending| pending.transport_id == transport_id);
2000
2001        if transport_in_use {
2002            return;
2003        }
2004
2005        tracing::debug!(
2006            transport_id = %transport_id,
2007            "bootstrap transport has no remaining references; dropping"
2008        );
2009
2010        self.bootstrap_transports.remove(&transport_id);
2011        self.bootstrap_transport_npubs.remove(&transport_id);
2012        self.transport_drops.remove(&transport_id);
2013        self.transports.remove(&transport_id);
2014    }
2015
2016    /// Iterate over all links.
2017    pub fn links(&self) -> impl Iterator<Item = &Link> {
2018        self.links.values()
2019    }
2020
2021    // === Connection Management (Handshake Phase) ===
2022
2023    /// Add a pending connection.
2024    pub fn add_connection(&mut self, connection: PeerConnection) -> Result<(), NodeError> {
2025        let link_id = connection.link_id();
2026
2027        if self.connections.contains_key(&link_id) {
2028            return Err(NodeError::ConnectionAlreadyExists(link_id));
2029        }
2030
2031        if self.max_connections > 0 && self.connections.len() >= self.max_connections {
2032            return Err(NodeError::MaxConnectionsExceeded {
2033                max: self.max_connections,
2034            });
2035        }
2036
2037        self.connections.insert(link_id, connection);
2038        Ok(())
2039    }
2040
2041    /// Get a connection by LinkId.
2042    pub fn get_connection(&self, link_id: &LinkId) -> Option<&PeerConnection> {
2043        self.connections.get(link_id)
2044    }
2045
2046    /// Get a mutable connection by LinkId.
2047    pub fn get_connection_mut(&mut self, link_id: &LinkId) -> Option<&mut PeerConnection> {
2048        self.connections.get_mut(link_id)
2049    }
2050
2051    /// Remove a connection.
2052    pub fn remove_connection(&mut self, link_id: &LinkId) -> Option<PeerConnection> {
2053        self.connections.remove(link_id)
2054    }
2055
2056    /// Iterate over all connections.
2057    pub fn connections(&self) -> impl Iterator<Item = &PeerConnection> {
2058        self.connections.values()
2059    }
2060
2061    // === Peer Management (Active Phase) ===
2062
2063    /// Get a peer by NodeAddr.
2064    pub fn get_peer(&self, node_addr: &NodeAddr) -> Option<&ActivePeer> {
2065        self.peers.get(node_addr)
2066    }
2067
2068    /// Get a mutable peer by NodeAddr.
2069    pub fn get_peer_mut(&mut self, node_addr: &NodeAddr) -> Option<&mut ActivePeer> {
2070        self.peers.get_mut(node_addr)
2071    }
2072
2073    /// Remove a peer.
2074    pub fn remove_peer(&mut self, node_addr: &NodeAddr) -> Option<ActivePeer> {
2075        self.peers.remove(node_addr)
2076    }
2077
2078    /// Iterate over all peers.
2079    pub fn peers(&self) -> impl Iterator<Item = &ActivePeer> {
2080        self.peers.values()
2081    }
2082
2083    /// Reference to the Nostr discovery handle if discovery is enabled.
2084    /// Used by control queries (`show_peers` per-peer Nostr-traversal
2085    /// state) to read failure-state without taking shared ownership.
2086    pub fn nostr_discovery_handle(&self) -> Option<&crate::discovery::nostr::NostrDiscovery> {
2087        self.nostr_discovery.as_deref()
2088    }
2089
2090    /// Iterate over all peer node IDs.
2091    pub fn peer_ids(&self) -> impl Iterator<Item = &NodeAddr> {
2092        self.peers.keys()
2093    }
2094
2095    /// Iterate over peers that can send traffic.
2096    pub fn sendable_peers(&self) -> impl Iterator<Item = &ActivePeer> {
2097        self.peers.values().filter(|p| p.can_send())
2098    }
2099
2100    /// Number of peers that can send traffic.
2101    pub fn sendable_peer_count(&self) -> usize {
2102        self.peers.values().filter(|p| p.can_send()).count()
2103    }
2104
2105    pub(crate) fn set_discovery_fallback_transit_allowed(
2106        &mut self,
2107        peer_addr: NodeAddr,
2108        allowed: bool,
2109    ) {
2110        if allowed {
2111            self.discovery_fallback_transit_blocked_peers
2112                .remove(&peer_addr);
2113        } else {
2114            self.discovery_fallback_transit_blocked_peers
2115                .insert(peer_addr);
2116        }
2117    }
2118
2119    pub(crate) fn configured_discovery_fallback_transit(
2120        &self,
2121        peer_addr: &NodeAddr,
2122    ) -> Option<bool> {
2123        self.configured_peer(peer_addr)
2124            .map(|peer| peer.discovery_fallback_transit)
2125    }
2126
2127    pub(crate) fn configured_peer(&self, peer_addr: &NodeAddr) -> Option<&PeerConfig> {
2128        self.config.peers().iter().find(|peer| {
2129            PeerIdentity::from_npub(&peer.npub)
2130                .ok()
2131                .is_some_and(|identity| identity.node_addr() == peer_addr)
2132        })
2133    }
2134
2135    pub(crate) fn discovery_fallback_transit_for_promotion(&self, peer_addr: &NodeAddr) -> bool {
2136        if let Some(retry_state) = self.retry_pending.get(peer_addr) {
2137            return retry_state.peer_config.discovery_fallback_transit;
2138        }
2139
2140        if let Some(allowed) = self.configured_discovery_fallback_transit(peer_addr) {
2141            return allowed;
2142        }
2143
2144        self.config.node.discovery.nostr.policy != crate::config::NostrDiscoveryPolicy::Open
2145    }
2146
2147    // === End-to-End Sessions ===
2148
2149    /// Get a session by remote NodeAddr.
2150    /// Disable the discovery forward rate limiter (for tests).
2151    #[cfg(test)]
2152    pub(crate) fn disable_discovery_forward_rate_limit(&mut self) {
2153        self.discovery_forward_limiter
2154            .set_interval(std::time::Duration::ZERO);
2155    }
2156
2157    #[cfg(test)]
2158    pub(crate) fn get_session(&self, remote: &NodeAddr) -> Option<&SessionEntry> {
2159        self.sessions.get(remote)
2160    }
2161
2162    /// Get a mutable session by remote NodeAddr.
2163    #[cfg(test)]
2164    pub(crate) fn get_session_mut(&mut self, remote: &NodeAddr) -> Option<&mut SessionEntry> {
2165        self.sessions.get_mut(remote)
2166    }
2167
2168    /// Remove a session.
2169    #[cfg(test)]
2170    pub(crate) fn remove_session(&mut self, remote: &NodeAddr) -> Option<SessionEntry> {
2171        self.sessions.remove(remote)
2172    }
2173
2174    /// Read the path_mtu_lookup entry for a destination FipsAddress.
2175    #[cfg(test)]
2176    pub(crate) fn path_mtu_lookup_get(&self, fips_addr: &crate::FipsAddress) -> Option<u16> {
2177        self.path_mtu_lookup
2178            .read()
2179            .ok()
2180            .and_then(|map| map.get(fips_addr).copied())
2181    }
2182
2183    /// Write a path_mtu_lookup entry directly (for tests that pre-seed the map).
2184    #[cfg(test)]
2185    pub(crate) fn path_mtu_lookup_insert(&self, fips_addr: crate::FipsAddress, mtu: u16) {
2186        if let Ok(mut map) = self.path_mtu_lookup.write() {
2187            map.insert(fips_addr, mtu);
2188        }
2189    }
2190
2191    /// Number of end-to-end sessions.
2192    pub fn session_count(&self) -> usize {
2193        self.sessions.len()
2194    }
2195
2196    /// Iterate over all session entries (for control queries).
2197    pub(crate) fn session_entries(&self) -> impl Iterator<Item = (&NodeAddr, &SessionEntry)> {
2198        self.sessions.iter()
2199    }
2200
2201    // === Identity Cache ===
2202
2203    /// Register a node in the identity cache for FipsAddress → NodeAddr lookup.
2204    pub(crate) fn register_identity(
2205        &mut self,
2206        node_addr: NodeAddr,
2207        pubkey: secp256k1::PublicKey,
2208    ) -> bool {
2209        let mut prefix = [0u8; 15];
2210        prefix.copy_from_slice(&node_addr.as_bytes()[0..15]);
2211        if let Some(entry) = self.identity_cache.get(&prefix)
2212            && entry.node_addr == node_addr
2213            && entry.pubkey == pubkey
2214        {
2215            // Endpoint sends pass the same PeerIdentity on every packet. Once
2216            // validated, avoid re-deriving NodeAddr from the public key in the
2217            // data path; that hash showed up in macOS sender profiles.
2218            return true;
2219        }
2220
2221        let (xonly, _) = pubkey.x_only_public_key();
2222        let derived_node_addr = NodeAddr::from_pubkey(&xonly);
2223        if derived_node_addr != node_addr {
2224            debug!(
2225                claimed_node_addr = %node_addr,
2226                derived_node_addr = %derived_node_addr,
2227                "Rejected identity cache entry with mismatched public key"
2228            );
2229            return false;
2230        }
2231
2232        let now_ms = Self::now_ms();
2233        if let Some(entry) = self.identity_cache.get_mut(&prefix)
2234            && entry.node_addr == node_addr
2235        {
2236            entry.pubkey = pubkey;
2237            entry.last_seen_ms = now_ms;
2238            return true;
2239        }
2240
2241        let npub = encode_npub(&xonly);
2242        self.identity_cache.insert(
2243            prefix,
2244            IdentityCacheEntry::new(node_addr, pubkey, npub, now_ms),
2245        );
2246        // LRU eviction
2247        let max = self.config.node.cache.identity_size;
2248        if self.identity_cache.len() > max
2249            && let Some(oldest_key) = self
2250                .identity_cache
2251                .iter()
2252                .min_by_key(|(_, entry)| entry.last_seen_ms)
2253                .map(|(k, _)| *k)
2254        {
2255            self.identity_cache.remove(&oldest_key);
2256        }
2257        true
2258    }
2259
2260    /// Look up a destination by FipsAddress prefix (bytes 1-15 of the IPv6 address).
2261    pub(crate) fn lookup_by_fips_prefix(
2262        &mut self,
2263        prefix: &[u8; 15],
2264    ) -> Option<(NodeAddr, secp256k1::PublicKey)> {
2265        if let Some(entry) = self.identity_cache.get_mut(prefix) {
2266            entry.last_seen_ms = Self::now_ms(); // LRU touch
2267            Some((entry.node_addr, entry.pubkey))
2268        } else {
2269            None
2270        }
2271    }
2272
2273    /// Check if a node's identity is in the cache (without LRU touch).
2274    pub(crate) fn has_cached_identity(&self, addr: &NodeAddr) -> bool {
2275        let mut prefix = [0u8; 15];
2276        prefix.copy_from_slice(&addr.as_bytes()[0..15]);
2277        self.identity_cache.contains_key(&prefix)
2278    }
2279
2280    /// Number of identity cache entries.
2281    pub fn identity_cache_len(&self) -> usize {
2282        self.identity_cache.len()
2283    }
2284
2285    /// Iterate over identity cache entries.
2286    ///
2287    /// Returns `(NodeAddr, PublicKey, last_seen_ms)` for each cached identity.
2288    /// Used by the `show_identity_cache` control query.
2289    pub fn identity_cache_iter(
2290        &self,
2291    ) -> impl Iterator<Item = (&NodeAddr, &secp256k1::PublicKey, u64)> {
2292        self.identity_cache
2293            .values()
2294            .map(|entry| (&entry.node_addr, &entry.pubkey, entry.last_seen_ms))
2295    }
2296
2297    /// Configured maximum identity cache size.
2298    pub fn identity_cache_max(&self) -> usize {
2299        self.config.node.cache.identity_size
2300    }
2301
2302    /// Number of pending discovery lookups.
2303    pub fn pending_lookup_count(&self) -> usize {
2304        self.pending_lookups.len()
2305    }
2306
2307    /// Iterate over pending discovery lookups for diagnostics.
2308    pub fn pending_lookups_iter(
2309        &self,
2310    ) -> impl Iterator<Item = (&NodeAddr, &handlers::discovery::PendingLookup)> {
2311        self.pending_lookups.iter()
2312    }
2313
2314    /// Number of recent discovery requests tracked.
2315    pub fn recent_request_count(&self) -> usize {
2316        self.recent_requests.len()
2317    }
2318
2319    /// Count of destinations with queued TUN packets awaiting session setup.
2320    pub fn pending_tun_destinations(&self) -> usize {
2321        self.pending_tun_packets.len()
2322    }
2323
2324    /// Total TUN packets queued across all destinations.
2325    pub fn pending_tun_total_packets(&self) -> usize {
2326        self.pending_tun_packets.values().map(|q| q.len()).sum()
2327    }
2328
2329    /// Iterate over retry state for diagnostics.
2330    pub fn retry_state_iter(&self) -> impl Iterator<Item = (&NodeAddr, &retry::RetryState)> {
2331        self.retry_pending.iter()
2332    }
2333
2334    // === Routing ===
2335
2336    /// Check if a peer is a tree neighbor (parent or child in the spanning tree).
2337    ///
2338    /// Returns true if the peer is our current tree parent, or if the peer
2339    /// has declared us as their parent (making them our child).
2340    pub(crate) fn is_tree_peer(&self, peer_addr: &NodeAddr) -> bool {
2341        // Peer is our parent
2342        if !self.tree_state.is_root() && self.tree_state.my_declaration().parent_id() == peer_addr {
2343            return true;
2344        }
2345        // Peer is our child (their declaration names us as parent)
2346        if let Some(decl) = self.tree_state.peer_declaration(peer_addr)
2347            && decl.parent_id() == self.node_addr()
2348        {
2349            return true;
2350        }
2351        false
2352    }
2353
2354    /// Find next hop for a destination node address.
2355    ///
2356    /// Routing priority:
2357    /// 1. Destination is self → `None` (local delivery)
2358    /// 2. Destination is a direct peer → that peer
2359    /// 3. Reply-learned routes in `reply_learned` mode. These are locally
2360    ///    observed reverse paths, selected with weighted multipath plus
2361    ///    periodic coordinate/tree exploration.
2362    /// 4. Bloom filter candidates with cached dest coords → among peers whose
2363    ///    bloom filter contains the destination, pick the one that minimizes
2364    ///    tree distance to the destination, with
2365    ///    `(link_cost, tree_distance_to_dest, node_addr)` tie-breaking.
2366    ///    The self-distance check ensures only peers strictly closer to the
2367    ///    destination than us are considered (prevents routing loops).
2368    /// 5. Greedy tree routing fallback (requires cached dest coords)
2369    /// 6. No route → `None`
2370    ///
2371    /// Both the bloom filter and tree routing paths require cached destination
2372    /// coordinates (checked in `coord_cache`). Without coordinates, the node
2373    /// cannot make loop-free forwarding decisions. The caller should signal
2374    /// `CoordsRequired` back to the source when `None` is returned for a
2375    /// non-local destination.
2376    pub fn find_next_hop(&mut self, dest_node_addr: &NodeAddr) -> Option<&ActivePeer> {
2377        // 1. Local delivery
2378        if dest_node_addr == self.node_addr() {
2379            return None;
2380        }
2381
2382        // 2. Healthy direct peer. Stale direct links remain usable, but in
2383        // reply-learned mode they must not hide a live mesh route learned from
2384        // recent traffic or discovery.
2385        let direct_peer_can_send = self
2386            .peers
2387            .get(dest_node_addr)
2388            .is_some_and(|peer| peer.can_send());
2389        if let Some(peer) = self.peers.get(dest_node_addr)
2390            && peer.is_healthy()
2391        {
2392            return Some(peer);
2393        }
2394
2395        let now_ms = Self::now_ms();
2396
2397        let sendable_learned_peers = if self.config.node.routing.mode == RoutingMode::ReplyLearned {
2398            Some(
2399                self.peers
2400                    .iter()
2401                    .filter(|(_, peer)| peer.can_send())
2402                    .map(|(addr, _)| *addr)
2403                    .collect::<HashSet<_>>(),
2404            )
2405        } else {
2406            None
2407        };
2408
2409        // 3. Optional reply-learned routing. These entries are not peer
2410        // claims; they are local observations of which peer carried traffic
2411        // or a verified lookup response back from the destination. Most
2412        // packets use weighted multipath over learned routes, but periodic
2413        // fallback exploration lets coord/bloom/tree routes discover better
2414        // candidates.
2415        let explore_fallback = sendable_learned_peers.as_ref().is_some_and(|sendable| {
2416            self.learned_routes.should_explore_fallback(
2417                dest_node_addr,
2418                now_ms,
2419                self.config.node.routing.learned_fallback_explore_interval,
2420                |addr| sendable.contains(addr),
2421            )
2422        });
2423        if let Some(sendable) = &sendable_learned_peers
2424            && !explore_fallback
2425            && let Some(next_hop_addr) =
2426                self.learned_routes
2427                    .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2428        {
2429            return self.peers.get(&next_hop_addr);
2430        }
2431
2432        // Look up cached destination coordinates (required by both bloom and tree paths).
2433        let Some(dest_coords) = self
2434            .coord_cache
2435            .get_and_touch(dest_node_addr, now_ms)
2436            .cloned()
2437        else {
2438            if let Some(sendable) = &sendable_learned_peers
2439                && let Some(next_hop_addr) =
2440                    self.learned_routes
2441                        .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2442            {
2443                return self.peers.get(&next_hop_addr);
2444            }
2445            if direct_peer_can_send {
2446                return self.peers.get(dest_node_addr);
2447            }
2448            return None;
2449        };
2450
2451        // 4. Bloom filter candidates — requires dest_coords for loop-free selection.
2452        //    If no candidate is strictly closer, fall through to tree routing.
2453        let coordinate_route_addr = {
2454            let candidates: Vec<&ActivePeer> = self.destination_in_filters(dest_node_addr);
2455            if !candidates.is_empty() {
2456                self.select_best_candidate(&candidates, &dest_coords)
2457                    .map(|peer| *peer.node_addr())
2458            } else {
2459                None
2460            }
2461        };
2462        if let Some(next_hop_addr) = coordinate_route_addr {
2463            return self.peers.get(&next_hop_addr);
2464        }
2465
2466        // 5. Greedy tree routing fallback
2467        let tree_route_addr = self
2468            .tree_state
2469            .find_next_hop(&dest_coords)
2470            .filter(|next_hop_id| {
2471                self.peers
2472                    .get(next_hop_id)
2473                    .is_some_and(|peer| peer.can_send())
2474            });
2475        if let Some(next_hop_addr) = tree_route_addr {
2476            return self.peers.get(&next_hop_addr);
2477        }
2478        if explore_fallback {
2479            return sendable_learned_peers.as_ref().and_then(|sendable| {
2480                self.learned_routes
2481                    .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2482                    .and_then(|next_hop_addr| self.peers.get(&next_hop_addr))
2483            });
2484        }
2485
2486        if let Some(sendable) = &sendable_learned_peers
2487            && let Some(next_hop_addr) =
2488                self.learned_routes
2489                    .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2490        {
2491            return self.peers.get(&next_hop_addr);
2492        }
2493
2494        if direct_peer_can_send {
2495            return self.peers.get(dest_node_addr);
2496        }
2497
2498        None
2499    }
2500
2501    pub(in crate::node) fn learn_reverse_route(
2502        &mut self,
2503        destination: NodeAddr,
2504        next_hop: NodeAddr,
2505    ) {
2506        if self.config.node.routing.mode != RoutingMode::ReplyLearned
2507            || destination == *self.node_addr()
2508        {
2509            return;
2510        }
2511        let now_ms = Self::now_ms();
2512        self.learned_routes.learn(
2513            destination,
2514            next_hop,
2515            now_ms,
2516            self.config.node.routing.learned_ttl_secs,
2517            self.config.node.routing.max_learned_routes_per_dest,
2518        );
2519    }
2520
2521    pub(in crate::node) fn record_route_failure(
2522        &mut self,
2523        destination: NodeAddr,
2524        next_hop: NodeAddr,
2525    ) {
2526        if self.config.node.routing.mode != RoutingMode::ReplyLearned {
2527            return;
2528        }
2529        self.learned_routes.record_failure(&destination, &next_hop);
2530    }
2531
2532    pub(crate) fn learned_route_table_snapshot(&self, now_ms: u64) -> LearnedRouteTableSnapshot {
2533        self.learned_routes.snapshot(now_ms)
2534    }
2535
2536    pub(in crate::node) fn purge_learned_routes(&mut self, now_ms: u64) {
2537        self.learned_routes.purge_expired(now_ms);
2538    }
2539
2540    /// Select the best peer from a set of bloom filter candidates.
2541    ///
2542    /// Uses distance from each candidate's tree coordinates to the destination
2543    /// as the primary metric (after link_cost). Only selects peers that are
2544    /// strictly closer to the destination than we are (self-distance check
2545    /// prevents routing loops).
2546    ///
2547    /// Ordering: `(link_cost, distance_to_dest, node_addr)`.
2548    fn select_best_candidate<'a>(
2549        &'a self,
2550        candidates: &[&'a ActivePeer],
2551        dest_coords: &crate::tree::TreeCoordinate,
2552    ) -> Option<&'a ActivePeer> {
2553        let my_distance = self.tree_state.my_coords().distance_to(dest_coords);
2554
2555        let mut best: Option<(&ActivePeer, f64, usize)> = None;
2556
2557        for &candidate in candidates {
2558            if !candidate.can_send() {
2559                continue;
2560            }
2561
2562            let cost = candidate.link_cost();
2563
2564            let dist = self
2565                .tree_state
2566                .peer_coords(candidate.node_addr())
2567                .map(|pc| pc.distance_to(dest_coords))
2568                .unwrap_or(usize::MAX);
2569
2570            // Self-distance check: only consider peers strictly closer
2571            // to the destination than we are (prevents routing loops)
2572            if dist >= my_distance {
2573                continue;
2574            }
2575
2576            let dominated = match &best {
2577                None => true,
2578                Some((_, best_cost, best_dist)) => {
2579                    cost < *best_cost
2580                        || (cost == *best_cost && dist < *best_dist)
2581                        || (cost == *best_cost
2582                            && dist == *best_dist
2583                            && candidate.node_addr() < best.as_ref().unwrap().0.node_addr())
2584                }
2585            };
2586
2587            if dominated {
2588                best = Some((candidate, cost, dist));
2589            }
2590        }
2591
2592        best.map(|(peer, _, _)| peer)
2593    }
2594
2595    /// Check if a destination is in any peer's bloom filter.
2596    pub fn destination_in_filters(&self, dest: &NodeAddr) -> Vec<&ActivePeer> {
2597        self.peers.values().filter(|p| p.may_reach(dest)).collect()
2598    }
2599
2600    /// Get the TUN packet sender channel.
2601    ///
2602    /// Returns None if TUN is not active or the node hasn't been started.
2603    pub fn tun_tx(&self) -> Option<&TunTx> {
2604        self.tun_tx.as_ref()
2605    }
2606
2607    /// Attach app-owned packet I/O for embedded operation without a system TUN.
2608    ///
2609    /// This must be called before [`Node::start`] and requires `tun.enabled =
2610    /// false`. Outbound packets sent to the returned sender are processed by the
2611    /// normal session pipeline. Inbound packets delivered by FIPS sessions are
2612    /// sent to the returned receiver with source attribution.
2613    pub fn attach_external_packet_io(
2614        &mut self,
2615        capacity: usize,
2616    ) -> Result<ExternalPacketIo, NodeError> {
2617        if self.state != NodeState::Created {
2618            return Err(NodeError::Config(ConfigError::Validation(
2619                "external packet I/O must be attached before node start".to_string(),
2620            )));
2621        }
2622        if self.config.tun.enabled {
2623            return Err(NodeError::Config(ConfigError::Validation(
2624                "external packet I/O requires tun.enabled=false".to_string(),
2625            )));
2626        }
2627
2628        let capacity = capacity.max(1);
2629        let (outbound_tx, outbound_rx) = tokio::sync::mpsc::channel(capacity);
2630        let (inbound_tx, inbound_rx) = tokio::sync::mpsc::channel(capacity);
2631        self.tun_outbound_rx = Some(outbound_rx);
2632        self.external_packet_tx = Some(inbound_tx);
2633
2634        Ok(ExternalPacketIo {
2635            outbound_tx,
2636            inbound_rx,
2637        })
2638    }
2639
2640    /// Attach app-owned endpoint data I/O for embedded operation.
2641    ///
2642    /// Commands sent to the returned sender are processed by the node RX loop.
2643    /// Incoming endpoint data is emitted as source-attributed events.
2644    pub(crate) fn attach_endpoint_data_io(
2645        &mut self,
2646        capacity: usize,
2647    ) -> Result<EndpointDataIo, NodeError> {
2648        if self.state != NodeState::Created {
2649            return Err(NodeError::Config(ConfigError::Validation(
2650                "endpoint data I/O must be attached before node start".to_string(),
2651            )));
2652        }
2653
2654        let command_capacity = endpoint_data_command_capacity(capacity);
2655        let (command_tx, command_rx) = tokio::sync::mpsc::channel(command_capacity);
2656        // Inbound endpoint-data events use an unbounded channel — see
2657        // `EndpointDataIo::event_rx` docs for the rationale (kills the
2658        // per-packet semaphore + the cross-task relay task that used to
2659        // sit on top of this channel).
2660        let (event_tx, event_rx) = tokio::sync::mpsc::unbounded_channel();
2661        self.endpoint_command_rx = Some(command_rx);
2662        self.endpoint_event_tx = Some(event_tx.clone());
2663
2664        Ok(EndpointDataIo {
2665            command_tx,
2666            event_rx,
2667            event_tx,
2668        })
2669    }
2670
2671    pub(crate) fn pubkey_for_node_addr(&self, addr: &NodeAddr) -> Option<secp256k1::PublicKey> {
2672        let mut prefix = [0u8; 15];
2673        prefix.copy_from_slice(&addr.as_bytes()[0..15]);
2674        self.identity_cache
2675            .get(&prefix)
2676            .filter(|entry| &entry.node_addr == addr)
2677            .map(|entry| entry.pubkey)
2678    }
2679
2680    pub(crate) fn npub_for_node_addr(&self, addr: &NodeAddr) -> Option<String> {
2681        let mut prefix = [0u8; 15];
2682        prefix.copy_from_slice(&addr.as_bytes()[0..15]);
2683        self.identity_cache
2684            .get(&prefix)
2685            .filter(|entry| &entry.node_addr == addr)
2686            .map(|entry| entry.npub.clone())
2687    }
2688
2689    pub(in crate::node) fn deliver_external_ipv6_packet(
2690        &self,
2691        src_addr: &NodeAddr,
2692        packet: Vec<u8>,
2693    ) {
2694        let Some(external_packet_tx) = &self.external_packet_tx else {
2695            return;
2696        };
2697        if packet.len() < 40 {
2698            return;
2699        }
2700        let Ok(destination) = FipsAddress::from_slice(&packet[24..40]) else {
2701            return;
2702        };
2703        let delivered = NodeDeliveredPacket {
2704            source_node_addr: *src_addr,
2705            source_npub: self.npub_for_node_addr(src_addr),
2706            destination,
2707            packet,
2708        };
2709        if let Err(error) = external_packet_tx.try_send(delivered) {
2710            debug!(error = %error, "Failed to deliver packet to external app sink");
2711        }
2712    }
2713
2714    // === Sending ===
2715
2716    /// Encrypt and send a link-layer message to an authenticated peer.
2717    ///
2718    /// The plaintext should include the message type byte followed by the
2719    /// message-specific payload (e.g., `[0x50, reason]` for Disconnect).
2720    ///
2721    /// The send path prepends a 4-byte session-relative timestamp (inner
2722    /// header) before encryption. The full 16-byte outer header is used
2723    /// as AAD for the AEAD construction.
2724    ///
2725    /// This is the standard path for sending any link-layer control message
2726    /// to a peer over their encrypted Noise session.
2727    pub(super) async fn send_encrypted_link_message(
2728        &mut self,
2729        node_addr: &NodeAddr,
2730        plaintext: &[u8],
2731    ) -> Result<(), NodeError> {
2732        self.send_encrypted_link_message_with_ce(node_addr, plaintext, false)
2733            .await
2734    }
2735
2736    /// Update the local-outbound-broken signal from a `transport.send`
2737    /// outcome. Sets `last_local_send_failure_at` on local-side io
2738    /// errors (NetworkUnreachable / HostUnreachable / AddrNotAvailable);
2739    /// clears it on success. The reaper consults this in
2740    /// `check_link_heartbeats` to switch to `fast_link_dead_timeout_secs`.
2741    pub(in crate::node) fn note_local_send_outcome(
2742        &mut self,
2743        result: &Result<usize, TransportError>,
2744    ) {
2745        match result {
2746            Ok(_) => {
2747                if self.last_local_send_failure_at.is_some() {
2748                    self.last_local_send_failure_at = None;
2749                }
2750            }
2751            Err(TransportError::Io(e))
2752                if matches!(
2753                    e.kind(),
2754                    std::io::ErrorKind::NetworkUnreachable
2755                        | std::io::ErrorKind::HostUnreachable
2756                        | std::io::ErrorKind::AddrNotAvailable
2757                ) =>
2758            {
2759                self.last_local_send_failure_at = Some(std::time::Instant::now());
2760            }
2761            Err(_) => {}
2762        }
2763    }
2764
2765    /// Returns the wall-clock instant of the most recent observed local
2766    /// outbound failure, if any. Used by the link-dead reaper.
2767    pub(in crate::node) fn last_local_send_failure_at(&self) -> Option<std::time::Instant> {
2768        self.last_local_send_failure_at
2769    }
2770
2771    /// Like `send_encrypted_link_message` but allows setting the FMP CE flag.
2772    ///
2773    /// Used by the forwarding path to relay congestion signals hop-by-hop.
2774    pub(super) async fn send_encrypted_link_message_with_ce(
2775        &mut self,
2776        node_addr: &NodeAddr,
2777        plaintext: &[u8],
2778        ce_flag: bool,
2779    ) -> Result<(), NodeError> {
2780        let peer = self
2781            .peers
2782            .get_mut(node_addr)
2783            .ok_or(NodeError::PeerNotFound(*node_addr))?;
2784
2785        let their_index = peer.their_index().ok_or_else(|| NodeError::SendFailed {
2786            node_addr: *node_addr,
2787            reason: "no their_index".into(),
2788        })?;
2789        let transport_id = peer.transport_id().ok_or_else(|| NodeError::SendFailed {
2790            node_addr: *node_addr,
2791            reason: "no transport_id".into(),
2792        })?;
2793        let remote_addr = peer
2794            .current_addr()
2795            .cloned()
2796            .ok_or_else(|| NodeError::SendFailed {
2797                node_addr: *node_addr,
2798                reason: "no current_addr".into(),
2799            })?;
2800        #[cfg(any(target_os = "linux", target_os = "macos"))]
2801        let connected_socket = peer.connected_udp();
2802
2803        // Prepend 4-byte session-relative timestamp (inner header)
2804        let timestamp_ms = peer.session_elapsed_ms();
2805
2806        // MMP: read spin bit value before entering session borrow
2807        let sp_flag = peer.mmp().map(|mmp| mmp.spin_bit.tx_bit()).unwrap_or(false);
2808        let mut flags = if sp_flag { FLAG_SP } else { 0 };
2809        if ce_flag {
2810            flags |= FLAG_CE;
2811        }
2812        if peer.current_k_bit() {
2813            flags |= FLAG_KEY_EPOCH;
2814        }
2815
2816        let session = peer
2817            .noise_session_mut()
2818            .ok_or_else(|| NodeError::SendFailed {
2819                node_addr: *node_addr,
2820                reason: "no noise session".into(),
2821            })?;
2822
2823        // Build 16-byte outer header upfront. The inner-plaintext
2824        // layout is `[ts:4 LE][plaintext...]`, so its length is exactly
2825        // `INNER_TS_LEN + plaintext.len()` — no need to build the Vec
2826        // just to measure it. The worker path uses this length to size
2827        // the wire buffer directly; the legacy path below still
2828        // materialises a separate `inner_plaintext` Vec for the inline
2829        // encrypt-and-send call.
2830        const INNER_TS_LEN: usize = 4;
2831        let counter = session.current_send_counter();
2832        let inner_len = INNER_TS_LEN + plaintext.len();
2833        let payload_len = inner_len as u16;
2834        let header = build_established_header(their_index, counter, flags, payload_len);
2835
2836        // **UDP send fast path.** The encrypt-worker pool is always
2837        // spawned at lifecycle start (workers = num_cpus) in
2838        // production, so this branch is taken for every authentic
2839        // send on every UDP-transported established session. The
2840        // AEAD work + sendmsg syscall run on a dedicated OS thread;
2841        // the rx_loop only builds the wire buffer + reserves the
2842        // counter inline.
2843        //
2844        // Other transport kinds (BLE, TCP, sim, ethernet) fall
2845        // through to the inline encrypt + transport.send path
2846        // below — those don't have raw-fd / sendmmsg / UDP_GSO
2847        // benefits to expose through the worker pool, so the simpler
2848        // synchronous send is the right shape for them.
2849        //
2850        // The `encrypt_workers.is_some()` check below is true in
2851        // production (lifecycle::start spawns the pool); it stays
2852        // checked rather than `expect()`-ed because unit tests
2853        // construct `Node` without calling `start()`.
2854        let transport_for_send = self
2855            .transports
2856            .get(&transport_id)
2857            .ok_or(NodeError::TransportNotFound(transport_id))?;
2858        match transport_for_send.connection_state(&remote_addr) {
2859            ConnectionState::Connected => {}
2860            other => {
2861                if matches!(other, ConnectionState::None) {
2862                    let _ = transport_for_send.connect(&remote_addr).await;
2863                }
2864                return Err(NodeError::SendFailed {
2865                    node_addr: *node_addr,
2866                    reason: format!("transport connection not ready: {:?}", other),
2867                });
2868            }
2869        }
2870        let is_udp = matches!(transport_for_send, TransportHandle::Udp(_));
2871        if let Some(workers) = self.encrypt_workers.as_ref().cloned()
2872            && is_udp
2873            && let Some(cipher_clone) = session.send_cipher_clone()
2874        {
2875            {
2876                // Reserve the counter on the session so subsequent
2877                // sends don't reuse it. `current_send_counter` only
2878                // peeks; we advance via `take_send_counter`.
2879                let reserved_counter =
2880                    session
2881                        .take_send_counter()
2882                        .map_err(|e| NodeError::SendFailed {
2883                            node_addr: *node_addr,
2884                            reason: format!("counter reservation failed: {}", e),
2885                        })?;
2886                debug_assert_eq!(reserved_counter, counter);
2887                // Re-derive the header with the now-locked-in counter
2888                // value (same value, but the call sequence is more
2889                // explicit).
2890                let header =
2891                    build_established_header(their_index, reserved_counter, flags, payload_len);
2892                let transport = transport_for_send;
2893                // Snapshot the per-peer connected UDP socket before
2894                // resolving the fallback address. On the established
2895                // steady-state path this socket already carries the
2896                // kernel peer address, so re-parsing the configured
2897                // transport address and touching the DNS cache on every
2898                // packet is pure overhead on the sender hot path.
2899                let send_target = {
2900                    if let TransportHandle::Udp(udp) = transport {
2901                        let socket_addr = {
2902                            #[cfg(any(target_os = "linux", target_os = "macos"))]
2903                            {
2904                                match connected_socket.as_ref() {
2905                                    Some(socket) => Some(socket.peer_addr()),
2906                                    None => udp.resolve_for_off_task(&remote_addr).await.ok(),
2907                                }
2908                            }
2909                            #[cfg(not(any(target_os = "linux", target_os = "macos")))]
2910                            {
2911                                udp.resolve_for_off_task(&remote_addr).await.ok()
2912                            }
2913                        };
2914                        match (udp.async_socket(), socket_addr) {
2915                            (Some(socket), Some(socket_addr)) => Some((socket, socket_addr)),
2916                            _ => None,
2917                        }
2918                    } else {
2919                        None
2920                    }
2921                };
2922                if let Some((socket, socket_addr)) = send_target {
2923                    // Build the wire buffer **directly** from
2924                    // `plaintext` with a single allocation:
2925                    //   `[16 header][4 ts][plaintext...]` with
2926                    // +16 trailing capacity for the AEAD tag.
2927                    // The worker seals `wire_buf[16..]` in
2928                    // place and appends the tag — no second
2929                    // alloc, no second memcpy.
2930                    //
2931                    // Previous design built `inner_plaintext`
2932                    // via `prepend_inner_header` (1 alloc + 1
2933                    // copy) and then let the worker memcpy
2934                    // header + plaintext into a fresh Vec
2935                    // (another alloc + copy). At ~100 kpps the
2936                    // saved alloc/copy is ~150 MB/sec of memory
2937                    // bandwidth on the hot rx_loop + worker.
2938                    let wire_capacity = ESTABLISHED_HEADER_SIZE + inner_len + 16;
2939                    let mut wire_buf = Vec::with_capacity(wire_capacity);
2940                    wire_buf.extend_from_slice(&header);
2941                    wire_buf.extend_from_slice(&timestamp_ms.to_le_bytes());
2942                    wire_buf.extend_from_slice(plaintext);
2943                    let predicted_bytes = wire_capacity;
2944                    // Stats / MMP update inline — predicted size
2945                    // is exact for ChaCha20-Poly1305 (tag is
2946                    // constant 16 bytes). When `connected_socket` is
2947                    // `Some`, the worker sends on it without a
2948                    // destination sockaddr — the kernel skips the
2949                    // per-packet sockaddr + route + neighbor resolve.
2950                    if let Some(peer) = self.peers.get_mut(node_addr) {
2951                        peer.link_stats_mut().record_sent(predicted_bytes);
2952                        if let Some(mmp) = peer.mmp_mut() {
2953                            mmp.sender
2954                                .record_sent(reserved_counter, timestamp_ms, predicted_bytes);
2955                        }
2956                    }
2957                    let scheduling_weight = self.send_weight_for_peer(node_addr);
2958                    workers.dispatch(self::encrypt_worker::FmpSendJob {
2959                        cipher: cipher_clone,
2960                        counter: reserved_counter,
2961                        wire_buf,
2962                        fsp_seal: None,
2963                        socket,
2964                        dest_addr: socket_addr,
2965                        #[cfg(any(target_os = "linux", target_os = "macos"))]
2966                        connected_socket,
2967                        drop_on_backpressure: plaintext
2968                            .first()
2969                            .is_some_and(|ty| *ty == SessionMessageType::EndpointData.to_byte()),
2970                        scheduling_weight,
2971                        queued_at: crate::perf_profile::stamp(),
2972                    });
2973                    return Ok(());
2974                }
2975            }
2976        }
2977
2978        // Inline (legacy) path: encrypt + send on the rx_loop.
2979        // Build the inner plaintext lazily here — the worker path
2980        // above never reaches this point, so the prepend_inner_header
2981        // alloc is avoided in the fast path.
2982        let inner_plaintext = prepend_inner_header(timestamp_ms, plaintext);
2983        // Encrypt with AAD binding to the outer header
2984        let ciphertext = {
2985            let _t = crate::perf_profile::Timer::start(crate::perf_profile::Stage::FmpEncrypt);
2986            session
2987                .encrypt_with_aad(&inner_plaintext, &header)
2988                .map_err(|e| NodeError::SendFailed {
2989                    node_addr: *node_addr,
2990                    reason: format!("encryption failed: {}", e),
2991                })?
2992        };
2993
2994        let wire_packet = build_encrypted(&header, &ciphertext);
2995
2996        // Re-borrow peer for stats update after sending
2997        let send_result = {
2998            let _t = crate::perf_profile::Timer::start(crate::perf_profile::Stage::UdpSend);
2999            let transport = self
3000                .transports
3001                .get(&transport_id)
3002                .ok_or(NodeError::TransportNotFound(transport_id))?;
3003            transport.send(&remote_addr, &wire_packet).await
3004        };
3005        self.note_local_send_outcome(&send_result);
3006        let bytes_sent = send_result.map_err(|e| match e {
3007            TransportError::MtuExceeded { packet_size, mtu } => NodeError::MtuExceeded {
3008                node_addr: *node_addr,
3009                packet_size,
3010                mtu,
3011            },
3012            other => NodeError::SendFailed {
3013                node_addr: *node_addr,
3014                reason: format!("transport send: {}", other),
3015            },
3016        })?;
3017
3018        // Update send statistics
3019        if let Some(peer) = self.peers.get_mut(node_addr) {
3020            peer.link_stats_mut().record_sent(bytes_sent);
3021            // MMP: record sent frame for sender report generation
3022            if let Some(mmp) = peer.mmp_mut() {
3023                mmp.sender.record_sent(counter, timestamp_ms, bytes_sent);
3024            }
3025        }
3026
3027        Ok(())
3028    }
3029}
3030
3031impl fmt::Debug for Node {
3032    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3033        f.debug_struct("Node")
3034            .field("node_addr", self.node_addr())
3035            .field("state", &self.state)
3036            .field("is_leaf_only", &self.is_leaf_only)
3037            .field("connections", &self.connection_count())
3038            .field("peers", &self.peer_count())
3039            .field("links", &self.link_count())
3040            .field("transports", &self.transport_count())
3041            .finish()
3042    }
3043}