1mod acl;
8mod bloom;
9mod decrypt_worker;
10mod discovery_rate_limit;
11mod encrypt_worker;
12mod handlers;
13mod lifecycle;
14mod rate_limit;
15mod retry;
16mod routing;
17mod routing_error_rate_limit;
18pub(crate) mod session;
19pub(crate) mod session_wire;
20pub(crate) mod stats;
21pub(crate) mod stats_history;
22#[cfg(test)]
23mod tests;
24mod tree;
25pub(crate) mod wire;
26
27use self::discovery_rate_limit::{DiscoveryBackoff, DiscoveryForwardRateLimiter};
28use self::rate_limit::HandshakeRateLimiter;
29use self::routing::{LearnedRouteTable, LearnedRouteTableSnapshot};
30use self::routing_error_rate_limit::RoutingErrorRateLimiter;
31use self::wire::{
32 ESTABLISHED_HEADER_SIZE, FLAG_CE, FLAG_KEY_EPOCH, FLAG_SP, build_encrypted,
33 build_established_header, prepend_inner_header,
34};
35use crate::bloom::BloomState;
36use crate::cache::CoordCache;
37use crate::config::{NostrDiscoveryPolicy, PeerConfig, RoutingMode};
38use crate::node::session::SessionEntry;
39use crate::node::session_wire::{FSP_PHASE_ESTABLISHED, FspCommonPrefix};
40use crate::peer::{ActivePeer, PeerConnection};
41#[cfg(any(target_os = "linux", target_os = "macos"))]
42use crate::transport::ethernet::EthernetTransport;
43use crate::transport::tcp::TcpTransport;
44use crate::transport::tor::TorTransport;
45use crate::transport::udp::UdpTransport;
46#[cfg(feature = "webrtc-transport")]
47use crate::transport::webrtc::WebRtcTransport;
48use crate::transport::{
49 ConnectionState, Link, LinkId, PacketRx, PacketTx, TransportAddr, TransportError,
50 TransportHandle, TransportId,
51};
52use crate::tree::TreeState;
53use crate::upper::hosts::HostMap;
54use crate::upper::icmp_rate_limit::IcmpRateLimiter;
55use crate::upper::tun::{TunError, TunOutboundRx, TunState, TunTx};
56use crate::utils::index::IndexAllocator;
57use crate::{
58 Config, ConfigError, FipsAddress, Identity, IdentityError, LinkMessageType, NodeAddr,
59 PeerIdentity, encode_npub,
60};
61use rand::Rng;
62use std::collections::{HashMap, HashSet, VecDeque};
63use std::fmt;
64use std::sync::Arc;
65use std::thread::JoinHandle;
66use thiserror::Error;
67use tracing::{debug, warn};
68
69const LOCAL_SEND_FAILURE_FAST_DEAD_WINDOW: std::time::Duration = std::time::Duration::from_secs(3);
70const SESSION_DIRECT_DEGRADED_HOLD_MS: u64 = 20_000;
71const SESSION_DIRECT_DEGRADED_MIN_SAMPLE: u64 = 16;
72const SESSION_DIRECT_DEGRADED_LOSS_THRESHOLD: f64 = 0.08;
73const SESSION_DIRECT_RECOVERY_LOSS_THRESHOLD: f64 = 0.02;
74const ROUTING_FALLBACK_MIN_COST_ADVANTAGE: f64 = 0.25;
75
76fn fmp_plaintext_is_bulk_session_datagram(plaintext: &[u8]) -> bool {
77 if !plaintext
78 .first()
79 .is_some_and(|ty| *ty == LinkMessageType::SessionDatagram.to_byte())
80 {
81 return false;
82 }
83 let Some(fsp_payload) = plaintext.get(crate::protocol::SESSION_DATAGRAM_HEADER_SIZE..) else {
84 return false;
85 };
86 FspCommonPrefix::parse(fsp_payload)
87 .is_some_and(|prefix| prefix.phase == FSP_PHASE_ESTABLISHED && !prefix.is_unencrypted())
88}
89
90pub(crate) const REKEY_JITTER_SECS: i64 = 15;
97
98#[derive(Debug, Error)]
100pub enum NodeError {
101 #[error("node not started")]
102 NotStarted,
103
104 #[error("node already started")]
105 AlreadyStarted,
106
107 #[error("node already stopped")]
108 AlreadyStopped,
109
110 #[error("transport not found: {0}")]
111 TransportNotFound(TransportId),
112
113 #[error("no transport available for type: {0}")]
114 NoTransportForType(String),
115
116 #[error("link not found: {0}")]
117 LinkNotFound(LinkId),
118
119 #[error("connection not found: {0}")]
120 ConnectionNotFound(LinkId),
121
122 #[error("peer not found: {0:?}")]
123 PeerNotFound(NodeAddr),
124
125 #[error("peer already exists: {0:?}")]
126 PeerAlreadyExists(NodeAddr),
127
128 #[error("connection already exists for link: {0}")]
129 ConnectionAlreadyExists(LinkId),
130
131 #[error("invalid peer npub '{npub}': {reason}")]
132 InvalidPeerNpub { npub: String, reason: String },
133
134 #[error("discovery error: {0}")]
135 Discovery(String),
136
137 #[error("access denied: {0}")]
138 AccessDenied(String),
139
140 #[error("max connections exceeded: {max}")]
141 MaxConnectionsExceeded { max: usize },
142
143 #[error("max peers exceeded: {max}")]
144 MaxPeersExceeded { max: usize },
145
146 #[error("max links exceeded: {max}")]
147 MaxLinksExceeded { max: usize },
148
149 #[error("handshake incomplete for link {0}")]
150 HandshakeIncomplete(LinkId),
151
152 #[error("no session available for link {0}")]
153 NoSession(LinkId),
154
155 #[error("promotion failed for link {link_id}: {reason}")]
156 PromotionFailed { link_id: LinkId, reason: String },
157
158 #[error("send failed to {node_addr}: {reason}")]
159 SendFailed { node_addr: NodeAddr, reason: String },
160
161 #[error("mtu exceeded forwarding to {node_addr}: packet {packet_size} > mtu {mtu}")]
162 MtuExceeded {
163 node_addr: NodeAddr,
164 packet_size: usize,
165 mtu: u16,
166 },
167
168 #[error("config error: {0}")]
169 Config(#[from] ConfigError),
170
171 #[error("identity error: {0}")]
172 Identity(#[from] IdentityError),
173
174 #[error("TUN error: {0}")]
175 Tun(#[from] TunError),
176
177 #[error("index allocation failed: {0}")]
178 IndexAllocationFailed(String),
179
180 #[error("handshake failed: {0}")]
181 HandshakeFailed(String),
182
183 #[error("transport error: {0}")]
184 TransportError(String),
185
186 #[error("local route unavailable: {0}")]
187 LocalRouteUnavailable(String),
188
189 #[error("bootstrap handoff failed: {0}")]
190 BootstrapHandoff(String),
191}
192
193impl NodeError {
194 pub(in crate::node) fn from_transport_error(error: TransportError) -> Self {
195 if error.is_local_route_unavailable() {
196 Self::LocalRouteUnavailable(error.to_string())
197 } else {
198 Self::TransportError(error.to_string())
199 }
200 }
201
202 pub(in crate::node) fn is_local_route_unavailable(&self) -> bool {
203 matches!(self, Self::LocalRouteUnavailable(_))
204 }
205}
206
207#[derive(Debug, Clone, PartialEq, Eq)]
209pub struct NodeDeliveredPacket {
210 pub source_node_addr: NodeAddr,
212 pub source_npub: Option<String>,
214 pub destination: FipsAddress,
216 pub packet: Vec<u8>,
218}
219
220#[derive(Debug, Clone)]
221struct IdentityCacheEntry {
222 node_addr: NodeAddr,
223 pubkey: secp256k1::PublicKey,
224 npub: String,
225 last_seen_ms: u64,
226}
227
228impl IdentityCacheEntry {
229 fn new(
230 node_addr: NodeAddr,
231 pubkey: secp256k1::PublicKey,
232 npub: String,
233 last_seen_ms: u64,
234 ) -> Self {
235 Self {
236 node_addr,
237 pubkey,
238 npub,
239 last_seen_ms,
240 }
241 }
242}
243
244#[derive(Debug)]
246pub struct ExternalPacketIo {
247 pub outbound_tx: crate::upper::tun::TunOutboundTx,
249 pub inbound_rx: tokio::sync::mpsc::Receiver<NodeDeliveredPacket>,
251}
252
253#[derive(Debug)]
255pub(crate) struct EndpointDataIo {
256 pub(crate) command_tx: tokio::sync::mpsc::Sender<NodeEndpointCommand>,
265 pub(crate) event_rx: tokio::sync::mpsc::UnboundedReceiver<NodeEndpointEvent>,
275 pub(crate) event_tx: tokio::sync::mpsc::UnboundedSender<NodeEndpointEvent>,
281}
282
283fn endpoint_data_command_capacity(requested: usize) -> usize {
284 if let Ok(raw) = std::env::var("FIPS_ENDPOINT_DATA_QUEUE_CAP")
285 && let Ok(value) = raw.trim().parse::<usize>()
286 && value > 0
287 {
288 return value;
289 }
290
291 requested.max(1).max(32_768)
292}
293
294#[derive(Debug)]
296pub(crate) enum NodeEndpointCommand {
297 Send {
301 remote: PeerIdentity,
302 payload: Vec<u8>,
303 queued_at: Option<std::time::Instant>,
304 response_tx: tokio::sync::oneshot::Sender<Result<(), NodeError>>,
305 },
306 SendOneway {
312 remote: PeerIdentity,
313 payload: Vec<u8>,
314 queued_at: Option<std::time::Instant>,
315 },
316 PeerSnapshot {
317 response_tx: tokio::sync::oneshot::Sender<Vec<NodeEndpointPeer>>,
318 },
319 RelaySnapshot {
320 response_tx: tokio::sync::oneshot::Sender<Vec<NodeEndpointRelayStatus>>,
321 },
322 UpdateRelays {
323 advert_relays: Vec<String>,
324 dm_relays: Vec<String>,
325 response_tx: tokio::sync::oneshot::Sender<Result<(), NodeError>>,
326 },
327 UpdatePeers {
333 peers: Vec<crate::config::PeerConfig>,
334 response_tx: tokio::sync::oneshot::Sender<Result<UpdatePeersOutcome, NodeError>>,
335 },
336}
337
338#[derive(Debug, Clone, Default, PartialEq, Eq)]
340pub(crate) struct UpdatePeersOutcome {
341 pub(crate) added: usize,
342 pub(crate) removed: usize,
343 pub(crate) updated: usize,
344 pub(crate) unchanged: usize,
345}
346
347#[derive(Debug)]
349pub(crate) enum NodeEndpointEvent {
350 Data {
351 source_node_addr: NodeAddr,
352 source_npub: Option<String>,
353 payload: Vec<u8>,
354 queued_at: Option<std::time::Instant>,
355 },
356}
357
358#[derive(Debug, Clone, PartialEq, Eq)]
360pub(crate) struct NodeEndpointPeer {
361 pub(crate) npub: String,
362 pub(crate) connected: bool,
363 pub(crate) transport_addr: Option<String>,
364 pub(crate) transport_type: Option<String>,
365 pub(crate) link_id: u64,
366 pub(crate) srtt_ms: Option<u64>,
367 pub(crate) packets_sent: u64,
368 pub(crate) packets_recv: u64,
369 pub(crate) bytes_sent: u64,
370 pub(crate) bytes_recv: u64,
371 pub(crate) direct_probe_pending: bool,
372 pub(crate) direct_probe_after_ms: Option<u64>,
373}
374
375#[derive(Debug, Clone, PartialEq, Eq)]
377pub(crate) struct NodeEndpointRelayStatus {
378 pub(crate) url: String,
379 pub(crate) status: String,
380}
381
382#[derive(Clone, Copy, Debug, PartialEq, Eq)]
384pub enum NodeState {
385 Created,
387 Starting,
389 Running,
391 Stopping,
393 Stopped,
395}
396
397impl NodeState {
398 pub fn is_operational(&self) -> bool {
400 matches!(self, NodeState::Running)
401 }
402
403 pub fn can_start(&self) -> bool {
405 matches!(self, NodeState::Created | NodeState::Stopped)
406 }
407
408 pub fn can_stop(&self) -> bool {
410 matches!(self, NodeState::Running)
411 }
412}
413
414impl fmt::Display for NodeState {
415 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
416 let s = match self {
417 NodeState::Created => "created",
418 NodeState::Starting => "starting",
419 NodeState::Running => "running",
420 NodeState::Stopping => "stopping",
421 NodeState::Stopped => "stopped",
422 };
423 write!(f, "{}", s)
424 }
425}
426
427#[derive(Clone, Debug)]
434pub(crate) struct RecentRequest {
435 pub(crate) from_peer: NodeAddr,
437 pub(crate) timestamp_ms: u64,
439 pub(crate) response_forwarded: bool,
443}
444
445impl RecentRequest {
446 pub(crate) fn new(from_peer: NodeAddr, timestamp_ms: u64) -> Self {
447 Self {
448 from_peer,
449 timestamp_ms,
450 response_forwarded: false,
451 }
452 }
453
454 pub(crate) fn is_expired(&self, current_time_ms: u64, expiry_ms: u64) -> bool {
456 current_time_ms.saturating_sub(self.timestamp_ms) > expiry_ms
457 }
458}
459
460type AddrKey = (TransportId, TransportAddr);
462
463#[derive(Debug, Default)]
468struct TransportDropState {
469 prev_drops: u64,
471 dropping: bool,
473}
474
475struct PendingConnect {
481 link_id: LinkId,
483 transport_id: TransportId,
485 remote_addr: TransportAddr,
487 peer_identity: PeerIdentity,
489}
490
491pub struct Node {
505 identity: Identity,
508
509 startup_epoch: [u8; 8],
512
513 started_at: std::time::Instant,
515
516 config: Config,
519
520 state: NodeState,
523
524 is_leaf_only: bool,
526
527 tree_state: TreeState,
530
531 bloom_state: BloomState,
534
535 coord_cache: CoordCache,
538 learned_routes: LearnedRouteTable,
540 session_direct_degraded_until_ms: HashMap<NodeAddr, u64>,
543 recent_requests: HashMap<u64, RecentRequest>,
546 path_mtu_lookup: Arc<std::sync::RwLock<HashMap<crate::FipsAddress, u16>>>,
552
553 transports: HashMap<TransportId, TransportHandle>,
556 transport_drops: HashMap<TransportId, TransportDropState>,
558 links: HashMap<LinkId, Link>,
560 addr_to_link: HashMap<AddrKey, LinkId>,
562
563 packet_tx: Option<PacketTx>,
566 packet_rx: Option<PacketRx>,
568
569 connections: HashMap<LinkId, PeerConnection>,
573
574 peers: HashMap<NodeAddr, ActivePeer>,
578
579 sessions: HashMap<NodeAddr, SessionEntry>,
583
584 identity_cache: HashMap<[u8; 15], IdentityCacheEntry>,
588
589 pending_tun_packets: HashMap<NodeAddr, VecDeque<Vec<u8>>>,
593 pending_endpoint_data: HashMap<NodeAddr, VecDeque<Vec<u8>>>,
595 pending_lookups: HashMap<NodeAddr, handlers::discovery::PendingLookup>,
599
600 max_connections: usize,
603 max_peers: usize,
605 max_links: usize,
607
608 next_link_id: u64,
611 next_transport_id: u32,
613
614 stats: stats::NodeStats,
617
618 stats_history: stats_history::StatsHistory,
620
621 tun_state: TunState,
624 tun_name: Option<String>,
626 tun_tx: Option<TunTx>,
628 tun_outbound_rx: Option<TunOutboundRx>,
630 external_packet_tx: Option<tokio::sync::mpsc::Sender<NodeDeliveredPacket>>,
632 endpoint_command_rx: Option<tokio::sync::mpsc::Receiver<NodeEndpointCommand>>,
634 endpoint_event_tx: Option<tokio::sync::mpsc::UnboundedSender<NodeEndpointEvent>>,
636 encrypt_workers: Option<encrypt_worker::EncryptWorkerPool>,
642 decrypt_workers: Option<decrypt_worker::DecryptWorkerPool>,
645 decrypt_registered_sessions: std::collections::HashSet<(TransportId, u32)>,
654 decrypt_fallback_rx:
658 Option<tokio::sync::mpsc::UnboundedReceiver<decrypt_worker::DecryptWorkerEvent>>,
659 decrypt_fallback_tx: tokio::sync::mpsc::UnboundedSender<decrypt_worker::DecryptWorkerEvent>,
660 tun_reader_handle: Option<JoinHandle<()>>,
662 tun_writer_handle: Option<JoinHandle<()>>,
664 #[cfg(target_os = "macos")]
667 tun_shutdown_fd: Option<std::os::unix::io::RawFd>,
668
669 dns_identity_rx: Option<crate::upper::dns::DnsIdentityRx>,
672 dns_task: Option<tokio::task::JoinHandle<()>>,
674
675 index_allocator: IndexAllocator,
678 peers_by_index: HashMap<(TransportId, u32), NodeAddr>,
681 pending_outbound: HashMap<(TransportId, u32), LinkId>,
684
685 msg1_rate_limiter: HandshakeRateLimiter,
688 icmp_rate_limiter: IcmpRateLimiter,
690 routing_error_rate_limiter: RoutingErrorRateLimiter,
692 coords_response_rate_limiter: RoutingErrorRateLimiter,
694 discovery_backoff: DiscoveryBackoff,
696 discovery_forward_limiter: DiscoveryForwardRateLimiter,
698
699 pending_connects: Vec<PendingConnect>,
705
706 retry_pending: HashMap<NodeAddr, retry::RetryState>,
712
713 nostr_discovery: Option<Arc<crate::discovery::nostr::NostrDiscovery>>,
715 lan_discovery: Option<Arc<crate::discovery::lan::LanDiscovery>>,
720 local_instance_registry: Option<crate::discovery::local::LocalInstanceRegistry>,
724 local_instance_started_at_ms: Option<u64>,
725 last_local_instance_publish_ms: Option<u64>,
726 last_local_instance_scan_ms: Option<u64>,
727 nostr_discovery_started_at_ms: Option<u64>,
732 startup_open_discovery_sweep_done: bool,
736 bootstrap_transports: HashSet<TransportId>,
738 bootstrap_transport_npubs: HashMap<TransportId, String>,
745 discovery_fallback_transit_blocked_peers: HashSet<NodeAddr>,
748
749 last_parent_reeval: Option<crate::time::Instant>,
752
753 last_congestion_log: Option<std::time::Instant>,
756
757 estimated_mesh_size: Option<u64>,
760 last_mesh_size_log: Option<std::time::Instant>,
762
763 last_self_warn: Option<std::time::Instant>,
769
770 last_local_send_failure_at: Option<std::time::Instant>,
778 last_rx_loop_maintenance_timeout_at: Option<std::time::Instant>,
783
784 peer_aliases: HashMap<NodeAddr, String>,
788 configured_peer_send_weights: HashMap<NodeAddr, u8>,
791
792 peer_acl: acl::PeerAclReloader,
794
795 host_map: Arc<HostMap>,
799}
800
801impl Node {
802 pub fn new(config: Config) -> Result<Self, NodeError> {
804 config.validate()?;
805 let identity = config.create_identity()?;
806 let node_addr = *identity.node_addr();
807 let is_leaf_only = config.is_leaf_only();
808
809 let (decrypt_fallback_tx, decrypt_fallback_rx) = tokio::sync::mpsc::unbounded_channel();
810 let decrypt_fallback_rx = Some(decrypt_fallback_rx);
811
812 let mut startup_epoch = [0u8; 8];
813 rand::rng().fill_bytes(&mut startup_epoch);
814
815 let mut bloom_state = if is_leaf_only {
816 BloomState::leaf_only(node_addr)
817 } else {
818 BloomState::new(node_addr)
819 };
820 bloom_state.set_update_debounce_ms(config.node.bloom.update_debounce_ms);
821
822 let tun_state = if config.tun.enabled {
823 TunState::Configured
824 } else {
825 TunState::Disabled
826 };
827
828 let mut tree_state = TreeState::new(node_addr);
830 tree_state.set_parent_hysteresis(config.node.tree.parent_hysteresis);
831 tree_state.set_hold_down(config.node.tree.hold_down_secs);
832 tree_state.set_flap_dampening(
833 config.node.tree.flap_threshold,
834 config.node.tree.flap_window_secs,
835 config.node.tree.flap_dampening_secs,
836 );
837 tree_state
838 .sign_declaration(&identity)
839 .expect("signing own declaration should never fail");
840
841 let coord_cache = CoordCache::new(
842 config.node.cache.coord_size,
843 config.node.cache.coord_ttl_secs * 1000,
844 );
845 let rl = &config.node.rate_limit;
846 let msg1_rate_limiter = HandshakeRateLimiter::with_params(
847 rate_limit::TokenBucket::with_params(rl.handshake_burst, rl.handshake_rate),
848 config.node.limits.max_pending_inbound,
849 );
850
851 let max_connections = config.node.limits.max_connections;
852 let max_peers = config.node.limits.max_peers;
853 let max_links = config.node.limits.max_links;
854 let coords_response_interval_ms = config.node.session.coords_response_interval_ms;
855 let backoff_base_secs = config.node.discovery.backoff_base_secs;
856 let backoff_max_secs = config.node.discovery.backoff_max_secs;
857 let forward_min_interval_secs = config.node.discovery.forward_min_interval_secs;
858
859 let (host_map, peer_acl) = Self::host_map_and_peer_acl(&config);
860 let configured_peer_send_weights = Self::configured_peer_send_weights(&config);
861
862 Ok(Self {
863 identity,
864 startup_epoch,
865 started_at: std::time::Instant::now(),
866 config,
867 state: NodeState::Created,
868 is_leaf_only,
869 tree_state,
870 bloom_state,
871 coord_cache,
872 learned_routes: LearnedRouteTable::default(),
873 session_direct_degraded_until_ms: HashMap::new(),
874 recent_requests: HashMap::new(),
875 transports: HashMap::new(),
876 transport_drops: HashMap::new(),
877 links: HashMap::new(),
878 addr_to_link: HashMap::new(),
879 packet_tx: None,
880 packet_rx: None,
881 connections: HashMap::new(),
882 peers: HashMap::new(),
883 sessions: HashMap::new(),
884 identity_cache: HashMap::new(),
885 pending_tun_packets: HashMap::new(),
886 pending_endpoint_data: HashMap::new(),
887 pending_lookups: HashMap::new(),
888 max_connections,
889 max_peers,
890 max_links,
891 next_link_id: 1,
892 next_transport_id: 1,
893 stats: stats::NodeStats::new(),
894 stats_history: stats_history::StatsHistory::new(),
895 tun_state,
896 tun_name: None,
897 tun_tx: None,
898 tun_outbound_rx: None,
899 external_packet_tx: None,
900 endpoint_command_rx: None,
901 endpoint_event_tx: None,
902 encrypt_workers: None,
903 decrypt_workers: None,
904 decrypt_registered_sessions: std::collections::HashSet::new(),
905 decrypt_fallback_tx,
906 decrypt_fallback_rx,
907 tun_reader_handle: None,
908 tun_writer_handle: None,
909 #[cfg(target_os = "macos")]
910 tun_shutdown_fd: None,
911 dns_identity_rx: None,
912 dns_task: None,
913 index_allocator: IndexAllocator::new(),
914 peers_by_index: HashMap::new(),
915 pending_outbound: HashMap::new(),
916 msg1_rate_limiter,
917 icmp_rate_limiter: IcmpRateLimiter::new(),
918 routing_error_rate_limiter: RoutingErrorRateLimiter::new(),
919 coords_response_rate_limiter: RoutingErrorRateLimiter::with_interval(
920 std::time::Duration::from_millis(coords_response_interval_ms),
921 ),
922 discovery_backoff: DiscoveryBackoff::with_params(backoff_base_secs, backoff_max_secs),
923 discovery_forward_limiter: DiscoveryForwardRateLimiter::with_interval(
924 std::time::Duration::from_secs(forward_min_interval_secs),
925 ),
926 pending_connects: Vec::new(),
927 retry_pending: HashMap::new(),
928 nostr_discovery: None,
929 nostr_discovery_started_at_ms: None,
930 lan_discovery: None,
931 local_instance_registry: None,
932 local_instance_started_at_ms: None,
933 last_local_instance_publish_ms: None,
934 last_local_instance_scan_ms: None,
935 startup_open_discovery_sweep_done: false,
936 bootstrap_transports: HashSet::new(),
937 bootstrap_transport_npubs: HashMap::new(),
938 discovery_fallback_transit_blocked_peers: HashSet::new(),
939 last_parent_reeval: None,
940 last_congestion_log: None,
941 estimated_mesh_size: None,
942 last_mesh_size_log: None,
943 last_self_warn: None,
944 last_local_send_failure_at: None,
945 last_rx_loop_maintenance_timeout_at: None,
946 peer_aliases: HashMap::new(),
947 configured_peer_send_weights,
948 peer_acl,
949 host_map,
950 path_mtu_lookup: Arc::new(std::sync::RwLock::new(HashMap::new())),
951 })
952 }
953
954 pub fn with_identity(identity: Identity, config: Config) -> Result<Self, NodeError> {
959 config.validate()?;
960 let node_addr = *identity.node_addr();
961
962 let (decrypt_fallback_tx, decrypt_fallback_rx) = tokio::sync::mpsc::unbounded_channel();
963 let decrypt_fallback_rx = Some(decrypt_fallback_rx);
964
965 let mut startup_epoch = [0u8; 8];
966 rand::rng().fill_bytes(&mut startup_epoch);
967
968 let tun_state = if config.tun.enabled {
969 TunState::Configured
970 } else {
971 TunState::Disabled
972 };
973
974 let mut tree_state = TreeState::new(node_addr);
976 tree_state.set_parent_hysteresis(config.node.tree.parent_hysteresis);
977 tree_state.set_hold_down(config.node.tree.hold_down_secs);
978 tree_state.set_flap_dampening(
979 config.node.tree.flap_threshold,
980 config.node.tree.flap_window_secs,
981 config.node.tree.flap_dampening_secs,
982 );
983 tree_state
984 .sign_declaration(&identity)
985 .expect("signing own declaration should never fail");
986
987 let mut bloom_state = BloomState::new(node_addr);
988 bloom_state.set_update_debounce_ms(config.node.bloom.update_debounce_ms);
989
990 let coord_cache = CoordCache::new(
991 config.node.cache.coord_size,
992 config.node.cache.coord_ttl_secs * 1000,
993 );
994 let rl = &config.node.rate_limit;
995 let msg1_rate_limiter = HandshakeRateLimiter::with_params(
996 rate_limit::TokenBucket::with_params(rl.handshake_burst, rl.handshake_rate),
997 config.node.limits.max_pending_inbound,
998 );
999
1000 let max_connections = config.node.limits.max_connections;
1001 let max_peers = config.node.limits.max_peers;
1002 let max_links = config.node.limits.max_links;
1003 let coords_response_interval_ms = config.node.session.coords_response_interval_ms;
1004
1005 let (host_map, peer_acl) = Self::host_map_and_peer_acl(&config);
1006 let configured_peer_send_weights = Self::configured_peer_send_weights(&config);
1007
1008 Ok(Self {
1009 identity,
1010 startup_epoch,
1011 started_at: std::time::Instant::now(),
1012 config,
1013 state: NodeState::Created,
1014 is_leaf_only: false,
1015 tree_state,
1016 bloom_state,
1017 coord_cache,
1018 learned_routes: LearnedRouteTable::default(),
1019 session_direct_degraded_until_ms: HashMap::new(),
1020 recent_requests: HashMap::new(),
1021 transports: HashMap::new(),
1022 transport_drops: HashMap::new(),
1023 links: HashMap::new(),
1024 addr_to_link: HashMap::new(),
1025 packet_tx: None,
1026 packet_rx: None,
1027 connections: HashMap::new(),
1028 peers: HashMap::new(),
1029 sessions: HashMap::new(),
1030 identity_cache: HashMap::new(),
1031 pending_tun_packets: HashMap::new(),
1032 pending_endpoint_data: HashMap::new(),
1033 pending_lookups: HashMap::new(),
1034 max_connections,
1035 max_peers,
1036 max_links,
1037 next_link_id: 1,
1038 next_transport_id: 1,
1039 stats: stats::NodeStats::new(),
1040 stats_history: stats_history::StatsHistory::new(),
1041 tun_state,
1042 tun_name: None,
1043 tun_tx: None,
1044 tun_outbound_rx: None,
1045 external_packet_tx: None,
1046 endpoint_command_rx: None,
1047 endpoint_event_tx: None,
1048 encrypt_workers: None,
1049 decrypt_workers: None,
1050 decrypt_registered_sessions: std::collections::HashSet::new(),
1051 decrypt_fallback_tx,
1052 decrypt_fallback_rx,
1053 tun_reader_handle: None,
1054 tun_writer_handle: None,
1055 #[cfg(target_os = "macos")]
1056 tun_shutdown_fd: None,
1057 dns_identity_rx: None,
1058 dns_task: None,
1059 index_allocator: IndexAllocator::new(),
1060 peers_by_index: HashMap::new(),
1061 pending_outbound: HashMap::new(),
1062 msg1_rate_limiter,
1063 icmp_rate_limiter: IcmpRateLimiter::new(),
1064 routing_error_rate_limiter: RoutingErrorRateLimiter::new(),
1065 coords_response_rate_limiter: RoutingErrorRateLimiter::with_interval(
1066 std::time::Duration::from_millis(coords_response_interval_ms),
1067 ),
1068 discovery_backoff: DiscoveryBackoff::new(),
1069 discovery_forward_limiter: DiscoveryForwardRateLimiter::new(),
1070 pending_connects: Vec::new(),
1071 retry_pending: HashMap::new(),
1072 nostr_discovery: None,
1073 nostr_discovery_started_at_ms: None,
1074 lan_discovery: None,
1075 local_instance_registry: None,
1076 local_instance_started_at_ms: None,
1077 last_local_instance_publish_ms: None,
1078 last_local_instance_scan_ms: None,
1079 startup_open_discovery_sweep_done: false,
1080 bootstrap_transports: HashSet::new(),
1081 bootstrap_transport_npubs: HashMap::new(),
1082 discovery_fallback_transit_blocked_peers: HashSet::new(),
1083 last_parent_reeval: None,
1084 last_congestion_log: None,
1085 estimated_mesh_size: None,
1086 last_mesh_size_log: None,
1087 last_self_warn: None,
1088 last_local_send_failure_at: None,
1089 last_rx_loop_maintenance_timeout_at: None,
1090 peer_aliases: HashMap::new(),
1091 configured_peer_send_weights,
1092 peer_acl,
1093 host_map,
1094 path_mtu_lookup: Arc::new(std::sync::RwLock::new(HashMap::new())),
1095 })
1096 }
1097
1098 pub fn leaf_only(config: Config) -> Result<Self, NodeError> {
1100 let mut node = Self::new(config)?;
1101 node.is_leaf_only = true;
1102 node.bloom_state = BloomState::leaf_only(*node.identity.node_addr());
1103 Ok(node)
1104 }
1105
1106 fn host_map_and_peer_acl(config: &Config) -> (Arc<HostMap>, acl::PeerAclReloader) {
1107 let base_host_map = HostMap::from_peer_configs(config.peers());
1108 if !config.node.system_files_enabled {
1109 return (
1110 Arc::new(base_host_map.clone()),
1111 acl::PeerAclReloader::memory_only(base_host_map),
1112 );
1113 }
1114
1115 let mut host_map = base_host_map.clone();
1116 let hosts_path = std::path::PathBuf::from(crate::upper::hosts::DEFAULT_HOSTS_PATH);
1117 let hosts_file = HostMap::load_hosts_file(std::path::Path::new(
1118 crate::upper::hosts::DEFAULT_HOSTS_PATH,
1119 ));
1120 host_map.merge(hosts_file);
1121 let peer_acl = acl::PeerAclReloader::with_alias_sources(
1122 std::path::PathBuf::from(acl::DEFAULT_PEERS_ALLOW_PATH),
1123 std::path::PathBuf::from(acl::DEFAULT_PEERS_DENY_PATH),
1124 base_host_map,
1125 hosts_path,
1126 );
1127 (Arc::new(host_map), peer_acl)
1128 }
1129
1130 fn configured_peer_send_weights(config: &Config) -> HashMap<NodeAddr, u8> {
1131 config
1132 .peers()
1133 .iter()
1134 .filter_map(|peer| {
1135 PeerIdentity::from_npub(&peer.npub).ok().map(|identity| {
1136 (
1137 *identity.node_addr(),
1138 encrypt_worker::EXPLICIT_PEER_SEND_WEIGHT,
1139 )
1140 })
1141 })
1142 .collect()
1143 }
1144
1145 fn send_weight_for_peer(&self, peer_addr: &NodeAddr) -> u8 {
1146 self.configured_peer_send_weights
1147 .get(peer_addr)
1148 .copied()
1149 .unwrap_or(encrypt_worker::DEFAULT_SEND_WEIGHT)
1150 }
1151
1152 async fn create_transports(&mut self, packet_tx: &PacketTx) -> Vec<TransportHandle> {
1156 let mut transports = Vec::new();
1157
1158 let udp_instances: Vec<_> = self
1160 .config
1161 .transports
1162 .udp
1163 .iter()
1164 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1165 .collect();
1166
1167 for (name, udp_config) in udp_instances {
1169 let transport_id = self.allocate_transport_id();
1170 let udp = UdpTransport::new(transport_id, name, udp_config, packet_tx.clone());
1171 transports.push(TransportHandle::Udp(udp));
1172 }
1173
1174 #[cfg(feature = "sim-transport")]
1175 {
1176 let sim_instances: Vec<_> = self
1177 .config
1178 .transports
1179 .sim
1180 .iter()
1181 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1182 .collect();
1183
1184 for (name, sim_config) in sim_instances {
1185 let transport_id = self.allocate_transport_id();
1186 let sim = crate::transport::sim::SimTransport::new(
1187 transport_id,
1188 name,
1189 sim_config,
1190 packet_tx.clone(),
1191 );
1192 transports.push(TransportHandle::Sim(sim));
1193 }
1194 }
1195
1196 #[cfg(any(target_os = "linux", target_os = "macos"))]
1198 {
1199 let eth_instances: Vec<_> = self
1200 .config
1201 .transports
1202 .ethernet
1203 .iter()
1204 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1205 .collect();
1206 let xonly = self.identity.pubkey();
1207 for (name, eth_config) in eth_instances {
1208 let mut eth_config = eth_config;
1209 if eth_config.discovery_scope.is_none() {
1210 eth_config.discovery_scope = self.lan_discovery_scope();
1211 }
1212 let transport_id = self.allocate_transport_id();
1213 let mut eth =
1214 EthernetTransport::new(transport_id, name, eth_config, packet_tx.clone());
1215 eth.set_local_pubkey(xonly);
1216 transports.push(TransportHandle::Ethernet(eth));
1217 }
1218 }
1219
1220 let tcp_instances: Vec<_> = self
1222 .config
1223 .transports
1224 .tcp
1225 .iter()
1226 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1227 .collect();
1228
1229 for (name, tcp_config) in tcp_instances {
1230 let transport_id = self.allocate_transport_id();
1231 let tcp = TcpTransport::new(transport_id, name, tcp_config, packet_tx.clone());
1232 transports.push(TransportHandle::Tcp(tcp));
1233 }
1234
1235 let tor_instances: Vec<_> = self
1237 .config
1238 .transports
1239 .tor
1240 .iter()
1241 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1242 .collect();
1243
1244 for (name, tor_config) in tor_instances {
1245 let transport_id = self.allocate_transport_id();
1246 let tor = TorTransport::new(transport_id, name, tor_config, packet_tx.clone());
1247 transports.push(TransportHandle::Tor(tor));
1248 }
1249
1250 let webrtc_instances: Vec<_> = self
1251 .config
1252 .transports
1253 .webrtc
1254 .iter()
1255 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1256 .collect();
1257
1258 #[cfg(feature = "webrtc-transport")]
1259 {
1260 for (name, webrtc_config) in webrtc_instances {
1261 let transport_id = self.allocate_transport_id();
1262 match WebRtcTransport::new(
1263 transport_id,
1264 name,
1265 webrtc_config,
1266 packet_tx.clone(),
1267 &self.identity,
1268 &self.config.node.discovery.nostr,
1269 ) {
1270 Ok(webrtc) => transports.push(TransportHandle::WebRtc(Box::new(webrtc))),
1271 Err(err) => {
1272 warn!(
1273 transport_id = %transport_id,
1274 error = %err,
1275 "failed to initialize WebRTC transport"
1276 );
1277 }
1278 }
1279 }
1280 }
1281 #[cfg(not(feature = "webrtc-transport"))]
1282 if !webrtc_instances.is_empty() {
1283 warn!("WebRTC transport configured but this build lacks WebRTC transport support");
1284 }
1285
1286 #[cfg(bluer_available)]
1288 {
1289 let ble_instances: Vec<_> = self
1290 .config
1291 .transports
1292 .ble
1293 .iter()
1294 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1295 .collect();
1296
1297 #[cfg(all(bluer_available, not(test)))]
1298 for (name, ble_config) in ble_instances {
1299 let transport_id = self.allocate_transport_id();
1300 let adapter = ble_config.adapter().to_string();
1301 let mtu = ble_config.mtu();
1302 match crate::transport::ble::io::BluerIo::new(&adapter, mtu).await {
1303 Ok(io) => {
1304 let mut ble = crate::transport::ble::BleTransport::new(
1305 transport_id,
1306 name,
1307 ble_config,
1308 io,
1309 packet_tx.clone(),
1310 );
1311 ble.set_local_pubkey(self.identity.pubkey().serialize());
1312 transports.push(TransportHandle::Ble(ble));
1313 }
1314 Err(e) => {
1315 tracing::warn!(adapter = %adapter, error = %e, "failed to initialize BLE adapter");
1316 }
1317 }
1318 }
1319
1320 #[cfg(any(not(bluer_available), test))]
1321 if !ble_instances.is_empty() {
1322 #[cfg(not(test))]
1323 tracing::warn!("BLE transport configured but this build lacks BlueZ support");
1324 }
1325 }
1326
1327 transports
1328 }
1329
1330 fn find_transport_for_type(&self, transport_type: &str) -> Option<TransportId> {
1340 self.transports
1341 .iter()
1342 .filter(|(id, handle)| {
1343 handle.transport_type().name == transport_type
1344 && handle.is_operational()
1345 && !self.bootstrap_transports.contains(id)
1346 })
1347 .min_by_key(|(id, _)| id.as_u32())
1348 .map(|(id, _)| *id)
1349 }
1350
1351 #[allow(unused_variables)]
1357 fn resolve_ethernet_addr(
1358 &self,
1359 addr_str: &str,
1360 ) -> Result<(TransportId, TransportAddr), NodeError> {
1361 #[cfg(any(target_os = "linux", target_os = "macos"))]
1362 {
1363 let (iface, mac_str) = addr_str.split_once('/').ok_or_else(|| {
1364 NodeError::NoTransportForType(format!(
1365 "invalid Ethernet address format '{}': expected 'interface/mac'",
1366 addr_str
1367 ))
1368 })?;
1369
1370 let transport_id = self
1372 .transports
1373 .iter()
1374 .find(|(_, handle)| {
1375 handle.transport_type().name == "ethernet"
1376 && handle.is_operational()
1377 && handle.interface_name() == Some(iface)
1378 })
1379 .map(|(id, _)| *id)
1380 .ok_or_else(|| {
1381 NodeError::NoTransportForType(format!(
1382 "no operational Ethernet transport for interface '{}'",
1383 iface
1384 ))
1385 })?;
1386
1387 let mac = crate::transport::ethernet::parse_mac_string(mac_str).map_err(|e| {
1388 NodeError::NoTransportForType(format!("invalid MAC in '{}': {}", addr_str, e))
1389 })?;
1390
1391 Ok((transport_id, TransportAddr::from_bytes(&mac)))
1392 }
1393 #[cfg(not(any(target_os = "linux", target_os = "macos")))]
1394 {
1395 Err(NodeError::NoTransportForType(
1396 "Ethernet transport is not supported on this platform".to_string(),
1397 ))
1398 }
1399 }
1400
1401 #[cfg(bluer_available)]
1405 fn resolve_ble_addr(&self, addr_str: &str) -> Result<(TransportId, TransportAddr), NodeError> {
1406 let ta = TransportAddr::from_string(addr_str);
1407 let adapter = crate::transport::ble::addr::adapter_from_addr(&ta).ok_or_else(|| {
1408 NodeError::NoTransportForType(format!(
1409 "invalid BLE address format '{}': expected 'adapter/mac'",
1410 addr_str
1411 ))
1412 })?;
1413
1414 let transport_id = self
1416 .transports
1417 .iter()
1418 .find(|(_, handle)| handle.transport_type().name == "ble" && handle.is_operational())
1419 .map(|(id, _)| *id)
1420 .ok_or_else(|| {
1421 NodeError::NoTransportForType(format!(
1422 "no operational BLE transport for adapter '{}'",
1423 adapter
1424 ))
1425 })?;
1426
1427 crate::transport::ble::addr::BleAddr::parse(addr_str).map_err(|e| {
1429 NodeError::NoTransportForType(format!("invalid BLE address '{}': {}", addr_str, e))
1430 })?;
1431
1432 Ok((transport_id, TransportAddr::from_string(addr_str)))
1433 }
1434
1435 pub fn identity(&self) -> &Identity {
1439 &self.identity
1440 }
1441
1442 pub fn node_addr(&self) -> &NodeAddr {
1444 self.identity.node_addr()
1445 }
1446
1447 pub fn npub(&self) -> String {
1449 self.identity.npub()
1450 }
1451
1452 pub(crate) fn peer_display_name(&self, addr: &NodeAddr) -> String {
1461 if let Some(hostname) = self.host_map.lookup_hostname(addr) {
1462 return hostname.to_string();
1463 }
1464 if let Some(name) = self.peer_aliases.get(addr) {
1465 return name.clone();
1466 }
1467 if let Some(peer) = self.peers.get(addr) {
1468 return peer.identity().short_npub();
1469 }
1470 if let Some(entry) = self.sessions.get(addr) {
1471 let (xonly, _) = entry.remote_pubkey().x_only_public_key();
1472 return PeerIdentity::from_pubkey(xonly).short_npub();
1473 }
1474 addr.short_hex()
1475 }
1476
1477 pub(in crate::node) fn deregister_session_index(&mut self, cache_key: (TransportId, u32)) {
1489 let owning_peer = self.peers_by_index.get(&cache_key).copied();
1493 self.peers_by_index.remove(&cache_key);
1494 if self.decrypt_registered_sessions.remove(&cache_key)
1495 && let Some(workers) = self.decrypt_workers.as_ref()
1496 {
1497 workers.unregister_session(cache_key);
1498 }
1499 if let Some(peer_addr) = owning_peer {
1510 let peer_has_other_index = self
1511 .peers_by_index
1512 .values()
1513 .any(|other| *other == peer_addr);
1514 if !peer_has_other_index {
1515 self.clear_connected_udp_for_peer(&peer_addr);
1516 }
1517 }
1518 }
1519
1520 pub(in crate::node) fn ensure_current_session_index_registered(
1529 &mut self,
1530 node_addr: &NodeAddr,
1531 context: &'static str,
1532 ) -> bool {
1533 let Some(peer) = self.peers.get(node_addr) else {
1534 return false;
1535 };
1536 let Some(transport_id) = peer.transport_id() else {
1537 warn!(
1538 peer = %self.peer_display_name(node_addr),
1539 context,
1540 "Cannot register current session index without transport id"
1541 );
1542 return false;
1543 };
1544 let Some(our_index) = peer.our_index() else {
1545 warn!(
1546 peer = %self.peer_display_name(node_addr),
1547 context,
1548 "Cannot register current session index without local index"
1549 );
1550 return false;
1551 };
1552
1553 let cache_key = (transport_id, our_index.as_u32());
1554 match self.peers_by_index.get(&cache_key).copied() {
1555 Some(existing) if existing == *node_addr => true,
1556 Some(existing) => {
1557 warn!(
1558 peer = %self.peer_display_name(node_addr),
1559 previous_owner = %self.peer_display_name(&existing),
1560 transport_id = %transport_id,
1561 our_index = %our_index,
1562 context,
1563 "Repairing current session index with stale owner"
1564 );
1565 self.peers_by_index.insert(cache_key, *node_addr);
1566 true
1567 }
1568 None => {
1569 warn!(
1570 peer = %self.peer_display_name(node_addr),
1571 transport_id = %transport_id,
1572 our_index = %our_index,
1573 context,
1574 "Repairing missing current session index"
1575 );
1576 self.peers_by_index.insert(cache_key, *node_addr);
1577 true
1578 }
1579 }
1580 }
1581
1582 pub fn config(&self) -> &Config {
1586 &self.config
1587 }
1588
1589 pub fn effective_ipv6_mtu(&self) -> u16 {
1595 crate::upper::icmp::effective_ipv6_mtu(self.transport_mtu())
1596 }
1597
1598 pub fn transport_mtu(&self) -> u16 {
1615 let min_operational = self
1616 .transports
1617 .values()
1618 .filter(|h| h.is_operational())
1619 .map(|h| h.mtu())
1620 .min();
1621 if let Some(mtu) = min_operational {
1622 return mtu;
1623 }
1624 if let Some((_, cfg)) = self.config.transports.udp.iter().next() {
1626 return cfg.mtu();
1627 }
1628 1280
1629 }
1630
1631 pub fn state(&self) -> NodeState {
1635 self.state
1636 }
1637
1638 pub fn uptime(&self) -> std::time::Duration {
1640 self.started_at.elapsed()
1641 }
1642
1643 pub fn is_running(&self) -> bool {
1645 self.state.is_operational()
1646 }
1647
1648 pub fn is_leaf_only(&self) -> bool {
1650 self.is_leaf_only
1651 }
1652
1653 pub fn tree_state(&self) -> &TreeState {
1657 &self.tree_state
1658 }
1659
1660 pub fn tree_state_mut(&mut self) -> &mut TreeState {
1662 &mut self.tree_state
1663 }
1664
1665 pub fn bloom_state(&self) -> &BloomState {
1669 &self.bloom_state
1670 }
1671
1672 pub fn bloom_state_mut(&mut self) -> &mut BloomState {
1674 &mut self.bloom_state
1675 }
1676
1677 pub fn estimated_mesh_size(&self) -> Option<u64> {
1681 self.estimated_mesh_size
1682 }
1683
1684 pub(crate) fn compute_mesh_size(&mut self) {
1690 let my_addr = *self.tree_state.my_node_addr();
1691 let parent_id = *self.tree_state.my_declaration().parent_id();
1692 let is_root = self.tree_state.is_root();
1693
1694 let max_fpr = self.config.node.bloom.max_inbound_fpr;
1695 let mut total: f64 = 1.0; let mut child_count: u32 = 0;
1697 let mut has_data = false;
1698
1699 if !is_root
1705 && let Some(parent) = self.peers.get(&parent_id)
1706 && let Some(filter) = parent.inbound_filter()
1707 {
1708 match filter.estimated_count(max_fpr) {
1709 Some(n) => {
1710 total += n;
1711 has_data = true;
1712 }
1713 None => {
1714 self.estimated_mesh_size = None;
1715 return;
1716 }
1717 }
1718 }
1719
1720 for (peer_addr, peer) in &self.peers {
1722 if peer_addr == &parent_id {
1723 continue;
1724 }
1725 if let Some(decl) = self.tree_state.peer_declaration(peer_addr)
1726 && *decl.parent_id() == my_addr
1727 {
1728 child_count += 1;
1729 if let Some(filter) = peer.inbound_filter() {
1730 match filter.estimated_count(max_fpr) {
1731 Some(n) => {
1732 total += n;
1733 has_data = true;
1734 }
1735 None => {
1736 self.estimated_mesh_size = None;
1737 return;
1738 }
1739 }
1740 }
1741 }
1742 }
1743
1744 if !has_data {
1745 self.estimated_mesh_size = None;
1746 return;
1747 }
1748
1749 let size = total.round() as u64;
1750 self.estimated_mesh_size = Some(size);
1751
1752 let now = std::time::Instant::now();
1754 let should_log = match self.last_mesh_size_log {
1755 None => true,
1756 Some(last) => {
1757 now.duration_since(last)
1758 >= std::time::Duration::from_secs(self.config.node.mmp.log_interval_secs)
1759 }
1760 };
1761 if should_log {
1762 tracing::debug!(
1763 estimated_mesh_size = size,
1764 peers = self.peers.len(),
1765 children = child_count,
1766 "Mesh size estimate"
1767 );
1768 self.last_mesh_size_log = Some(now);
1769 }
1770 }
1771
1772 pub fn coord_cache(&self) -> &CoordCache {
1776 &self.coord_cache
1777 }
1778
1779 pub fn coord_cache_mut(&mut self) -> &mut CoordCache {
1781 &mut self.coord_cache
1782 }
1783
1784 pub fn stats(&self) -> &stats::NodeStats {
1788 &self.stats
1789 }
1790
1791 pub(crate) fn stats_mut(&mut self) -> &mut stats::NodeStats {
1793 &mut self.stats
1794 }
1795
1796 pub fn stats_history(&self) -> &stats_history::StatsHistory {
1798 &self.stats_history
1799 }
1800
1801 pub(crate) fn record_stats_history(&mut self) {
1804 let fwd = &self.stats.forwarding;
1805 let peers_with_mmp: Vec<f64> = self
1806 .peers
1807 .values()
1808 .filter_map(|p| p.mmp().map(|m| m.metrics.loss_rate()))
1809 .collect();
1810 let loss_rate = if peers_with_mmp.is_empty() {
1811 0.0
1812 } else {
1813 peers_with_mmp.iter().sum::<f64>() / peers_with_mmp.len() as f64
1814 };
1815
1816 let snap = stats_history::Snapshot {
1817 mesh_size: self.estimated_mesh_size,
1818 tree_depth: self.tree_state.my_coords().depth() as u32,
1819 peer_count: self.peers.len() as u64,
1820 parent_switches_total: self.stats.tree.parent_switches,
1821 bytes_in_total: fwd.received_bytes,
1822 bytes_out_total: fwd.forwarded_bytes + fwd.originated_bytes,
1823 packets_in_total: fwd.received_packets,
1824 packets_out_total: fwd.forwarded_packets + fwd.originated_packets,
1825 loss_rate,
1826 active_sessions: self.sessions.len() as u64,
1827 };
1828
1829 let now = std::time::Instant::now();
1830 let peer_snaps: Vec<stats_history::PeerSnapshot> = self
1831 .peers
1832 .values()
1833 .map(|p| {
1834 let stats = p.link_stats();
1835 let (srtt_ms, loss_rate, ecn_ce) = match p.mmp() {
1836 Some(m) => (
1837 m.metrics.srtt_ms(),
1838 Some(m.metrics.loss_rate()),
1839 m.receiver.ecn_ce_count() as u64,
1840 ),
1841 None => (None, None, 0),
1842 };
1843 stats_history::PeerSnapshot {
1844 node_addr: *p.node_addr(),
1845 last_seen: now,
1846 srtt_ms,
1847 loss_rate,
1848 bytes_in_total: stats.bytes_recv,
1849 bytes_out_total: stats.bytes_sent,
1850 packets_in_total: stats.packets_recv,
1851 packets_out_total: stats.packets_sent,
1852 ecn_ce_total: ecn_ce,
1853 }
1854 })
1855 .collect();
1856
1857 self.stats_history.tick(now, &snap, &peer_snaps);
1858 }
1859
1860 pub fn tun_state(&self) -> TunState {
1864 self.tun_state
1865 }
1866
1867 pub fn tun_name(&self) -> Option<&str> {
1869 self.tun_name.as_deref()
1870 }
1871
1872 pub fn set_max_connections(&mut self, max: usize) {
1876 self.max_connections = max;
1877 }
1878
1879 pub fn set_max_peers(&mut self, max: usize) {
1881 self.max_peers = max;
1882 }
1883
1884 pub(crate) fn outbound_admission_check(&self) -> bool {
1887 let connection_used = self
1888 .connections
1889 .len()
1890 .saturating_add(self.pending_connects.len());
1891 let peer_allowed = self.max_peers == 0 || self.peers.len() < self.max_peers;
1892 let connection_allowed =
1893 self.max_connections == 0 || connection_used < self.max_connections;
1894 let link_allowed = self.max_links == 0 || self.links.len() < self.max_links;
1895 peer_allowed && connection_allowed && link_allowed
1896 }
1897
1898 pub(crate) fn open_discovery_outbound_admission_check(&self) -> bool {
1902 if !self.outbound_admission_check() {
1903 return false;
1904 }
1905
1906 let nostr = &self.config.node.discovery.nostr;
1907 if !nostr.enabled || nostr.policy != NostrDiscoveryPolicy::Open {
1908 return true;
1909 }
1910
1911 let configured_npubs = self
1912 .config
1913 .peers()
1914 .iter()
1915 .map(|peer| peer.npub.clone())
1916 .collect::<HashSet<_>>();
1917 self.open_discovery_enqueue_budget(&configured_npubs) > 0
1918 }
1919
1920 pub(crate) fn outbound_direct_refresh_admission_check(&self) -> bool {
1924 let connection_used = self
1925 .connections
1926 .len()
1927 .saturating_add(self.pending_connects.len());
1928 let connection_allowed =
1929 self.max_connections == 0 || connection_used < self.max_connections;
1930 let link_allowed = self.max_links == 0 || self.links.len() < self.max_links;
1931 connection_allowed && link_allowed
1932 }
1933
1934 pub fn set_max_links(&mut self, max: usize) {
1936 self.max_links = max;
1937 }
1938
1939 pub fn connection_count(&self) -> usize {
1943 self.connections.len()
1944 }
1945
1946 pub fn peer_count(&self) -> usize {
1948 self.peers.len()
1949 }
1950
1951 pub fn link_count(&self) -> usize {
1953 self.links.len()
1954 }
1955
1956 pub fn transport_count(&self) -> usize {
1958 self.transports.len()
1959 }
1960
1961 pub fn allocate_transport_id(&mut self) -> TransportId {
1965 let id = TransportId::new(self.next_transport_id);
1966 self.next_transport_id += 1;
1967 id
1968 }
1969
1970 pub fn get_transport(&self, id: &TransportId) -> Option<&TransportHandle> {
1972 self.transports.get(id)
1973 }
1974
1975 pub fn get_transport_mut(&mut self, id: &TransportId) -> Option<&mut TransportHandle> {
1977 self.transports.get_mut(id)
1978 }
1979
1980 pub fn transport_ids(&self) -> impl Iterator<Item = &TransportId> {
1982 self.transports.keys()
1983 }
1984
1985 pub fn packet_rx(&mut self) -> Option<&mut PacketRx> {
1987 self.packet_rx.as_mut()
1988 }
1989
1990 pub fn allocate_link_id(&mut self) -> LinkId {
1994 let id = LinkId::new(self.next_link_id);
1995 self.next_link_id += 1;
1996 id
1997 }
1998
1999 pub fn add_link(&mut self, link: Link) -> Result<(), NodeError> {
2001 if self.max_links > 0 && self.links.len() >= self.max_links {
2002 return Err(NodeError::MaxLinksExceeded {
2003 max: self.max_links,
2004 });
2005 }
2006 let link_id = link.link_id();
2007 let transport_id = link.transport_id();
2008 let remote_addr = link.remote_addr().clone();
2009
2010 self.links.insert(link_id, link);
2011 self.addr_to_link
2012 .insert((transport_id, remote_addr), link_id);
2013 Ok(())
2014 }
2015
2016 pub fn get_link(&self, link_id: &LinkId) -> Option<&Link> {
2018 self.links.get(link_id)
2019 }
2020
2021 pub fn get_link_mut(&mut self, link_id: &LinkId) -> Option<&mut Link> {
2023 self.links.get_mut(link_id)
2024 }
2025
2026 pub fn find_link_by_addr(
2028 &self,
2029 transport_id: TransportId,
2030 addr: &TransportAddr,
2031 ) -> Option<LinkId> {
2032 self.addr_to_link
2033 .get(&(transport_id, addr.clone()))
2034 .copied()
2035 }
2036
2037 pub fn remove_link(&mut self, link_id: &LinkId) -> Option<Link> {
2043 if let Some(link) = self.links.remove(link_id) {
2044 let key = (link.transport_id(), link.remote_addr().clone());
2046 if self.addr_to_link.get(&key) == Some(link_id) {
2047 self.addr_to_link.remove(&key);
2048 }
2049 Some(link)
2050 } else {
2051 None
2052 }
2053 }
2054
2055 pub(crate) fn cleanup_bootstrap_transport_if_unused(&mut self, transport_id: TransportId) {
2056 if !self.bootstrap_transports.contains(&transport_id) {
2057 return;
2058 }
2059
2060 let transport_in_use = self
2061 .links
2062 .values()
2063 .any(|link| link.transport_id() == transport_id)
2064 || self
2065 .connections
2066 .values()
2067 .any(|conn| conn.transport_id() == Some(transport_id))
2068 || self
2069 .peers
2070 .values()
2071 .any(|peer| peer.transport_id() == Some(transport_id))
2072 || self
2073 .pending_connects
2074 .iter()
2075 .any(|pending| pending.transport_id == transport_id);
2076
2077 if transport_in_use {
2078 return;
2079 }
2080
2081 tracing::debug!(
2082 transport_id = %transport_id,
2083 "bootstrap transport has no remaining references; dropping"
2084 );
2085
2086 self.bootstrap_transports.remove(&transport_id);
2087 self.bootstrap_transport_npubs.remove(&transport_id);
2088 self.transport_drops.remove(&transport_id);
2089 self.transports.remove(&transport_id);
2090 }
2091
2092 pub fn links(&self) -> impl Iterator<Item = &Link> {
2094 self.links.values()
2095 }
2096
2097 pub fn add_connection(&mut self, connection: PeerConnection) -> Result<(), NodeError> {
2101 let link_id = connection.link_id();
2102
2103 if self.connections.contains_key(&link_id) {
2104 return Err(NodeError::ConnectionAlreadyExists(link_id));
2105 }
2106
2107 if self.max_connections > 0 && self.connections.len() >= self.max_connections {
2108 return Err(NodeError::MaxConnectionsExceeded {
2109 max: self.max_connections,
2110 });
2111 }
2112
2113 self.connections.insert(link_id, connection);
2114 Ok(())
2115 }
2116
2117 pub fn get_connection(&self, link_id: &LinkId) -> Option<&PeerConnection> {
2119 self.connections.get(link_id)
2120 }
2121
2122 pub fn get_connection_mut(&mut self, link_id: &LinkId) -> Option<&mut PeerConnection> {
2124 self.connections.get_mut(link_id)
2125 }
2126
2127 pub fn remove_connection(&mut self, link_id: &LinkId) -> Option<PeerConnection> {
2129 self.connections.remove(link_id)
2130 }
2131
2132 pub fn connections(&self) -> impl Iterator<Item = &PeerConnection> {
2134 self.connections.values()
2135 }
2136
2137 pub fn get_peer(&self, node_addr: &NodeAddr) -> Option<&ActivePeer> {
2141 self.peers.get(node_addr)
2142 }
2143
2144 pub fn get_peer_mut(&mut self, node_addr: &NodeAddr) -> Option<&mut ActivePeer> {
2146 self.peers.get_mut(node_addr)
2147 }
2148
2149 pub fn remove_peer(&mut self, node_addr: &NodeAddr) -> Option<ActivePeer> {
2151 self.peers.remove(node_addr)
2152 }
2153
2154 pub fn peers(&self) -> impl Iterator<Item = &ActivePeer> {
2156 self.peers.values()
2157 }
2158
2159 pub fn nostr_discovery_handle(&self) -> Option<&crate::discovery::nostr::NostrDiscovery> {
2163 self.nostr_discovery.as_deref()
2164 }
2165
2166 pub fn peer_ids(&self) -> impl Iterator<Item = &NodeAddr> {
2168 self.peers.keys()
2169 }
2170
2171 pub fn sendable_peers(&self) -> impl Iterator<Item = &ActivePeer> {
2173 self.peers.values().filter(|p| p.can_send())
2174 }
2175
2176 pub fn sendable_peer_count(&self) -> usize {
2178 self.peers.values().filter(|p| p.can_send()).count()
2179 }
2180
2181 pub(crate) fn set_discovery_fallback_transit_allowed(
2182 &mut self,
2183 peer_addr: NodeAddr,
2184 allowed: bool,
2185 ) {
2186 if allowed {
2187 self.discovery_fallback_transit_blocked_peers
2188 .remove(&peer_addr);
2189 } else {
2190 self.discovery_fallback_transit_blocked_peers
2191 .insert(peer_addr);
2192 }
2193 }
2194
2195 pub(crate) fn configured_discovery_fallback_transit(
2196 &self,
2197 peer_addr: &NodeAddr,
2198 ) -> Option<bool> {
2199 self.configured_peer(peer_addr)
2200 .map(|peer| peer.discovery_fallback_transit)
2201 }
2202
2203 pub(crate) fn configured_peer(&self, peer_addr: &NodeAddr) -> Option<&PeerConfig> {
2204 self.config.peers().iter().find(|peer| {
2205 PeerIdentity::from_npub(&peer.npub)
2206 .ok()
2207 .is_some_and(|identity| identity.node_addr() == peer_addr)
2208 })
2209 }
2210
2211 pub(crate) fn discovery_fallback_transit_for_promotion(&self, peer_addr: &NodeAddr) -> bool {
2212 if let Some(retry_state) = self.retry_pending.get(peer_addr) {
2213 return retry_state.peer_config.discovery_fallback_transit;
2214 }
2215
2216 if let Some(allowed) = self.configured_discovery_fallback_transit(peer_addr) {
2217 return allowed;
2218 }
2219
2220 self.config.node.discovery.nostr.policy != crate::config::NostrDiscoveryPolicy::Open
2221 }
2222
2223 #[cfg(test)]
2228 pub(crate) fn disable_discovery_forward_rate_limit(&mut self) {
2229 self.discovery_forward_limiter
2230 .set_interval(std::time::Duration::ZERO);
2231 }
2232
2233 #[cfg(test)]
2234 pub(crate) fn get_session(&self, remote: &NodeAddr) -> Option<&SessionEntry> {
2235 self.sessions.get(remote)
2236 }
2237
2238 #[cfg(test)]
2240 pub(crate) fn get_session_mut(&mut self, remote: &NodeAddr) -> Option<&mut SessionEntry> {
2241 self.sessions.get_mut(remote)
2242 }
2243
2244 #[cfg(test)]
2246 pub(crate) fn remove_session(&mut self, remote: &NodeAddr) -> Option<SessionEntry> {
2247 self.sessions.remove(remote)
2248 }
2249
2250 #[cfg(test)]
2252 pub(crate) fn path_mtu_lookup_get(&self, fips_addr: &crate::FipsAddress) -> Option<u16> {
2253 self.path_mtu_lookup
2254 .read()
2255 .ok()
2256 .and_then(|map| map.get(fips_addr).copied())
2257 }
2258
2259 #[cfg(test)]
2261 pub(crate) fn path_mtu_lookup_insert(&self, fips_addr: crate::FipsAddress, mtu: u16) {
2262 if let Ok(mut map) = self.path_mtu_lookup.write() {
2263 map.insert(fips_addr, mtu);
2264 }
2265 }
2266
2267 pub fn session_count(&self) -> usize {
2269 self.sessions.len()
2270 }
2271
2272 pub(crate) fn session_entries(&self) -> impl Iterator<Item = (&NodeAddr, &SessionEntry)> {
2274 self.sessions.iter()
2275 }
2276
2277 pub(crate) fn register_identity(
2281 &mut self,
2282 node_addr: NodeAddr,
2283 pubkey: secp256k1::PublicKey,
2284 ) -> bool {
2285 let mut prefix = [0u8; 15];
2286 prefix.copy_from_slice(&node_addr.as_bytes()[0..15]);
2287 if let Some(entry) = self.identity_cache.get(&prefix)
2288 && entry.node_addr == node_addr
2289 && entry.pubkey == pubkey
2290 {
2291 return true;
2295 }
2296
2297 let (xonly, _) = pubkey.x_only_public_key();
2298 let derived_node_addr = NodeAddr::from_pubkey(&xonly);
2299 if derived_node_addr != node_addr {
2300 debug!(
2301 claimed_node_addr = %node_addr,
2302 derived_node_addr = %derived_node_addr,
2303 "Rejected identity cache entry with mismatched public key"
2304 );
2305 return false;
2306 }
2307
2308 let now_ms = Self::now_ms();
2309 if let Some(entry) = self.identity_cache.get_mut(&prefix)
2310 && entry.node_addr == node_addr
2311 {
2312 entry.pubkey = pubkey;
2313 entry.last_seen_ms = now_ms;
2314 return true;
2315 }
2316
2317 let npub = encode_npub(&xonly);
2318 self.identity_cache.insert(
2319 prefix,
2320 IdentityCacheEntry::new(node_addr, pubkey, npub, now_ms),
2321 );
2322 let max = self.config.node.cache.identity_size;
2324 if self.identity_cache.len() > max
2325 && let Some(oldest_key) = self
2326 .identity_cache
2327 .iter()
2328 .min_by_key(|(_, entry)| entry.last_seen_ms)
2329 .map(|(k, _)| *k)
2330 {
2331 self.identity_cache.remove(&oldest_key);
2332 }
2333 true
2334 }
2335
2336 pub(crate) fn lookup_by_fips_prefix(
2338 &mut self,
2339 prefix: &[u8; 15],
2340 ) -> Option<(NodeAddr, secp256k1::PublicKey)> {
2341 if let Some(entry) = self.identity_cache.get_mut(prefix) {
2342 entry.last_seen_ms = Self::now_ms(); Some((entry.node_addr, entry.pubkey))
2344 } else {
2345 None
2346 }
2347 }
2348
2349 pub(crate) fn has_cached_identity(&self, addr: &NodeAddr) -> bool {
2351 let mut prefix = [0u8; 15];
2352 prefix.copy_from_slice(&addr.as_bytes()[0..15]);
2353 self.identity_cache.contains_key(&prefix)
2354 }
2355
2356 pub fn identity_cache_len(&self) -> usize {
2358 self.identity_cache.len()
2359 }
2360
2361 pub fn identity_cache_iter(
2366 &self,
2367 ) -> impl Iterator<Item = (&NodeAddr, &secp256k1::PublicKey, u64)> {
2368 self.identity_cache
2369 .values()
2370 .map(|entry| (&entry.node_addr, &entry.pubkey, entry.last_seen_ms))
2371 }
2372
2373 pub fn identity_cache_max(&self) -> usize {
2375 self.config.node.cache.identity_size
2376 }
2377
2378 pub fn pending_lookup_count(&self) -> usize {
2380 self.pending_lookups.len()
2381 }
2382
2383 pub fn pending_lookups_iter(
2385 &self,
2386 ) -> impl Iterator<Item = (&NodeAddr, &handlers::discovery::PendingLookup)> {
2387 self.pending_lookups.iter()
2388 }
2389
2390 pub fn recent_request_count(&self) -> usize {
2392 self.recent_requests.len()
2393 }
2394
2395 pub fn pending_tun_destinations(&self) -> usize {
2397 self.pending_tun_packets.len()
2398 }
2399
2400 pub fn pending_tun_total_packets(&self) -> usize {
2402 self.pending_tun_packets.values().map(|q| q.len()).sum()
2403 }
2404
2405 pub fn retry_state_iter(&self) -> impl Iterator<Item = (&NodeAddr, &retry::RetryState)> {
2407 self.retry_pending.iter()
2408 }
2409
2410 pub(crate) fn is_tree_peer(&self, peer_addr: &NodeAddr) -> bool {
2417 if !self.tree_state.is_root() && self.tree_state.my_declaration().parent_id() == peer_addr {
2419 return true;
2420 }
2421 if let Some(decl) = self.tree_state.peer_declaration(peer_addr)
2423 && decl.parent_id() == self.node_addr()
2424 {
2425 return true;
2426 }
2427 false
2428 }
2429
2430 pub fn find_next_hop(&mut self, dest_node_addr: &NodeAddr) -> Option<&ActivePeer> {
2454 if dest_node_addr == self.node_addr() {
2456 return None;
2457 }
2458 let now_ms = Self::now_ms();
2459 let direct_session_degraded = self.session_direct_path_is_degraded(dest_node_addr, now_ms);
2460
2461 let direct_peer_can_send = self
2462 .peers
2463 .get(dest_node_addr)
2464 .is_some_and(|peer| peer.can_send());
2465 let healthy_direct_route = self
2466 .peers
2467 .get(dest_node_addr)
2468 .filter(|peer| peer.is_healthy() && !direct_session_degraded)
2469 .map(|_| *dest_node_addr);
2470 let direct_payload_eligible = healthy_direct_route.is_some();
2471 let payload_candidate_can_send = |addr: &NodeAddr, peer: &ActivePeer| {
2472 if addr == dest_node_addr {
2473 direct_payload_eligible
2474 } else {
2475 peer.can_send()
2476 }
2477 };
2478
2479 let fallback_beats_direct = |node: &Self, fallback_addr: NodeAddr| {
2484 node.route_candidate_beats_direct(healthy_direct_route, fallback_addr)
2485 };
2486
2487 let sendable_learned_peers = if self.config.node.routing.mode == RoutingMode::ReplyLearned {
2488 Some(
2489 self.peers
2490 .iter()
2491 .filter(|(addr, peer)| payload_candidate_can_send(addr, peer))
2492 .map(|(addr, _)| *addr)
2493 .collect::<HashSet<_>>(),
2494 )
2495 } else {
2496 None
2497 };
2498
2499 let explore_fallback = sendable_learned_peers.as_ref().is_some_and(|sendable| {
2506 self.learned_routes.should_explore_fallback(
2507 dest_node_addr,
2508 now_ms,
2509 self.config.node.routing.learned_fallback_explore_interval,
2510 |addr| sendable.contains(addr),
2511 )
2512 });
2513 if let Some(sendable) = &sendable_learned_peers
2514 && !explore_fallback
2515 {
2516 let eligible = sendable
2517 .iter()
2518 .copied()
2519 .filter(|addr| fallback_beats_direct(self, *addr))
2520 .collect::<HashSet<_>>();
2521 if !eligible.is_empty()
2522 && let Some(next_hop_addr) =
2523 self.learned_routes
2524 .select_next_hop(dest_node_addr, now_ms, |addr| eligible.contains(addr))
2525 {
2526 return self.peers.get(&next_hop_addr);
2527 }
2528 }
2529
2530 let Some(dest_coords) = self
2532 .coord_cache
2533 .get_and_touch(dest_node_addr, now_ms)
2534 .cloned()
2535 else {
2536 if (healthy_direct_route.is_none() || explore_fallback)
2537 && let Some(sendable) = &sendable_learned_peers
2538 && let Some(next_hop_addr) =
2539 self.learned_routes
2540 .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2541 {
2542 return self.peers.get(&next_hop_addr);
2543 }
2544 if let Some(direct_addr) = healthy_direct_route {
2545 return self.peers.get(&direct_addr);
2546 }
2547 if direct_peer_can_send {
2548 return self.peers.get(dest_node_addr);
2549 }
2550 return None;
2551 };
2552
2553 let coordinate_route_addr = {
2556 let candidates: Vec<&ActivePeer> = self
2557 .peers
2558 .iter()
2559 .filter(|(addr, peer)| {
2560 payload_candidate_can_send(addr, peer) && peer.may_reach(dest_node_addr)
2561 })
2562 .map(|(_, peer)| peer)
2563 .collect();
2564 if !candidates.is_empty() {
2565 self.select_best_candidate(&candidates, &dest_coords)
2566 .map(|peer| *peer.node_addr())
2567 } else {
2568 None
2569 }
2570 };
2571 if let Some(next_hop_addr) = coordinate_route_addr
2572 && fallback_beats_direct(self, next_hop_addr)
2573 {
2574 return self.peers.get(&next_hop_addr);
2575 }
2576
2577 let tree_route_addr = self.select_tree_payload_candidate(
2579 &dest_coords,
2580 dest_node_addr,
2581 direct_payload_eligible,
2582 );
2583 if let Some(next_hop_addr) = tree_route_addr
2584 && fallback_beats_direct(self, next_hop_addr)
2585 {
2586 return self.peers.get(&next_hop_addr);
2587 }
2588
2589 if explore_fallback {
2590 return sendable_learned_peers.as_ref().and_then(|sendable| {
2591 self.learned_routes
2592 .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2593 .and_then(|next_hop_addr| self.peers.get(&next_hop_addr))
2594 });
2595 }
2596
2597 if let Some(direct_addr) = healthy_direct_route {
2598 return self.peers.get(&direct_addr);
2599 }
2600
2601 if let Some(sendable) = &sendable_learned_peers
2602 && let Some(next_hop_addr) =
2603 self.learned_routes
2604 .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2605 {
2606 return self.peers.get(&next_hop_addr);
2607 }
2608
2609 if direct_peer_can_send {
2610 return self.peers.get(dest_node_addr);
2611 }
2612
2613 None
2614 }
2615
2616 fn route_candidate_beats_direct(
2617 &self,
2618 healthy_direct_route: Option<NodeAddr>,
2619 candidate_addr: NodeAddr,
2620 ) -> bool {
2621 let Some(direct_addr) = healthy_direct_route else {
2622 return true;
2623 };
2624 if candidate_addr == direct_addr {
2625 return false;
2626 }
2627
2628 let Some(direct) = self.peers.get(&direct_addr) else {
2629 return true;
2630 };
2631 let Some(candidate) = self.peers.get(&candidate_addr) else {
2632 return false;
2633 };
2634 if !candidate.can_send() {
2635 return false;
2636 }
2637
2638 let direct_cost = direct.link_cost();
2639 let candidate_cost = candidate.link_cost();
2640 candidate_cost + ROUTING_FALLBACK_MIN_COST_ADVANTAGE < direct_cost
2641 }
2642
2643 fn select_tree_payload_candidate(
2644 &self,
2645 dest_coords: &crate::tree::TreeCoordinate,
2646 direct_dest: &NodeAddr,
2647 direct_payload_eligible: bool,
2648 ) -> Option<NodeAddr> {
2649 if self.tree_state.my_coords().root_id() != dest_coords.root_id() {
2650 return None;
2651 }
2652
2653 let my_distance = self.tree_state.my_coords().distance_to(dest_coords);
2654 let mut best: Option<(NodeAddr, usize)> = None;
2655
2656 for (peer_addr, peer) in &self.peers {
2657 if peer_addr == direct_dest {
2658 if !direct_payload_eligible {
2659 continue;
2660 }
2661 } else if !peer.can_send() {
2662 continue;
2663 }
2664
2665 let Some(peer_coords) = self.tree_state.peer_coords(peer_addr) else {
2666 continue;
2667 };
2668 let distance = peer_coords.distance_to(dest_coords);
2669 if distance >= my_distance {
2670 continue;
2671 }
2672
2673 let dominated = match &best {
2674 None => true,
2675 Some((best_id, best_dist)) => {
2676 distance < *best_dist || (distance == *best_dist && peer_addr < best_id)
2677 }
2678 };
2679 if dominated {
2680 best = Some((*peer_addr, distance));
2681 }
2682 }
2683
2684 best.map(|(peer_addr, _)| peer_addr)
2685 }
2686
2687 pub(in crate::node) fn session_direct_path_is_degraded(
2688 &mut self,
2689 dest: &NodeAddr,
2690 now_ms: u64,
2691 ) -> bool {
2692 match self.session_direct_degraded_until_ms.get(dest).copied() {
2693 Some(until_ms) if until_ms > now_ms => true,
2694 Some(_) => {
2695 self.session_direct_degraded_until_ms.remove(dest);
2696 false
2697 }
2698 None => false,
2699 }
2700 }
2701
2702 pub(in crate::node) fn mark_session_direct_path_degraded(
2703 &mut self,
2704 dest: NodeAddr,
2705 now_ms: u64,
2706 ) -> bool {
2707 let until_ms = now_ms.saturating_add(SESSION_DIRECT_DEGRADED_HOLD_MS);
2708 let entry = self
2709 .session_direct_degraded_until_ms
2710 .entry(dest)
2711 .or_insert(0);
2712 let was_degraded = *entry > now_ms;
2713 *entry = (*entry).max(until_ms);
2714 !was_degraded
2715 }
2716
2717 pub(in crate::node) fn clear_session_direct_path_degraded(&mut self, dest: &NodeAddr) -> bool {
2718 self.session_direct_degraded_until_ms.remove(dest).is_some()
2719 }
2720
2721 pub(in crate::node) fn learn_reverse_route(
2722 &mut self,
2723 destination: NodeAddr,
2724 next_hop: NodeAddr,
2725 ) {
2726 if self.config.node.routing.mode != RoutingMode::ReplyLearned
2727 || destination == *self.node_addr()
2728 {
2729 return;
2730 }
2731 let now_ms = Self::now_ms();
2732 self.learned_routes.learn(
2733 destination,
2734 next_hop,
2735 now_ms,
2736 self.config.node.routing.learned_ttl_secs,
2737 self.config.node.routing.max_learned_routes_per_dest,
2738 );
2739 }
2740
2741 pub(in crate::node) fn record_route_failure(
2742 &mut self,
2743 destination: NodeAddr,
2744 next_hop: NodeAddr,
2745 ) {
2746 if self.config.node.routing.mode != RoutingMode::ReplyLearned {
2747 return;
2748 }
2749 self.learned_routes.record_failure(&destination, &next_hop);
2750 }
2751
2752 pub(crate) fn learned_route_table_snapshot(&self, now_ms: u64) -> LearnedRouteTableSnapshot {
2753 self.learned_routes.snapshot(now_ms)
2754 }
2755
2756 pub(in crate::node) fn purge_learned_routes(&mut self, now_ms: u64) {
2757 self.learned_routes.purge_expired(now_ms);
2758 }
2759
2760 fn select_best_candidate<'a>(
2769 &'a self,
2770 candidates: &[&'a ActivePeer],
2771 dest_coords: &crate::tree::TreeCoordinate,
2772 ) -> Option<&'a ActivePeer> {
2773 let my_distance = self.tree_state.my_coords().distance_to(dest_coords);
2774
2775 let mut best: Option<(&ActivePeer, f64, usize)> = None;
2776
2777 for &candidate in candidates {
2778 if !candidate.can_send() {
2779 continue;
2780 }
2781
2782 let cost = candidate.link_cost();
2783
2784 let dist = self
2785 .tree_state
2786 .peer_coords(candidate.node_addr())
2787 .map(|pc| pc.distance_to(dest_coords))
2788 .unwrap_or(usize::MAX);
2789
2790 if dist >= my_distance {
2793 continue;
2794 }
2795
2796 let dominated = match &best {
2797 None => true,
2798 Some((_, best_cost, best_dist)) => {
2799 cost < *best_cost
2800 || (cost == *best_cost && dist < *best_dist)
2801 || (cost == *best_cost
2802 && dist == *best_dist
2803 && candidate.node_addr() < best.as_ref().unwrap().0.node_addr())
2804 }
2805 };
2806
2807 if dominated {
2808 best = Some((candidate, cost, dist));
2809 }
2810 }
2811
2812 best.map(|(peer, _, _)| peer)
2813 }
2814
2815 pub fn destination_in_filters(&self, dest: &NodeAddr) -> Vec<&ActivePeer> {
2817 self.peers.values().filter(|p| p.may_reach(dest)).collect()
2818 }
2819
2820 pub fn tun_tx(&self) -> Option<&TunTx> {
2824 self.tun_tx.as_ref()
2825 }
2826
2827 pub fn attach_external_packet_io(
2834 &mut self,
2835 capacity: usize,
2836 ) -> Result<ExternalPacketIo, NodeError> {
2837 if self.state != NodeState::Created {
2838 return Err(NodeError::Config(ConfigError::Validation(
2839 "external packet I/O must be attached before node start".to_string(),
2840 )));
2841 }
2842 if self.config.tun.enabled {
2843 return Err(NodeError::Config(ConfigError::Validation(
2844 "external packet I/O requires tun.enabled=false".to_string(),
2845 )));
2846 }
2847
2848 let capacity = capacity.max(1);
2849 let (outbound_tx, outbound_rx) = tokio::sync::mpsc::channel(capacity);
2850 let (inbound_tx, inbound_rx) = tokio::sync::mpsc::channel(capacity);
2851 self.tun_outbound_rx = Some(outbound_rx);
2852 self.external_packet_tx = Some(inbound_tx);
2853
2854 Ok(ExternalPacketIo {
2855 outbound_tx,
2856 inbound_rx,
2857 })
2858 }
2859
2860 pub(crate) fn attach_endpoint_data_io(
2865 &mut self,
2866 capacity: usize,
2867 ) -> Result<EndpointDataIo, NodeError> {
2868 if self.state != NodeState::Created {
2869 return Err(NodeError::Config(ConfigError::Validation(
2870 "endpoint data I/O must be attached before node start".to_string(),
2871 )));
2872 }
2873
2874 let command_capacity = endpoint_data_command_capacity(capacity);
2875 let (command_tx, command_rx) = tokio::sync::mpsc::channel(command_capacity);
2876 let (event_tx, event_rx) = tokio::sync::mpsc::unbounded_channel();
2881 self.endpoint_command_rx = Some(command_rx);
2882 self.endpoint_event_tx = Some(event_tx.clone());
2883
2884 Ok(EndpointDataIo {
2885 command_tx,
2886 event_rx,
2887 event_tx,
2888 })
2889 }
2890
2891 pub(crate) fn pubkey_for_node_addr(&self, addr: &NodeAddr) -> Option<secp256k1::PublicKey> {
2892 let mut prefix = [0u8; 15];
2893 prefix.copy_from_slice(&addr.as_bytes()[0..15]);
2894 self.identity_cache
2895 .get(&prefix)
2896 .filter(|entry| &entry.node_addr == addr)
2897 .map(|entry| entry.pubkey)
2898 }
2899
2900 pub(crate) fn npub_for_node_addr(&self, addr: &NodeAddr) -> Option<String> {
2901 let mut prefix = [0u8; 15];
2902 prefix.copy_from_slice(&addr.as_bytes()[0..15]);
2903 self.identity_cache
2904 .get(&prefix)
2905 .filter(|entry| &entry.node_addr == addr)
2906 .map(|entry| entry.npub.clone())
2907 }
2908
2909 pub(in crate::node) fn deliver_external_ipv6_packet(
2910 &self,
2911 src_addr: &NodeAddr,
2912 packet: Vec<u8>,
2913 ) {
2914 let Some(external_packet_tx) = &self.external_packet_tx else {
2915 return;
2916 };
2917 if packet.len() < 40 {
2918 return;
2919 }
2920 let Ok(destination) = FipsAddress::from_slice(&packet[24..40]) else {
2921 return;
2922 };
2923 let delivered = NodeDeliveredPacket {
2924 source_node_addr: *src_addr,
2925 source_npub: self.npub_for_node_addr(src_addr),
2926 destination,
2927 packet,
2928 };
2929 if let Err(error) = external_packet_tx.try_send(delivered) {
2930 debug!(error = %error, "Failed to deliver packet to external app sink");
2931 }
2932 }
2933
2934 pub(super) async fn send_encrypted_link_message(
2948 &mut self,
2949 node_addr: &NodeAddr,
2950 plaintext: &[u8],
2951 ) -> Result<(), NodeError> {
2952 self.send_encrypted_link_message_with_ce(node_addr, plaintext, false)
2953 .await
2954 }
2955
2956 pub(in crate::node) fn note_local_send_outcome(
2962 &mut self,
2963 result: &Result<usize, TransportError>,
2964 ) {
2965 match result {
2966 Ok(_) => {
2967 if self.last_local_send_failure_at.is_some() {
2968 self.last_local_send_failure_at = None;
2969 }
2970 }
2971 Err(error) if error.is_local_route_unavailable() => {
2972 self.last_local_send_failure_at = Some(std::time::Instant::now());
2973 }
2974 Err(_) => {}
2975 }
2976 }
2977
2978 pub(in crate::node) fn local_send_failure_dead_timeout(
2984 &mut self,
2985 now: std::time::Instant,
2986 dead_timeout: std::time::Duration,
2987 fast_dead_timeout: std::time::Duration,
2988 ) -> std::time::Duration {
2989 match self.last_local_send_failure_at {
2990 Some(t) if now.duration_since(t) <= LOCAL_SEND_FAILURE_FAST_DEAD_WINDOW => {
2991 fast_dead_timeout.min(dead_timeout)
2992 }
2993 Some(_) => {
2994 self.last_local_send_failure_at = None;
2995 dead_timeout
2996 }
2997 None => dead_timeout,
2998 }
2999 }
3000
3001 pub(in crate::node) fn mark_rx_loop_maintenance_timeout(&mut self) {
3002 self.last_rx_loop_maintenance_timeout_at = Some(std::time::Instant::now());
3003 }
3004
3005 pub(in crate::node) fn rx_loop_maintenance_timed_out_recently(&self) -> bool {
3006 let Some(t) = self.last_rx_loop_maintenance_timeout_at else {
3007 return false;
3008 };
3009 let grace = std::time::Duration::from_secs(self.config.node.link_dead_timeout_secs.max(1));
3010 std::time::Instant::now().duration_since(t) <= grace
3011 }
3012
3013 pub(super) async fn send_encrypted_link_message_with_ce(
3017 &mut self,
3018 node_addr: &NodeAddr,
3019 plaintext: &[u8],
3020 ce_flag: bool,
3021 ) -> Result<(), NodeError> {
3022 let peer = self
3023 .peers
3024 .get_mut(node_addr)
3025 .ok_or(NodeError::PeerNotFound(*node_addr))?;
3026
3027 let their_index = peer.their_index().ok_or_else(|| NodeError::SendFailed {
3028 node_addr: *node_addr,
3029 reason: "no their_index".into(),
3030 })?;
3031 let transport_id = peer.transport_id().ok_or_else(|| NodeError::SendFailed {
3032 node_addr: *node_addr,
3033 reason: "no transport_id".into(),
3034 })?;
3035 let remote_addr = peer
3036 .current_addr()
3037 .cloned()
3038 .ok_or_else(|| NodeError::SendFailed {
3039 node_addr: *node_addr,
3040 reason: "no current_addr".into(),
3041 })?;
3042 #[cfg(any(target_os = "linux", target_os = "macos"))]
3043 let connected_socket = peer.connected_udp();
3044
3045 let timestamp_ms = peer.session_elapsed_ms();
3047
3048 let sp_flag = peer.mmp().map(|mmp| mmp.spin_bit.tx_bit()).unwrap_or(false);
3050 let mut flags = if sp_flag { FLAG_SP } else { 0 };
3051 if ce_flag {
3052 flags |= FLAG_CE;
3053 }
3054 if peer.current_k_bit() {
3055 flags |= FLAG_KEY_EPOCH;
3056 }
3057
3058 let session = peer
3059 .noise_session_mut()
3060 .ok_or_else(|| NodeError::SendFailed {
3061 node_addr: *node_addr,
3062 reason: "no noise session".into(),
3063 })?;
3064
3065 const INNER_TS_LEN: usize = 4;
3073 let counter = session.current_send_counter();
3074 let inner_len = INNER_TS_LEN + plaintext.len();
3075 let payload_len = inner_len as u16;
3076 let header = build_established_header(their_index, counter, flags, payload_len);
3077
3078 let transport_for_send = self
3097 .transports
3098 .get(&transport_id)
3099 .ok_or(NodeError::TransportNotFound(transport_id))?;
3100 match transport_for_send.connection_state(&remote_addr) {
3101 ConnectionState::Connected => {}
3102 other => {
3103 if matches!(other, ConnectionState::None) {
3104 let _ = transport_for_send.connect(&remote_addr).await;
3105 }
3106 return Err(NodeError::SendFailed {
3107 node_addr: *node_addr,
3108 reason: format!("transport connection not ready: {:?}", other),
3109 });
3110 }
3111 }
3112 let is_udp = matches!(transport_for_send, TransportHandle::Udp(_));
3113 if let Some(workers) = self.encrypt_workers.as_ref().cloned()
3114 && is_udp
3115 && let Some(cipher_clone) = session.send_cipher_clone()
3116 {
3117 {
3118 let reserved_counter =
3122 session
3123 .take_send_counter()
3124 .map_err(|e| NodeError::SendFailed {
3125 node_addr: *node_addr,
3126 reason: format!("counter reservation failed: {}", e),
3127 })?;
3128 debug_assert_eq!(reserved_counter, counter);
3129 let header =
3133 build_established_header(their_index, reserved_counter, flags, payload_len);
3134 let transport = transport_for_send;
3135 let send_target = {
3142 if let TransportHandle::Udp(udp) = transport {
3143 let socket_addr = {
3144 #[cfg(any(target_os = "linux", target_os = "macos"))]
3145 {
3146 match connected_socket.as_ref() {
3147 Some(socket) => Some(socket.peer_addr()),
3148 None => udp.resolve_for_off_task(&remote_addr).await.ok(),
3149 }
3150 }
3151 #[cfg(not(any(target_os = "linux", target_os = "macos")))]
3152 {
3153 udp.resolve_for_off_task(&remote_addr).await.ok()
3154 }
3155 };
3156 match (udp.async_socket(), socket_addr) {
3157 (Some(socket), Some(socket_addr)) => Some((socket, socket_addr)),
3158 _ => None,
3159 }
3160 } else {
3161 None
3162 }
3163 };
3164 if let Some((socket, socket_addr)) = send_target {
3165 let wire_capacity = ESTABLISHED_HEADER_SIZE + inner_len + 16;
3181 let mut wire_buf = Vec::with_capacity(wire_capacity);
3182 wire_buf.extend_from_slice(&header);
3183 wire_buf.extend_from_slice(×tamp_ms.to_le_bytes());
3184 wire_buf.extend_from_slice(plaintext);
3185 let predicted_bytes = wire_capacity;
3186 if let Some(peer) = self.peers.get_mut(node_addr) {
3193 peer.link_stats_mut().record_sent(predicted_bytes);
3194 if let Some(mmp) = peer.mmp_mut() {
3195 mmp.sender
3196 .record_sent(reserved_counter, timestamp_ms, predicted_bytes);
3197 }
3198 }
3199 let scheduling_weight = self.send_weight_for_peer(node_addr);
3200 workers.dispatch(self::encrypt_worker::FmpSendJob {
3201 cipher: cipher_clone,
3202 counter: reserved_counter,
3203 wire_buf,
3204 fsp_seal: None,
3205 socket,
3206 dest_addr: socket_addr,
3207 #[cfg(any(target_os = "linux", target_os = "macos"))]
3208 connected_socket,
3209 drop_on_backpressure: fmp_plaintext_is_bulk_session_datagram(plaintext),
3210 scheduling_weight,
3211 queued_at: crate::perf_profile::stamp(),
3212 });
3213 return Ok(());
3214 }
3215 }
3216 }
3217
3218 let inner_plaintext = prepend_inner_header(timestamp_ms, plaintext);
3223 let ciphertext = {
3225 let _t = crate::perf_profile::Timer::start(crate::perf_profile::Stage::FmpEncrypt);
3226 session
3227 .encrypt_with_aad(&inner_plaintext, &header)
3228 .map_err(|e| NodeError::SendFailed {
3229 node_addr: *node_addr,
3230 reason: format!("encryption failed: {}", e),
3231 })?
3232 };
3233
3234 let wire_packet = build_encrypted(&header, &ciphertext);
3235
3236 let send_result = {
3238 let _t = crate::perf_profile::Timer::start(crate::perf_profile::Stage::UdpSend);
3239 let transport = self
3240 .transports
3241 .get(&transport_id)
3242 .ok_or(NodeError::TransportNotFound(transport_id))?;
3243 transport.send(&remote_addr, &wire_packet).await
3244 };
3245 self.note_local_send_outcome(&send_result);
3246 let bytes_sent = send_result.map_err(|e| match e {
3247 TransportError::MtuExceeded { packet_size, mtu } => NodeError::MtuExceeded {
3248 node_addr: *node_addr,
3249 packet_size,
3250 mtu,
3251 },
3252 other => NodeError::SendFailed {
3253 node_addr: *node_addr,
3254 reason: format!("transport send: {}", other),
3255 },
3256 })?;
3257
3258 if let Some(peer) = self.peers.get_mut(node_addr) {
3260 peer.link_stats_mut().record_sent(bytes_sent);
3261 if let Some(mmp) = peer.mmp_mut() {
3263 mmp.sender.record_sent(counter, timestamp_ms, bytes_sent);
3264 }
3265 }
3266
3267 Ok(())
3268 }
3269}
3270
3271impl fmt::Debug for Node {
3272 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3273 f.debug_struct("Node")
3274 .field("node_addr", self.node_addr())
3275 .field("state", &self.state)
3276 .field("is_leaf_only", &self.is_leaf_only)
3277 .field("connections", &self.connection_count())
3278 .field("peers", &self.peer_count())
3279 .field("links", &self.link_count())
3280 .field("transports", &self.transport_count())
3281 .finish()
3282 }
3283}