Skip to main content

fips_core/node/
mod.rs

1//! FIPS Node Entity
2//!
3//! Top-level structure representing a running FIPS instance. The Node
4//! holds all state required for mesh routing: identity, tree state,
5//! Bloom filters, coordinate caches, transports, links, and peers.
6
7mod acl;
8mod bloom;
9mod decrypt_worker;
10mod discovery_rate_limit;
11mod encrypt_worker;
12mod handlers;
13mod lifecycle;
14mod rate_limit;
15mod retry;
16mod routing;
17mod routing_error_rate_limit;
18pub(crate) mod session;
19pub(crate) mod session_wire;
20pub(crate) mod stats;
21pub(crate) mod stats_history;
22#[cfg(test)]
23mod tests;
24mod tree;
25pub(crate) mod wire;
26
27use self::discovery_rate_limit::{DiscoveryBackoff, DiscoveryForwardRateLimiter};
28use self::rate_limit::HandshakeRateLimiter;
29use self::routing::{LearnedRouteTable, LearnedRouteTableSnapshot};
30use self::routing_error_rate_limit::RoutingErrorRateLimiter;
31use self::wire::{
32    ESTABLISHED_HEADER_SIZE, FLAG_CE, FLAG_KEY_EPOCH, FLAG_SP, build_encrypted,
33    build_established_header, prepend_inner_header,
34};
35use crate::bloom::BloomState;
36use crate::cache::CoordCache;
37use crate::config::RoutingMode;
38use crate::node::session::SessionEntry;
39use crate::peer::{ActivePeer, PeerConnection};
40#[cfg(any(target_os = "linux", target_os = "macos"))]
41use crate::transport::ethernet::EthernetTransport;
42use crate::transport::tcp::TcpTransport;
43use crate::transport::tor::TorTransport;
44use crate::transport::udp::UdpTransport;
45#[cfg(feature = "webrtc-transport")]
46use crate::transport::webrtc::WebRtcTransport;
47use crate::transport::{
48    Link, LinkId, PacketRx, PacketTx, TransportAddr, TransportError, TransportHandle, TransportId,
49};
50use crate::tree::TreeState;
51use crate::upper::hosts::HostMap;
52use crate::upper::icmp_rate_limit::IcmpRateLimiter;
53use crate::upper::tun::{TunError, TunOutboundRx, TunState, TunTx};
54use crate::utils::index::IndexAllocator;
55use crate::{
56    Config, ConfigError, FipsAddress, Identity, IdentityError, NodeAddr, PeerIdentity,
57    SessionMessageType, encode_npub,
58};
59use rand::Rng;
60use std::collections::{HashMap, HashSet, VecDeque};
61use std::fmt;
62use std::sync::Arc;
63use std::thread::JoinHandle;
64use thiserror::Error;
65use tracing::{debug, warn};
66
67/// Half-range of the symmetric jitter applied to per-session rekey timers.
68///
69/// Each FMP/FSP session draws an offset uniformly from
70/// `[-REKEY_JITTER_SECS, +REKEY_JITTER_SECS]` seconds at construction and
71/// after each cutover. This preserves the configured mean interval while
72/// reducing dual-initiation bursts in symmetric-start meshes.
73pub(crate) const REKEY_JITTER_SECS: i64 = 15;
74
75/// Errors related to node operations.
76#[derive(Debug, Error)]
77pub enum NodeError {
78    #[error("node not started")]
79    NotStarted,
80
81    #[error("node already started")]
82    AlreadyStarted,
83
84    #[error("node already stopped")]
85    AlreadyStopped,
86
87    #[error("transport not found: {0}")]
88    TransportNotFound(TransportId),
89
90    #[error("no transport available for type: {0}")]
91    NoTransportForType(String),
92
93    #[error("link not found: {0}")]
94    LinkNotFound(LinkId),
95
96    #[error("connection not found: {0}")]
97    ConnectionNotFound(LinkId),
98
99    #[error("peer not found: {0:?}")]
100    PeerNotFound(NodeAddr),
101
102    #[error("peer already exists: {0:?}")]
103    PeerAlreadyExists(NodeAddr),
104
105    #[error("connection already exists for link: {0}")]
106    ConnectionAlreadyExists(LinkId),
107
108    #[error("invalid peer npub '{npub}': {reason}")]
109    InvalidPeerNpub { npub: String, reason: String },
110
111    #[error("discovery error: {0}")]
112    Discovery(String),
113
114    #[error("access denied: {0}")]
115    AccessDenied(String),
116
117    #[error("max connections exceeded: {max}")]
118    MaxConnectionsExceeded { max: usize },
119
120    #[error("max peers exceeded: {max}")]
121    MaxPeersExceeded { max: usize },
122
123    #[error("max links exceeded: {max}")]
124    MaxLinksExceeded { max: usize },
125
126    #[error("handshake incomplete for link {0}")]
127    HandshakeIncomplete(LinkId),
128
129    #[error("no session available for link {0}")]
130    NoSession(LinkId),
131
132    #[error("promotion failed for link {link_id}: {reason}")]
133    PromotionFailed { link_id: LinkId, reason: String },
134
135    #[error("send failed to {node_addr}: {reason}")]
136    SendFailed { node_addr: NodeAddr, reason: String },
137
138    #[error("mtu exceeded forwarding to {node_addr}: packet {packet_size} > mtu {mtu}")]
139    MtuExceeded {
140        node_addr: NodeAddr,
141        packet_size: usize,
142        mtu: u16,
143    },
144
145    #[error("config error: {0}")]
146    Config(#[from] ConfigError),
147
148    #[error("identity error: {0}")]
149    Identity(#[from] IdentityError),
150
151    #[error("TUN error: {0}")]
152    Tun(#[from] TunError),
153
154    #[error("index allocation failed: {0}")]
155    IndexAllocationFailed(String),
156
157    #[error("handshake failed: {0}")]
158    HandshakeFailed(String),
159
160    #[error("transport error: {0}")]
161    TransportError(String),
162
163    #[error("bootstrap handoff failed: {0}")]
164    BootstrapHandoff(String),
165}
166
167/// Source-attributed packet delivered by a node running without a system TUN.
168#[derive(Debug, Clone, PartialEq, Eq)]
169pub struct NodeDeliveredPacket {
170    /// FIPS node address that originated the packet.
171    pub source_node_addr: NodeAddr,
172    /// Source Nostr public key when the node has learned it.
173    pub source_npub: Option<String>,
174    /// Destination FIPS address from the IPv6 packet.
175    pub destination: FipsAddress,
176    /// Full IPv6 packet after FIPS session decapsulation.
177    pub packet: Vec<u8>,
178}
179
180#[derive(Debug, Clone)]
181struct IdentityCacheEntry {
182    node_addr: NodeAddr,
183    pubkey: secp256k1::PublicKey,
184    npub: String,
185    last_seen_ms: u64,
186}
187
188impl IdentityCacheEntry {
189    fn new(
190        node_addr: NodeAddr,
191        pubkey: secp256k1::PublicKey,
192        npub: String,
193        last_seen_ms: u64,
194    ) -> Self {
195        Self {
196            node_addr,
197            pubkey,
198            npub,
199            last_seen_ms,
200        }
201    }
202}
203
204/// App-owned packet channels for embedding FIPS without a system TUN.
205#[derive(Debug)]
206pub struct ExternalPacketIo {
207    /// Send outbound IPv6 packets into the node.
208    pub outbound_tx: crate::upper::tun::TunOutboundTx,
209    /// Receive inbound IPv6 packets delivered by FIPS sessions.
210    pub inbound_rx: tokio::sync::mpsc::Receiver<NodeDeliveredPacket>,
211}
212
213/// App-owned endpoint data channels for embedding FIPS without a daemon.
214#[derive(Debug)]
215pub(crate) struct EndpointDataIo {
216    /// Send endpoint data commands into the node RX loop.
217    ///
218    /// Bounded with a generous default so normal sender bursts do not
219    /// stall on semaphore acquisition. macOS pacing happens at the UDP
220    /// egress thread where the real Wi-Fi/interface bottleneck is visible;
221    /// constraining this app queue instead caused the inner TCP flow to
222    /// collapse under iperf. `FIPS_ENDPOINT_DATA_QUEUE_CAP` overrides the
223    /// default for benches.
224    pub(crate) command_tx: tokio::sync::mpsc::Sender<NodeEndpointCommand>,
225    /// Receive endpoint data delivered by FIPS sessions.
226    ///
227    /// Unbounded so the rx_loop's send on inbound packet delivery is a
228    /// wait-free push (no semaphore acquire), and so we can drop the
229    /// per-packet cross-task relay that previously sat between the node
230    /// task and the `FipsEndpoint::recv()` consumer. Backpressure is
231    /// naturally bounded — the rx_loop both produces here and runs the
232    /// same runtime that schedules the consumer, so a stalled consumer
233    /// stalls production too.
234    pub(crate) event_rx: tokio::sync::mpsc::UnboundedReceiver<NodeEndpointEvent>,
235    /// Clone of the event_tx exposed for in-process loopback (e.g.
236    /// `FipsEndpoint::send` to self_npub). Lets the endpoint inject an
237    /// event into the same queue without going through the encrypt /
238    /// decrypt path, while keeping every consumer reading from a single
239    /// channel.
240    pub(crate) event_tx: tokio::sync::mpsc::UnboundedSender<NodeEndpointEvent>,
241}
242
243fn endpoint_data_command_capacity(requested: usize) -> usize {
244    if let Ok(raw) = std::env::var("FIPS_ENDPOINT_DATA_QUEUE_CAP")
245        && let Ok(value) = raw.trim().parse::<usize>()
246        && value > 0
247    {
248        return value;
249    }
250
251    requested.max(1).max(32_768)
252}
253
254/// Commands accepted by the node endpoint data service.
255#[derive(Debug)]
256pub(crate) enum NodeEndpointCommand {
257    /// Send with an explicit response channel — used by callers that
258    /// care whether the local-stack handoff succeeded (e.g.
259    /// `blocking_send` waits for the runtime to accept the send).
260    Send {
261        remote: PeerIdentity,
262        payload: Vec<u8>,
263        queued_at: Option<std::time::Instant>,
264        response_tx: tokio::sync::oneshot::Sender<Result<(), NodeError>>,
265    },
266    /// **Fire-and-forget** variant of `Send` — no oneshot allocation,
267    /// no per-packet result channel. Used by the data-plane fast path
268    /// (`FipsEndpoint::send`) where the caller already discards the
269    /// result. Saves one oneshot::channel() allocation per outbound
270    /// packet on the application's send hot path.
271    SendOneway {
272        remote: PeerIdentity,
273        payload: Vec<u8>,
274        queued_at: Option<std::time::Instant>,
275    },
276    PeerSnapshot {
277        response_tx: tokio::sync::oneshot::Sender<Vec<NodeEndpointPeer>>,
278    },
279    RelaySnapshot {
280        response_tx: tokio::sync::oneshot::Sender<Vec<NodeEndpointRelayStatus>>,
281    },
282    UpdateRelays {
283        advert_relays: Vec<String>,
284        dm_relays: Vec<String>,
285        response_tx: tokio::sync::oneshot::Sender<Result<(), NodeError>>,
286    },
287    /// Replace the runtime peer list. Newly added auto-connect peers get
288    /// `initiate_peer_connection` immediately; removed peers are dropped
289    /// from the retry queue (the regular liveness timeout reaps any active
290    /// session). Existing entries are kept and their `addresses` field is
291    /// refreshed so the next retry sees the latest hints.
292    UpdatePeers {
293        peers: Vec<crate::config::PeerConfig>,
294        response_tx: tokio::sync::oneshot::Sender<Result<UpdatePeersOutcome, NodeError>>,
295    },
296}
297
298/// Reports what changed in response to `UpdatePeers`.
299#[derive(Debug, Clone, Default, PartialEq, Eq)]
300pub(crate) struct UpdatePeersOutcome {
301    pub(crate) added: usize,
302    pub(crate) removed: usize,
303    pub(crate) updated: usize,
304    pub(crate) unchanged: usize,
305}
306
307/// Endpoint data events emitted by the node session receive path.
308#[derive(Debug)]
309pub(crate) enum NodeEndpointEvent {
310    Data {
311        source_node_addr: NodeAddr,
312        source_npub: Option<String>,
313        payload: Vec<u8>,
314        queued_at: Option<std::time::Instant>,
315    },
316}
317
318/// Authenticated peer state exposed to embedded endpoint callers.
319#[derive(Debug, Clone, PartialEq, Eq)]
320pub(crate) struct NodeEndpointPeer {
321    pub(crate) npub: String,
322    pub(crate) transport_addr: Option<String>,
323    pub(crate) transport_type: Option<String>,
324    pub(crate) link_id: u64,
325    pub(crate) srtt_ms: Option<u64>,
326    pub(crate) packets_sent: u64,
327    pub(crate) packets_recv: u64,
328    pub(crate) bytes_sent: u64,
329    pub(crate) bytes_recv: u64,
330}
331
332/// Live Nostr relay state exposed to embedded endpoint callers.
333#[derive(Debug, Clone, PartialEq, Eq)]
334pub(crate) struct NodeEndpointRelayStatus {
335    pub(crate) url: String,
336    pub(crate) status: String,
337}
338
339/// Node operational state.
340#[derive(Clone, Copy, Debug, PartialEq, Eq)]
341pub enum NodeState {
342    /// Created but not started.
343    Created,
344    /// Starting up (initializing transports).
345    Starting,
346    /// Fully operational.
347    Running,
348    /// Shutting down.
349    Stopping,
350    /// Stopped.
351    Stopped,
352}
353
354impl NodeState {
355    /// Check if node is operational.
356    pub fn is_operational(&self) -> bool {
357        matches!(self, NodeState::Running)
358    }
359
360    /// Check if node can be started.
361    pub fn can_start(&self) -> bool {
362        matches!(self, NodeState::Created | NodeState::Stopped)
363    }
364
365    /// Check if node can be stopped.
366    pub fn can_stop(&self) -> bool {
367        matches!(self, NodeState::Running)
368    }
369}
370
371impl fmt::Display for NodeState {
372    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
373        let s = match self {
374            NodeState::Created => "created",
375            NodeState::Starting => "starting",
376            NodeState::Running => "running",
377            NodeState::Stopping => "stopping",
378            NodeState::Stopped => "stopped",
379        };
380        write!(f, "{}", s)
381    }
382}
383
384/// Recent request tracking for dedup and reverse-path forwarding.
385///
386/// When a LookupRequest is forwarded through a node, the node stores the
387/// request_id and which peer sent it. When the corresponding LookupResponse
388/// arrives, it's forwarded back to that peer (reverse-path forwarding).
389/// The `response_forwarded` flag prevents response routing loops.
390#[derive(Clone, Debug)]
391pub(crate) struct RecentRequest {
392    /// The peer who sent this request to us.
393    pub(crate) from_peer: NodeAddr,
394    /// When we received this request (Unix milliseconds).
395    pub(crate) timestamp_ms: u64,
396    /// Whether we've already forwarded a response for this request.
397    /// Prevents response routing loops when convergent request paths
398    /// create bidirectional entries in recent_requests.
399    pub(crate) response_forwarded: bool,
400}
401
402impl RecentRequest {
403    pub(crate) fn new(from_peer: NodeAddr, timestamp_ms: u64) -> Self {
404        Self {
405            from_peer,
406            timestamp_ms,
407            response_forwarded: false,
408        }
409    }
410
411    /// Check if this entry has expired (older than expiry_ms).
412    pub(crate) fn is_expired(&self, current_time_ms: u64, expiry_ms: u64) -> bool {
413        current_time_ms.saturating_sub(self.timestamp_ms) > expiry_ms
414    }
415}
416
417/// Key for addr_to_link reverse lookup.
418type AddrKey = (TransportId, TransportAddr);
419
420/// Per-transport kernel drop tracking for congestion detection.
421///
422/// Sampled every tick (1s). The `dropping` flag indicates whether new
423/// kernel drops were observed since the previous sample.
424#[derive(Debug, Default)]
425struct TransportDropState {
426    /// Previous `recv_drops` sample (cumulative counter).
427    prev_drops: u64,
428    /// True if drops increased since the last sample.
429    dropping: bool,
430}
431
432/// State for a link waiting for transport-level connection establishment.
433///
434/// For connection-oriented transports (TCP, Tor), the transport connect runs
435/// asynchronously. This struct holds the data needed to complete the handshake
436/// once the connection is ready.
437struct PendingConnect {
438    /// The link that was created for this connection.
439    link_id: LinkId,
440    /// Which transport is being used.
441    transport_id: TransportId,
442    /// The remote address being connected to.
443    remote_addr: TransportAddr,
444    /// The peer identity (for handshake initiation).
445    peer_identity: PeerIdentity,
446}
447
448/// A running FIPS node instance.
449///
450/// This is the top-level container holding all node state.
451///
452/// ## Peer Lifecycle
453///
454/// Peers go through two phases:
455/// 1. **Connection phase** (`connections`): Handshake in progress, indexed by LinkId
456/// 2. **Active phase** (`peers`): Authenticated, indexed by NodeAddr
457///
458/// The `addr_to_link` map enables dispatching incoming packets to the right
459/// connection before authentication completes.
460// Discovery lookup constants moved to config: node.discovery.attempt_timeouts_secs, node.discovery.ttl
461pub struct Node {
462    // === Identity ===
463    /// This node's cryptographic identity.
464    identity: Identity,
465
466    /// Random epoch generated at startup for peer restart detection.
467    /// Exchanged inside Noise handshake messages so peers can detect restarts.
468    startup_epoch: [u8; 8],
469
470    /// Instant when the node was created, for uptime reporting.
471    started_at: std::time::Instant,
472
473    // === Configuration ===
474    /// Loaded configuration.
475    config: Config,
476
477    // === State ===
478    /// Node operational state.
479    state: NodeState,
480
481    /// Whether this is a leaf-only node.
482    is_leaf_only: bool,
483
484    // === Spanning Tree ===
485    /// Local spanning tree state.
486    tree_state: TreeState,
487
488    // === Bloom Filter ===
489    /// Local Bloom filter state.
490    bloom_state: BloomState,
491
492    // === Routing ===
493    /// Address -> coordinates cache (from session setup and discovery).
494    coord_cache: CoordCache,
495    /// Locally learned reverse-path next-hop hints.
496    learned_routes: LearnedRouteTable,
497    /// Recent discovery requests (dedup + reverse-path forwarding).
498    /// Maps request_id → RecentRequest.
499    recent_requests: HashMap<u64, RecentRequest>,
500    /// Per-destination path MTU lookup, keyed by FipsAddress (mirrors
501    /// `coord_cache.entries[*].path_mtu`). Sync read-only access from
502    /// the TUN reader/writer threads at TCP MSS clamp time so the
503    /// SYN/SYN-ACK clamp can use the smaller of the local-egress floor
504    /// and the learned per-destination path MTU.
505    path_mtu_lookup: Arc<std::sync::RwLock<HashMap<crate::FipsAddress, u16>>>,
506
507    // === Transports & Links ===
508    /// Active transports (owned by Node).
509    transports: HashMap<TransportId, TransportHandle>,
510    /// Per-transport kernel drop tracking for congestion detection.
511    transport_drops: HashMap<TransportId, TransportDropState>,
512    /// Active links.
513    links: HashMap<LinkId, Link>,
514    /// Reverse lookup: (transport_id, remote_addr) -> link_id.
515    addr_to_link: HashMap<AddrKey, LinkId>,
516
517    // === Packet Channel ===
518    /// Packet sender for transports.
519    packet_tx: Option<PacketTx>,
520    /// Packet receiver (for event loop).
521    packet_rx: Option<PacketRx>,
522
523    // === Connections (Handshake Phase) ===
524    /// Pending connections (handshake in progress).
525    /// Indexed by LinkId since we don't know the peer's identity yet.
526    connections: HashMap<LinkId, PeerConnection>,
527
528    // === Peers (Active Phase) ===
529    /// Authenticated peers.
530    /// Indexed by NodeAddr (verified identity).
531    peers: HashMap<NodeAddr, ActivePeer>,
532
533    // === End-to-End Sessions ===
534    /// Session table for end-to-end encrypted sessions.
535    /// Keyed by remote NodeAddr.
536    sessions: HashMap<NodeAddr, SessionEntry>,
537
538    // === Identity Cache ===
539    /// Maps FipsAddress prefix bytes (bytes 1-15) to cached peer identity data.
540    /// Enables reverse lookup from IPv6 destination to session/routing identity.
541    identity_cache: HashMap<[u8; 15], IdentityCacheEntry>,
542
543    // === Pending TUN Packets ===
544    /// Packets queued while waiting for session establishment.
545    /// Keyed by destination NodeAddr, bounded per-dest and total.
546    pending_tun_packets: HashMap<NodeAddr, VecDeque<Vec<u8>>>,
547    /// Endpoint data payloads queued while waiting for session establishment.
548    pending_endpoint_data: HashMap<NodeAddr, VecDeque<Vec<u8>>>,
549    // === Pending Discovery Lookups ===
550    /// Tracks in-flight discovery lookups. Maps target NodeAddr to the
551    /// initiation timestamp (Unix ms). Prevents duplicate flood queries.
552    pending_lookups: HashMap<NodeAddr, handlers::discovery::PendingLookup>,
553
554    // === Resource Limits ===
555    /// Maximum connections (0 = unlimited).
556    max_connections: usize,
557    /// Maximum peers (0 = unlimited).
558    max_peers: usize,
559    /// Maximum links (0 = unlimited).
560    max_links: usize,
561
562    // === Counters ===
563    /// Next link ID to allocate.
564    next_link_id: u64,
565    /// Next transport ID to allocate.
566    next_transport_id: u32,
567
568    // === Node Statistics ===
569    /// Routing, forwarding, discovery, and error signal counters.
570    stats: stats::NodeStats,
571
572    /// Time-series history of node-level metrics (1s/1m rings).
573    stats_history: stats_history::StatsHistory,
574
575    // === TUN Interface ===
576    /// TUN device state.
577    tun_state: TunState,
578    /// TUN interface name (for cleanup).
579    tun_name: Option<String>,
580    /// TUN packet sender channel.
581    tun_tx: Option<TunTx>,
582    /// Receiver for outbound packets from the TUN reader.
583    tun_outbound_rx: Option<TunOutboundRx>,
584    /// App-owned packet sink used by embedded/no-TUN integrations.
585    external_packet_tx: Option<tokio::sync::mpsc::Sender<NodeDeliveredPacket>>,
586    /// Endpoint data command receiver used by embedded/no-daemon integrations.
587    endpoint_command_rx: Option<tokio::sync::mpsc::Receiver<NodeEndpointCommand>>,
588    /// Endpoint data event sink used by embedded/no-daemon integrations.
589    endpoint_event_tx: Option<tokio::sync::mpsc::UnboundedSender<NodeEndpointEvent>>,
590    /// Off-task FMP-encrypt + UDP-send worker pool. `None` if not yet
591    /// spawned (set up in `start()` once transports are running).
592    /// `Some(pool)` once available; the pool internally holds
593    /// per-worker mpsc senders and round-robins jobs across them.
594    /// See `node::encrypt_worker` for the rationale and layout.
595    encrypt_workers: Option<encrypt_worker::EncryptWorkerPool>,
596    /// Off-task FMP + FSP decrypt + delivery worker pool. Mirror of
597    /// `encrypt_workers` for the receive side.
598    decrypt_workers: Option<decrypt_worker::DecryptWorkerPool>,
599    /// Set of sessions that have been registered with the decrypt
600    /// shard worker pool. Used by rx_loop to decide between fast-path
601    /// dispatch (worker owns the session) and legacy in-place decrypt
602    /// (worker doesn't have it yet). Per the data-plane restructure,
603    /// the worker owns its session state directly — there's no shared
604    /// `Arc<RwLock<HashMap>>` of cipher / replay state anymore, only
605    /// this set tracks **whether** the worker has been told about a
606    /// given session.
607    decrypt_registered_sessions: std::collections::HashSet<(TransportId, u32)>,
608    /// Fallback channel: decrypt worker bounces non-fast-path packets
609    /// (anything that's not bulk EndpointData) back here for rx_loop
610    /// to handle via the legacy path. Drained by a new rx_loop arm.
611    decrypt_fallback_rx:
612        Option<tokio::sync::mpsc::UnboundedReceiver<decrypt_worker::DecryptWorkerEvent>>,
613    decrypt_fallback_tx: tokio::sync::mpsc::UnboundedSender<decrypt_worker::DecryptWorkerEvent>,
614    /// TUN reader thread handle.
615    tun_reader_handle: Option<JoinHandle<()>>,
616    /// TUN writer thread handle.
617    tun_writer_handle: Option<JoinHandle<()>>,
618    /// Shutdown pipe: writing to this fd unblocks the TUN reader thread on macOS.
619    /// On Linux, deleting the interface via netlink serves the same purpose.
620    #[cfg(target_os = "macos")]
621    tun_shutdown_fd: Option<std::os::unix::io::RawFd>,
622
623    // === DNS Responder ===
624    /// Receiver for resolved identities from the DNS responder.
625    dns_identity_rx: Option<crate::upper::dns::DnsIdentityRx>,
626    /// DNS responder task handle.
627    dns_task: Option<tokio::task::JoinHandle<()>>,
628
629    // === Index-Based Session Dispatch ===
630    /// Allocator for session indices.
631    index_allocator: IndexAllocator,
632    /// O(1) lookup: (transport_id, our_index) → NodeAddr.
633    /// This maps our session index to the peer that uses it.
634    peers_by_index: HashMap<(TransportId, u32), NodeAddr>,
635    /// Pending outbound handshakes by our sender_idx.
636    /// Tracks which LinkId corresponds to which session index.
637    pending_outbound: HashMap<(TransportId, u32), LinkId>,
638
639    // === Rate Limiting ===
640    /// Rate limiter for msg1 processing (DoS protection).
641    msg1_rate_limiter: HandshakeRateLimiter,
642    /// Rate limiter for ICMP Packet Too Big messages.
643    icmp_rate_limiter: IcmpRateLimiter,
644    /// Rate limiter for routing error signals (CoordsRequired / PathBroken).
645    routing_error_rate_limiter: RoutingErrorRateLimiter,
646    /// Rate limiter for source-side CoordsRequired/PathBroken responses.
647    coords_response_rate_limiter: RoutingErrorRateLimiter,
648    /// Backoff for failed discovery lookups (originator-side).
649    discovery_backoff: DiscoveryBackoff,
650    /// Rate limiter for forwarded discovery requests (transit-side).
651    discovery_forward_limiter: DiscoveryForwardRateLimiter,
652
653    // === Pending Transport Connects ===
654    /// Links waiting for transport-level connection establishment before
655    /// sending handshake msg1. For connection-oriented transports (TCP, Tor),
656    /// the transport connect runs in the background; the tick handler polls
657    /// connection_state() and initiates the handshake when connected.
658    pending_connects: Vec<PendingConnect>,
659
660    // === Connection Retry ===
661    /// Retry state for peers whose outbound connections have failed.
662    /// Keyed by NodeAddr. Entries are created when a handshake times out
663    /// or fails, and removed on successful promotion or when max retries
664    /// are exhausted.
665    retry_pending: HashMap<NodeAddr, retry::RetryState>,
666
667    /// Optional Nostr/STUN overlay discovery coordinator for `udp:nat` peers.
668    nostr_discovery: Option<Arc<crate::discovery::nostr::NostrDiscovery>>,
669    /// mDNS / DNS-SD responder + browser for local-link peer discovery.
670    /// Identity is unverified at this layer — the Noise XX handshake
671    /// initiated against an mDNS-observed endpoint is what proves the
672    /// peer holds the matching private key.
673    lan_discovery: Option<Arc<crate::discovery::lan::LanDiscovery>>,
674    /// Wall-clock ms when Nostr discovery successfully started, used to
675    /// schedule the one-shot startup advert sweep after a settle delay.
676    /// `None` until discovery comes up; remains `None` if discovery is
677    /// disabled or failed to start.
678    nostr_discovery_started_at_ms: Option<u64>,
679    /// Whether the one-shot startup advert sweep has run. Set to true
680    /// after the first sweep fires (under `policy: open`); thereafter
681    /// only the per-tick `queue_open_discovery_retries` continues.
682    startup_open_discovery_sweep_done: bool,
683    /// Per-peer UDP transports adopted from NAT traversal handoff.
684    bootstrap_transports: HashSet<TransportId>,
685    /// Originating peer npub (bech32) for each adopted bootstrap
686    /// transport, captured at `adopt_established_traversal` time.
687    /// Populated alongside `bootstrap_transports`; cleared in
688    /// `cleanup_bootstrap_transport_if_unused`. Used by the rx loop to
689    /// route fatal-protocol-mismatch observations back to the
690    /// Nostr-discovery `failure_state` for long cooldown application.
691    bootstrap_transport_npubs: HashMap<TransportId, String>,
692    /// Peers that should not be used as reply-learned fallback transit for
693    /// other destinations. Direct lookups to the peer are still permitted.
694    discovery_fallback_transit_blocked_peers: HashSet<NodeAddr>,
695
696    // === Periodic Parent Re-evaluation ===
697    /// Timestamp of last periodic parent re-evaluation (for pacing).
698    last_parent_reeval: Option<crate::time::Instant>,
699
700    // === Congestion Logging ===
701    /// Timestamp of last congestion detection log (rate-limited to 5s).
702    last_congestion_log: Option<std::time::Instant>,
703
704    // === Mesh Size Estimate ===
705    /// Cached estimated mesh size (computed once per tick from bloom filters).
706    estimated_mesh_size: Option<u64>,
707    /// Timestamp of last mesh size log emission.
708    last_mesh_size_log: Option<std::time::Instant>,
709
710    // === Bloom Self-Plausibility ===
711    /// Rate-limit state for the self-plausibility WARN. Fires at most
712    /// once per 60s globally when our own outgoing FilterAnnounce has
713    /// an FPR above `node.bloom.max_inbound_fpr`, signalling either
714    /// aggregation drift or an ingress bypass.
715    last_self_warn: Option<std::time::Instant>,
716
717    // === Local Outbound Liveness ===
718    /// Set when a `transport.send` returned a local-side io error
719    /// (`NetworkUnreachable` / `HostUnreachable` / `AddrNotAvailable`),
720    /// cleared on the next successful send. Used by
721    /// `check_link_heartbeats` to compress the dead-timeout to
722    /// `fast_link_dead_timeout_secs` while our outbound is observed
723    /// broken — direct kernel evidence beats waiting on receive-silence.
724    last_local_send_failure_at: Option<std::time::Instant>,
725
726    // === Display Names ===
727    /// Human-readable names for configured peers (alias or short npub).
728    /// Populated at startup from peer config.
729    peer_aliases: HashMap<NodeAddr, String>,
730
731    /// Reloadable peer ACL state from standard allow/deny files.
732    peer_acl: acl::PeerAclReloader,
733
734    // === Host Map ===
735    /// Static hostname → npub mapping for DNS resolution.
736    /// Built at construction from peer aliases and /etc/fips/hosts.
737    host_map: Arc<HostMap>,
738}
739
740impl Node {
741    /// Create a new node from configuration.
742    pub fn new(config: Config) -> Result<Self, NodeError> {
743        config.validate()?;
744        let identity = config.create_identity()?;
745        let node_addr = *identity.node_addr();
746        let is_leaf_only = config.is_leaf_only();
747
748        let (decrypt_fallback_tx, decrypt_fallback_rx) = tokio::sync::mpsc::unbounded_channel();
749        let decrypt_fallback_rx = Some(decrypt_fallback_rx);
750
751        let mut startup_epoch = [0u8; 8];
752        rand::rng().fill_bytes(&mut startup_epoch);
753
754        let mut bloom_state = if is_leaf_only {
755            BloomState::leaf_only(node_addr)
756        } else {
757            BloomState::new(node_addr)
758        };
759        bloom_state.set_update_debounce_ms(config.node.bloom.update_debounce_ms);
760
761        let tun_state = if config.tun.enabled {
762            TunState::Configured
763        } else {
764            TunState::Disabled
765        };
766
767        // Initialize tree state with signed self-declaration
768        let mut tree_state = TreeState::new(node_addr);
769        tree_state.set_parent_hysteresis(config.node.tree.parent_hysteresis);
770        tree_state.set_hold_down(config.node.tree.hold_down_secs);
771        tree_state.set_flap_dampening(
772            config.node.tree.flap_threshold,
773            config.node.tree.flap_window_secs,
774            config.node.tree.flap_dampening_secs,
775        );
776        tree_state
777            .sign_declaration(&identity)
778            .expect("signing own declaration should never fail");
779
780        let coord_cache = CoordCache::new(
781            config.node.cache.coord_size,
782            config.node.cache.coord_ttl_secs * 1000,
783        );
784        let rl = &config.node.rate_limit;
785        let msg1_rate_limiter = HandshakeRateLimiter::with_params(
786            rate_limit::TokenBucket::with_params(rl.handshake_burst, rl.handshake_rate),
787            config.node.limits.max_pending_inbound,
788        );
789
790        let max_connections = config.node.limits.max_connections;
791        let max_peers = config.node.limits.max_peers;
792        let max_links = config.node.limits.max_links;
793        let coords_response_interval_ms = config.node.session.coords_response_interval_ms;
794        let backoff_base_secs = config.node.discovery.backoff_base_secs;
795        let backoff_max_secs = config.node.discovery.backoff_max_secs;
796        let forward_min_interval_secs = config.node.discovery.forward_min_interval_secs;
797
798        let (host_map, peer_acl) = Self::host_map_and_peer_acl(&config);
799
800        Ok(Self {
801            identity,
802            startup_epoch,
803            started_at: std::time::Instant::now(),
804            config,
805            state: NodeState::Created,
806            is_leaf_only,
807            tree_state,
808            bloom_state,
809            coord_cache,
810            learned_routes: LearnedRouteTable::default(),
811            recent_requests: HashMap::new(),
812            transports: HashMap::new(),
813            transport_drops: HashMap::new(),
814            links: HashMap::new(),
815            addr_to_link: HashMap::new(),
816            packet_tx: None,
817            packet_rx: None,
818            connections: HashMap::new(),
819            peers: HashMap::new(),
820            sessions: HashMap::new(),
821            identity_cache: HashMap::new(),
822            pending_tun_packets: HashMap::new(),
823            pending_endpoint_data: HashMap::new(),
824            pending_lookups: HashMap::new(),
825            max_connections,
826            max_peers,
827            max_links,
828            next_link_id: 1,
829            next_transport_id: 1,
830            stats: stats::NodeStats::new(),
831            stats_history: stats_history::StatsHistory::new(),
832            tun_state,
833            tun_name: None,
834            tun_tx: None,
835            tun_outbound_rx: None,
836            external_packet_tx: None,
837            endpoint_command_rx: None,
838            endpoint_event_tx: None,
839            encrypt_workers: None,
840            decrypt_workers: None,
841            decrypt_registered_sessions: std::collections::HashSet::new(),
842            decrypt_fallback_tx,
843            decrypt_fallback_rx,
844            tun_reader_handle: None,
845            tun_writer_handle: None,
846            #[cfg(target_os = "macos")]
847            tun_shutdown_fd: None,
848            dns_identity_rx: None,
849            dns_task: None,
850            index_allocator: IndexAllocator::new(),
851            peers_by_index: HashMap::new(),
852            pending_outbound: HashMap::new(),
853            msg1_rate_limiter,
854            icmp_rate_limiter: IcmpRateLimiter::new(),
855            routing_error_rate_limiter: RoutingErrorRateLimiter::new(),
856            coords_response_rate_limiter: RoutingErrorRateLimiter::with_interval(
857                std::time::Duration::from_millis(coords_response_interval_ms),
858            ),
859            discovery_backoff: DiscoveryBackoff::with_params(backoff_base_secs, backoff_max_secs),
860            discovery_forward_limiter: DiscoveryForwardRateLimiter::with_interval(
861                std::time::Duration::from_secs(forward_min_interval_secs),
862            ),
863            pending_connects: Vec::new(),
864            retry_pending: HashMap::new(),
865            nostr_discovery: None,
866            nostr_discovery_started_at_ms: None,
867            lan_discovery: None,
868            startup_open_discovery_sweep_done: false,
869            bootstrap_transports: HashSet::new(),
870            bootstrap_transport_npubs: HashMap::new(),
871            discovery_fallback_transit_blocked_peers: HashSet::new(),
872            last_parent_reeval: None,
873            last_congestion_log: None,
874            estimated_mesh_size: None,
875            last_mesh_size_log: None,
876            last_self_warn: None,
877            last_local_send_failure_at: None,
878            peer_aliases: HashMap::new(),
879            peer_acl,
880            host_map,
881            path_mtu_lookup: Arc::new(std::sync::RwLock::new(HashMap::new())),
882        })
883    }
884
885    /// Create a node with a specific identity.
886    ///
887    /// This constructor validates cross-field config invariants before
888    /// constructing the node, same as [`Node::new`].
889    pub fn with_identity(identity: Identity, config: Config) -> Result<Self, NodeError> {
890        config.validate()?;
891        let node_addr = *identity.node_addr();
892
893        let (decrypt_fallback_tx, decrypt_fallback_rx) = tokio::sync::mpsc::unbounded_channel();
894        let decrypt_fallback_rx = Some(decrypt_fallback_rx);
895
896        let mut startup_epoch = [0u8; 8];
897        rand::rng().fill_bytes(&mut startup_epoch);
898
899        let tun_state = if config.tun.enabled {
900            TunState::Configured
901        } else {
902            TunState::Disabled
903        };
904
905        // Initialize tree state with signed self-declaration
906        let mut tree_state = TreeState::new(node_addr);
907        tree_state.set_parent_hysteresis(config.node.tree.parent_hysteresis);
908        tree_state.set_hold_down(config.node.tree.hold_down_secs);
909        tree_state.set_flap_dampening(
910            config.node.tree.flap_threshold,
911            config.node.tree.flap_window_secs,
912            config.node.tree.flap_dampening_secs,
913        );
914        tree_state
915            .sign_declaration(&identity)
916            .expect("signing own declaration should never fail");
917
918        let mut bloom_state = BloomState::new(node_addr);
919        bloom_state.set_update_debounce_ms(config.node.bloom.update_debounce_ms);
920
921        let coord_cache = CoordCache::new(
922            config.node.cache.coord_size,
923            config.node.cache.coord_ttl_secs * 1000,
924        );
925        let rl = &config.node.rate_limit;
926        let msg1_rate_limiter = HandshakeRateLimiter::with_params(
927            rate_limit::TokenBucket::with_params(rl.handshake_burst, rl.handshake_rate),
928            config.node.limits.max_pending_inbound,
929        );
930
931        let max_connections = config.node.limits.max_connections;
932        let max_peers = config.node.limits.max_peers;
933        let max_links = config.node.limits.max_links;
934        let coords_response_interval_ms = config.node.session.coords_response_interval_ms;
935
936        let (host_map, peer_acl) = Self::host_map_and_peer_acl(&config);
937
938        Ok(Self {
939            identity,
940            startup_epoch,
941            started_at: std::time::Instant::now(),
942            config,
943            state: NodeState::Created,
944            is_leaf_only: false,
945            tree_state,
946            bloom_state,
947            coord_cache,
948            learned_routes: LearnedRouteTable::default(),
949            recent_requests: HashMap::new(),
950            transports: HashMap::new(),
951            transport_drops: HashMap::new(),
952            links: HashMap::new(),
953            addr_to_link: HashMap::new(),
954            packet_tx: None,
955            packet_rx: None,
956            connections: HashMap::new(),
957            peers: HashMap::new(),
958            sessions: HashMap::new(),
959            identity_cache: HashMap::new(),
960            pending_tun_packets: HashMap::new(),
961            pending_endpoint_data: HashMap::new(),
962            pending_lookups: HashMap::new(),
963            max_connections,
964            max_peers,
965            max_links,
966            next_link_id: 1,
967            next_transport_id: 1,
968            stats: stats::NodeStats::new(),
969            stats_history: stats_history::StatsHistory::new(),
970            tun_state,
971            tun_name: None,
972            tun_tx: None,
973            tun_outbound_rx: None,
974            external_packet_tx: None,
975            endpoint_command_rx: None,
976            endpoint_event_tx: None,
977            encrypt_workers: None,
978            decrypt_workers: None,
979            decrypt_registered_sessions: std::collections::HashSet::new(),
980            decrypt_fallback_tx,
981            decrypt_fallback_rx,
982            tun_reader_handle: None,
983            tun_writer_handle: None,
984            #[cfg(target_os = "macos")]
985            tun_shutdown_fd: None,
986            dns_identity_rx: None,
987            dns_task: None,
988            index_allocator: IndexAllocator::new(),
989            peers_by_index: HashMap::new(),
990            pending_outbound: HashMap::new(),
991            msg1_rate_limiter,
992            icmp_rate_limiter: IcmpRateLimiter::new(),
993            routing_error_rate_limiter: RoutingErrorRateLimiter::new(),
994            coords_response_rate_limiter: RoutingErrorRateLimiter::with_interval(
995                std::time::Duration::from_millis(coords_response_interval_ms),
996            ),
997            discovery_backoff: DiscoveryBackoff::new(),
998            discovery_forward_limiter: DiscoveryForwardRateLimiter::new(),
999            pending_connects: Vec::new(),
1000            retry_pending: HashMap::new(),
1001            nostr_discovery: None,
1002            nostr_discovery_started_at_ms: None,
1003            lan_discovery: None,
1004            startup_open_discovery_sweep_done: false,
1005            bootstrap_transports: HashSet::new(),
1006            bootstrap_transport_npubs: HashMap::new(),
1007            discovery_fallback_transit_blocked_peers: HashSet::new(),
1008            last_parent_reeval: None,
1009            last_congestion_log: None,
1010            estimated_mesh_size: None,
1011            last_mesh_size_log: None,
1012            last_self_warn: None,
1013            last_local_send_failure_at: None,
1014            peer_aliases: HashMap::new(),
1015            peer_acl,
1016            host_map,
1017            path_mtu_lookup: Arc::new(std::sync::RwLock::new(HashMap::new())),
1018        })
1019    }
1020
1021    /// Create a leaf-only node (simplified state).
1022    pub fn leaf_only(config: Config) -> Result<Self, NodeError> {
1023        let mut node = Self::new(config)?;
1024        node.is_leaf_only = true;
1025        node.bloom_state = BloomState::leaf_only(*node.identity.node_addr());
1026        Ok(node)
1027    }
1028
1029    fn host_map_and_peer_acl(config: &Config) -> (Arc<HostMap>, acl::PeerAclReloader) {
1030        let base_host_map = HostMap::from_peer_configs(config.peers());
1031        if !config.node.system_files_enabled {
1032            return (
1033                Arc::new(base_host_map.clone()),
1034                acl::PeerAclReloader::memory_only(base_host_map),
1035            );
1036        }
1037
1038        let mut host_map = base_host_map.clone();
1039        let hosts_path = std::path::PathBuf::from(crate::upper::hosts::DEFAULT_HOSTS_PATH);
1040        let hosts_file = HostMap::load_hosts_file(std::path::Path::new(
1041            crate::upper::hosts::DEFAULT_HOSTS_PATH,
1042        ));
1043        host_map.merge(hosts_file);
1044        let peer_acl = acl::PeerAclReloader::with_alias_sources(
1045            std::path::PathBuf::from(acl::DEFAULT_PEERS_ALLOW_PATH),
1046            std::path::PathBuf::from(acl::DEFAULT_PEERS_DENY_PATH),
1047            base_host_map,
1048            hosts_path,
1049        );
1050        (Arc::new(host_map), peer_acl)
1051    }
1052
1053    /// Create transport instances from configuration.
1054    ///
1055    /// Returns a vector of TransportHandles for all configured transports.
1056    async fn create_transports(&mut self, packet_tx: &PacketTx) -> Vec<TransportHandle> {
1057        let mut transports = Vec::new();
1058
1059        // Collect UDP configs with optional names to avoid borrow conflicts
1060        let udp_instances: Vec<_> = self
1061            .config
1062            .transports
1063            .udp
1064            .iter()
1065            .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1066            .collect();
1067
1068        // Create UDP transport instances
1069        for (name, udp_config) in udp_instances {
1070            let transport_id = self.allocate_transport_id();
1071            let udp = UdpTransport::new(transport_id, name, udp_config, packet_tx.clone());
1072            transports.push(TransportHandle::Udp(udp));
1073        }
1074
1075        #[cfg(feature = "sim-transport")]
1076        {
1077            let sim_instances: Vec<_> = self
1078                .config
1079                .transports
1080                .sim
1081                .iter()
1082                .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1083                .collect();
1084
1085            for (name, sim_config) in sim_instances {
1086                let transport_id = self.allocate_transport_id();
1087                let sim = crate::transport::sim::SimTransport::new(
1088                    transport_id,
1089                    name,
1090                    sim_config,
1091                    packet_tx.clone(),
1092                );
1093                transports.push(TransportHandle::Sim(sim));
1094            }
1095        }
1096
1097        // Create Ethernet transport instances where raw-socket support exists.
1098        #[cfg(any(target_os = "linux", target_os = "macos"))]
1099        {
1100            let eth_instances: Vec<_> = self
1101                .config
1102                .transports
1103                .ethernet
1104                .iter()
1105                .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1106                .collect();
1107            let xonly = self.identity.pubkey();
1108            for (name, eth_config) in eth_instances {
1109                let mut eth_config = eth_config;
1110                if eth_config.discovery_scope.is_none() {
1111                    eth_config.discovery_scope = self.lan_discovery_scope();
1112                }
1113                let transport_id = self.allocate_transport_id();
1114                let mut eth =
1115                    EthernetTransport::new(transport_id, name, eth_config, packet_tx.clone());
1116                eth.set_local_pubkey(xonly);
1117                transports.push(TransportHandle::Ethernet(eth));
1118            }
1119        }
1120
1121        // Create TCP transport instances
1122        let tcp_instances: Vec<_> = self
1123            .config
1124            .transports
1125            .tcp
1126            .iter()
1127            .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1128            .collect();
1129
1130        for (name, tcp_config) in tcp_instances {
1131            let transport_id = self.allocate_transport_id();
1132            let tcp = TcpTransport::new(transport_id, name, tcp_config, packet_tx.clone());
1133            transports.push(TransportHandle::Tcp(tcp));
1134        }
1135
1136        // Create Tor transport instances
1137        let tor_instances: Vec<_> = self
1138            .config
1139            .transports
1140            .tor
1141            .iter()
1142            .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1143            .collect();
1144
1145        for (name, tor_config) in tor_instances {
1146            let transport_id = self.allocate_transport_id();
1147            let tor = TorTransport::new(transport_id, name, tor_config, packet_tx.clone());
1148            transports.push(TransportHandle::Tor(tor));
1149        }
1150
1151        let webrtc_instances: Vec<_> = self
1152            .config
1153            .transports
1154            .webrtc
1155            .iter()
1156            .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1157            .collect();
1158
1159        #[cfg(feature = "webrtc-transport")]
1160        {
1161            for (name, webrtc_config) in webrtc_instances {
1162                let transport_id = self.allocate_transport_id();
1163                match WebRtcTransport::new(
1164                    transport_id,
1165                    name,
1166                    webrtc_config,
1167                    packet_tx.clone(),
1168                    &self.identity,
1169                    &self.config.node.discovery.nostr,
1170                ) {
1171                    Ok(webrtc) => transports.push(TransportHandle::WebRtc(Box::new(webrtc))),
1172                    Err(err) => {
1173                        warn!(
1174                            transport_id = %transport_id,
1175                            error = %err,
1176                            "failed to initialize WebRTC transport"
1177                        );
1178                    }
1179                }
1180            }
1181        }
1182        #[cfg(not(feature = "webrtc-transport"))]
1183        if !webrtc_instances.is_empty() {
1184            warn!("WebRTC transport configured but this build lacks WebRTC transport support");
1185        }
1186
1187        // Create BLE transport instances
1188        #[cfg(bluer_available)]
1189        {
1190            let ble_instances: Vec<_> = self
1191                .config
1192                .transports
1193                .ble
1194                .iter()
1195                .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1196                .collect();
1197
1198            #[cfg(all(bluer_available, not(test)))]
1199            for (name, ble_config) in ble_instances {
1200                let transport_id = self.allocate_transport_id();
1201                let adapter = ble_config.adapter().to_string();
1202                let mtu = ble_config.mtu();
1203                match crate::transport::ble::io::BluerIo::new(&adapter, mtu).await {
1204                    Ok(io) => {
1205                        let mut ble = crate::transport::ble::BleTransport::new(
1206                            transport_id,
1207                            name,
1208                            ble_config,
1209                            io,
1210                            packet_tx.clone(),
1211                        );
1212                        ble.set_local_pubkey(self.identity.pubkey().serialize());
1213                        transports.push(TransportHandle::Ble(ble));
1214                    }
1215                    Err(e) => {
1216                        tracing::warn!(adapter = %adapter, error = %e, "failed to initialize BLE adapter");
1217                    }
1218                }
1219            }
1220
1221            #[cfg(any(not(bluer_available), test))]
1222            if !ble_instances.is_empty() {
1223                #[cfg(not(test))]
1224                tracing::warn!("BLE transport configured but this build lacks BlueZ support");
1225            }
1226        }
1227
1228        transports
1229    }
1230
1231    /// Find an operational transport that matches the given transport type name.
1232    ///
1233    /// Adopted UDP bootstrap transports are point-to-point sockets handed off
1234    /// from Nostr/STUN traversal. They must not be reused for ordinary
1235    /// `udp host:port` dials discovered through static config, mDNS, or overlay
1236    /// adverts: on macOS a `send_to` through the wrong adopted socket can fail
1237    /// with `EINVAL`, and even on platforms that allow it the packet would use
1238    /// the wrong 5-tuple/NAT mapping. Prefer configured transports and make the
1239    /// choice deterministic by lowest transport id instead of HashMap order.
1240    fn find_transport_for_type(&self, transport_type: &str) -> Option<TransportId> {
1241        self.transports
1242            .iter()
1243            .filter(|(id, handle)| {
1244                handle.transport_type().name == transport_type
1245                    && handle.is_operational()
1246                    && !self.bootstrap_transports.contains(id)
1247            })
1248            .min_by_key(|(id, _)| id.as_u32())
1249            .map(|(id, _)| *id)
1250    }
1251
1252    /// Resolve an Ethernet peer address ("interface/mac") to a transport ID
1253    /// and binary TransportAddr.
1254    ///
1255    /// Finds the Ethernet transport instance bound to the named interface
1256    /// and parses the MAC portion into a 6-byte TransportAddr.
1257    #[allow(unused_variables)]
1258    fn resolve_ethernet_addr(
1259        &self,
1260        addr_str: &str,
1261    ) -> Result<(TransportId, TransportAddr), NodeError> {
1262        #[cfg(any(target_os = "linux", target_os = "macos"))]
1263        {
1264            let (iface, mac_str) = addr_str.split_once('/').ok_or_else(|| {
1265                NodeError::NoTransportForType(format!(
1266                    "invalid Ethernet address format '{}': expected 'interface/mac'",
1267                    addr_str
1268                ))
1269            })?;
1270
1271            // Find the Ethernet transport bound to this interface
1272            let transport_id = self
1273                .transports
1274                .iter()
1275                .find(|(_, handle)| {
1276                    handle.transport_type().name == "ethernet"
1277                        && handle.is_operational()
1278                        && handle.interface_name() == Some(iface)
1279                })
1280                .map(|(id, _)| *id)
1281                .ok_or_else(|| {
1282                    NodeError::NoTransportForType(format!(
1283                        "no operational Ethernet transport for interface '{}'",
1284                        iface
1285                    ))
1286                })?;
1287
1288            let mac = crate::transport::ethernet::parse_mac_string(mac_str).map_err(|e| {
1289                NodeError::NoTransportForType(format!("invalid MAC in '{}': {}", addr_str, e))
1290            })?;
1291
1292            Ok((transport_id, TransportAddr::from_bytes(&mac)))
1293        }
1294        #[cfg(not(any(target_os = "linux", target_os = "macos")))]
1295        {
1296            Err(NodeError::NoTransportForType(
1297                "Ethernet transport is not supported on this platform".to_string(),
1298            ))
1299        }
1300    }
1301
1302    /// Resolve a BLE address string (`"adapter/AA:BB:CC:DD:EE:FF"`) to a
1303    /// (TransportId, TransportAddr) pair by finding the BLE transport
1304    /// instance matching the adapter name.
1305    #[cfg(bluer_available)]
1306    fn resolve_ble_addr(&self, addr_str: &str) -> Result<(TransportId, TransportAddr), NodeError> {
1307        let ta = TransportAddr::from_string(addr_str);
1308        let adapter = crate::transport::ble::addr::adapter_from_addr(&ta).ok_or_else(|| {
1309            NodeError::NoTransportForType(format!(
1310                "invalid BLE address format '{}': expected 'adapter/mac'",
1311                addr_str
1312            ))
1313        })?;
1314
1315        // Find the BLE transport for this adapter
1316        let transport_id = self
1317            .transports
1318            .iter()
1319            .find(|(_, handle)| handle.transport_type().name == "ble" && handle.is_operational())
1320            .map(|(id, _)| *id)
1321            .ok_or_else(|| {
1322                NodeError::NoTransportForType(format!(
1323                    "no operational BLE transport for adapter '{}'",
1324                    adapter
1325                ))
1326            })?;
1327
1328        // Validate the address format
1329        crate::transport::ble::addr::BleAddr::parse(addr_str).map_err(|e| {
1330            NodeError::NoTransportForType(format!("invalid BLE address '{}': {}", addr_str, e))
1331        })?;
1332
1333        Ok((transport_id, TransportAddr::from_string(addr_str)))
1334    }
1335
1336    // === Identity Accessors ===
1337
1338    /// Get this node's identity.
1339    pub fn identity(&self) -> &Identity {
1340        &self.identity
1341    }
1342
1343    /// Get this node's NodeAddr.
1344    pub fn node_addr(&self) -> &NodeAddr {
1345        self.identity.node_addr()
1346    }
1347
1348    /// Get this node's npub.
1349    pub fn npub(&self) -> String {
1350        self.identity.npub()
1351    }
1352
1353    /// Return a human-readable display name for a NodeAddr.
1354    ///
1355    /// Lookup order:
1356    /// 1. Host map hostname (from peer aliases + /etc/fips/hosts)
1357    /// 2. Configured peer alias or short npub (from startup map)
1358    /// 3. Active peer's short npub (e.g., inbound peer not in config)
1359    /// 4. Session endpoint's short npub (end-to-end, may not be direct peer)
1360    /// 5. Truncated NodeAddr hex (unknown address)
1361    pub(crate) fn peer_display_name(&self, addr: &NodeAddr) -> String {
1362        if let Some(hostname) = self.host_map.lookup_hostname(addr) {
1363            return hostname.to_string();
1364        }
1365        if let Some(name) = self.peer_aliases.get(addr) {
1366            return name.clone();
1367        }
1368        if let Some(peer) = self.peers.get(addr) {
1369            return peer.identity().short_npub();
1370        }
1371        if let Some(entry) = self.sessions.get(addr) {
1372            let (xonly, _) = entry.remote_pubkey().x_only_public_key();
1373            return PeerIdentity::from_pubkey(xonly).short_npub();
1374        }
1375        addr.short_hex()
1376    }
1377
1378    /// Tear down a `peers_by_index` entry **and** keep the shard-owned
1379    /// decrypt-worker state coherent: removes the same `cache_key`
1380    /// from the registered-sessions tracking set and tells the
1381    /// assigned shard worker to drop its `OwnedSessionState` entry.
1382    ///
1383    /// Use this instead of a bare `self.peers_by_index.remove(&key)`
1384    /// at every session-lifecycle teardown site (rekey cross-connection
1385    /// swap, peer disconnect, dispatch session-rotation) so the worker
1386    /// doesn't keep stale ciphers / replay windows around. The
1387    /// follow-up `RegisterSession` for the NEW key (if any) will then
1388    /// install the fresh state on the same shard.
1389    pub(in crate::node) fn deregister_session_index(&mut self, cache_key: (TransportId, u32)) {
1390        // Find the peer that owns this index BEFORE removing it from
1391        // the index map, so we can decide whether the deregistration
1392        // also tears down the peer's connected UDP socket.
1393        let owning_peer = self.peers_by_index.get(&cache_key).copied();
1394        self.peers_by_index.remove(&cache_key);
1395        if self.decrypt_registered_sessions.remove(&cache_key)
1396            && let Some(workers) = self.decrypt_workers.as_ref()
1397        {
1398            workers.unregister_session(cache_key);
1399        }
1400        // Tear down the per-peer connected UDP socket *only* if no
1401        // other peers_by_index entry still resolves to this peer.
1402        // Rekey drain calls into this helper with the OLD session
1403        // index while the NEW index is already installed and points
1404        // at the same peer — there the connect()-ed 5-tuple is
1405        // still valid for the new session and we must not close it.
1406        // Peer-teardown sites (CrossConnection swap, stale-index
1407        // fall-through in encrypted.rs, disconnect handler) call
1408        // here when this is the peer's last index, so the connected
1409        // socket goes away with the peer.
1410        if let Some(peer_addr) = owning_peer {
1411            let peer_has_other_index = self
1412                .peers_by_index
1413                .values()
1414                .any(|other| *other == peer_addr);
1415            if !peer_has_other_index {
1416                self.clear_connected_udp_for_peer(&peer_addr);
1417            }
1418        }
1419    }
1420
1421    /// Ensure the current FMP receive index resolves to this peer.
1422    ///
1423    /// Rekey msg1/msg2 handlers pre-register the pending index before
1424    /// cutover, but losing that registration in a debug build used to
1425    /// panic in the cutover path. Repairing the map here is safe: the
1426    /// peer has already promoted the pending session, and the decrypt
1427    /// worker registration immediately after cutover depends on the
1428    /// same `(transport_id, our_index)` key.
1429    pub(in crate::node) fn ensure_current_session_index_registered(
1430        &mut self,
1431        node_addr: &NodeAddr,
1432        context: &'static str,
1433    ) -> bool {
1434        let Some(peer) = self.peers.get(node_addr) else {
1435            return false;
1436        };
1437        let Some(transport_id) = peer.transport_id() else {
1438            warn!(
1439                peer = %self.peer_display_name(node_addr),
1440                context,
1441                "Cannot register current session index without transport id"
1442            );
1443            return false;
1444        };
1445        let Some(our_index) = peer.our_index() else {
1446            warn!(
1447                peer = %self.peer_display_name(node_addr),
1448                context,
1449                "Cannot register current session index without local index"
1450            );
1451            return false;
1452        };
1453
1454        let cache_key = (transport_id, our_index.as_u32());
1455        match self.peers_by_index.get(&cache_key).copied() {
1456            Some(existing) if existing == *node_addr => true,
1457            Some(existing) => {
1458                warn!(
1459                    peer = %self.peer_display_name(node_addr),
1460                    previous_owner = %self.peer_display_name(&existing),
1461                    transport_id = %transport_id,
1462                    our_index = %our_index,
1463                    context,
1464                    "Repairing current session index with stale owner"
1465                );
1466                self.peers_by_index.insert(cache_key, *node_addr);
1467                true
1468            }
1469            None => {
1470                warn!(
1471                    peer = %self.peer_display_name(node_addr),
1472                    transport_id = %transport_id,
1473                    our_index = %our_index,
1474                    context,
1475                    "Repairing missing current session index"
1476                );
1477                self.peers_by_index.insert(cache_key, *node_addr);
1478                true
1479            }
1480        }
1481    }
1482
1483    // === Configuration ===
1484
1485    /// Get the configuration.
1486    pub fn config(&self) -> &Config {
1487        &self.config
1488    }
1489
1490    /// Calculate the effective IPv6 MTU that can be sent over FIPS.
1491    ///
1492    /// Delegates to `upper::icmp::effective_ipv6_mtu()` with this node's
1493    /// transport MTU. Returns the maximum IPv6 packet size (including
1494    /// IPv6 header) that can be transmitted through the FIPS mesh.
1495    pub fn effective_ipv6_mtu(&self) -> u16 {
1496        crate::upper::icmp::effective_ipv6_mtu(self.transport_mtu())
1497    }
1498
1499    /// Get the transport MTU governing the global TUN-boundary MSS clamp.
1500    ///
1501    /// Returns the **minimum** MTU across all operational transports, or
1502    /// 1280 (IPv6 minimum) as fallback. Used for initial TUN configuration
1503    /// where a specific egress transport isn't yet known: the resulting
1504    /// `effective_ipv6_mtu` (transport_mtu - 77) and `max_mss`
1505    /// (effective_mtu - 60) form a conservative ceiling that fits ANY
1506    /// configured-transport's egress, eliminating PMTU-D black holes that
1507    /// would otherwise occur when a flow's actual egress is smaller than
1508    /// the clamp ceiling assumed at TUN init.
1509    ///
1510    /// Returning the smallest (rather than the first-iterated, which used
1511    /// to vary across HashMap iteration order + async-startup race) makes
1512    /// the clamp deterministic across daemon restarts.
1513    ///
1514    /// See `ISSUE-2026-0011` for the empirical investigation.
1515    pub fn transport_mtu(&self) -> u16 {
1516        let min_operational = self
1517            .transports
1518            .values()
1519            .filter(|h| h.is_operational())
1520            .map(|h| h.mtu())
1521            .min();
1522        if let Some(mtu) = min_operational {
1523            return mtu;
1524        }
1525        // Fallback to config: try UDP first, then Ethernet
1526        if let Some((_, cfg)) = self.config.transports.udp.iter().next() {
1527            return cfg.mtu();
1528        }
1529        1280
1530    }
1531
1532    // === State ===
1533
1534    /// Get the node state.
1535    pub fn state(&self) -> NodeState {
1536        self.state
1537    }
1538
1539    /// Get the node uptime.
1540    pub fn uptime(&self) -> std::time::Duration {
1541        self.started_at.elapsed()
1542    }
1543
1544    /// Check if node is operational.
1545    pub fn is_running(&self) -> bool {
1546        self.state.is_operational()
1547    }
1548
1549    /// Check if this is a leaf-only node.
1550    pub fn is_leaf_only(&self) -> bool {
1551        self.is_leaf_only
1552    }
1553
1554    // === Tree State ===
1555
1556    /// Get the tree state.
1557    pub fn tree_state(&self) -> &TreeState {
1558        &self.tree_state
1559    }
1560
1561    /// Get mutable tree state.
1562    pub fn tree_state_mut(&mut self) -> &mut TreeState {
1563        &mut self.tree_state
1564    }
1565
1566    // === Bloom State ===
1567
1568    /// Get the Bloom filter state.
1569    pub fn bloom_state(&self) -> &BloomState {
1570        &self.bloom_state
1571    }
1572
1573    /// Get mutable Bloom filter state.
1574    pub fn bloom_state_mut(&mut self) -> &mut BloomState {
1575        &mut self.bloom_state
1576    }
1577
1578    // === Mesh Size Estimate ===
1579
1580    /// Get the cached estimated mesh size.
1581    pub fn estimated_mesh_size(&self) -> Option<u64> {
1582        self.estimated_mesh_size
1583    }
1584
1585    /// Compute and cache the estimated mesh size from bloom filters.
1586    ///
1587    /// Uses the spanning tree partition: parent's filter covers nodes reachable
1588    /// upward, children's filters cover disjoint subtrees downward. The sum
1589    /// of estimated entry counts plus one (self) approximates total network size.
1590    pub(crate) fn compute_mesh_size(&mut self) {
1591        let my_addr = *self.tree_state.my_node_addr();
1592        let parent_id = *self.tree_state.my_declaration().parent_id();
1593        let is_root = self.tree_state.is_root();
1594
1595        let max_fpr = self.config.node.bloom.max_inbound_fpr;
1596        let mut total: f64 = 1.0; // count self
1597        let mut child_count: u32 = 0;
1598        let mut has_data = false;
1599
1600        // Parent's filter: nodes reachable upward through the tree.
1601        // If any contributing filter is above the FPR cap, we refuse to
1602        // estimate rather than substitute a partial/biased aggregate —
1603        // Node.estimated_mesh_size is already Option<u64> and consumers
1604        // (control socket, fipstop, periodic debug log) handle None.
1605        if !is_root
1606            && let Some(parent) = self.peers.get(&parent_id)
1607            && let Some(filter) = parent.inbound_filter()
1608        {
1609            match filter.estimated_count(max_fpr) {
1610                Some(n) => {
1611                    total += n;
1612                    has_data = true;
1613                }
1614                None => {
1615                    self.estimated_mesh_size = None;
1616                    return;
1617                }
1618            }
1619        }
1620
1621        // Children's filters: each child's subtree is disjoint
1622        for (peer_addr, peer) in &self.peers {
1623            if let Some(decl) = self.tree_state.peer_declaration(peer_addr)
1624                && *decl.parent_id() == my_addr
1625            {
1626                child_count += 1;
1627                if let Some(filter) = peer.inbound_filter() {
1628                    match filter.estimated_count(max_fpr) {
1629                        Some(n) => {
1630                            total += n;
1631                            has_data = true;
1632                        }
1633                        None => {
1634                            self.estimated_mesh_size = None;
1635                            return;
1636                        }
1637                    }
1638                }
1639            }
1640        }
1641
1642        if !has_data {
1643            self.estimated_mesh_size = None;
1644            return;
1645        }
1646
1647        let size = total.round() as u64;
1648        self.estimated_mesh_size = Some(size);
1649
1650        // Periodic logging (reuse MMP default interval: 30s)
1651        let now = std::time::Instant::now();
1652        let should_log = match self.last_mesh_size_log {
1653            None => true,
1654            Some(last) => {
1655                now.duration_since(last)
1656                    >= std::time::Duration::from_secs(self.config.node.mmp.log_interval_secs)
1657            }
1658        };
1659        if should_log {
1660            tracing::debug!(
1661                estimated_mesh_size = size,
1662                peers = self.peers.len(),
1663                children = child_count,
1664                "Mesh size estimate"
1665            );
1666            self.last_mesh_size_log = Some(now);
1667        }
1668    }
1669
1670    // === Coord Cache ===
1671
1672    /// Get the coordinate cache.
1673    pub fn coord_cache(&self) -> &CoordCache {
1674        &self.coord_cache
1675    }
1676
1677    /// Get mutable coordinate cache.
1678    pub fn coord_cache_mut(&mut self) -> &mut CoordCache {
1679        &mut self.coord_cache
1680    }
1681
1682    // === Node Statistics ===
1683
1684    /// Get the node statistics.
1685    pub fn stats(&self) -> &stats::NodeStats {
1686        &self.stats
1687    }
1688
1689    /// Get mutable node statistics.
1690    pub(crate) fn stats_mut(&mut self) -> &mut stats::NodeStats {
1691        &mut self.stats
1692    }
1693
1694    /// Get the stats history collector.
1695    pub fn stats_history(&self) -> &stats_history::StatsHistory {
1696        &self.stats_history
1697    }
1698
1699    /// Sample the current node state into the stats history ring.
1700    /// Called once per tick from the RX loop.
1701    pub(crate) fn record_stats_history(&mut self) {
1702        let fwd = &self.stats.forwarding;
1703        let peers_with_mmp: Vec<f64> = self
1704            .peers
1705            .values()
1706            .filter_map(|p| p.mmp().map(|m| m.metrics.loss_rate()))
1707            .collect();
1708        let loss_rate = if peers_with_mmp.is_empty() {
1709            0.0
1710        } else {
1711            peers_with_mmp.iter().sum::<f64>() / peers_with_mmp.len() as f64
1712        };
1713
1714        let snap = stats_history::Snapshot {
1715            mesh_size: self.estimated_mesh_size,
1716            tree_depth: self.tree_state.my_coords().depth() as u32,
1717            peer_count: self.peers.len() as u64,
1718            parent_switches_total: self.stats.tree.parent_switches,
1719            bytes_in_total: fwd.received_bytes,
1720            bytes_out_total: fwd.forwarded_bytes + fwd.originated_bytes,
1721            packets_in_total: fwd.received_packets,
1722            packets_out_total: fwd.forwarded_packets + fwd.originated_packets,
1723            loss_rate,
1724            active_sessions: self.sessions.len() as u64,
1725        };
1726
1727        let now = std::time::Instant::now();
1728        let peer_snaps: Vec<stats_history::PeerSnapshot> = self
1729            .peers
1730            .values()
1731            .map(|p| {
1732                let stats = p.link_stats();
1733                let (srtt_ms, loss_rate, ecn_ce) = match p.mmp() {
1734                    Some(m) => (
1735                        m.metrics.srtt_ms(),
1736                        Some(m.metrics.loss_rate()),
1737                        m.receiver.ecn_ce_count() as u64,
1738                    ),
1739                    None => (None, None, 0),
1740                };
1741                stats_history::PeerSnapshot {
1742                    node_addr: *p.node_addr(),
1743                    last_seen: now,
1744                    srtt_ms,
1745                    loss_rate,
1746                    bytes_in_total: stats.bytes_recv,
1747                    bytes_out_total: stats.bytes_sent,
1748                    packets_in_total: stats.packets_recv,
1749                    packets_out_total: stats.packets_sent,
1750                    ecn_ce_total: ecn_ce,
1751                }
1752            })
1753            .collect();
1754
1755        self.stats_history.tick(now, &snap, &peer_snaps);
1756    }
1757
1758    // === TUN Interface ===
1759
1760    /// Get the TUN state.
1761    pub fn tun_state(&self) -> TunState {
1762        self.tun_state
1763    }
1764
1765    /// Get the TUN interface name, if active.
1766    pub fn tun_name(&self) -> Option<&str> {
1767        self.tun_name.as_deref()
1768    }
1769
1770    // === Resource Limits ===
1771
1772    /// Set the maximum number of connections (handshake phase).
1773    pub fn set_max_connections(&mut self, max: usize) {
1774        self.max_connections = max;
1775    }
1776
1777    /// Set the maximum number of peers (authenticated).
1778    pub fn set_max_peers(&mut self, max: usize) {
1779        self.max_peers = max;
1780    }
1781
1782    /// Set the maximum number of links.
1783    pub fn set_max_links(&mut self, max: usize) {
1784        self.max_links = max;
1785    }
1786
1787    // === Counts ===
1788
1789    /// Number of pending connections (handshake in progress).
1790    pub fn connection_count(&self) -> usize {
1791        self.connections.len()
1792    }
1793
1794    /// Number of authenticated peers.
1795    pub fn peer_count(&self) -> usize {
1796        self.peers.len()
1797    }
1798
1799    /// Number of active links.
1800    pub fn link_count(&self) -> usize {
1801        self.links.len()
1802    }
1803
1804    /// Number of active transports.
1805    pub fn transport_count(&self) -> usize {
1806        self.transports.len()
1807    }
1808
1809    // === Transport Management ===
1810
1811    /// Allocate a new transport ID.
1812    pub fn allocate_transport_id(&mut self) -> TransportId {
1813        let id = TransportId::new(self.next_transport_id);
1814        self.next_transport_id += 1;
1815        id
1816    }
1817
1818    /// Get a transport by ID.
1819    pub fn get_transport(&self, id: &TransportId) -> Option<&TransportHandle> {
1820        self.transports.get(id)
1821    }
1822
1823    /// Get mutable transport by ID.
1824    pub fn get_transport_mut(&mut self, id: &TransportId) -> Option<&mut TransportHandle> {
1825        self.transports.get_mut(id)
1826    }
1827
1828    /// Iterate over transport IDs.
1829    pub fn transport_ids(&self) -> impl Iterator<Item = &TransportId> {
1830        self.transports.keys()
1831    }
1832
1833    /// Get the packet receiver for the event loop.
1834    pub fn packet_rx(&mut self) -> Option<&mut PacketRx> {
1835        self.packet_rx.as_mut()
1836    }
1837
1838    // === Link Management ===
1839
1840    /// Allocate a new link ID.
1841    pub fn allocate_link_id(&mut self) -> LinkId {
1842        let id = LinkId::new(self.next_link_id);
1843        self.next_link_id += 1;
1844        id
1845    }
1846
1847    /// Add a link.
1848    pub fn add_link(&mut self, link: Link) -> Result<(), NodeError> {
1849        if self.max_links > 0 && self.links.len() >= self.max_links {
1850            return Err(NodeError::MaxLinksExceeded {
1851                max: self.max_links,
1852            });
1853        }
1854        let link_id = link.link_id();
1855        let transport_id = link.transport_id();
1856        let remote_addr = link.remote_addr().clone();
1857
1858        self.links.insert(link_id, link);
1859        self.addr_to_link
1860            .insert((transport_id, remote_addr), link_id);
1861        Ok(())
1862    }
1863
1864    /// Get a link by ID.
1865    pub fn get_link(&self, link_id: &LinkId) -> Option<&Link> {
1866        self.links.get(link_id)
1867    }
1868
1869    /// Get a mutable link by ID.
1870    pub fn get_link_mut(&mut self, link_id: &LinkId) -> Option<&mut Link> {
1871        self.links.get_mut(link_id)
1872    }
1873
1874    /// Find link ID by transport address.
1875    pub fn find_link_by_addr(
1876        &self,
1877        transport_id: TransportId,
1878        addr: &TransportAddr,
1879    ) -> Option<LinkId> {
1880        self.addr_to_link
1881            .get(&(transport_id, addr.clone()))
1882            .copied()
1883    }
1884
1885    /// Remove a link.
1886    ///
1887    /// Only removes the addr_to_link reverse lookup if it still points to this
1888    /// link. In cross-connection scenarios, a newer link may have replaced the
1889    /// entry for the same address.
1890    pub fn remove_link(&mut self, link_id: &LinkId) -> Option<Link> {
1891        if let Some(link) = self.links.remove(link_id) {
1892            // Clean up reverse lookup only if it still maps to this link
1893            let key = (link.transport_id(), link.remote_addr().clone());
1894            if self.addr_to_link.get(&key) == Some(link_id) {
1895                self.addr_to_link.remove(&key);
1896            }
1897            Some(link)
1898        } else {
1899            None
1900        }
1901    }
1902
1903    pub(crate) fn cleanup_bootstrap_transport_if_unused(&mut self, transport_id: TransportId) {
1904        if !self.bootstrap_transports.contains(&transport_id) {
1905            return;
1906        }
1907
1908        let transport_in_use = self
1909            .links
1910            .values()
1911            .any(|link| link.transport_id() == transport_id)
1912            || self
1913                .connections
1914                .values()
1915                .any(|conn| conn.transport_id() == Some(transport_id))
1916            || self
1917                .peers
1918                .values()
1919                .any(|peer| peer.transport_id() == Some(transport_id))
1920            || self
1921                .pending_connects
1922                .iter()
1923                .any(|pending| pending.transport_id == transport_id);
1924
1925        if transport_in_use {
1926            return;
1927        }
1928
1929        tracing::debug!(
1930            transport_id = %transport_id,
1931            "bootstrap transport has no remaining references; dropping"
1932        );
1933
1934        self.bootstrap_transports.remove(&transport_id);
1935        self.bootstrap_transport_npubs.remove(&transport_id);
1936        self.transport_drops.remove(&transport_id);
1937        self.transports.remove(&transport_id);
1938    }
1939
1940    /// Iterate over all links.
1941    pub fn links(&self) -> impl Iterator<Item = &Link> {
1942        self.links.values()
1943    }
1944
1945    // === Connection Management (Handshake Phase) ===
1946
1947    /// Add a pending connection.
1948    pub fn add_connection(&mut self, connection: PeerConnection) -> Result<(), NodeError> {
1949        let link_id = connection.link_id();
1950
1951        if self.connections.contains_key(&link_id) {
1952            return Err(NodeError::ConnectionAlreadyExists(link_id));
1953        }
1954
1955        if self.max_connections > 0 && self.connections.len() >= self.max_connections {
1956            return Err(NodeError::MaxConnectionsExceeded {
1957                max: self.max_connections,
1958            });
1959        }
1960
1961        self.connections.insert(link_id, connection);
1962        Ok(())
1963    }
1964
1965    /// Get a connection by LinkId.
1966    pub fn get_connection(&self, link_id: &LinkId) -> Option<&PeerConnection> {
1967        self.connections.get(link_id)
1968    }
1969
1970    /// Get a mutable connection by LinkId.
1971    pub fn get_connection_mut(&mut self, link_id: &LinkId) -> Option<&mut PeerConnection> {
1972        self.connections.get_mut(link_id)
1973    }
1974
1975    /// Remove a connection.
1976    pub fn remove_connection(&mut self, link_id: &LinkId) -> Option<PeerConnection> {
1977        self.connections.remove(link_id)
1978    }
1979
1980    /// Iterate over all connections.
1981    pub fn connections(&self) -> impl Iterator<Item = &PeerConnection> {
1982        self.connections.values()
1983    }
1984
1985    // === Peer Management (Active Phase) ===
1986
1987    /// Get a peer by NodeAddr.
1988    pub fn get_peer(&self, node_addr: &NodeAddr) -> Option<&ActivePeer> {
1989        self.peers.get(node_addr)
1990    }
1991
1992    /// Get a mutable peer by NodeAddr.
1993    pub fn get_peer_mut(&mut self, node_addr: &NodeAddr) -> Option<&mut ActivePeer> {
1994        self.peers.get_mut(node_addr)
1995    }
1996
1997    /// Remove a peer.
1998    pub fn remove_peer(&mut self, node_addr: &NodeAddr) -> Option<ActivePeer> {
1999        self.peers.remove(node_addr)
2000    }
2001
2002    /// Iterate over all peers.
2003    pub fn peers(&self) -> impl Iterator<Item = &ActivePeer> {
2004        self.peers.values()
2005    }
2006
2007    /// Reference to the Nostr discovery handle if discovery is enabled.
2008    /// Used by control queries (`show_peers` per-peer Nostr-traversal
2009    /// state) to read failure-state without taking shared ownership.
2010    pub fn nostr_discovery_handle(&self) -> Option<&crate::discovery::nostr::NostrDiscovery> {
2011        self.nostr_discovery.as_deref()
2012    }
2013
2014    /// Iterate over all peer node IDs.
2015    pub fn peer_ids(&self) -> impl Iterator<Item = &NodeAddr> {
2016        self.peers.keys()
2017    }
2018
2019    /// Iterate over peers that can send traffic.
2020    pub fn sendable_peers(&self) -> impl Iterator<Item = &ActivePeer> {
2021        self.peers.values().filter(|p| p.can_send())
2022    }
2023
2024    /// Number of peers that can send traffic.
2025    pub fn sendable_peer_count(&self) -> usize {
2026        self.peers.values().filter(|p| p.can_send()).count()
2027    }
2028
2029    pub(crate) fn set_discovery_fallback_transit_allowed(
2030        &mut self,
2031        peer_addr: NodeAddr,
2032        allowed: bool,
2033    ) {
2034        if allowed {
2035            self.discovery_fallback_transit_blocked_peers
2036                .remove(&peer_addr);
2037        } else {
2038            self.discovery_fallback_transit_blocked_peers
2039                .insert(peer_addr);
2040        }
2041    }
2042
2043    pub(crate) fn configured_discovery_fallback_transit(
2044        &self,
2045        peer_addr: &NodeAddr,
2046    ) -> Option<bool> {
2047        self.config.peers().iter().find_map(|peer| {
2048            PeerIdentity::from_npub(&peer.npub)
2049                .ok()
2050                .filter(|identity| identity.node_addr() == peer_addr)
2051                .map(|_| peer.discovery_fallback_transit)
2052        })
2053    }
2054
2055    pub(crate) fn discovery_fallback_transit_for_promotion(&self, peer_addr: &NodeAddr) -> bool {
2056        if let Some(retry_state) = self.retry_pending.get(peer_addr) {
2057            return retry_state.peer_config.discovery_fallback_transit;
2058        }
2059
2060        if let Some(allowed) = self.configured_discovery_fallback_transit(peer_addr) {
2061            return allowed;
2062        }
2063
2064        self.config.node.discovery.nostr.policy != crate::config::NostrDiscoveryPolicy::Open
2065    }
2066
2067    // === End-to-End Sessions ===
2068
2069    /// Get a session by remote NodeAddr.
2070    /// Disable the discovery forward rate limiter (for tests).
2071    #[cfg(test)]
2072    pub(crate) fn disable_discovery_forward_rate_limit(&mut self) {
2073        self.discovery_forward_limiter
2074            .set_interval(std::time::Duration::ZERO);
2075    }
2076
2077    #[cfg(test)]
2078    pub(crate) fn get_session(&self, remote: &NodeAddr) -> Option<&SessionEntry> {
2079        self.sessions.get(remote)
2080    }
2081
2082    /// Get a mutable session by remote NodeAddr.
2083    #[cfg(test)]
2084    pub(crate) fn get_session_mut(&mut self, remote: &NodeAddr) -> Option<&mut SessionEntry> {
2085        self.sessions.get_mut(remote)
2086    }
2087
2088    /// Remove a session.
2089    #[cfg(test)]
2090    pub(crate) fn remove_session(&mut self, remote: &NodeAddr) -> Option<SessionEntry> {
2091        self.sessions.remove(remote)
2092    }
2093
2094    /// Read the path_mtu_lookup entry for a destination FipsAddress.
2095    #[cfg(test)]
2096    pub(crate) fn path_mtu_lookup_get(&self, fips_addr: &crate::FipsAddress) -> Option<u16> {
2097        self.path_mtu_lookup
2098            .read()
2099            .ok()
2100            .and_then(|map| map.get(fips_addr).copied())
2101    }
2102
2103    /// Write a path_mtu_lookup entry directly (for tests that pre-seed the map).
2104    #[cfg(test)]
2105    pub(crate) fn path_mtu_lookup_insert(&self, fips_addr: crate::FipsAddress, mtu: u16) {
2106        if let Ok(mut map) = self.path_mtu_lookup.write() {
2107            map.insert(fips_addr, mtu);
2108        }
2109    }
2110
2111    /// Number of end-to-end sessions.
2112    pub fn session_count(&self) -> usize {
2113        self.sessions.len()
2114    }
2115
2116    /// Iterate over all session entries (for control queries).
2117    pub(crate) fn session_entries(&self) -> impl Iterator<Item = (&NodeAddr, &SessionEntry)> {
2118        self.sessions.iter()
2119    }
2120
2121    // === Identity Cache ===
2122
2123    /// Register a node in the identity cache for FipsAddress → NodeAddr lookup.
2124    pub(crate) fn register_identity(
2125        &mut self,
2126        node_addr: NodeAddr,
2127        pubkey: secp256k1::PublicKey,
2128    ) -> bool {
2129        let mut prefix = [0u8; 15];
2130        prefix.copy_from_slice(&node_addr.as_bytes()[0..15]);
2131        if let Some(entry) = self.identity_cache.get(&prefix)
2132            && entry.node_addr == node_addr
2133            && entry.pubkey == pubkey
2134        {
2135            // Endpoint sends pass the same PeerIdentity on every packet. Once
2136            // validated, avoid re-deriving NodeAddr from the public key in the
2137            // data path; that hash showed up in macOS sender profiles.
2138            return true;
2139        }
2140
2141        let (xonly, _) = pubkey.x_only_public_key();
2142        let derived_node_addr = NodeAddr::from_pubkey(&xonly);
2143        if derived_node_addr != node_addr {
2144            debug!(
2145                claimed_node_addr = %node_addr,
2146                derived_node_addr = %derived_node_addr,
2147                "Rejected identity cache entry with mismatched public key"
2148            );
2149            return false;
2150        }
2151
2152        let now_ms = Self::now_ms();
2153        if let Some(entry) = self.identity_cache.get_mut(&prefix)
2154            && entry.node_addr == node_addr
2155        {
2156            entry.pubkey = pubkey;
2157            entry.last_seen_ms = now_ms;
2158            return true;
2159        }
2160
2161        let npub = encode_npub(&xonly);
2162        self.identity_cache.insert(
2163            prefix,
2164            IdentityCacheEntry::new(node_addr, pubkey, npub, now_ms),
2165        );
2166        // LRU eviction
2167        let max = self.config.node.cache.identity_size;
2168        if self.identity_cache.len() > max
2169            && let Some(oldest_key) = self
2170                .identity_cache
2171                .iter()
2172                .min_by_key(|(_, entry)| entry.last_seen_ms)
2173                .map(|(k, _)| *k)
2174        {
2175            self.identity_cache.remove(&oldest_key);
2176        }
2177        true
2178    }
2179
2180    /// Look up a destination by FipsAddress prefix (bytes 1-15 of the IPv6 address).
2181    pub(crate) fn lookup_by_fips_prefix(
2182        &mut self,
2183        prefix: &[u8; 15],
2184    ) -> Option<(NodeAddr, secp256k1::PublicKey)> {
2185        if let Some(entry) = self.identity_cache.get_mut(prefix) {
2186            entry.last_seen_ms = Self::now_ms(); // LRU touch
2187            Some((entry.node_addr, entry.pubkey))
2188        } else {
2189            None
2190        }
2191    }
2192
2193    /// Check if a node's identity is in the cache (without LRU touch).
2194    pub(crate) fn has_cached_identity(&self, addr: &NodeAddr) -> bool {
2195        let mut prefix = [0u8; 15];
2196        prefix.copy_from_slice(&addr.as_bytes()[0..15]);
2197        self.identity_cache.contains_key(&prefix)
2198    }
2199
2200    /// Number of identity cache entries.
2201    pub fn identity_cache_len(&self) -> usize {
2202        self.identity_cache.len()
2203    }
2204
2205    /// Iterate over identity cache entries.
2206    ///
2207    /// Returns `(NodeAddr, PublicKey, last_seen_ms)` for each cached identity.
2208    /// Used by the `show_identity_cache` control query.
2209    pub fn identity_cache_iter(
2210        &self,
2211    ) -> impl Iterator<Item = (&NodeAddr, &secp256k1::PublicKey, u64)> {
2212        self.identity_cache
2213            .values()
2214            .map(|entry| (&entry.node_addr, &entry.pubkey, entry.last_seen_ms))
2215    }
2216
2217    /// Configured maximum identity cache size.
2218    pub fn identity_cache_max(&self) -> usize {
2219        self.config.node.cache.identity_size
2220    }
2221
2222    /// Number of pending discovery lookups.
2223    pub fn pending_lookup_count(&self) -> usize {
2224        self.pending_lookups.len()
2225    }
2226
2227    /// Iterate over pending discovery lookups for diagnostics.
2228    pub fn pending_lookups_iter(
2229        &self,
2230    ) -> impl Iterator<Item = (&NodeAddr, &handlers::discovery::PendingLookup)> {
2231        self.pending_lookups.iter()
2232    }
2233
2234    /// Number of recent discovery requests tracked.
2235    pub fn recent_request_count(&self) -> usize {
2236        self.recent_requests.len()
2237    }
2238
2239    /// Count of destinations with queued TUN packets awaiting session setup.
2240    pub fn pending_tun_destinations(&self) -> usize {
2241        self.pending_tun_packets.len()
2242    }
2243
2244    /// Total TUN packets queued across all destinations.
2245    pub fn pending_tun_total_packets(&self) -> usize {
2246        self.pending_tun_packets.values().map(|q| q.len()).sum()
2247    }
2248
2249    /// Iterate over retry state for diagnostics.
2250    pub fn retry_state_iter(&self) -> impl Iterator<Item = (&NodeAddr, &retry::RetryState)> {
2251        self.retry_pending.iter()
2252    }
2253
2254    // === Routing ===
2255
2256    /// Check if a peer is a tree neighbor (parent or child in the spanning tree).
2257    ///
2258    /// Returns true if the peer is our current tree parent, or if the peer
2259    /// has declared us as their parent (making them our child).
2260    pub(crate) fn is_tree_peer(&self, peer_addr: &NodeAddr) -> bool {
2261        // Peer is our parent
2262        if !self.tree_state.is_root() && self.tree_state.my_declaration().parent_id() == peer_addr {
2263            return true;
2264        }
2265        // Peer is our child (their declaration names us as parent)
2266        if let Some(decl) = self.tree_state.peer_declaration(peer_addr)
2267            && decl.parent_id() == self.node_addr()
2268        {
2269            return true;
2270        }
2271        false
2272    }
2273
2274    /// Find next hop for a destination node address.
2275    ///
2276    /// Routing priority:
2277    /// 1. Destination is self → `None` (local delivery)
2278    /// 2. Destination is a direct peer → that peer
2279    /// 3. Reply-learned routes in `reply_learned` mode. These are locally
2280    ///    observed reverse paths, selected with weighted multipath plus
2281    ///    periodic coordinate/tree exploration.
2282    /// 4. Bloom filter candidates with cached dest coords → among peers whose
2283    ///    bloom filter contains the destination, pick the one that minimizes
2284    ///    tree distance to the destination, with
2285    ///    `(link_cost, tree_distance_to_dest, node_addr)` tie-breaking.
2286    ///    The self-distance check ensures only peers strictly closer to the
2287    ///    destination than us are considered (prevents routing loops).
2288    /// 5. Greedy tree routing fallback (requires cached dest coords)
2289    /// 6. No route → `None`
2290    ///
2291    /// Both the bloom filter and tree routing paths require cached destination
2292    /// coordinates (checked in `coord_cache`). Without coordinates, the node
2293    /// cannot make loop-free forwarding decisions. The caller should signal
2294    /// `CoordsRequired` back to the source when `None` is returned for a
2295    /// non-local destination.
2296    pub fn find_next_hop(&mut self, dest_node_addr: &NodeAddr) -> Option<&ActivePeer> {
2297        // 1. Local delivery
2298        if dest_node_addr == self.node_addr() {
2299            return None;
2300        }
2301
2302        // 2. Healthy direct peer. Stale direct links remain usable, but in
2303        // reply-learned mode they must not hide a live mesh route learned from
2304        // recent traffic or discovery.
2305        let direct_peer_can_send = self
2306            .peers
2307            .get(dest_node_addr)
2308            .is_some_and(|peer| peer.can_send());
2309        if let Some(peer) = self.peers.get(dest_node_addr)
2310            && peer.is_healthy()
2311        {
2312            return Some(peer);
2313        }
2314
2315        let now_ms = Self::now_ms();
2316
2317        let sendable_learned_peers = if self.config.node.routing.mode == RoutingMode::ReplyLearned {
2318            Some(
2319                self.peers
2320                    .iter()
2321                    .filter(|(_, peer)| peer.can_send())
2322                    .map(|(addr, _)| *addr)
2323                    .collect::<HashSet<_>>(),
2324            )
2325        } else {
2326            None
2327        };
2328
2329        // 3. Optional reply-learned routing. These entries are not peer
2330        // claims; they are local observations of which peer carried traffic
2331        // or a verified lookup response back from the destination. Most
2332        // packets use weighted multipath over learned routes, but periodic
2333        // fallback exploration lets coord/bloom/tree routes discover better
2334        // candidates.
2335        let explore_fallback = sendable_learned_peers.as_ref().is_some_and(|sendable| {
2336            self.learned_routes.should_explore_fallback(
2337                dest_node_addr,
2338                now_ms,
2339                self.config.node.routing.learned_fallback_explore_interval,
2340                |addr| sendable.contains(addr),
2341            )
2342        });
2343        if let Some(sendable) = &sendable_learned_peers
2344            && !explore_fallback
2345            && let Some(next_hop_addr) =
2346                self.learned_routes
2347                    .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2348        {
2349            return self.peers.get(&next_hop_addr);
2350        }
2351
2352        // Look up cached destination coordinates (required by both bloom and tree paths).
2353        let Some(dest_coords) = self
2354            .coord_cache
2355            .get_and_touch(dest_node_addr, now_ms)
2356            .cloned()
2357        else {
2358            if let Some(sendable) = &sendable_learned_peers
2359                && let Some(next_hop_addr) =
2360                    self.learned_routes
2361                        .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2362            {
2363                return self.peers.get(&next_hop_addr);
2364            }
2365            if direct_peer_can_send {
2366                return self.peers.get(dest_node_addr);
2367            }
2368            return None;
2369        };
2370
2371        // 4. Bloom filter candidates — requires dest_coords for loop-free selection.
2372        //    If no candidate is strictly closer, fall through to tree routing.
2373        let coordinate_route_addr = {
2374            let candidates: Vec<&ActivePeer> = self.destination_in_filters(dest_node_addr);
2375            if !candidates.is_empty() {
2376                self.select_best_candidate(&candidates, &dest_coords)
2377                    .map(|peer| *peer.node_addr())
2378            } else {
2379                None
2380            }
2381        };
2382        if let Some(next_hop_addr) = coordinate_route_addr {
2383            return self.peers.get(&next_hop_addr);
2384        }
2385
2386        // 5. Greedy tree routing fallback
2387        let tree_route_addr = self
2388            .tree_state
2389            .find_next_hop(&dest_coords)
2390            .filter(|next_hop_id| {
2391                self.peers
2392                    .get(next_hop_id)
2393                    .is_some_and(|peer| peer.can_send())
2394            });
2395        if let Some(next_hop_addr) = tree_route_addr {
2396            return self.peers.get(&next_hop_addr);
2397        }
2398        if explore_fallback {
2399            return sendable_learned_peers.as_ref().and_then(|sendable| {
2400                self.learned_routes
2401                    .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2402                    .and_then(|next_hop_addr| self.peers.get(&next_hop_addr))
2403            });
2404        }
2405
2406        if let Some(sendable) = &sendable_learned_peers
2407            && let Some(next_hop_addr) =
2408                self.learned_routes
2409                    .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2410        {
2411            return self.peers.get(&next_hop_addr);
2412        }
2413
2414        if direct_peer_can_send {
2415            return self.peers.get(dest_node_addr);
2416        }
2417
2418        None
2419    }
2420
2421    pub(in crate::node) fn learn_reverse_route(
2422        &mut self,
2423        destination: NodeAddr,
2424        next_hop: NodeAddr,
2425    ) {
2426        if self.config.node.routing.mode != RoutingMode::ReplyLearned
2427            || destination == *self.node_addr()
2428        {
2429            return;
2430        }
2431        let now_ms = Self::now_ms();
2432        self.learned_routes.learn(
2433            destination,
2434            next_hop,
2435            now_ms,
2436            self.config.node.routing.learned_ttl_secs,
2437            self.config.node.routing.max_learned_routes_per_dest,
2438        );
2439    }
2440
2441    pub(in crate::node) fn record_route_failure(
2442        &mut self,
2443        destination: NodeAddr,
2444        next_hop: NodeAddr,
2445    ) {
2446        if self.config.node.routing.mode != RoutingMode::ReplyLearned {
2447            return;
2448        }
2449        self.learned_routes.record_failure(&destination, &next_hop);
2450    }
2451
2452    pub(crate) fn learned_route_table_snapshot(&self, now_ms: u64) -> LearnedRouteTableSnapshot {
2453        self.learned_routes.snapshot(now_ms)
2454    }
2455
2456    pub(in crate::node) fn purge_learned_routes(&mut self, now_ms: u64) {
2457        self.learned_routes.purge_expired(now_ms);
2458    }
2459
2460    /// Select the best peer from a set of bloom filter candidates.
2461    ///
2462    /// Uses distance from each candidate's tree coordinates to the destination
2463    /// as the primary metric (after link_cost). Only selects peers that are
2464    /// strictly closer to the destination than we are (self-distance check
2465    /// prevents routing loops).
2466    ///
2467    /// Ordering: `(link_cost, distance_to_dest, node_addr)`.
2468    fn select_best_candidate<'a>(
2469        &'a self,
2470        candidates: &[&'a ActivePeer],
2471        dest_coords: &crate::tree::TreeCoordinate,
2472    ) -> Option<&'a ActivePeer> {
2473        let my_distance = self.tree_state.my_coords().distance_to(dest_coords);
2474
2475        let mut best: Option<(&ActivePeer, f64, usize)> = None;
2476
2477        for &candidate in candidates {
2478            if !candidate.can_send() {
2479                continue;
2480            }
2481
2482            let cost = candidate.link_cost();
2483
2484            let dist = self
2485                .tree_state
2486                .peer_coords(candidate.node_addr())
2487                .map(|pc| pc.distance_to(dest_coords))
2488                .unwrap_or(usize::MAX);
2489
2490            // Self-distance check: only consider peers strictly closer
2491            // to the destination than we are (prevents routing loops)
2492            if dist >= my_distance {
2493                continue;
2494            }
2495
2496            let dominated = match &best {
2497                None => true,
2498                Some((_, best_cost, best_dist)) => {
2499                    cost < *best_cost
2500                        || (cost == *best_cost && dist < *best_dist)
2501                        || (cost == *best_cost
2502                            && dist == *best_dist
2503                            && candidate.node_addr() < best.as_ref().unwrap().0.node_addr())
2504                }
2505            };
2506
2507            if dominated {
2508                best = Some((candidate, cost, dist));
2509            }
2510        }
2511
2512        best.map(|(peer, _, _)| peer)
2513    }
2514
2515    /// Check if a destination is in any peer's bloom filter.
2516    pub fn destination_in_filters(&self, dest: &NodeAddr) -> Vec<&ActivePeer> {
2517        self.peers.values().filter(|p| p.may_reach(dest)).collect()
2518    }
2519
2520    /// Get the TUN packet sender channel.
2521    ///
2522    /// Returns None if TUN is not active or the node hasn't been started.
2523    pub fn tun_tx(&self) -> Option<&TunTx> {
2524        self.tun_tx.as_ref()
2525    }
2526
2527    /// Attach app-owned packet I/O for embedded operation without a system TUN.
2528    ///
2529    /// This must be called before [`Node::start`] and requires `tun.enabled =
2530    /// false`. Outbound packets sent to the returned sender are processed by the
2531    /// normal session pipeline. Inbound packets delivered by FIPS sessions are
2532    /// sent to the returned receiver with source attribution.
2533    pub fn attach_external_packet_io(
2534        &mut self,
2535        capacity: usize,
2536    ) -> Result<ExternalPacketIo, NodeError> {
2537        if self.state != NodeState::Created {
2538            return Err(NodeError::Config(ConfigError::Validation(
2539                "external packet I/O must be attached before node start".to_string(),
2540            )));
2541        }
2542        if self.config.tun.enabled {
2543            return Err(NodeError::Config(ConfigError::Validation(
2544                "external packet I/O requires tun.enabled=false".to_string(),
2545            )));
2546        }
2547
2548        let capacity = capacity.max(1);
2549        let (outbound_tx, outbound_rx) = tokio::sync::mpsc::channel(capacity);
2550        let (inbound_tx, inbound_rx) = tokio::sync::mpsc::channel(capacity);
2551        self.tun_outbound_rx = Some(outbound_rx);
2552        self.external_packet_tx = Some(inbound_tx);
2553
2554        Ok(ExternalPacketIo {
2555            outbound_tx,
2556            inbound_rx,
2557        })
2558    }
2559
2560    /// Attach app-owned endpoint data I/O for embedded operation.
2561    ///
2562    /// Commands sent to the returned sender are processed by the node RX loop.
2563    /// Incoming endpoint data is emitted as source-attributed events.
2564    pub(crate) fn attach_endpoint_data_io(
2565        &mut self,
2566        capacity: usize,
2567    ) -> Result<EndpointDataIo, NodeError> {
2568        if self.state != NodeState::Created {
2569            return Err(NodeError::Config(ConfigError::Validation(
2570                "endpoint data I/O must be attached before node start".to_string(),
2571            )));
2572        }
2573
2574        let command_capacity = endpoint_data_command_capacity(capacity);
2575        let (command_tx, command_rx) = tokio::sync::mpsc::channel(command_capacity);
2576        // Inbound endpoint-data events use an unbounded channel — see
2577        // `EndpointDataIo::event_rx` docs for the rationale (kills the
2578        // per-packet semaphore + the cross-task relay task that used to
2579        // sit on top of this channel).
2580        let (event_tx, event_rx) = tokio::sync::mpsc::unbounded_channel();
2581        self.endpoint_command_rx = Some(command_rx);
2582        self.endpoint_event_tx = Some(event_tx.clone());
2583
2584        Ok(EndpointDataIo {
2585            command_tx,
2586            event_rx,
2587            event_tx,
2588        })
2589    }
2590
2591    pub(crate) fn pubkey_for_node_addr(&self, addr: &NodeAddr) -> Option<secp256k1::PublicKey> {
2592        let mut prefix = [0u8; 15];
2593        prefix.copy_from_slice(&addr.as_bytes()[0..15]);
2594        self.identity_cache
2595            .get(&prefix)
2596            .filter(|entry| &entry.node_addr == addr)
2597            .map(|entry| entry.pubkey)
2598    }
2599
2600    pub(crate) fn npub_for_node_addr(&self, addr: &NodeAddr) -> Option<String> {
2601        let mut prefix = [0u8; 15];
2602        prefix.copy_from_slice(&addr.as_bytes()[0..15]);
2603        self.identity_cache
2604            .get(&prefix)
2605            .filter(|entry| &entry.node_addr == addr)
2606            .map(|entry| entry.npub.clone())
2607    }
2608
2609    pub(in crate::node) fn deliver_external_ipv6_packet(
2610        &self,
2611        src_addr: &NodeAddr,
2612        packet: Vec<u8>,
2613    ) {
2614        let Some(external_packet_tx) = &self.external_packet_tx else {
2615            return;
2616        };
2617        if packet.len() < 40 {
2618            return;
2619        }
2620        let Ok(destination) = FipsAddress::from_slice(&packet[24..40]) else {
2621            return;
2622        };
2623        let delivered = NodeDeliveredPacket {
2624            source_node_addr: *src_addr,
2625            source_npub: self.npub_for_node_addr(src_addr),
2626            destination,
2627            packet,
2628        };
2629        if let Err(error) = external_packet_tx.try_send(delivered) {
2630            debug!(error = %error, "Failed to deliver packet to external app sink");
2631        }
2632    }
2633
2634    // === Sending ===
2635
2636    /// Encrypt and send a link-layer message to an authenticated peer.
2637    ///
2638    /// The plaintext should include the message type byte followed by the
2639    /// message-specific payload (e.g., `[0x50, reason]` for Disconnect).
2640    ///
2641    /// The send path prepends a 4-byte session-relative timestamp (inner
2642    /// header) before encryption. The full 16-byte outer header is used
2643    /// as AAD for the AEAD construction.
2644    ///
2645    /// This is the standard path for sending any link-layer control message
2646    /// to a peer over their encrypted Noise session.
2647    pub(super) async fn send_encrypted_link_message(
2648        &mut self,
2649        node_addr: &NodeAddr,
2650        plaintext: &[u8],
2651    ) -> Result<(), NodeError> {
2652        self.send_encrypted_link_message_with_ce(node_addr, plaintext, false)
2653            .await
2654    }
2655
2656    /// Update the local-outbound-broken signal from a `transport.send`
2657    /// outcome. Sets `last_local_send_failure_at` on local-side io
2658    /// errors (NetworkUnreachable / HostUnreachable / AddrNotAvailable);
2659    /// clears it on success. The reaper consults this in
2660    /// `check_link_heartbeats` to switch to `fast_link_dead_timeout_secs`.
2661    pub(in crate::node) fn note_local_send_outcome(
2662        &mut self,
2663        result: &Result<usize, TransportError>,
2664    ) {
2665        match result {
2666            Ok(_) => {
2667                if self.last_local_send_failure_at.is_some() {
2668                    self.last_local_send_failure_at = None;
2669                }
2670            }
2671            Err(TransportError::Io(e))
2672                if matches!(
2673                    e.kind(),
2674                    std::io::ErrorKind::NetworkUnreachable
2675                        | std::io::ErrorKind::HostUnreachable
2676                        | std::io::ErrorKind::AddrNotAvailable
2677                ) =>
2678            {
2679                self.last_local_send_failure_at = Some(std::time::Instant::now());
2680            }
2681            Err(_) => {}
2682        }
2683    }
2684
2685    /// Returns the wall-clock instant of the most recent observed local
2686    /// outbound failure, if any. Used by the link-dead reaper.
2687    pub(in crate::node) fn last_local_send_failure_at(&self) -> Option<std::time::Instant> {
2688        self.last_local_send_failure_at
2689    }
2690
2691    /// Like `send_encrypted_link_message` but allows setting the FMP CE flag.
2692    ///
2693    /// Used by the forwarding path to relay congestion signals hop-by-hop.
2694    pub(super) async fn send_encrypted_link_message_with_ce(
2695        &mut self,
2696        node_addr: &NodeAddr,
2697        plaintext: &[u8],
2698        ce_flag: bool,
2699    ) -> Result<(), NodeError> {
2700        let peer = self
2701            .peers
2702            .get_mut(node_addr)
2703            .ok_or(NodeError::PeerNotFound(*node_addr))?;
2704
2705        let their_index = peer.their_index().ok_or_else(|| NodeError::SendFailed {
2706            node_addr: *node_addr,
2707            reason: "no their_index".into(),
2708        })?;
2709        let transport_id = peer.transport_id().ok_or_else(|| NodeError::SendFailed {
2710            node_addr: *node_addr,
2711            reason: "no transport_id".into(),
2712        })?;
2713        let remote_addr = peer
2714            .current_addr()
2715            .cloned()
2716            .ok_or_else(|| NodeError::SendFailed {
2717                node_addr: *node_addr,
2718                reason: "no current_addr".into(),
2719            })?;
2720        #[cfg(any(target_os = "linux", target_os = "macos"))]
2721        let connected_socket = peer.connected_udp();
2722
2723        // Prepend 4-byte session-relative timestamp (inner header)
2724        let timestamp_ms = peer.session_elapsed_ms();
2725
2726        // MMP: read spin bit value before entering session borrow
2727        let sp_flag = peer.mmp().map(|mmp| mmp.spin_bit.tx_bit()).unwrap_or(false);
2728        let mut flags = if sp_flag { FLAG_SP } else { 0 };
2729        if ce_flag {
2730            flags |= FLAG_CE;
2731        }
2732        if peer.current_k_bit() {
2733            flags |= FLAG_KEY_EPOCH;
2734        }
2735
2736        let session = peer
2737            .noise_session_mut()
2738            .ok_or_else(|| NodeError::SendFailed {
2739                node_addr: *node_addr,
2740                reason: "no noise session".into(),
2741            })?;
2742
2743        // Build 16-byte outer header upfront. The inner-plaintext
2744        // layout is `[ts:4 LE][plaintext...]`, so its length is exactly
2745        // `INNER_TS_LEN + plaintext.len()` — no need to build the Vec
2746        // just to measure it. The worker path uses this length to size
2747        // the wire buffer directly; the legacy path below still
2748        // materialises a separate `inner_plaintext` Vec for the inline
2749        // encrypt-and-send call.
2750        const INNER_TS_LEN: usize = 4;
2751        let counter = session.current_send_counter();
2752        let inner_len = INNER_TS_LEN + plaintext.len();
2753        let payload_len = inner_len as u16;
2754        let header = build_established_header(their_index, counter, flags, payload_len);
2755
2756        // **UDP send fast path.** The encrypt-worker pool is always
2757        // spawned at lifecycle start (workers = num_cpus) in
2758        // production, so this branch is taken for every authentic
2759        // send on every UDP-transported established session. The
2760        // AEAD work + sendmsg syscall run on a dedicated OS thread;
2761        // the rx_loop only builds the wire buffer + reserves the
2762        // counter inline.
2763        //
2764        // Other transport kinds (BLE, TCP, sim, ethernet) fall
2765        // through to the inline encrypt + transport.send path
2766        // below — those don't have raw-fd / sendmmsg / UDP_GSO
2767        // benefits to expose through the worker pool, so the simpler
2768        // synchronous send is the right shape for them.
2769        //
2770        // The `encrypt_workers.is_some()` check below is true in
2771        // production (lifecycle::start spawns the pool); it stays
2772        // checked rather than `expect()`-ed because unit tests
2773        // construct `Node` without calling `start()`.
2774        let transport_for_send = self
2775            .transports
2776            .get(&transport_id)
2777            .ok_or(NodeError::TransportNotFound(transport_id))?;
2778        let is_udp = matches!(transport_for_send, TransportHandle::Udp(_));
2779        if let Some(workers) = self.encrypt_workers.as_ref().cloned()
2780            && is_udp
2781            && let Some(cipher_clone) = session.send_cipher_clone()
2782        {
2783            {
2784                // Reserve the counter on the session so subsequent
2785                // sends don't reuse it. `current_send_counter` only
2786                // peeks; we advance via `take_send_counter`.
2787                let reserved_counter =
2788                    session
2789                        .take_send_counter()
2790                        .map_err(|e| NodeError::SendFailed {
2791                            node_addr: *node_addr,
2792                            reason: format!("counter reservation failed: {}", e),
2793                        })?;
2794                debug_assert_eq!(reserved_counter, counter);
2795                // Re-derive the header with the now-locked-in counter
2796                // value (same value, but the call sequence is more
2797                // explicit).
2798                let header =
2799                    build_established_header(their_index, reserved_counter, flags, payload_len);
2800                let transport = transport_for_send;
2801                // Snapshot the per-peer connected UDP socket before
2802                // resolving the fallback address. On the established
2803                // steady-state path this socket already carries the
2804                // kernel peer address, so re-parsing the configured
2805                // transport address and touching the DNS cache on every
2806                // packet is pure overhead on the sender hot path.
2807                let send_target = {
2808                    if let TransportHandle::Udp(udp) = transport {
2809                        let socket_addr = {
2810                            #[cfg(any(target_os = "linux", target_os = "macos"))]
2811                            {
2812                                match connected_socket.as_ref() {
2813                                    Some(socket) => Some(socket.peer_addr()),
2814                                    None => udp.resolve_for_off_task(&remote_addr).await.ok(),
2815                                }
2816                            }
2817                            #[cfg(not(any(target_os = "linux", target_os = "macos")))]
2818                            {
2819                                udp.resolve_for_off_task(&remote_addr).await.ok()
2820                            }
2821                        };
2822                        match (udp.async_socket(), socket_addr) {
2823                            (Some(socket), Some(socket_addr)) => Some((socket, socket_addr)),
2824                            _ => None,
2825                        }
2826                    } else {
2827                        None
2828                    }
2829                };
2830                if let Some((socket, socket_addr)) = send_target {
2831                    // Build the wire buffer **directly** from
2832                    // `plaintext` with a single allocation:
2833                    //   `[16 header][4 ts][plaintext...]` with
2834                    // +16 trailing capacity for the AEAD tag.
2835                    // The worker seals `wire_buf[16..]` in
2836                    // place and appends the tag — no second
2837                    // alloc, no second memcpy.
2838                    //
2839                    // Previous design built `inner_plaintext`
2840                    // via `prepend_inner_header` (1 alloc + 1
2841                    // copy) and then let the worker memcpy
2842                    // header + plaintext into a fresh Vec
2843                    // (another alloc + copy). At ~100 kpps the
2844                    // saved alloc/copy is ~150 MB/sec of memory
2845                    // bandwidth on the hot rx_loop + worker.
2846                    let wire_capacity = ESTABLISHED_HEADER_SIZE + inner_len + 16;
2847                    let mut wire_buf = Vec::with_capacity(wire_capacity);
2848                    wire_buf.extend_from_slice(&header);
2849                    wire_buf.extend_from_slice(&timestamp_ms.to_le_bytes());
2850                    wire_buf.extend_from_slice(plaintext);
2851                    let predicted_bytes = wire_capacity;
2852                    // Stats / MMP update inline — predicted size
2853                    // is exact for ChaCha20-Poly1305 (tag is
2854                    // constant 16 bytes). When `connected_socket` is
2855                    // `Some`, the worker sends on it without a
2856                    // destination sockaddr — the kernel skips the
2857                    // per-packet sockaddr + route + neighbor resolve.
2858                    if let Some(peer) = self.peers.get_mut(node_addr) {
2859                        peer.link_stats_mut().record_sent(predicted_bytes);
2860                        if let Some(mmp) = peer.mmp_mut() {
2861                            mmp.sender
2862                                .record_sent(reserved_counter, timestamp_ms, predicted_bytes);
2863                        }
2864                    }
2865                    workers.dispatch(self::encrypt_worker::FmpSendJob {
2866                        cipher: cipher_clone,
2867                        counter: reserved_counter,
2868                        wire_buf,
2869                        fsp_seal: None,
2870                        socket,
2871                        dest_addr: socket_addr,
2872                        #[cfg(any(target_os = "linux", target_os = "macos"))]
2873                        connected_socket,
2874                        drop_on_backpressure: plaintext
2875                            .first()
2876                            .is_some_and(|ty| *ty == SessionMessageType::EndpointData.to_byte()),
2877                        queued_at: crate::perf_profile::stamp(),
2878                    });
2879                    return Ok(());
2880                }
2881            }
2882        }
2883
2884        // Inline (legacy) path: encrypt + send on the rx_loop.
2885        // Build the inner plaintext lazily here — the worker path
2886        // above never reaches this point, so the prepend_inner_header
2887        // alloc is avoided in the fast path.
2888        let inner_plaintext = prepend_inner_header(timestamp_ms, plaintext);
2889        // Encrypt with AAD binding to the outer header
2890        let ciphertext = {
2891            let _t = crate::perf_profile::Timer::start(crate::perf_profile::Stage::FmpEncrypt);
2892            session
2893                .encrypt_with_aad(&inner_plaintext, &header)
2894                .map_err(|e| NodeError::SendFailed {
2895                    node_addr: *node_addr,
2896                    reason: format!("encryption failed: {}", e),
2897                })?
2898        };
2899
2900        let wire_packet = build_encrypted(&header, &ciphertext);
2901
2902        // Re-borrow peer for stats update after sending
2903        let send_result = {
2904            let _t = crate::perf_profile::Timer::start(crate::perf_profile::Stage::UdpSend);
2905            let transport = self
2906                .transports
2907                .get(&transport_id)
2908                .ok_or(NodeError::TransportNotFound(transport_id))?;
2909            transport.send(&remote_addr, &wire_packet).await
2910        };
2911        self.note_local_send_outcome(&send_result);
2912        let bytes_sent = send_result.map_err(|e| match e {
2913            TransportError::MtuExceeded { packet_size, mtu } => NodeError::MtuExceeded {
2914                node_addr: *node_addr,
2915                packet_size,
2916                mtu,
2917            },
2918            other => NodeError::SendFailed {
2919                node_addr: *node_addr,
2920                reason: format!("transport send: {}", other),
2921            },
2922        })?;
2923
2924        // Update send statistics
2925        if let Some(peer) = self.peers.get_mut(node_addr) {
2926            peer.link_stats_mut().record_sent(bytes_sent);
2927            // MMP: record sent frame for sender report generation
2928            if let Some(mmp) = peer.mmp_mut() {
2929                mmp.sender.record_sent(counter, timestamp_ms, bytes_sent);
2930            }
2931        }
2932
2933        Ok(())
2934    }
2935}
2936
2937impl fmt::Debug for Node {
2938    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2939        f.debug_struct("Node")
2940            .field("node_addr", self.node_addr())
2941            .field("state", &self.state)
2942            .field("is_leaf_only", &self.is_leaf_only)
2943            .field("connections", &self.connection_count())
2944            .field("peers", &self.peer_count())
2945            .field("links", &self.link_count())
2946            .field("transports", &self.transport_count())
2947            .finish()
2948    }
2949}