1mod acl;
8mod bloom;
9mod decrypt_worker;
10mod discovery_rate_limit;
11mod encrypt_worker;
12mod handlers;
13mod lifecycle;
14mod rate_limit;
15mod retry;
16mod routing;
17mod routing_error_rate_limit;
18pub(crate) mod session;
19pub(crate) mod session_wire;
20pub(crate) mod stats;
21pub(crate) mod stats_history;
22#[cfg(test)]
23mod tests;
24mod tree;
25pub(crate) mod wire;
26
27use self::discovery_rate_limit::{DiscoveryBackoff, DiscoveryForwardRateLimiter};
28use self::rate_limit::HandshakeRateLimiter;
29use self::routing::{LearnedRouteTable, LearnedRouteTableSnapshot};
30use self::routing_error_rate_limit::RoutingErrorRateLimiter;
31#[cfg(unix)]
32use self::wire::ESTABLISHED_HEADER_SIZE;
33use self::wire::{
34 FLAG_CE, FLAG_KEY_EPOCH, FLAG_SP, build_encrypted, build_established_header,
35 prepend_inner_header,
36};
37use crate::bloom::BloomState;
38use crate::cache::CoordCache;
39use crate::config::{NostrDiscoveryPolicy, PeerConfig, RoutingMode};
40use crate::node::session::SessionEntry;
41use crate::node::session_wire::{FSP_PHASE_ESTABLISHED, FspCommonPrefix};
42use crate::peer::{ActivePeer, PeerConnection};
43#[cfg(any(target_os = "linux", target_os = "macos"))]
44use crate::transport::ethernet::EthernetTransport;
45use crate::transport::tcp::TcpTransport;
46use crate::transport::tor::TorTransport;
47use crate::transport::udp::UdpTransport;
48#[cfg(feature = "webrtc-transport")]
49use crate::transport::webrtc::WebRtcTransport;
50use crate::transport::{
51 ConnectionState, Link, LinkId, PacketRx, PacketTx, TransportAddr, TransportError,
52 TransportHandle, TransportId,
53};
54use crate::tree::TreeState;
55use crate::upper::hosts::HostMap;
56use crate::upper::icmp_rate_limit::IcmpRateLimiter;
57use crate::upper::tun::{TunError, TunOutboundRx, TunState, TunTx};
58use crate::utils::index::IndexAllocator;
59use crate::{
60 Config, ConfigError, FipsAddress, Identity, IdentityError, LinkMessageType, NodeAddr,
61 PeerIdentity, encode_npub,
62};
63use rand::Rng;
64use std::collections::{HashMap, HashSet, VecDeque};
65use std::fmt;
66use std::sync::Arc;
67use std::thread::JoinHandle;
68use thiserror::Error;
69use tracing::{debug, warn};
70
71const LOCAL_SEND_FAILURE_FAST_DEAD_WINDOW: std::time::Duration = std::time::Duration::from_secs(3);
72const SESSION_DIRECT_DEGRADED_HOLD_MS: u64 = 20_000;
73const SESSION_DIRECT_DEGRADED_MIN_SAMPLE: u64 = 16;
74const SESSION_DIRECT_DEGRADED_LOSS_THRESHOLD: f64 = 0.08;
75const SESSION_DIRECT_RECOVERY_LOSS_THRESHOLD: f64 = 0.02;
76const ROUTING_FALLBACK_MIN_COST_ADVANTAGE: f64 = 0.25;
77
78fn fmp_plaintext_is_bulk_session_datagram(plaintext: &[u8]) -> bool {
79 if plaintext
80 .first()
81 .is_none_or(|ty| *ty != LinkMessageType::SessionDatagram.to_byte())
82 {
83 return false;
84 }
85 let Some(fsp_payload) = plaintext.get(crate::protocol::SESSION_DATAGRAM_HEADER_SIZE..) else {
86 return false;
87 };
88 FspCommonPrefix::parse(fsp_payload).is_some_and(|prefix| {
89 prefix.phase == FSP_PHASE_ESTABLISHED && !prefix.is_unencrypted() && !prefix.has_coords()
90 })
91}
92
93fn endpoint_payload_is_tcp(payload: &[u8]) -> bool {
94 const IPPROTO_TCP: u8 = 6;
95 const IPV4_MIN_HEADER_LEN: usize = 20;
96
97 let Some(version_ihl) = payload.first().copied() else {
98 return false;
99 };
100
101 match version_ihl >> 4 {
102 4 => {
103 if payload.len() < IPV4_MIN_HEADER_LEN {
104 return false;
105 }
106 let header_len = usize::from(version_ihl & 0x0f) * 4;
107 header_len >= IPV4_MIN_HEADER_LEN
108 && payload.len() >= header_len
109 && payload[9] == IPPROTO_TCP
110 }
111 6 => ipv6_payload_next_header(payload).is_some_and(|proto| proto == IPPROTO_TCP),
112 _ => false,
113 }
114}
115
116fn ipv6_payload_next_header(payload: &[u8]) -> Option<u8> {
117 const IPV6_HEADER_LEN: usize = 40;
118 const IPV6_FRAGMENT_HEADER_LEN: usize = 8;
119
120 if payload.len() < IPV6_HEADER_LEN || payload[0] >> 4 != 6 {
121 return None;
122 }
123
124 let mut next_header = payload[6];
125 let mut offset = IPV6_HEADER_LEN;
126 let mut extension_count = 0usize;
127 while ipv6_extension_header_is_skippable(next_header) {
128 if next_header == 44 {
129 if payload.len() < offset + IPV6_FRAGMENT_HEADER_LEN {
130 return None;
131 }
132 next_header = payload[offset];
133 offset += IPV6_FRAGMENT_HEADER_LEN;
134 } else if next_header == 51 {
135 if payload.len() < offset + 2 {
136 return None;
137 }
138 let header_len = (usize::from(payload[offset + 1]) + 2) * 4;
139 if payload.len() < offset + header_len {
140 return None;
141 }
142 next_header = payload[offset];
143 offset += header_len;
144 } else {
145 if payload.len() < offset + 2 {
146 return None;
147 }
148 let header_len = (usize::from(payload[offset + 1]) + 1) * 8;
149 if payload.len() < offset + header_len {
150 return None;
151 }
152 next_header = payload[offset];
153 offset += header_len;
154 }
155 extension_count += 1;
156 if extension_count > 8 {
157 return None;
158 }
159 }
160
161 Some(next_header)
162}
163
164fn ipv6_extension_header_is_skippable(next_header: u8) -> bool {
165 matches!(next_header, 0 | 43 | 44 | 51 | 60 | 135)
166}
167
168pub(crate) const REKEY_JITTER_SECS: i64 = 15;
175
176#[derive(Debug, Error)]
178pub enum NodeError {
179 #[error("node not started")]
180 NotStarted,
181
182 #[error("node already started")]
183 AlreadyStarted,
184
185 #[error("node already stopped")]
186 AlreadyStopped,
187
188 #[error("transport not found: {0}")]
189 TransportNotFound(TransportId),
190
191 #[error("no transport available for type: {0}")]
192 NoTransportForType(String),
193
194 #[error("link not found: {0}")]
195 LinkNotFound(LinkId),
196
197 #[error("connection not found: {0}")]
198 ConnectionNotFound(LinkId),
199
200 #[error("peer not found: {0:?}")]
201 PeerNotFound(NodeAddr),
202
203 #[error("peer already exists: {0:?}")]
204 PeerAlreadyExists(NodeAddr),
205
206 #[error("connection already exists for link: {0}")]
207 ConnectionAlreadyExists(LinkId),
208
209 #[error("invalid peer npub '{npub}': {reason}")]
210 InvalidPeerNpub { npub: String, reason: String },
211
212 #[error("discovery error: {0}")]
213 Discovery(String),
214
215 #[error("access denied: {0}")]
216 AccessDenied(String),
217
218 #[error("max connections exceeded: {max}")]
219 MaxConnectionsExceeded { max: usize },
220
221 #[error("max peers exceeded: {max}")]
222 MaxPeersExceeded { max: usize },
223
224 #[error("max links exceeded: {max}")]
225 MaxLinksExceeded { max: usize },
226
227 #[error("handshake incomplete for link {0}")]
228 HandshakeIncomplete(LinkId),
229
230 #[error("no session available for link {0}")]
231 NoSession(LinkId),
232
233 #[error("promotion failed for link {link_id}: {reason}")]
234 PromotionFailed { link_id: LinkId, reason: String },
235
236 #[error("send failed to {node_addr}: {reason}")]
237 SendFailed { node_addr: NodeAddr, reason: String },
238
239 #[error("mtu exceeded forwarding to {node_addr}: packet {packet_size} > mtu {mtu}")]
240 MtuExceeded {
241 node_addr: NodeAddr,
242 packet_size: usize,
243 mtu: u16,
244 },
245
246 #[error("config error: {0}")]
247 Config(#[from] ConfigError),
248
249 #[error("identity error: {0}")]
250 Identity(#[from] IdentityError),
251
252 #[error("TUN error: {0}")]
253 Tun(#[from] TunError),
254
255 #[error("index allocation failed: {0}")]
256 IndexAllocationFailed(String),
257
258 #[error("handshake failed: {0}")]
259 HandshakeFailed(String),
260
261 #[error("transport error: {0}")]
262 TransportError(String),
263
264 #[error("local route unavailable: {0}")]
265 LocalRouteUnavailable(String),
266
267 #[error("bootstrap handoff failed: {0}")]
268 BootstrapHandoff(String),
269}
270
271impl NodeError {
272 pub(in crate::node) fn from_transport_error(error: TransportError) -> Self {
273 if error.is_local_route_unavailable() {
274 Self::LocalRouteUnavailable(error.to_string())
275 } else {
276 Self::TransportError(error.to_string())
277 }
278 }
279
280 pub(in crate::node) fn is_local_route_unavailable(&self) -> bool {
281 matches!(self, Self::LocalRouteUnavailable(_))
282 }
283}
284
285#[derive(Debug, Clone, PartialEq, Eq)]
287pub struct NodeDeliveredPacket {
288 pub source_node_addr: NodeAddr,
290 pub source_npub: Option<String>,
292 pub destination: FipsAddress,
294 pub packet: Vec<u8>,
296}
297
298#[derive(Debug, Clone)]
299struct IdentityCacheEntry {
300 node_addr: NodeAddr,
301 pubkey: secp256k1::PublicKey,
302 npub: String,
303 last_seen_ms: u64,
304}
305
306impl IdentityCacheEntry {
307 fn new(
308 node_addr: NodeAddr,
309 pubkey: secp256k1::PublicKey,
310 npub: String,
311 last_seen_ms: u64,
312 ) -> Self {
313 Self {
314 node_addr,
315 pubkey,
316 npub,
317 last_seen_ms,
318 }
319 }
320}
321
322#[derive(Debug)]
324pub struct ExternalPacketIo {
325 pub outbound_tx: crate::upper::tun::TunOutboundTx,
327 pub inbound_rx: tokio::sync::mpsc::Receiver<NodeDeliveredPacket>,
329}
330
331#[derive(Debug)]
333pub(crate) struct EndpointDataIo {
334 pub(crate) command_tx: tokio::sync::mpsc::Sender<NodeEndpointCommand>,
343 pub(crate) event_rx: tokio::sync::mpsc::UnboundedReceiver<NodeEndpointEvent>,
353 pub(crate) event_tx: tokio::sync::mpsc::UnboundedSender<NodeEndpointEvent>,
359}
360
361fn endpoint_data_command_capacity(requested: usize) -> usize {
362 if let Ok(raw) = std::env::var("FIPS_ENDPOINT_DATA_QUEUE_CAP")
363 && let Ok(value) = raw.trim().parse::<usize>()
364 && value > 0
365 {
366 return value;
367 }
368
369 requested.max(1).max(32_768)
370}
371
372#[derive(Debug)]
374pub(crate) enum NodeEndpointCommand {
375 Send {
379 remote: PeerIdentity,
380 payload: Vec<u8>,
381 queued_at: Option<std::time::Instant>,
382 response_tx: tokio::sync::oneshot::Sender<Result<(), NodeError>>,
383 },
384 SendOneway {
390 remote: PeerIdentity,
391 payload: Vec<u8>,
392 queued_at: Option<std::time::Instant>,
393 },
394 PeerSnapshot {
395 response_tx: tokio::sync::oneshot::Sender<Vec<NodeEndpointPeer>>,
396 },
397 RelaySnapshot {
398 response_tx: tokio::sync::oneshot::Sender<Vec<NodeEndpointRelayStatus>>,
399 },
400 UpdateRelays {
401 advert_relays: Vec<String>,
402 dm_relays: Vec<String>,
403 response_tx: tokio::sync::oneshot::Sender<Result<(), NodeError>>,
404 },
405 UpdatePeers {
411 peers: Vec<crate::config::PeerConfig>,
412 response_tx: tokio::sync::oneshot::Sender<Result<UpdatePeersOutcome, NodeError>>,
413 },
414}
415
416#[derive(Debug, Clone, Default, PartialEq, Eq)]
418pub(crate) struct UpdatePeersOutcome {
419 pub(crate) added: usize,
420 pub(crate) removed: usize,
421 pub(crate) updated: usize,
422 pub(crate) unchanged: usize,
423}
424
425#[derive(Debug)]
427pub(crate) enum NodeEndpointEvent {
428 Data {
429 source_node_addr: NodeAddr,
430 source_npub: Option<String>,
431 payload: Vec<u8>,
432 queued_at: Option<std::time::Instant>,
433 },
434}
435
436#[derive(Debug, Clone, PartialEq, Eq)]
438pub(crate) struct NodeEndpointPeer {
439 pub(crate) npub: String,
440 pub(crate) connected: bool,
441 pub(crate) transport_addr: Option<String>,
442 pub(crate) transport_type: Option<String>,
443 pub(crate) link_id: u64,
444 pub(crate) srtt_ms: Option<u64>,
445 pub(crate) packets_sent: u64,
446 pub(crate) packets_recv: u64,
447 pub(crate) bytes_sent: u64,
448 pub(crate) bytes_recv: u64,
449 pub(crate) direct_probe_pending: bool,
450 pub(crate) direct_probe_after_ms: Option<u64>,
451}
452
453#[derive(Debug, Clone, PartialEq, Eq)]
455pub(crate) struct NodeEndpointRelayStatus {
456 pub(crate) url: String,
457 pub(crate) status: String,
458}
459
460#[derive(Clone, Copy, Debug, PartialEq, Eq)]
462pub enum NodeState {
463 Created,
465 Starting,
467 Running,
469 Stopping,
471 Stopped,
473}
474
475impl NodeState {
476 pub fn is_operational(&self) -> bool {
478 matches!(self, NodeState::Running)
479 }
480
481 pub fn can_start(&self) -> bool {
483 matches!(self, NodeState::Created | NodeState::Stopped)
484 }
485
486 pub fn can_stop(&self) -> bool {
488 matches!(self, NodeState::Running)
489 }
490}
491
492impl fmt::Display for NodeState {
493 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
494 let s = match self {
495 NodeState::Created => "created",
496 NodeState::Starting => "starting",
497 NodeState::Running => "running",
498 NodeState::Stopping => "stopping",
499 NodeState::Stopped => "stopped",
500 };
501 write!(f, "{}", s)
502 }
503}
504
505#[derive(Clone, Debug)]
512pub(crate) struct RecentRequest {
513 pub(crate) from_peer: NodeAddr,
515 pub(crate) timestamp_ms: u64,
517 pub(crate) response_forwarded: bool,
521}
522
523impl RecentRequest {
524 pub(crate) fn new(from_peer: NodeAddr, timestamp_ms: u64) -> Self {
525 Self {
526 from_peer,
527 timestamp_ms,
528 response_forwarded: false,
529 }
530 }
531
532 pub(crate) fn is_expired(&self, current_time_ms: u64, expiry_ms: u64) -> bool {
534 current_time_ms.saturating_sub(self.timestamp_ms) > expiry_ms
535 }
536}
537
538type AddrKey = (TransportId, TransportAddr);
540
541#[derive(Debug, Default)]
546struct TransportDropState {
547 prev_drops: u64,
549 dropping: bool,
551}
552
553struct PendingConnect {
559 link_id: LinkId,
561 transport_id: TransportId,
563 remote_addr: TransportAddr,
565 peer_identity: PeerIdentity,
567}
568
569pub struct Node {
583 identity: Identity,
586
587 startup_epoch: [u8; 8],
590
591 started_at: std::time::Instant,
593
594 config: Config,
597
598 state: NodeState,
601
602 is_leaf_only: bool,
604
605 tree_state: TreeState,
608
609 bloom_state: BloomState,
612
613 coord_cache: CoordCache,
616 learned_routes: LearnedRouteTable,
618 session_direct_degraded_until_ms: HashMap<NodeAddr, u64>,
621 recent_requests: HashMap<u64, RecentRequest>,
624 path_mtu_lookup: Arc<std::sync::RwLock<HashMap<crate::FipsAddress, u16>>>,
630
631 transports: HashMap<TransportId, TransportHandle>,
634 transport_drops: HashMap<TransportId, TransportDropState>,
636 links: HashMap<LinkId, Link>,
638 addr_to_link: HashMap<AddrKey, LinkId>,
640
641 packet_tx: Option<PacketTx>,
644 packet_rx: Option<PacketRx>,
646
647 connections: HashMap<LinkId, PeerConnection>,
651
652 peers: HashMap<NodeAddr, ActivePeer>,
656
657 sessions: HashMap<NodeAddr, SessionEntry>,
661
662 identity_cache: HashMap<[u8; 15], IdentityCacheEntry>,
666
667 pending_tun_packets: HashMap<NodeAddr, VecDeque<Vec<u8>>>,
671 pending_endpoint_data: HashMap<NodeAddr, VecDeque<Vec<u8>>>,
673 pending_lookups: HashMap<NodeAddr, handlers::discovery::PendingLookup>,
677
678 max_connections: usize,
681 max_peers: usize,
683 max_links: usize,
685
686 next_link_id: u64,
689 next_transport_id: u32,
691
692 stats: stats::NodeStats,
695
696 stats_history: stats_history::StatsHistory,
698
699 tun_state: TunState,
702 tun_name: Option<String>,
704 tun_tx: Option<TunTx>,
706 tun_outbound_rx: Option<TunOutboundRx>,
708 external_packet_tx: Option<tokio::sync::mpsc::Sender<NodeDeliveredPacket>>,
710 endpoint_command_rx: Option<tokio::sync::mpsc::Receiver<NodeEndpointCommand>>,
712 endpoint_event_tx: Option<tokio::sync::mpsc::UnboundedSender<NodeEndpointEvent>>,
714 encrypt_workers: Option<encrypt_worker::EncryptWorkerPool>,
720 decrypt_workers: Option<decrypt_worker::DecryptWorkerPool>,
723 decrypt_registered_sessions: std::collections::HashSet<(TransportId, u32)>,
732 decrypt_fallback_rx:
736 Option<tokio::sync::mpsc::UnboundedReceiver<decrypt_worker::DecryptWorkerEvent>>,
737 decrypt_fallback_tx: tokio::sync::mpsc::UnboundedSender<decrypt_worker::DecryptWorkerEvent>,
738 tun_reader_handle: Option<JoinHandle<()>>,
740 tun_writer_handle: Option<JoinHandle<()>>,
742 #[cfg(target_os = "macos")]
745 tun_shutdown_fd: Option<std::os::unix::io::RawFd>,
746
747 dns_identity_rx: Option<crate::upper::dns::DnsIdentityRx>,
750 dns_task: Option<tokio::task::JoinHandle<()>>,
752
753 index_allocator: IndexAllocator,
756 peers_by_index: HashMap<(TransportId, u32), NodeAddr>,
759 pending_outbound: HashMap<(TransportId, u32), LinkId>,
762
763 msg1_rate_limiter: HandshakeRateLimiter,
766 icmp_rate_limiter: IcmpRateLimiter,
768 routing_error_rate_limiter: RoutingErrorRateLimiter,
770 coords_response_rate_limiter: RoutingErrorRateLimiter,
772 discovery_backoff: DiscoveryBackoff,
774 discovery_forward_limiter: DiscoveryForwardRateLimiter,
776
777 pending_connects: Vec<PendingConnect>,
783
784 retry_pending: HashMap<NodeAddr, retry::RetryState>,
790
791 nostr_discovery: Option<Arc<crate::discovery::nostr::NostrDiscovery>>,
793 lan_discovery: Option<Arc<crate::discovery::lan::LanDiscovery>>,
798 local_instance_registry: Option<crate::discovery::local::LocalInstanceRegistry>,
802 local_instance_started_at_ms: Option<u64>,
803 last_local_instance_publish_ms: Option<u64>,
804 last_local_instance_scan_ms: Option<u64>,
805 nostr_discovery_started_at_ms: Option<u64>,
810 startup_open_discovery_sweep_done: bool,
814 bootstrap_transports: HashSet<TransportId>,
816 bootstrap_transport_npubs: HashMap<TransportId, String>,
823 discovery_fallback_transit_blocked_peers: HashSet<NodeAddr>,
826
827 last_parent_reeval: Option<crate::time::Instant>,
830
831 last_congestion_log: Option<std::time::Instant>,
834
835 estimated_mesh_size: Option<u64>,
838 last_mesh_size_log: Option<std::time::Instant>,
840
841 last_self_warn: Option<std::time::Instant>,
847
848 local_send_failure_at_by_peer: HashMap<NodeAddr, std::time::Instant>,
855 last_rx_loop_maintenance_timeout_at: Option<std::time::Instant>,
860
861 peer_aliases: HashMap<NodeAddr, String>,
865 configured_peer_send_weights: HashMap<NodeAddr, u8>,
868
869 peer_acl: acl::PeerAclReloader,
871
872 host_map: Arc<HostMap>,
876}
877
878impl Node {
879 pub fn new(config: Config) -> Result<Self, NodeError> {
881 config.validate()?;
882 let identity = config.create_identity()?;
883 let node_addr = *identity.node_addr();
884 let is_leaf_only = config.is_leaf_only();
885
886 let (decrypt_fallback_tx, decrypt_fallback_rx) = tokio::sync::mpsc::unbounded_channel();
887 let decrypt_fallback_rx = Some(decrypt_fallback_rx);
888
889 let mut startup_epoch = [0u8; 8];
890 rand::rng().fill_bytes(&mut startup_epoch);
891
892 let mut bloom_state = if is_leaf_only {
893 BloomState::leaf_only(node_addr)
894 } else {
895 BloomState::new(node_addr)
896 };
897 bloom_state.set_update_debounce_ms(config.node.bloom.update_debounce_ms);
898
899 let tun_state = if config.tun.enabled {
900 TunState::Configured
901 } else {
902 TunState::Disabled
903 };
904
905 let mut tree_state = TreeState::new(node_addr);
907 tree_state.set_parent_hysteresis(config.node.tree.parent_hysteresis);
908 tree_state.set_hold_down(config.node.tree.hold_down_secs);
909 tree_state.set_flap_dampening(
910 config.node.tree.flap_threshold,
911 config.node.tree.flap_window_secs,
912 config.node.tree.flap_dampening_secs,
913 );
914 tree_state
915 .sign_declaration(&identity)
916 .expect("signing own declaration should never fail");
917
918 let coord_cache = CoordCache::new(
919 config.node.cache.coord_size,
920 config.node.cache.coord_ttl_secs * 1000,
921 );
922 let rl = &config.node.rate_limit;
923 let msg1_rate_limiter = HandshakeRateLimiter::with_params(
924 rate_limit::TokenBucket::with_params(rl.handshake_burst, rl.handshake_rate),
925 config.node.limits.max_pending_inbound,
926 );
927
928 let max_connections = config.node.limits.max_connections;
929 let max_peers = config.node.limits.max_peers;
930 let max_links = config.node.limits.max_links;
931 let coords_response_interval_ms = config.node.session.coords_response_interval_ms;
932 let backoff_base_secs = config.node.discovery.backoff_base_secs;
933 let backoff_max_secs = config.node.discovery.backoff_max_secs;
934 let forward_min_interval_secs = config.node.discovery.forward_min_interval_secs;
935
936 let (host_map, peer_acl) = Self::host_map_and_peer_acl(&config);
937 let configured_peer_send_weights = Self::configured_peer_send_weights(&config);
938
939 Ok(Self {
940 identity,
941 startup_epoch,
942 started_at: std::time::Instant::now(),
943 config,
944 state: NodeState::Created,
945 is_leaf_only,
946 tree_state,
947 bloom_state,
948 coord_cache,
949 learned_routes: LearnedRouteTable::default(),
950 session_direct_degraded_until_ms: HashMap::new(),
951 recent_requests: HashMap::new(),
952 transports: HashMap::new(),
953 transport_drops: HashMap::new(),
954 links: HashMap::new(),
955 addr_to_link: HashMap::new(),
956 packet_tx: None,
957 packet_rx: None,
958 connections: HashMap::new(),
959 peers: HashMap::new(),
960 sessions: HashMap::new(),
961 identity_cache: HashMap::new(),
962 pending_tun_packets: HashMap::new(),
963 pending_endpoint_data: HashMap::new(),
964 pending_lookups: HashMap::new(),
965 max_connections,
966 max_peers,
967 max_links,
968 next_link_id: 1,
969 next_transport_id: 1,
970 stats: stats::NodeStats::new(),
971 stats_history: stats_history::StatsHistory::new(),
972 tun_state,
973 tun_name: None,
974 tun_tx: None,
975 tun_outbound_rx: None,
976 external_packet_tx: None,
977 endpoint_command_rx: None,
978 endpoint_event_tx: None,
979 encrypt_workers: None,
980 decrypt_workers: None,
981 decrypt_registered_sessions: std::collections::HashSet::new(),
982 decrypt_fallback_tx,
983 decrypt_fallback_rx,
984 tun_reader_handle: None,
985 tun_writer_handle: None,
986 #[cfg(target_os = "macos")]
987 tun_shutdown_fd: None,
988 dns_identity_rx: None,
989 dns_task: None,
990 index_allocator: IndexAllocator::new(),
991 peers_by_index: HashMap::new(),
992 pending_outbound: HashMap::new(),
993 msg1_rate_limiter,
994 icmp_rate_limiter: IcmpRateLimiter::new(),
995 routing_error_rate_limiter: RoutingErrorRateLimiter::new(),
996 coords_response_rate_limiter: RoutingErrorRateLimiter::with_interval(
997 std::time::Duration::from_millis(coords_response_interval_ms),
998 ),
999 discovery_backoff: DiscoveryBackoff::with_params(backoff_base_secs, backoff_max_secs),
1000 discovery_forward_limiter: DiscoveryForwardRateLimiter::with_interval(
1001 std::time::Duration::from_secs(forward_min_interval_secs),
1002 ),
1003 pending_connects: Vec::new(),
1004 retry_pending: HashMap::new(),
1005 nostr_discovery: None,
1006 nostr_discovery_started_at_ms: None,
1007 lan_discovery: None,
1008 local_instance_registry: None,
1009 local_instance_started_at_ms: None,
1010 last_local_instance_publish_ms: None,
1011 last_local_instance_scan_ms: None,
1012 startup_open_discovery_sweep_done: false,
1013 bootstrap_transports: HashSet::new(),
1014 bootstrap_transport_npubs: HashMap::new(),
1015 discovery_fallback_transit_blocked_peers: HashSet::new(),
1016 last_parent_reeval: None,
1017 last_congestion_log: None,
1018 estimated_mesh_size: None,
1019 last_mesh_size_log: None,
1020 last_self_warn: None,
1021 local_send_failure_at_by_peer: HashMap::new(),
1022 last_rx_loop_maintenance_timeout_at: None,
1023 peer_aliases: HashMap::new(),
1024 configured_peer_send_weights,
1025 peer_acl,
1026 host_map,
1027 path_mtu_lookup: Arc::new(std::sync::RwLock::new(HashMap::new())),
1028 })
1029 }
1030
1031 pub fn with_identity(identity: Identity, config: Config) -> Result<Self, NodeError> {
1036 config.validate()?;
1037 let node_addr = *identity.node_addr();
1038
1039 let (decrypt_fallback_tx, decrypt_fallback_rx) = tokio::sync::mpsc::unbounded_channel();
1040 let decrypt_fallback_rx = Some(decrypt_fallback_rx);
1041
1042 let mut startup_epoch = [0u8; 8];
1043 rand::rng().fill_bytes(&mut startup_epoch);
1044
1045 let tun_state = if config.tun.enabled {
1046 TunState::Configured
1047 } else {
1048 TunState::Disabled
1049 };
1050
1051 let mut tree_state = TreeState::new(node_addr);
1053 tree_state.set_parent_hysteresis(config.node.tree.parent_hysteresis);
1054 tree_state.set_hold_down(config.node.tree.hold_down_secs);
1055 tree_state.set_flap_dampening(
1056 config.node.tree.flap_threshold,
1057 config.node.tree.flap_window_secs,
1058 config.node.tree.flap_dampening_secs,
1059 );
1060 tree_state
1061 .sign_declaration(&identity)
1062 .expect("signing own declaration should never fail");
1063
1064 let mut bloom_state = BloomState::new(node_addr);
1065 bloom_state.set_update_debounce_ms(config.node.bloom.update_debounce_ms);
1066
1067 let coord_cache = CoordCache::new(
1068 config.node.cache.coord_size,
1069 config.node.cache.coord_ttl_secs * 1000,
1070 );
1071 let rl = &config.node.rate_limit;
1072 let msg1_rate_limiter = HandshakeRateLimiter::with_params(
1073 rate_limit::TokenBucket::with_params(rl.handshake_burst, rl.handshake_rate),
1074 config.node.limits.max_pending_inbound,
1075 );
1076
1077 let max_connections = config.node.limits.max_connections;
1078 let max_peers = config.node.limits.max_peers;
1079 let max_links = config.node.limits.max_links;
1080 let coords_response_interval_ms = config.node.session.coords_response_interval_ms;
1081
1082 let (host_map, peer_acl) = Self::host_map_and_peer_acl(&config);
1083 let configured_peer_send_weights = Self::configured_peer_send_weights(&config);
1084
1085 Ok(Self {
1086 identity,
1087 startup_epoch,
1088 started_at: std::time::Instant::now(),
1089 config,
1090 state: NodeState::Created,
1091 is_leaf_only: false,
1092 tree_state,
1093 bloom_state,
1094 coord_cache,
1095 learned_routes: LearnedRouteTable::default(),
1096 session_direct_degraded_until_ms: HashMap::new(),
1097 recent_requests: HashMap::new(),
1098 transports: HashMap::new(),
1099 transport_drops: HashMap::new(),
1100 links: HashMap::new(),
1101 addr_to_link: HashMap::new(),
1102 packet_tx: None,
1103 packet_rx: None,
1104 connections: HashMap::new(),
1105 peers: HashMap::new(),
1106 sessions: HashMap::new(),
1107 identity_cache: HashMap::new(),
1108 pending_tun_packets: HashMap::new(),
1109 pending_endpoint_data: HashMap::new(),
1110 pending_lookups: HashMap::new(),
1111 max_connections,
1112 max_peers,
1113 max_links,
1114 next_link_id: 1,
1115 next_transport_id: 1,
1116 stats: stats::NodeStats::new(),
1117 stats_history: stats_history::StatsHistory::new(),
1118 tun_state,
1119 tun_name: None,
1120 tun_tx: None,
1121 tun_outbound_rx: None,
1122 external_packet_tx: None,
1123 endpoint_command_rx: None,
1124 endpoint_event_tx: None,
1125 encrypt_workers: None,
1126 decrypt_workers: None,
1127 decrypt_registered_sessions: std::collections::HashSet::new(),
1128 decrypt_fallback_tx,
1129 decrypt_fallback_rx,
1130 tun_reader_handle: None,
1131 tun_writer_handle: None,
1132 #[cfg(target_os = "macos")]
1133 tun_shutdown_fd: None,
1134 dns_identity_rx: None,
1135 dns_task: None,
1136 index_allocator: IndexAllocator::new(),
1137 peers_by_index: HashMap::new(),
1138 pending_outbound: HashMap::new(),
1139 msg1_rate_limiter,
1140 icmp_rate_limiter: IcmpRateLimiter::new(),
1141 routing_error_rate_limiter: RoutingErrorRateLimiter::new(),
1142 coords_response_rate_limiter: RoutingErrorRateLimiter::with_interval(
1143 std::time::Duration::from_millis(coords_response_interval_ms),
1144 ),
1145 discovery_backoff: DiscoveryBackoff::new(),
1146 discovery_forward_limiter: DiscoveryForwardRateLimiter::new(),
1147 pending_connects: Vec::new(),
1148 retry_pending: HashMap::new(),
1149 nostr_discovery: None,
1150 nostr_discovery_started_at_ms: None,
1151 lan_discovery: None,
1152 local_instance_registry: None,
1153 local_instance_started_at_ms: None,
1154 last_local_instance_publish_ms: None,
1155 last_local_instance_scan_ms: None,
1156 startup_open_discovery_sweep_done: false,
1157 bootstrap_transports: HashSet::new(),
1158 bootstrap_transport_npubs: HashMap::new(),
1159 discovery_fallback_transit_blocked_peers: HashSet::new(),
1160 last_parent_reeval: None,
1161 last_congestion_log: None,
1162 estimated_mesh_size: None,
1163 last_mesh_size_log: None,
1164 last_self_warn: None,
1165 local_send_failure_at_by_peer: HashMap::new(),
1166 last_rx_loop_maintenance_timeout_at: None,
1167 peer_aliases: HashMap::new(),
1168 configured_peer_send_weights,
1169 peer_acl,
1170 host_map,
1171 path_mtu_lookup: Arc::new(std::sync::RwLock::new(HashMap::new())),
1172 })
1173 }
1174
1175 pub fn leaf_only(config: Config) -> Result<Self, NodeError> {
1177 let mut node = Self::new(config)?;
1178 node.is_leaf_only = true;
1179 node.bloom_state = BloomState::leaf_only(*node.identity.node_addr());
1180 Ok(node)
1181 }
1182
1183 fn host_map_and_peer_acl(config: &Config) -> (Arc<HostMap>, acl::PeerAclReloader) {
1184 let base_host_map = HostMap::from_peer_configs(config.peers());
1185 if !config.node.system_files_enabled {
1186 return (
1187 Arc::new(base_host_map.clone()),
1188 acl::PeerAclReloader::memory_only(base_host_map),
1189 );
1190 }
1191
1192 let mut host_map = base_host_map.clone();
1193 let hosts_path = std::path::PathBuf::from(crate::upper::hosts::DEFAULT_HOSTS_PATH);
1194 let hosts_file = HostMap::load_hosts_file(std::path::Path::new(
1195 crate::upper::hosts::DEFAULT_HOSTS_PATH,
1196 ));
1197 host_map.merge(hosts_file);
1198 let peer_acl = acl::PeerAclReloader::with_alias_sources(
1199 std::path::PathBuf::from(acl::DEFAULT_PEERS_ALLOW_PATH),
1200 std::path::PathBuf::from(acl::DEFAULT_PEERS_DENY_PATH),
1201 base_host_map,
1202 hosts_path,
1203 );
1204 (Arc::new(host_map), peer_acl)
1205 }
1206
1207 fn configured_peer_send_weights(config: &Config) -> HashMap<NodeAddr, u8> {
1208 config
1209 .peers()
1210 .iter()
1211 .filter_map(|peer| {
1212 PeerIdentity::from_npub(&peer.npub).ok().map(|identity| {
1213 (
1214 *identity.node_addr(),
1215 encrypt_worker::EXPLICIT_PEER_SEND_WEIGHT,
1216 )
1217 })
1218 })
1219 .collect()
1220 }
1221
1222 #[cfg(unix)]
1223 fn send_weight_for_peer(&self, peer_addr: &NodeAddr) -> u8 {
1224 self.configured_peer_send_weights
1225 .get(peer_addr)
1226 .copied()
1227 .unwrap_or(encrypt_worker::DEFAULT_SEND_WEIGHT)
1228 }
1229
1230 async fn create_transports(&mut self, packet_tx: &PacketTx) -> Vec<TransportHandle> {
1234 let mut transports = Vec::new();
1235
1236 let udp_instances: Vec<_> = self
1238 .config
1239 .transports
1240 .udp
1241 .iter()
1242 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1243 .collect();
1244
1245 for (name, udp_config) in udp_instances {
1247 let transport_id = self.allocate_transport_id();
1248 let udp = UdpTransport::new(transport_id, name, udp_config, packet_tx.clone());
1249 transports.push(TransportHandle::Udp(udp));
1250 }
1251
1252 #[cfg(feature = "sim-transport")]
1253 {
1254 let sim_instances: Vec<_> = self
1255 .config
1256 .transports
1257 .sim
1258 .iter()
1259 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1260 .collect();
1261
1262 for (name, sim_config) in sim_instances {
1263 let transport_id = self.allocate_transport_id();
1264 let sim = crate::transport::sim::SimTransport::new(
1265 transport_id,
1266 name,
1267 sim_config,
1268 packet_tx.clone(),
1269 );
1270 transports.push(TransportHandle::Sim(sim));
1271 }
1272 }
1273
1274 #[cfg(any(target_os = "linux", target_os = "macos"))]
1276 {
1277 let eth_instances: Vec<_> = self
1278 .config
1279 .transports
1280 .ethernet
1281 .iter()
1282 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1283 .collect();
1284 let xonly = self.identity.pubkey();
1285 for (name, eth_config) in eth_instances {
1286 let mut eth_config = eth_config;
1287 if eth_config.discovery_scope.is_none() {
1288 eth_config.discovery_scope = self.lan_discovery_scope();
1289 }
1290 let transport_id = self.allocate_transport_id();
1291 let mut eth =
1292 EthernetTransport::new(transport_id, name, eth_config, packet_tx.clone());
1293 eth.set_local_pubkey(xonly);
1294 transports.push(TransportHandle::Ethernet(eth));
1295 }
1296 }
1297
1298 let tcp_instances: Vec<_> = self
1300 .config
1301 .transports
1302 .tcp
1303 .iter()
1304 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1305 .collect();
1306
1307 for (name, tcp_config) in tcp_instances {
1308 let transport_id = self.allocate_transport_id();
1309 let tcp = TcpTransport::new(transport_id, name, tcp_config, packet_tx.clone());
1310 transports.push(TransportHandle::Tcp(tcp));
1311 }
1312
1313 let tor_instances: Vec<_> = self
1315 .config
1316 .transports
1317 .tor
1318 .iter()
1319 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1320 .collect();
1321
1322 for (name, tor_config) in tor_instances {
1323 let transport_id = self.allocate_transport_id();
1324 let tor = TorTransport::new(transport_id, name, tor_config, packet_tx.clone());
1325 transports.push(TransportHandle::Tor(tor));
1326 }
1327
1328 let webrtc_instances: Vec<_> = self
1329 .config
1330 .transports
1331 .webrtc
1332 .iter()
1333 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1334 .collect();
1335
1336 #[cfg(feature = "webrtc-transport")]
1337 {
1338 for (name, webrtc_config) in webrtc_instances {
1339 let transport_id = self.allocate_transport_id();
1340 match WebRtcTransport::new(
1341 transport_id,
1342 name,
1343 webrtc_config,
1344 packet_tx.clone(),
1345 &self.identity,
1346 &self.config.node.discovery.nostr,
1347 ) {
1348 Ok(webrtc) => transports.push(TransportHandle::WebRtc(Box::new(webrtc))),
1349 Err(err) => {
1350 warn!(
1351 transport_id = %transport_id,
1352 error = %err,
1353 "failed to initialize WebRTC transport"
1354 );
1355 }
1356 }
1357 }
1358 }
1359 #[cfg(not(feature = "webrtc-transport"))]
1360 if !webrtc_instances.is_empty() {
1361 warn!("WebRTC transport configured but this build lacks WebRTC transport support");
1362 }
1363
1364 #[cfg(bluer_available)]
1366 {
1367 let ble_instances: Vec<_> = self
1368 .config
1369 .transports
1370 .ble
1371 .iter()
1372 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1373 .collect();
1374
1375 #[cfg(all(bluer_available, not(test)))]
1376 for (name, ble_config) in ble_instances {
1377 let transport_id = self.allocate_transport_id();
1378 let adapter = ble_config.adapter().to_string();
1379 let mtu = ble_config.mtu();
1380 match crate::transport::ble::io::BluerIo::new(&adapter, mtu).await {
1381 Ok(io) => {
1382 let mut ble = crate::transport::ble::BleTransport::new(
1383 transport_id,
1384 name,
1385 ble_config,
1386 io,
1387 packet_tx.clone(),
1388 );
1389 ble.set_local_pubkey(self.identity.pubkey().serialize());
1390 transports.push(TransportHandle::Ble(ble));
1391 }
1392 Err(e) => {
1393 tracing::warn!(adapter = %adapter, error = %e, "failed to initialize BLE adapter");
1394 }
1395 }
1396 }
1397
1398 #[cfg(any(not(bluer_available), test))]
1399 if !ble_instances.is_empty() {
1400 #[cfg(not(test))]
1401 tracing::warn!("BLE transport configured but this build lacks BlueZ support");
1402 }
1403 }
1404
1405 transports
1406 }
1407
1408 fn find_transport_for_type(&self, transport_type: &str) -> Option<TransportId> {
1418 self.transports
1419 .iter()
1420 .filter(|(id, handle)| {
1421 handle.transport_type().name == transport_type
1422 && handle.is_operational()
1423 && !self.bootstrap_transports.contains(id)
1424 })
1425 .min_by_key(|(id, _)| id.as_u32())
1426 .map(|(id, _)| *id)
1427 }
1428
1429 #[allow(unused_variables)]
1435 fn resolve_ethernet_addr(
1436 &self,
1437 addr_str: &str,
1438 ) -> Result<(TransportId, TransportAddr), NodeError> {
1439 #[cfg(any(target_os = "linux", target_os = "macos"))]
1440 {
1441 let (iface, mac_str) = addr_str.split_once('/').ok_or_else(|| {
1442 NodeError::NoTransportForType(format!(
1443 "invalid Ethernet address format '{}': expected 'interface/mac'",
1444 addr_str
1445 ))
1446 })?;
1447
1448 let transport_id = self
1450 .transports
1451 .iter()
1452 .find(|(_, handle)| {
1453 handle.transport_type().name == "ethernet"
1454 && handle.is_operational()
1455 && handle.interface_name() == Some(iface)
1456 })
1457 .map(|(id, _)| *id)
1458 .ok_or_else(|| {
1459 NodeError::NoTransportForType(format!(
1460 "no operational Ethernet transport for interface '{}'",
1461 iface
1462 ))
1463 })?;
1464
1465 let mac = crate::transport::ethernet::parse_mac_string(mac_str).map_err(|e| {
1466 NodeError::NoTransportForType(format!("invalid MAC in '{}': {}", addr_str, e))
1467 })?;
1468
1469 Ok((transport_id, TransportAddr::from_bytes(&mac)))
1470 }
1471 #[cfg(not(any(target_os = "linux", target_os = "macos")))]
1472 {
1473 Err(NodeError::NoTransportForType(
1474 "Ethernet transport is not supported on this platform".to_string(),
1475 ))
1476 }
1477 }
1478
1479 #[cfg(bluer_available)]
1483 fn resolve_ble_addr(&self, addr_str: &str) -> Result<(TransportId, TransportAddr), NodeError> {
1484 let ta = TransportAddr::from_string(addr_str);
1485 let adapter = crate::transport::ble::addr::adapter_from_addr(&ta).ok_or_else(|| {
1486 NodeError::NoTransportForType(format!(
1487 "invalid BLE address format '{}': expected 'adapter/mac'",
1488 addr_str
1489 ))
1490 })?;
1491
1492 let transport_id = self
1494 .transports
1495 .iter()
1496 .find(|(_, handle)| handle.transport_type().name == "ble" && handle.is_operational())
1497 .map(|(id, _)| *id)
1498 .ok_or_else(|| {
1499 NodeError::NoTransportForType(format!(
1500 "no operational BLE transport for adapter '{}'",
1501 adapter
1502 ))
1503 })?;
1504
1505 crate::transport::ble::addr::BleAddr::parse(addr_str).map_err(|e| {
1507 NodeError::NoTransportForType(format!("invalid BLE address '{}': {}", addr_str, e))
1508 })?;
1509
1510 Ok((transport_id, TransportAddr::from_string(addr_str)))
1511 }
1512
1513 pub fn identity(&self) -> &Identity {
1517 &self.identity
1518 }
1519
1520 pub fn node_addr(&self) -> &NodeAddr {
1522 self.identity.node_addr()
1523 }
1524
1525 pub fn npub(&self) -> String {
1527 self.identity.npub()
1528 }
1529
1530 pub(crate) fn peer_display_name(&self, addr: &NodeAddr) -> String {
1539 if let Some(hostname) = self.host_map.lookup_hostname(addr) {
1540 return hostname.to_string();
1541 }
1542 if let Some(name) = self.peer_aliases.get(addr) {
1543 return name.clone();
1544 }
1545 if let Some(peer) = self.peers.get(addr) {
1546 return peer.identity().short_npub();
1547 }
1548 if let Some(entry) = self.sessions.get(addr) {
1549 let (xonly, _) = entry.remote_pubkey().x_only_public_key();
1550 return PeerIdentity::from_pubkey(xonly).short_npub();
1551 }
1552 addr.short_hex()
1553 }
1554
1555 pub(in crate::node) fn deregister_session_index(&mut self, cache_key: (TransportId, u32)) {
1567 let owning_peer = self.peers_by_index.get(&cache_key).copied();
1571 self.peers_by_index.remove(&cache_key);
1572 if self.decrypt_registered_sessions.remove(&cache_key)
1573 && let Some(workers) = self.decrypt_workers.as_ref()
1574 {
1575 workers.unregister_session(cache_key);
1576 }
1577 if let Some(peer_addr) = owning_peer {
1588 let peer_has_other_index = self
1589 .peers_by_index
1590 .values()
1591 .any(|other| *other == peer_addr);
1592 if !peer_has_other_index {
1593 self.clear_connected_udp_for_peer(&peer_addr);
1594 }
1595 }
1596 }
1597
1598 pub(in crate::node) fn ensure_current_session_index_registered(
1607 &mut self,
1608 node_addr: &NodeAddr,
1609 context: &'static str,
1610 ) -> bool {
1611 let Some(peer) = self.peers.get(node_addr) else {
1612 return false;
1613 };
1614 let Some(transport_id) = peer.transport_id() else {
1615 warn!(
1616 peer = %self.peer_display_name(node_addr),
1617 context,
1618 "Cannot register current session index without transport id"
1619 );
1620 return false;
1621 };
1622 let Some(our_index) = peer.our_index() else {
1623 warn!(
1624 peer = %self.peer_display_name(node_addr),
1625 context,
1626 "Cannot register current session index without local index"
1627 );
1628 return false;
1629 };
1630
1631 let cache_key = (transport_id, our_index.as_u32());
1632 match self.peers_by_index.get(&cache_key).copied() {
1633 Some(existing) if existing == *node_addr => true,
1634 Some(existing) => {
1635 warn!(
1636 peer = %self.peer_display_name(node_addr),
1637 previous_owner = %self.peer_display_name(&existing),
1638 transport_id = %transport_id,
1639 our_index = %our_index,
1640 context,
1641 "Repairing current session index with stale owner"
1642 );
1643 self.peers_by_index.insert(cache_key, *node_addr);
1644 true
1645 }
1646 None => {
1647 warn!(
1648 peer = %self.peer_display_name(node_addr),
1649 transport_id = %transport_id,
1650 our_index = %our_index,
1651 context,
1652 "Repairing missing current session index"
1653 );
1654 self.peers_by_index.insert(cache_key, *node_addr);
1655 true
1656 }
1657 }
1658 }
1659
1660 pub fn config(&self) -> &Config {
1664 &self.config
1665 }
1666
1667 pub fn effective_ipv6_mtu(&self) -> u16 {
1673 crate::upper::icmp::effective_ipv6_mtu(self.transport_mtu())
1674 }
1675
1676 pub fn transport_mtu(&self) -> u16 {
1693 let min_operational = self
1694 .transports
1695 .values()
1696 .filter(|h| h.is_operational())
1697 .map(|h| h.mtu())
1698 .min();
1699 if let Some(mtu) = min_operational {
1700 return mtu;
1701 }
1702 if let Some((_, cfg)) = self.config.transports.udp.iter().next() {
1704 return cfg.mtu();
1705 }
1706 1280
1707 }
1708
1709 pub fn state(&self) -> NodeState {
1713 self.state
1714 }
1715
1716 pub fn uptime(&self) -> std::time::Duration {
1718 self.started_at.elapsed()
1719 }
1720
1721 pub fn is_running(&self) -> bool {
1723 self.state.is_operational()
1724 }
1725
1726 pub fn is_leaf_only(&self) -> bool {
1728 self.is_leaf_only
1729 }
1730
1731 pub fn tree_state(&self) -> &TreeState {
1735 &self.tree_state
1736 }
1737
1738 pub fn tree_state_mut(&mut self) -> &mut TreeState {
1740 &mut self.tree_state
1741 }
1742
1743 pub fn bloom_state(&self) -> &BloomState {
1747 &self.bloom_state
1748 }
1749
1750 pub fn bloom_state_mut(&mut self) -> &mut BloomState {
1752 &mut self.bloom_state
1753 }
1754
1755 pub fn estimated_mesh_size(&self) -> Option<u64> {
1759 self.estimated_mesh_size
1760 }
1761
1762 pub(crate) fn compute_mesh_size(&mut self) {
1768 let my_addr = *self.tree_state.my_node_addr();
1769 let parent_id = *self.tree_state.my_declaration().parent_id();
1770 let is_root = self.tree_state.is_root();
1771
1772 let max_fpr = self.config.node.bloom.max_inbound_fpr;
1773 let mut total: f64 = 1.0; let mut child_count: u32 = 0;
1775 let mut has_data = false;
1776
1777 if !is_root
1783 && let Some(parent) = self.peers.get(&parent_id)
1784 && let Some(filter) = parent.inbound_filter()
1785 {
1786 match filter.estimated_count(max_fpr) {
1787 Some(n) => {
1788 total += n;
1789 has_data = true;
1790 }
1791 None => {
1792 self.estimated_mesh_size = None;
1793 return;
1794 }
1795 }
1796 }
1797
1798 for (peer_addr, peer) in &self.peers {
1800 if peer_addr == &parent_id {
1801 continue;
1802 }
1803 if let Some(decl) = self.tree_state.peer_declaration(peer_addr)
1804 && *decl.parent_id() == my_addr
1805 {
1806 child_count += 1;
1807 if let Some(filter) = peer.inbound_filter() {
1808 match filter.estimated_count(max_fpr) {
1809 Some(n) => {
1810 total += n;
1811 has_data = true;
1812 }
1813 None => {
1814 self.estimated_mesh_size = None;
1815 return;
1816 }
1817 }
1818 }
1819 }
1820 }
1821
1822 if !has_data {
1823 self.estimated_mesh_size = None;
1824 return;
1825 }
1826
1827 let size = total.round() as u64;
1828 self.estimated_mesh_size = Some(size);
1829
1830 let now = std::time::Instant::now();
1832 let should_log = match self.last_mesh_size_log {
1833 None => true,
1834 Some(last) => {
1835 now.duration_since(last)
1836 >= std::time::Duration::from_secs(self.config.node.mmp.log_interval_secs)
1837 }
1838 };
1839 if should_log {
1840 tracing::debug!(
1841 estimated_mesh_size = size,
1842 peers = self.peers.len(),
1843 children = child_count,
1844 "Mesh size estimate"
1845 );
1846 self.last_mesh_size_log = Some(now);
1847 }
1848 }
1849
1850 pub fn coord_cache(&self) -> &CoordCache {
1854 &self.coord_cache
1855 }
1856
1857 pub fn coord_cache_mut(&mut self) -> &mut CoordCache {
1859 &mut self.coord_cache
1860 }
1861
1862 pub fn stats(&self) -> &stats::NodeStats {
1866 &self.stats
1867 }
1868
1869 pub(crate) fn stats_mut(&mut self) -> &mut stats::NodeStats {
1871 &mut self.stats
1872 }
1873
1874 pub fn stats_history(&self) -> &stats_history::StatsHistory {
1876 &self.stats_history
1877 }
1878
1879 pub(crate) fn record_stats_history(&mut self) {
1882 let fwd = &self.stats.forwarding;
1883 let peers_with_mmp: Vec<f64> = self
1884 .peers
1885 .values()
1886 .filter_map(|p| p.mmp().map(|m| m.metrics.loss_rate()))
1887 .collect();
1888 let loss_rate = if peers_with_mmp.is_empty() {
1889 0.0
1890 } else {
1891 peers_with_mmp.iter().sum::<f64>() / peers_with_mmp.len() as f64
1892 };
1893
1894 let snap = stats_history::Snapshot {
1895 mesh_size: self.estimated_mesh_size,
1896 tree_depth: self.tree_state.my_coords().depth() as u32,
1897 peer_count: self.peers.len() as u64,
1898 parent_switches_total: self.stats.tree.parent_switches,
1899 bytes_in_total: fwd.received_bytes,
1900 bytes_out_total: fwd.forwarded_bytes + fwd.originated_bytes,
1901 packets_in_total: fwd.received_packets,
1902 packets_out_total: fwd.forwarded_packets + fwd.originated_packets,
1903 loss_rate,
1904 active_sessions: self.sessions.len() as u64,
1905 };
1906
1907 let now = std::time::Instant::now();
1908 let peer_snaps: Vec<stats_history::PeerSnapshot> = self
1909 .peers
1910 .values()
1911 .map(|p| {
1912 let stats = p.link_stats();
1913 let (srtt_ms, loss_rate, ecn_ce) = match p.mmp() {
1914 Some(m) => (
1915 m.metrics.srtt_ms(),
1916 Some(m.metrics.loss_rate()),
1917 m.receiver.ecn_ce_count() as u64,
1918 ),
1919 None => (None, None, 0),
1920 };
1921 stats_history::PeerSnapshot {
1922 node_addr: *p.node_addr(),
1923 last_seen: now,
1924 srtt_ms,
1925 loss_rate,
1926 bytes_in_total: stats.bytes_recv,
1927 bytes_out_total: stats.bytes_sent,
1928 packets_in_total: stats.packets_recv,
1929 packets_out_total: stats.packets_sent,
1930 ecn_ce_total: ecn_ce,
1931 }
1932 })
1933 .collect();
1934
1935 self.stats_history.tick(now, &snap, &peer_snaps);
1936 }
1937
1938 pub fn tun_state(&self) -> TunState {
1942 self.tun_state
1943 }
1944
1945 pub fn tun_name(&self) -> Option<&str> {
1947 self.tun_name.as_deref()
1948 }
1949
1950 pub fn set_max_connections(&mut self, max: usize) {
1954 self.max_connections = max;
1955 }
1956
1957 pub fn set_max_peers(&mut self, max: usize) {
1959 self.max_peers = max;
1960 }
1961
1962 pub(crate) fn outbound_admission_check(&self) -> bool {
1965 let connection_used = self
1966 .connections
1967 .len()
1968 .saturating_add(self.pending_connects.len());
1969 let peer_allowed = self.max_peers == 0 || self.peers.len() < self.max_peers;
1970 let connection_allowed =
1971 self.max_connections == 0 || connection_used < self.max_connections;
1972 let link_allowed = self.max_links == 0 || self.links.len() < self.max_links;
1973 peer_allowed && connection_allowed && link_allowed
1974 }
1975
1976 pub(crate) fn open_discovery_outbound_admission_check(&self) -> bool {
1980 if !self.outbound_admission_check() {
1981 return false;
1982 }
1983
1984 let nostr = &self.config.node.discovery.nostr;
1985 if !nostr.enabled || nostr.policy != NostrDiscoveryPolicy::Open {
1986 return true;
1987 }
1988
1989 let configured_npubs = self
1990 .config
1991 .peers()
1992 .iter()
1993 .map(|peer| peer.npub.clone())
1994 .collect::<HashSet<_>>();
1995 self.open_discovery_enqueue_budget(&configured_npubs) > 0
1996 }
1997
1998 pub(crate) fn outbound_direct_refresh_admission_check(&self) -> bool {
2002 let connection_used = self
2003 .connections
2004 .len()
2005 .saturating_add(self.pending_connects.len());
2006 let connection_allowed =
2007 self.max_connections == 0 || connection_used < self.max_connections;
2008 let link_allowed = self.max_links == 0 || self.links.len() < self.max_links;
2009 connection_allowed && link_allowed
2010 }
2011
2012 pub fn set_max_links(&mut self, max: usize) {
2014 self.max_links = max;
2015 }
2016
2017 pub fn connection_count(&self) -> usize {
2021 self.connections.len()
2022 }
2023
2024 pub fn peer_count(&self) -> usize {
2026 self.peers.len()
2027 }
2028
2029 pub fn link_count(&self) -> usize {
2031 self.links.len()
2032 }
2033
2034 pub fn transport_count(&self) -> usize {
2036 self.transports.len()
2037 }
2038
2039 pub fn allocate_transport_id(&mut self) -> TransportId {
2043 let id = TransportId::new(self.next_transport_id);
2044 self.next_transport_id += 1;
2045 id
2046 }
2047
2048 pub fn get_transport(&self, id: &TransportId) -> Option<&TransportHandle> {
2050 self.transports.get(id)
2051 }
2052
2053 pub fn get_transport_mut(&mut self, id: &TransportId) -> Option<&mut TransportHandle> {
2055 self.transports.get_mut(id)
2056 }
2057
2058 pub fn transport_ids(&self) -> impl Iterator<Item = &TransportId> {
2060 self.transports.keys()
2061 }
2062
2063 pub fn packet_rx(&mut self) -> Option<&mut PacketRx> {
2065 self.packet_rx.as_mut()
2066 }
2067
2068 pub fn allocate_link_id(&mut self) -> LinkId {
2072 let id = LinkId::new(self.next_link_id);
2073 self.next_link_id += 1;
2074 id
2075 }
2076
2077 pub fn add_link(&mut self, link: Link) -> Result<(), NodeError> {
2079 if self.max_links > 0 && self.links.len() >= self.max_links {
2080 return Err(NodeError::MaxLinksExceeded {
2081 max: self.max_links,
2082 });
2083 }
2084 let link_id = link.link_id();
2085 let transport_id = link.transport_id();
2086 let remote_addr = link.remote_addr().clone();
2087
2088 self.links.insert(link_id, link);
2089 self.addr_to_link
2090 .insert((transport_id, remote_addr), link_id);
2091 Ok(())
2092 }
2093
2094 pub fn get_link(&self, link_id: &LinkId) -> Option<&Link> {
2096 self.links.get(link_id)
2097 }
2098
2099 pub fn get_link_mut(&mut self, link_id: &LinkId) -> Option<&mut Link> {
2101 self.links.get_mut(link_id)
2102 }
2103
2104 pub fn find_link_by_addr(
2106 &self,
2107 transport_id: TransportId,
2108 addr: &TransportAddr,
2109 ) -> Option<LinkId> {
2110 self.addr_to_link
2111 .get(&(transport_id, addr.clone()))
2112 .copied()
2113 }
2114
2115 pub fn remove_link(&mut self, link_id: &LinkId) -> Option<Link> {
2121 if let Some(link) = self.links.remove(link_id) {
2122 let key = (link.transport_id(), link.remote_addr().clone());
2124 if self.addr_to_link.get(&key) == Some(link_id) {
2125 self.addr_to_link.remove(&key);
2126 }
2127 Some(link)
2128 } else {
2129 None
2130 }
2131 }
2132
2133 pub(crate) fn cleanup_bootstrap_transport_if_unused(&mut self, transport_id: TransportId) {
2134 if !self.bootstrap_transports.contains(&transport_id) {
2135 return;
2136 }
2137
2138 let transport_in_use = self
2139 .links
2140 .values()
2141 .any(|link| link.transport_id() == transport_id)
2142 || self
2143 .connections
2144 .values()
2145 .any(|conn| conn.transport_id() == Some(transport_id))
2146 || self
2147 .peers
2148 .values()
2149 .any(|peer| peer.transport_id() == Some(transport_id))
2150 || self
2151 .pending_connects
2152 .iter()
2153 .any(|pending| pending.transport_id == transport_id);
2154
2155 if transport_in_use {
2156 return;
2157 }
2158
2159 tracing::debug!(
2160 transport_id = %transport_id,
2161 "bootstrap transport has no remaining references; dropping"
2162 );
2163
2164 self.bootstrap_transports.remove(&transport_id);
2165 self.bootstrap_transport_npubs.remove(&transport_id);
2166 self.transport_drops.remove(&transport_id);
2167 self.transports.remove(&transport_id);
2168 }
2169
2170 pub fn links(&self) -> impl Iterator<Item = &Link> {
2172 self.links.values()
2173 }
2174
2175 pub fn add_connection(&mut self, connection: PeerConnection) -> Result<(), NodeError> {
2179 let link_id = connection.link_id();
2180
2181 if self.connections.contains_key(&link_id) {
2182 return Err(NodeError::ConnectionAlreadyExists(link_id));
2183 }
2184
2185 if self.max_connections > 0 && self.connections.len() >= self.max_connections {
2186 return Err(NodeError::MaxConnectionsExceeded {
2187 max: self.max_connections,
2188 });
2189 }
2190
2191 self.connections.insert(link_id, connection);
2192 Ok(())
2193 }
2194
2195 pub fn get_connection(&self, link_id: &LinkId) -> Option<&PeerConnection> {
2197 self.connections.get(link_id)
2198 }
2199
2200 pub fn get_connection_mut(&mut self, link_id: &LinkId) -> Option<&mut PeerConnection> {
2202 self.connections.get_mut(link_id)
2203 }
2204
2205 pub fn remove_connection(&mut self, link_id: &LinkId) -> Option<PeerConnection> {
2207 self.connections.remove(link_id)
2208 }
2209
2210 pub fn connections(&self) -> impl Iterator<Item = &PeerConnection> {
2212 self.connections.values()
2213 }
2214
2215 pub fn get_peer(&self, node_addr: &NodeAddr) -> Option<&ActivePeer> {
2219 self.peers.get(node_addr)
2220 }
2221
2222 pub fn get_peer_mut(&mut self, node_addr: &NodeAddr) -> Option<&mut ActivePeer> {
2224 self.peers.get_mut(node_addr)
2225 }
2226
2227 pub fn remove_peer(&mut self, node_addr: &NodeAddr) -> Option<ActivePeer> {
2229 self.peers.remove(node_addr)
2230 }
2231
2232 pub fn peers(&self) -> impl Iterator<Item = &ActivePeer> {
2234 self.peers.values()
2235 }
2236
2237 pub fn nostr_discovery_handle(&self) -> Option<&crate::discovery::nostr::NostrDiscovery> {
2241 self.nostr_discovery.as_deref()
2242 }
2243
2244 pub fn peer_ids(&self) -> impl Iterator<Item = &NodeAddr> {
2246 self.peers.keys()
2247 }
2248
2249 pub fn sendable_peers(&self) -> impl Iterator<Item = &ActivePeer> {
2251 self.peers.values().filter(|p| p.can_send())
2252 }
2253
2254 pub fn sendable_peer_count(&self) -> usize {
2256 self.peers.values().filter(|p| p.can_send()).count()
2257 }
2258
2259 pub(crate) fn set_discovery_fallback_transit_allowed(
2260 &mut self,
2261 peer_addr: NodeAddr,
2262 allowed: bool,
2263 ) {
2264 if allowed {
2265 self.discovery_fallback_transit_blocked_peers
2266 .remove(&peer_addr);
2267 } else {
2268 self.discovery_fallback_transit_blocked_peers
2269 .insert(peer_addr);
2270 }
2271 }
2272
2273 pub(crate) fn configured_discovery_fallback_transit(
2274 &self,
2275 peer_addr: &NodeAddr,
2276 ) -> Option<bool> {
2277 self.configured_peer(peer_addr)
2278 .map(|peer| peer.discovery_fallback_transit)
2279 }
2280
2281 pub(crate) fn configured_peer(&self, peer_addr: &NodeAddr) -> Option<&PeerConfig> {
2282 self.config.peers().iter().find(|peer| {
2283 PeerIdentity::from_npub(&peer.npub)
2284 .ok()
2285 .is_some_and(|identity| identity.node_addr() == peer_addr)
2286 })
2287 }
2288
2289 pub(in crate::node) fn active_peer_uses_configured_static_udp_path(
2290 &self,
2291 peer_addr: &NodeAddr,
2292 ) -> bool {
2293 let Some(peer_config) = self.configured_peer(peer_addr) else {
2294 return false;
2295 };
2296
2297 peer_config.addresses.iter().any(|candidate| {
2298 candidate.seen_at_ms.is_none()
2299 && candidate.transport.eq_ignore_ascii_case("udp")
2300 && self.active_peer_matches_candidate(peer_addr, candidate)
2301 })
2302 }
2303
2304 pub(crate) fn discovery_fallback_transit_for_promotion(&self, peer_addr: &NodeAddr) -> bool {
2305 if let Some(retry_state) = self.retry_pending.get(peer_addr) {
2306 return retry_state.peer_config.discovery_fallback_transit;
2307 }
2308
2309 if let Some(allowed) = self.configured_discovery_fallback_transit(peer_addr) {
2310 return allowed;
2311 }
2312
2313 self.config.node.discovery.nostr.policy != crate::config::NostrDiscoveryPolicy::Open
2314 }
2315
2316 #[cfg(test)]
2321 pub(crate) fn disable_discovery_forward_rate_limit(&mut self) {
2322 self.discovery_forward_limiter
2323 .set_interval(std::time::Duration::ZERO);
2324 }
2325
2326 #[cfg(test)]
2327 pub(crate) fn get_session(&self, remote: &NodeAddr) -> Option<&SessionEntry> {
2328 self.sessions.get(remote)
2329 }
2330
2331 #[cfg(test)]
2333 pub(crate) fn get_session_mut(&mut self, remote: &NodeAddr) -> Option<&mut SessionEntry> {
2334 self.sessions.get_mut(remote)
2335 }
2336
2337 #[cfg(test)]
2339 pub(crate) fn remove_session(&mut self, remote: &NodeAddr) -> Option<SessionEntry> {
2340 self.sessions.remove(remote)
2341 }
2342
2343 #[cfg(test)]
2345 pub(crate) fn path_mtu_lookup_get(&self, fips_addr: &crate::FipsAddress) -> Option<u16> {
2346 self.path_mtu_lookup
2347 .read()
2348 .ok()
2349 .and_then(|map| map.get(fips_addr).copied())
2350 }
2351
2352 #[cfg(test)]
2354 pub(crate) fn path_mtu_lookup_insert(&self, fips_addr: crate::FipsAddress, mtu: u16) {
2355 if let Ok(mut map) = self.path_mtu_lookup.write() {
2356 map.insert(fips_addr, mtu);
2357 }
2358 }
2359
2360 pub fn session_count(&self) -> usize {
2362 self.sessions.len()
2363 }
2364
2365 pub(crate) fn session_entries(&self) -> impl Iterator<Item = (&NodeAddr, &SessionEntry)> {
2367 self.sessions.iter()
2368 }
2369
2370 pub(crate) fn register_identity(
2374 &mut self,
2375 node_addr: NodeAddr,
2376 pubkey: secp256k1::PublicKey,
2377 ) -> bool {
2378 let mut prefix = [0u8; 15];
2379 prefix.copy_from_slice(&node_addr.as_bytes()[0..15]);
2380 if let Some(entry) = self.identity_cache.get(&prefix)
2381 && entry.node_addr == node_addr
2382 && entry.pubkey == pubkey
2383 {
2384 return true;
2388 }
2389
2390 let (xonly, _) = pubkey.x_only_public_key();
2391 let derived_node_addr = NodeAddr::from_pubkey(&xonly);
2392 if derived_node_addr != node_addr {
2393 debug!(
2394 claimed_node_addr = %node_addr,
2395 derived_node_addr = %derived_node_addr,
2396 "Rejected identity cache entry with mismatched public key"
2397 );
2398 return false;
2399 }
2400
2401 let now_ms = Self::now_ms();
2402 if let Some(entry) = self.identity_cache.get_mut(&prefix)
2403 && entry.node_addr == node_addr
2404 {
2405 entry.pubkey = pubkey;
2406 entry.last_seen_ms = now_ms;
2407 return true;
2408 }
2409
2410 let npub = encode_npub(&xonly);
2411 self.identity_cache.insert(
2412 prefix,
2413 IdentityCacheEntry::new(node_addr, pubkey, npub, now_ms),
2414 );
2415 let max = self.config.node.cache.identity_size;
2417 if self.identity_cache.len() > max
2418 && let Some(oldest_key) = self
2419 .identity_cache
2420 .iter()
2421 .min_by_key(|(_, entry)| entry.last_seen_ms)
2422 .map(|(k, _)| *k)
2423 {
2424 self.identity_cache.remove(&oldest_key);
2425 }
2426 true
2427 }
2428
2429 pub(crate) fn lookup_by_fips_prefix(
2431 &mut self,
2432 prefix: &[u8; 15],
2433 ) -> Option<(NodeAddr, secp256k1::PublicKey)> {
2434 if let Some(entry) = self.identity_cache.get_mut(prefix) {
2435 entry.last_seen_ms = Self::now_ms(); Some((entry.node_addr, entry.pubkey))
2437 } else {
2438 None
2439 }
2440 }
2441
2442 pub(crate) fn has_cached_identity(&self, addr: &NodeAddr) -> bool {
2444 let mut prefix = [0u8; 15];
2445 prefix.copy_from_slice(&addr.as_bytes()[0..15]);
2446 self.identity_cache.contains_key(&prefix)
2447 }
2448
2449 pub fn identity_cache_len(&self) -> usize {
2451 self.identity_cache.len()
2452 }
2453
2454 pub fn identity_cache_iter(
2459 &self,
2460 ) -> impl Iterator<Item = (&NodeAddr, &secp256k1::PublicKey, u64)> {
2461 self.identity_cache
2462 .values()
2463 .map(|entry| (&entry.node_addr, &entry.pubkey, entry.last_seen_ms))
2464 }
2465
2466 pub fn identity_cache_max(&self) -> usize {
2468 self.config.node.cache.identity_size
2469 }
2470
2471 pub fn pending_lookup_count(&self) -> usize {
2473 self.pending_lookups.len()
2474 }
2475
2476 pub fn pending_lookups_iter(
2478 &self,
2479 ) -> impl Iterator<Item = (&NodeAddr, &handlers::discovery::PendingLookup)> {
2480 self.pending_lookups.iter()
2481 }
2482
2483 pub fn recent_request_count(&self) -> usize {
2485 self.recent_requests.len()
2486 }
2487
2488 pub fn pending_tun_destinations(&self) -> usize {
2490 self.pending_tun_packets.len()
2491 }
2492
2493 pub fn pending_tun_total_packets(&self) -> usize {
2495 self.pending_tun_packets.values().map(|q| q.len()).sum()
2496 }
2497
2498 pub fn retry_state_iter(&self) -> impl Iterator<Item = (&NodeAddr, &retry::RetryState)> {
2500 self.retry_pending.iter()
2501 }
2502
2503 pub(crate) fn is_tree_peer(&self, peer_addr: &NodeAddr) -> bool {
2510 if !self.tree_state.is_root() && self.tree_state.my_declaration().parent_id() == peer_addr {
2512 return true;
2513 }
2514 if let Some(decl) = self.tree_state.peer_declaration(peer_addr)
2516 && decl.parent_id() == self.node_addr()
2517 {
2518 return true;
2519 }
2520 false
2521 }
2522
2523 pub fn find_next_hop(&mut self, dest_node_addr: &NodeAddr) -> Option<&ActivePeer> {
2547 if dest_node_addr == self.node_addr() {
2549 return None;
2550 }
2551 let now_ms = Self::now_ms();
2552 let direct_session_degraded =
2553 self.session_direct_path_blocks_direct_payload(dest_node_addr, now_ms);
2554
2555 let healthy_direct_route = self
2556 .peers
2557 .get(dest_node_addr)
2558 .filter(|peer| peer.is_healthy() && !direct_session_degraded)
2559 .map(|_| *dest_node_addr);
2560 let direct_payload_eligible = healthy_direct_route.is_some();
2561 let payload_candidate_can_send = |addr: &NodeAddr, peer: &ActivePeer| {
2562 if addr == dest_node_addr {
2563 direct_payload_eligible
2564 } else {
2565 peer.is_healthy()
2566 }
2567 };
2568
2569 let fallback_beats_direct = |node: &Self, fallback_addr: NodeAddr| {
2574 node.route_candidate_beats_direct(healthy_direct_route, fallback_addr)
2575 };
2576
2577 let sendable_learned_peers = if self.config.node.routing.mode == RoutingMode::ReplyLearned {
2578 Some(
2579 self.peers
2580 .iter()
2581 .filter(|(addr, peer)| payload_candidate_can_send(addr, peer))
2582 .map(|(addr, _)| *addr)
2583 .collect::<HashSet<_>>(),
2584 )
2585 } else {
2586 None
2587 };
2588
2589 let explore_fallback = sendable_learned_peers.as_ref().is_some_and(|sendable| {
2596 self.learned_routes.should_explore_fallback(
2597 dest_node_addr,
2598 now_ms,
2599 self.config.node.routing.learned_fallback_explore_interval,
2600 |addr| sendable.contains(addr),
2601 )
2602 });
2603 if let Some(sendable) = &sendable_learned_peers
2604 && !explore_fallback
2605 {
2606 let eligible = sendable
2607 .iter()
2608 .copied()
2609 .filter(|addr| fallback_beats_direct(self, *addr))
2610 .collect::<HashSet<_>>();
2611 if !eligible.is_empty()
2612 && let Some(next_hop_addr) =
2613 self.learned_routes
2614 .select_next_hop(dest_node_addr, now_ms, |addr| eligible.contains(addr))
2615 {
2616 return self.peers.get(&next_hop_addr);
2617 }
2618 }
2619
2620 let Some(dest_coords) = self
2622 .coord_cache
2623 .get_and_touch(dest_node_addr, now_ms)
2624 .cloned()
2625 else {
2626 if (healthy_direct_route.is_none() || explore_fallback)
2627 && let Some(sendable) = &sendable_learned_peers
2628 && let Some(next_hop_addr) =
2629 self.learned_routes
2630 .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2631 {
2632 return self.peers.get(&next_hop_addr);
2633 }
2634 if let Some(direct_addr) = healthy_direct_route {
2635 return self.peers.get(&direct_addr);
2636 }
2637 return None;
2638 };
2639
2640 let coordinate_route_addr = {
2643 let candidates: Vec<&ActivePeer> = self
2644 .peers
2645 .iter()
2646 .filter(|(addr, peer)| {
2647 payload_candidate_can_send(addr, peer) && peer.may_reach(dest_node_addr)
2648 })
2649 .map(|(_, peer)| peer)
2650 .collect();
2651 if !candidates.is_empty() {
2652 self.select_best_candidate(&candidates, &dest_coords)
2653 .map(|peer| *peer.node_addr())
2654 } else {
2655 None
2656 }
2657 };
2658 if let Some(next_hop_addr) = coordinate_route_addr
2659 && fallback_beats_direct(self, next_hop_addr)
2660 {
2661 return self.peers.get(&next_hop_addr);
2662 }
2663
2664 let tree_route_addr = self.select_tree_payload_candidate(
2666 &dest_coords,
2667 dest_node_addr,
2668 direct_payload_eligible,
2669 );
2670 if let Some(next_hop_addr) = tree_route_addr
2671 && fallback_beats_direct(self, next_hop_addr)
2672 {
2673 return self.peers.get(&next_hop_addr);
2674 }
2675
2676 if explore_fallback {
2677 return sendable_learned_peers.as_ref().and_then(|sendable| {
2678 self.learned_routes
2679 .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2680 .and_then(|next_hop_addr| self.peers.get(&next_hop_addr))
2681 });
2682 }
2683
2684 if let Some(direct_addr) = healthy_direct_route {
2685 return self.peers.get(&direct_addr);
2686 }
2687
2688 if let Some(sendable) = &sendable_learned_peers
2689 && let Some(next_hop_addr) =
2690 self.learned_routes
2691 .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2692 {
2693 return self.peers.get(&next_hop_addr);
2694 }
2695
2696 None
2697 }
2698
2699 pub(in crate::node) fn find_transit_next_hop(
2700 &mut self,
2701 dest_node_addr: &NodeAddr,
2702 previous_hop: &NodeAddr,
2703 ) -> Option<NodeAddr> {
2704 if dest_node_addr == self.node_addr() {
2705 return None;
2706 }
2707
2708 if dest_node_addr != previous_hop
2709 && self
2710 .peers
2711 .get(dest_node_addr)
2712 .is_some_and(|peer| peer.is_healthy())
2713 {
2714 return Some(*dest_node_addr);
2715 }
2716
2717 let next_hop_addr = *self.find_next_hop(dest_node_addr)?.node_addr();
2718 if &next_hop_addr == previous_hop {
2719 self.record_route_failure(*dest_node_addr, next_hop_addr);
2720 return None;
2721 }
2722 Some(next_hop_addr)
2723 }
2724
2725 fn route_candidate_beats_direct(
2726 &self,
2727 healthy_direct_route: Option<NodeAddr>,
2728 candidate_addr: NodeAddr,
2729 ) -> bool {
2730 let Some(direct_addr) = healthy_direct_route else {
2731 return true;
2732 };
2733 if candidate_addr == direct_addr {
2734 return false;
2735 }
2736
2737 let Some(direct) = self.peers.get(&direct_addr) else {
2738 return true;
2739 };
2740 let Some(candidate) = self.peers.get(&candidate_addr) else {
2741 return false;
2742 };
2743 if !candidate.is_healthy() {
2744 return false;
2745 }
2746
2747 let direct_cost = direct.link_cost();
2748 let candidate_cost = candidate.link_cost();
2749 candidate_cost + ROUTING_FALLBACK_MIN_COST_ADVANTAGE < direct_cost
2750 }
2751
2752 fn select_tree_payload_candidate(
2753 &self,
2754 dest_coords: &crate::tree::TreeCoordinate,
2755 direct_dest: &NodeAddr,
2756 direct_payload_eligible: bool,
2757 ) -> Option<NodeAddr> {
2758 if self.tree_state.my_coords().root_id() != dest_coords.root_id() {
2759 return None;
2760 }
2761
2762 let my_distance = self.tree_state.my_coords().distance_to(dest_coords);
2763 let mut best: Option<(NodeAddr, usize)> = None;
2764
2765 for (peer_addr, peer) in &self.peers {
2766 if peer_addr == direct_dest {
2767 if !direct_payload_eligible {
2768 continue;
2769 }
2770 } else if !peer.is_healthy() {
2771 continue;
2772 }
2773
2774 let Some(peer_coords) = self.tree_state.peer_coords(peer_addr) else {
2775 continue;
2776 };
2777 let distance = peer_coords.distance_to(dest_coords);
2778 if distance >= my_distance {
2779 continue;
2780 }
2781
2782 let dominated = match &best {
2783 None => true,
2784 Some((best_id, best_dist)) => {
2785 distance < *best_dist || (distance == *best_dist && peer_addr < best_id)
2786 }
2787 };
2788 if dominated {
2789 best = Some((*peer_addr, distance));
2790 }
2791 }
2792
2793 best.map(|(peer_addr, _)| peer_addr)
2794 }
2795
2796 pub(in crate::node) fn session_direct_path_is_degraded(
2797 &mut self,
2798 dest: &NodeAddr,
2799 now_ms: u64,
2800 ) -> bool {
2801 match self.session_direct_degraded_until_ms.get(dest).copied() {
2802 Some(until_ms) if until_ms > now_ms => true,
2803 Some(_) => {
2804 self.session_direct_degraded_until_ms.remove(dest);
2805 false
2806 }
2807 None => false,
2808 }
2809 }
2810
2811 pub(in crate::node) fn session_direct_path_blocks_direct_payload(
2812 &mut self,
2813 dest: &NodeAddr,
2814 now_ms: u64,
2815 ) -> bool {
2816 self.session_direct_path_is_degraded(dest, now_ms)
2817 && !self.active_peer_uses_configured_static_udp_path(dest)
2818 }
2819
2820 pub(in crate::node) fn mark_session_direct_path_degraded(
2821 &mut self,
2822 dest: NodeAddr,
2823 now_ms: u64,
2824 ) -> bool {
2825 let until_ms = now_ms.saturating_add(SESSION_DIRECT_DEGRADED_HOLD_MS);
2826 let entry = self
2827 .session_direct_degraded_until_ms
2828 .entry(dest)
2829 .or_insert(0);
2830 let was_degraded = *entry > now_ms;
2831 *entry = (*entry).max(until_ms);
2832 !was_degraded
2833 }
2834
2835 pub(in crate::node) fn clear_session_direct_path_degraded(&mut self, dest: &NodeAddr) -> bool {
2836 self.session_direct_degraded_until_ms.remove(dest).is_some()
2837 }
2838
2839 pub(in crate::node) fn learn_reverse_route(
2840 &mut self,
2841 destination: NodeAddr,
2842 next_hop: NodeAddr,
2843 ) {
2844 if self.config.node.routing.mode != RoutingMode::ReplyLearned
2845 || destination == *self.node_addr()
2846 {
2847 return;
2848 }
2849 let now_ms = Self::now_ms();
2850 self.learned_routes.learn(
2851 destination,
2852 next_hop,
2853 now_ms,
2854 self.config.node.routing.learned_ttl_secs,
2855 self.config.node.routing.max_learned_routes_per_dest,
2856 );
2857 }
2858
2859 pub(in crate::node) fn record_route_failure(
2860 &mut self,
2861 destination: NodeAddr,
2862 next_hop: NodeAddr,
2863 ) {
2864 if self.config.node.routing.mode != RoutingMode::ReplyLearned {
2865 return;
2866 }
2867 self.learned_routes.record_failure(&destination, &next_hop);
2868 }
2869
2870 pub(crate) fn learned_route_table_snapshot(&self, now_ms: u64) -> LearnedRouteTableSnapshot {
2871 self.learned_routes.snapshot(now_ms)
2872 }
2873
2874 pub(in crate::node) fn purge_learned_routes(&mut self, now_ms: u64) {
2875 self.learned_routes.purge_expired(now_ms);
2876 }
2877
2878 fn select_best_candidate<'a>(
2887 &'a self,
2888 candidates: &[&'a ActivePeer],
2889 dest_coords: &crate::tree::TreeCoordinate,
2890 ) -> Option<&'a ActivePeer> {
2891 let my_distance = self.tree_state.my_coords().distance_to(dest_coords);
2892
2893 let mut best: Option<(&ActivePeer, f64, usize)> = None;
2894
2895 for &candidate in candidates {
2896 if !candidate.can_send() {
2897 continue;
2898 }
2899
2900 let cost = candidate.link_cost();
2901
2902 let dist = self
2903 .tree_state
2904 .peer_coords(candidate.node_addr())
2905 .map(|pc| pc.distance_to(dest_coords))
2906 .unwrap_or(usize::MAX);
2907
2908 if dist >= my_distance {
2911 continue;
2912 }
2913
2914 let dominated = match &best {
2915 None => true,
2916 Some((_, best_cost, best_dist)) => {
2917 cost < *best_cost
2918 || (cost == *best_cost && dist < *best_dist)
2919 || (cost == *best_cost
2920 && dist == *best_dist
2921 && candidate.node_addr() < best.as_ref().unwrap().0.node_addr())
2922 }
2923 };
2924
2925 if dominated {
2926 best = Some((candidate, cost, dist));
2927 }
2928 }
2929
2930 best.map(|(peer, _, _)| peer)
2931 }
2932
2933 pub fn destination_in_filters(&self, dest: &NodeAddr) -> Vec<&ActivePeer> {
2935 self.peers.values().filter(|p| p.may_reach(dest)).collect()
2936 }
2937
2938 pub fn tun_tx(&self) -> Option<&TunTx> {
2942 self.tun_tx.as_ref()
2943 }
2944
2945 pub fn attach_external_packet_io(
2952 &mut self,
2953 capacity: usize,
2954 ) -> Result<ExternalPacketIo, NodeError> {
2955 if self.state != NodeState::Created {
2956 return Err(NodeError::Config(ConfigError::Validation(
2957 "external packet I/O must be attached before node start".to_string(),
2958 )));
2959 }
2960 if self.config.tun.enabled {
2961 return Err(NodeError::Config(ConfigError::Validation(
2962 "external packet I/O requires tun.enabled=false".to_string(),
2963 )));
2964 }
2965
2966 let capacity = capacity.max(1);
2967 let (outbound_tx, outbound_rx) = tokio::sync::mpsc::channel(capacity);
2968 let (inbound_tx, inbound_rx) = tokio::sync::mpsc::channel(capacity);
2969 self.tun_outbound_rx = Some(outbound_rx);
2970 self.external_packet_tx = Some(inbound_tx);
2971
2972 Ok(ExternalPacketIo {
2973 outbound_tx,
2974 inbound_rx,
2975 })
2976 }
2977
2978 pub(crate) fn attach_endpoint_data_io(
2983 &mut self,
2984 capacity: usize,
2985 ) -> Result<EndpointDataIo, NodeError> {
2986 if self.state != NodeState::Created {
2987 return Err(NodeError::Config(ConfigError::Validation(
2988 "endpoint data I/O must be attached before node start".to_string(),
2989 )));
2990 }
2991
2992 let command_capacity = endpoint_data_command_capacity(capacity);
2993 let (command_tx, command_rx) = tokio::sync::mpsc::channel(command_capacity);
2994 let (event_tx, event_rx) = tokio::sync::mpsc::unbounded_channel();
2999 self.endpoint_command_rx = Some(command_rx);
3000 self.endpoint_event_tx = Some(event_tx.clone());
3001
3002 Ok(EndpointDataIo {
3003 command_tx,
3004 event_rx,
3005 event_tx,
3006 })
3007 }
3008
3009 pub(crate) fn pubkey_for_node_addr(&self, addr: &NodeAddr) -> Option<secp256k1::PublicKey> {
3010 let mut prefix = [0u8; 15];
3011 prefix.copy_from_slice(&addr.as_bytes()[0..15]);
3012 self.identity_cache
3013 .get(&prefix)
3014 .filter(|entry| &entry.node_addr == addr)
3015 .map(|entry| entry.pubkey)
3016 }
3017
3018 pub(crate) fn npub_for_node_addr(&self, addr: &NodeAddr) -> Option<String> {
3019 let mut prefix = [0u8; 15];
3020 prefix.copy_from_slice(&addr.as_bytes()[0..15]);
3021 self.identity_cache
3022 .get(&prefix)
3023 .filter(|entry| &entry.node_addr == addr)
3024 .map(|entry| entry.npub.clone())
3025 }
3026
3027 pub(in crate::node) fn deliver_external_ipv6_packet(
3028 &self,
3029 src_addr: &NodeAddr,
3030 packet: Vec<u8>,
3031 ) {
3032 let Some(external_packet_tx) = &self.external_packet_tx else {
3033 return;
3034 };
3035 if packet.len() < 40 {
3036 return;
3037 }
3038 let Ok(destination) = FipsAddress::from_slice(&packet[24..40]) else {
3039 return;
3040 };
3041 let delivered = NodeDeliveredPacket {
3042 source_node_addr: *src_addr,
3043 source_npub: self.npub_for_node_addr(src_addr),
3044 destination,
3045 packet,
3046 };
3047 if let Err(error) = external_packet_tx.try_send(delivered) {
3048 debug!(error = %error, "Failed to deliver packet to external app sink");
3049 }
3050 }
3051
3052 pub(super) async fn send_encrypted_link_message(
3066 &mut self,
3067 node_addr: &NodeAddr,
3068 plaintext: &[u8],
3069 ) -> Result<(), NodeError> {
3070 self.send_encrypted_link_message_with_ce(node_addr, plaintext, false)
3071 .await
3072 }
3073
3074 pub(in crate::node) fn note_local_send_outcome(
3080 &mut self,
3081 node_addr: &NodeAddr,
3082 result: &Result<usize, TransportError>,
3083 ) {
3084 match result {
3085 Ok(_) => {
3086 self.local_send_failure_at_by_peer.remove(node_addr);
3087 }
3088 Err(error) if error.is_local_route_unavailable() => {
3089 self.local_send_failure_at_by_peer
3090 .insert(*node_addr, std::time::Instant::now());
3091 }
3092 Err(_) => {}
3093 }
3094 }
3095
3096 pub(in crate::node) fn local_send_failure_dead_timeout_for_peer(
3102 &self,
3103 node_addr: &NodeAddr,
3104 now: std::time::Instant,
3105 dead_timeout: std::time::Duration,
3106 fast_dead_timeout: std::time::Duration,
3107 ) -> std::time::Duration {
3108 match self.local_send_failure_at_by_peer.get(node_addr).copied() {
3109 Some(t) if now.duration_since(t) <= LOCAL_SEND_FAILURE_FAST_DEAD_WINDOW => {
3110 fast_dead_timeout.min(dead_timeout)
3111 }
3112 None => dead_timeout,
3113 Some(_) => dead_timeout,
3114 }
3115 }
3116
3117 pub(in crate::node) fn purge_expired_local_send_failures(&mut self, now: std::time::Instant) {
3118 self.local_send_failure_at_by_peer
3119 .retain(|_, at| now.duration_since(*at) <= LOCAL_SEND_FAILURE_FAST_DEAD_WINDOW);
3120 }
3121
3122 pub(in crate::node) fn mark_rx_loop_maintenance_timeout(&mut self) {
3123 self.last_rx_loop_maintenance_timeout_at = Some(std::time::Instant::now());
3124 }
3125
3126 pub(in crate::node) fn rx_loop_maintenance_timed_out_recently(&self) -> bool {
3127 let Some(t) = self.last_rx_loop_maintenance_timeout_at else {
3128 return false;
3129 };
3130 let grace = std::time::Duration::from_secs(self.config.node.link_dead_timeout_secs.max(1));
3131 std::time::Instant::now().duration_since(t) <= grace
3132 }
3133
3134 pub(super) async fn send_encrypted_link_message_with_ce(
3138 &mut self,
3139 node_addr: &NodeAddr,
3140 plaintext: &[u8],
3141 ce_flag: bool,
3142 ) -> Result<(), NodeError> {
3143 let peer = self
3144 .peers
3145 .get_mut(node_addr)
3146 .ok_or(NodeError::PeerNotFound(*node_addr))?;
3147
3148 let their_index = peer.their_index().ok_or_else(|| NodeError::SendFailed {
3149 node_addr: *node_addr,
3150 reason: "no their_index".into(),
3151 })?;
3152 let transport_id = peer.transport_id().ok_or_else(|| NodeError::SendFailed {
3153 node_addr: *node_addr,
3154 reason: "no transport_id".into(),
3155 })?;
3156 let remote_addr = peer
3157 .current_addr()
3158 .cloned()
3159 .ok_or_else(|| NodeError::SendFailed {
3160 node_addr: *node_addr,
3161 reason: "no current_addr".into(),
3162 })?;
3163 #[cfg(any(target_os = "linux", target_os = "macos"))]
3164 let connected_socket = peer.connected_udp();
3165
3166 let timestamp_ms = peer.session_elapsed_ms();
3168
3169 let sp_flag = peer.mmp().map(|mmp| mmp.spin_bit.tx_bit()).unwrap_or(false);
3171 let mut flags = if sp_flag { FLAG_SP } else { 0 };
3172 if ce_flag {
3173 flags |= FLAG_CE;
3174 }
3175 if peer.current_k_bit() {
3176 flags |= FLAG_KEY_EPOCH;
3177 }
3178
3179 let session = peer
3180 .noise_session_mut()
3181 .ok_or_else(|| NodeError::SendFailed {
3182 node_addr: *node_addr,
3183 reason: "no noise session".into(),
3184 })?;
3185
3186 const INNER_TS_LEN: usize = 4;
3194 let counter = session.current_send_counter();
3195 let inner_len = INNER_TS_LEN + plaintext.len();
3196 let payload_len = inner_len as u16;
3197 let header = build_established_header(their_index, counter, flags, payload_len);
3198
3199 let transport_for_send = self
3221 .transports
3222 .get(&transport_id)
3223 .ok_or(NodeError::TransportNotFound(transport_id))?;
3224 match transport_for_send.connection_state(&remote_addr) {
3225 ConnectionState::Connected => {}
3226 other => {
3227 if matches!(other, ConnectionState::None) {
3228 let _ = transport_for_send.connect(&remote_addr).await;
3229 }
3230 return Err(NodeError::SendFailed {
3231 node_addr: *node_addr,
3232 reason: format!("transport connection not ready: {:?}", other),
3233 });
3234 }
3235 }
3236 #[cfg(unix)]
3237 {
3238 let is_udp = matches!(transport_for_send, TransportHandle::Udp(_));
3239 if let Some(workers) = self.encrypt_workers.as_ref().cloned()
3240 && is_udp
3241 && let Some(cipher_clone) = session.send_cipher_clone()
3242 {
3243 let reserved_counter =
3247 session
3248 .take_send_counter()
3249 .map_err(|e| NodeError::SendFailed {
3250 node_addr: *node_addr,
3251 reason: format!("counter reservation failed: {}", e),
3252 })?;
3253 debug_assert_eq!(reserved_counter, counter);
3254 let header =
3258 build_established_header(their_index, reserved_counter, flags, payload_len);
3259 let transport = transport_for_send;
3260 let send_target = {
3267 if let TransportHandle::Udp(udp) = transport {
3268 let socket_addr = {
3269 #[cfg(any(target_os = "linux", target_os = "macos"))]
3270 {
3271 match connected_socket.as_ref() {
3272 Some(socket) => Some(socket.peer_addr()),
3273 None => udp.resolve_for_off_task(&remote_addr).await.ok(),
3274 }
3275 }
3276 #[cfg(not(any(target_os = "linux", target_os = "macos")))]
3277 {
3278 udp.resolve_for_off_task(&remote_addr).await.ok()
3279 }
3280 };
3281 match (udp.async_socket(), socket_addr) {
3282 (Some(socket), Some(socket_addr)) => Some((socket, socket_addr)),
3283 _ => None,
3284 }
3285 } else {
3286 None
3287 }
3288 };
3289 if let Some((socket, socket_addr)) = send_target {
3290 let wire_capacity = ESTABLISHED_HEADER_SIZE + inner_len + 16;
3306 let mut wire_buf = Vec::with_capacity(wire_capacity);
3307 wire_buf.extend_from_slice(&header);
3308 wire_buf.extend_from_slice(×tamp_ms.to_le_bytes());
3309 wire_buf.extend_from_slice(plaintext);
3310 let predicted_bytes = wire_capacity;
3311 if let Some(peer) = self.peers.get_mut(node_addr) {
3318 peer.link_stats_mut().record_sent(predicted_bytes);
3319 if let Some(mmp) = peer.mmp_mut() {
3320 mmp.sender
3321 .record_sent(reserved_counter, timestamp_ms, predicted_bytes);
3322 }
3323 }
3324 let scheduling_weight = self.send_weight_for_peer(node_addr);
3325 let bulk_endpoint_data = fmp_plaintext_is_bulk_session_datagram(plaintext);
3326 workers.dispatch(self::encrypt_worker::FmpSendJob {
3327 cipher: cipher_clone,
3328 counter: reserved_counter,
3329 wire_buf,
3330 fsp_seal: None,
3331 socket,
3332 dest_addr: socket_addr,
3333 #[cfg(any(target_os = "linux", target_os = "macos"))]
3334 connected_socket,
3335 bulk_endpoint_data,
3336 drop_on_backpressure: bulk_endpoint_data,
3337 scheduling_weight,
3338 queued_at: crate::perf_profile::stamp(),
3339 });
3340 return Ok(());
3341 }
3342 }
3343 }
3344
3345 let inner_plaintext = prepend_inner_header(timestamp_ms, plaintext);
3350 let ciphertext = {
3352 let _t = crate::perf_profile::Timer::start(crate::perf_profile::Stage::FmpEncrypt);
3353 session
3354 .encrypt_with_aad(&inner_plaintext, &header)
3355 .map_err(|e| NodeError::SendFailed {
3356 node_addr: *node_addr,
3357 reason: format!("encryption failed: {}", e),
3358 })?
3359 };
3360
3361 let wire_packet = build_encrypted(&header, &ciphertext);
3362
3363 let send_result = {
3365 let _t = crate::perf_profile::Timer::start(crate::perf_profile::Stage::UdpSend);
3366 let transport = self
3367 .transports
3368 .get(&transport_id)
3369 .ok_or(NodeError::TransportNotFound(transport_id))?;
3370 transport.send(&remote_addr, &wire_packet).await
3371 };
3372 self.note_local_send_outcome(node_addr, &send_result);
3373 let bytes_sent = send_result.map_err(|e| match e {
3374 TransportError::MtuExceeded { packet_size, mtu } => NodeError::MtuExceeded {
3375 node_addr: *node_addr,
3376 packet_size,
3377 mtu,
3378 },
3379 other => NodeError::SendFailed {
3380 node_addr: *node_addr,
3381 reason: format!("transport send: {}", other),
3382 },
3383 })?;
3384
3385 if let Some(peer) = self.peers.get_mut(node_addr) {
3387 peer.link_stats_mut().record_sent(bytes_sent);
3388 if let Some(mmp) = peer.mmp_mut() {
3390 mmp.sender.record_sent(counter, timestamp_ms, bytes_sent);
3391 }
3392 }
3393
3394 Ok(())
3395 }
3396}
3397
3398impl fmt::Debug for Node {
3399 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3400 f.debug_struct("Node")
3401 .field("node_addr", self.node_addr())
3402 .field("state", &self.state)
3403 .field("is_leaf_only", &self.is_leaf_only)
3404 .field("connections", &self.connection_count())
3405 .field("peers", &self.peer_count())
3406 .field("links", &self.link_count())
3407 .field("transports", &self.transport_count())
3408 .finish()
3409 }
3410}