1mod acl;
8mod bloom;
9mod decrypt_worker;
10mod discovery_rate_limit;
11mod encrypt_worker;
12mod handlers;
13mod lifecycle;
14mod rate_limit;
15mod retry;
16mod routing;
17mod routing_error_rate_limit;
18pub(crate) mod session;
19pub(crate) mod session_wire;
20pub(crate) mod stats;
21pub(crate) mod stats_history;
22#[cfg(test)]
23mod tests;
24mod tree;
25pub(crate) mod wire;
26
27use self::discovery_rate_limit::{DiscoveryBackoff, DiscoveryForwardRateLimiter};
28use self::rate_limit::HandshakeRateLimiter;
29use self::routing::{LearnedRouteTable, LearnedRouteTableSnapshot};
30use self::routing_error_rate_limit::RoutingErrorRateLimiter;
31use self::wire::{
32 ESTABLISHED_HEADER_SIZE, FLAG_CE, FLAG_KEY_EPOCH, FLAG_SP, build_encrypted,
33 build_established_header, prepend_inner_header,
34};
35use crate::bloom::BloomState;
36use crate::cache::CoordCache;
37use crate::config::{NostrDiscoveryPolicy, PeerConfig, RoutingMode};
38use crate::node::session::SessionEntry;
39use crate::node::session_wire::{FSP_PHASE_ESTABLISHED, FspCommonPrefix};
40use crate::peer::{ActivePeer, PeerConnection};
41#[cfg(any(target_os = "linux", target_os = "macos"))]
42use crate::transport::ethernet::EthernetTransport;
43use crate::transport::tcp::TcpTransport;
44use crate::transport::tor::TorTransport;
45use crate::transport::udp::UdpTransport;
46#[cfg(feature = "webrtc-transport")]
47use crate::transport::webrtc::WebRtcTransport;
48use crate::transport::{
49 ConnectionState, Link, LinkId, PacketRx, PacketTx, TransportAddr, TransportError,
50 TransportHandle, TransportId,
51};
52use crate::tree::TreeState;
53use crate::upper::hosts::HostMap;
54use crate::upper::icmp_rate_limit::IcmpRateLimiter;
55use crate::upper::tun::{TunError, TunOutboundRx, TunState, TunTx};
56use crate::utils::index::IndexAllocator;
57use crate::{
58 Config, ConfigError, FipsAddress, Identity, IdentityError, LinkMessageType, NodeAddr,
59 PeerIdentity, encode_npub,
60};
61use rand::Rng;
62use std::collections::{HashMap, HashSet, VecDeque};
63use std::fmt;
64use std::sync::Arc;
65use std::thread::JoinHandle;
66use thiserror::Error;
67use tracing::{debug, warn};
68
69const LOCAL_SEND_FAILURE_FAST_DEAD_WINDOW: std::time::Duration = std::time::Duration::from_secs(3);
70const SESSION_DIRECT_DEGRADED_HOLD_MS: u64 = 20_000;
71const SESSION_DIRECT_DEGRADED_MIN_SAMPLE: u64 = 16;
72const SESSION_DIRECT_DEGRADED_LOSS_THRESHOLD: f64 = 0.08;
73const SESSION_DIRECT_RECOVERY_LOSS_THRESHOLD: f64 = 0.02;
74const ROUTING_FALLBACK_MIN_COST_ADVANTAGE: f64 = 0.25;
75
76fn fmp_plaintext_is_bulk_session_datagram(plaintext: &[u8]) -> bool {
77 if plaintext
78 .first()
79 .is_none_or(|ty| *ty != LinkMessageType::SessionDatagram.to_byte())
80 {
81 return false;
82 }
83 let Some(fsp_payload) = plaintext.get(crate::protocol::SESSION_DATAGRAM_HEADER_SIZE..) else {
84 return false;
85 };
86 FspCommonPrefix::parse(fsp_payload).is_some_and(|prefix| {
87 prefix.phase == FSP_PHASE_ESTABLISHED && !prefix.is_unencrypted() && !prefix.has_coords()
88 })
89}
90
91fn endpoint_payload_is_tcp(payload: &[u8]) -> bool {
92 const IPPROTO_TCP: u8 = 6;
93 const IPV4_MIN_HEADER_LEN: usize = 20;
94
95 let Some(version_ihl) = payload.first().copied() else {
96 return false;
97 };
98
99 match version_ihl >> 4 {
100 4 => {
101 if payload.len() < IPV4_MIN_HEADER_LEN {
102 return false;
103 }
104 let header_len = usize::from(version_ihl & 0x0f) * 4;
105 header_len >= IPV4_MIN_HEADER_LEN
106 && payload.len() >= header_len
107 && payload[9] == IPPROTO_TCP
108 }
109 6 => ipv6_payload_next_header(payload).is_some_and(|proto| proto == IPPROTO_TCP),
110 _ => false,
111 }
112}
113
114fn ipv6_payload_next_header(payload: &[u8]) -> Option<u8> {
115 const IPV6_HEADER_LEN: usize = 40;
116 const IPV6_FRAGMENT_HEADER_LEN: usize = 8;
117
118 if payload.len() < IPV6_HEADER_LEN || payload[0] >> 4 != 6 {
119 return None;
120 }
121
122 let mut next_header = payload[6];
123 let mut offset = IPV6_HEADER_LEN;
124 let mut extension_count = 0usize;
125 while ipv6_extension_header_is_skippable(next_header) {
126 if next_header == 44 {
127 if payload.len() < offset + IPV6_FRAGMENT_HEADER_LEN {
128 return None;
129 }
130 next_header = payload[offset];
131 offset += IPV6_FRAGMENT_HEADER_LEN;
132 } else if next_header == 51 {
133 if payload.len() < offset + 2 {
134 return None;
135 }
136 let header_len = (usize::from(payload[offset + 1]) + 2) * 4;
137 if payload.len() < offset + header_len {
138 return None;
139 }
140 next_header = payload[offset];
141 offset += header_len;
142 } else {
143 if payload.len() < offset + 2 {
144 return None;
145 }
146 let header_len = (usize::from(payload[offset + 1]) + 1) * 8;
147 if payload.len() < offset + header_len {
148 return None;
149 }
150 next_header = payload[offset];
151 offset += header_len;
152 }
153 extension_count += 1;
154 if extension_count > 8 {
155 return None;
156 }
157 }
158
159 Some(next_header)
160}
161
162fn ipv6_extension_header_is_skippable(next_header: u8) -> bool {
163 matches!(next_header, 0 | 43 | 44 | 51 | 60 | 135)
164}
165
166pub(crate) const REKEY_JITTER_SECS: i64 = 15;
173
174#[derive(Debug, Error)]
176pub enum NodeError {
177 #[error("node not started")]
178 NotStarted,
179
180 #[error("node already started")]
181 AlreadyStarted,
182
183 #[error("node already stopped")]
184 AlreadyStopped,
185
186 #[error("transport not found: {0}")]
187 TransportNotFound(TransportId),
188
189 #[error("no transport available for type: {0}")]
190 NoTransportForType(String),
191
192 #[error("link not found: {0}")]
193 LinkNotFound(LinkId),
194
195 #[error("connection not found: {0}")]
196 ConnectionNotFound(LinkId),
197
198 #[error("peer not found: {0:?}")]
199 PeerNotFound(NodeAddr),
200
201 #[error("peer already exists: {0:?}")]
202 PeerAlreadyExists(NodeAddr),
203
204 #[error("connection already exists for link: {0}")]
205 ConnectionAlreadyExists(LinkId),
206
207 #[error("invalid peer npub '{npub}': {reason}")]
208 InvalidPeerNpub { npub: String, reason: String },
209
210 #[error("discovery error: {0}")]
211 Discovery(String),
212
213 #[error("access denied: {0}")]
214 AccessDenied(String),
215
216 #[error("max connections exceeded: {max}")]
217 MaxConnectionsExceeded { max: usize },
218
219 #[error("max peers exceeded: {max}")]
220 MaxPeersExceeded { max: usize },
221
222 #[error("max links exceeded: {max}")]
223 MaxLinksExceeded { max: usize },
224
225 #[error("handshake incomplete for link {0}")]
226 HandshakeIncomplete(LinkId),
227
228 #[error("no session available for link {0}")]
229 NoSession(LinkId),
230
231 #[error("promotion failed for link {link_id}: {reason}")]
232 PromotionFailed { link_id: LinkId, reason: String },
233
234 #[error("send failed to {node_addr}: {reason}")]
235 SendFailed { node_addr: NodeAddr, reason: String },
236
237 #[error("mtu exceeded forwarding to {node_addr}: packet {packet_size} > mtu {mtu}")]
238 MtuExceeded {
239 node_addr: NodeAddr,
240 packet_size: usize,
241 mtu: u16,
242 },
243
244 #[error("config error: {0}")]
245 Config(#[from] ConfigError),
246
247 #[error("identity error: {0}")]
248 Identity(#[from] IdentityError),
249
250 #[error("TUN error: {0}")]
251 Tun(#[from] TunError),
252
253 #[error("index allocation failed: {0}")]
254 IndexAllocationFailed(String),
255
256 #[error("handshake failed: {0}")]
257 HandshakeFailed(String),
258
259 #[error("transport error: {0}")]
260 TransportError(String),
261
262 #[error("local route unavailable: {0}")]
263 LocalRouteUnavailable(String),
264
265 #[error("bootstrap handoff failed: {0}")]
266 BootstrapHandoff(String),
267}
268
269impl NodeError {
270 pub(in crate::node) fn from_transport_error(error: TransportError) -> Self {
271 if error.is_local_route_unavailable() {
272 Self::LocalRouteUnavailable(error.to_string())
273 } else {
274 Self::TransportError(error.to_string())
275 }
276 }
277
278 pub(in crate::node) fn is_local_route_unavailable(&self) -> bool {
279 matches!(self, Self::LocalRouteUnavailable(_))
280 }
281}
282
283#[derive(Debug, Clone, PartialEq, Eq)]
285pub struct NodeDeliveredPacket {
286 pub source_node_addr: NodeAddr,
288 pub source_npub: Option<String>,
290 pub destination: FipsAddress,
292 pub packet: Vec<u8>,
294}
295
296#[derive(Debug, Clone)]
297struct IdentityCacheEntry {
298 node_addr: NodeAddr,
299 pubkey: secp256k1::PublicKey,
300 npub: String,
301 last_seen_ms: u64,
302}
303
304impl IdentityCacheEntry {
305 fn new(
306 node_addr: NodeAddr,
307 pubkey: secp256k1::PublicKey,
308 npub: String,
309 last_seen_ms: u64,
310 ) -> Self {
311 Self {
312 node_addr,
313 pubkey,
314 npub,
315 last_seen_ms,
316 }
317 }
318}
319
320#[derive(Debug)]
322pub struct ExternalPacketIo {
323 pub outbound_tx: crate::upper::tun::TunOutboundTx,
325 pub inbound_rx: tokio::sync::mpsc::Receiver<NodeDeliveredPacket>,
327}
328
329#[derive(Debug)]
331pub(crate) struct EndpointDataIo {
332 pub(crate) command_tx: tokio::sync::mpsc::Sender<NodeEndpointCommand>,
341 pub(crate) event_rx: tokio::sync::mpsc::UnboundedReceiver<NodeEndpointEvent>,
351 pub(crate) event_tx: tokio::sync::mpsc::UnboundedSender<NodeEndpointEvent>,
357}
358
359fn endpoint_data_command_capacity(requested: usize) -> usize {
360 if let Ok(raw) = std::env::var("FIPS_ENDPOINT_DATA_QUEUE_CAP")
361 && let Ok(value) = raw.trim().parse::<usize>()
362 && value > 0
363 {
364 return value;
365 }
366
367 requested.max(1).max(32_768)
368}
369
370#[derive(Debug)]
372pub(crate) enum NodeEndpointCommand {
373 Send {
377 remote: PeerIdentity,
378 payload: Vec<u8>,
379 queued_at: Option<std::time::Instant>,
380 response_tx: tokio::sync::oneshot::Sender<Result<(), NodeError>>,
381 },
382 SendOneway {
388 remote: PeerIdentity,
389 payload: Vec<u8>,
390 queued_at: Option<std::time::Instant>,
391 },
392 PeerSnapshot {
393 response_tx: tokio::sync::oneshot::Sender<Vec<NodeEndpointPeer>>,
394 },
395 RelaySnapshot {
396 response_tx: tokio::sync::oneshot::Sender<Vec<NodeEndpointRelayStatus>>,
397 },
398 UpdateRelays {
399 advert_relays: Vec<String>,
400 dm_relays: Vec<String>,
401 response_tx: tokio::sync::oneshot::Sender<Result<(), NodeError>>,
402 },
403 UpdatePeers {
409 peers: Vec<crate::config::PeerConfig>,
410 response_tx: tokio::sync::oneshot::Sender<Result<UpdatePeersOutcome, NodeError>>,
411 },
412}
413
414#[derive(Debug, Clone, Default, PartialEq, Eq)]
416pub(crate) struct UpdatePeersOutcome {
417 pub(crate) added: usize,
418 pub(crate) removed: usize,
419 pub(crate) updated: usize,
420 pub(crate) unchanged: usize,
421}
422
423#[derive(Debug)]
425pub(crate) enum NodeEndpointEvent {
426 Data {
427 source_node_addr: NodeAddr,
428 source_npub: Option<String>,
429 payload: Vec<u8>,
430 queued_at: Option<std::time::Instant>,
431 },
432}
433
434#[derive(Debug, Clone, PartialEq, Eq)]
436pub(crate) struct NodeEndpointPeer {
437 pub(crate) npub: String,
438 pub(crate) connected: bool,
439 pub(crate) transport_addr: Option<String>,
440 pub(crate) transport_type: Option<String>,
441 pub(crate) link_id: u64,
442 pub(crate) srtt_ms: Option<u64>,
443 pub(crate) packets_sent: u64,
444 pub(crate) packets_recv: u64,
445 pub(crate) bytes_sent: u64,
446 pub(crate) bytes_recv: u64,
447 pub(crate) direct_probe_pending: bool,
448 pub(crate) direct_probe_after_ms: Option<u64>,
449}
450
451#[derive(Debug, Clone, PartialEq, Eq)]
453pub(crate) struct NodeEndpointRelayStatus {
454 pub(crate) url: String,
455 pub(crate) status: String,
456}
457
458#[derive(Clone, Copy, Debug, PartialEq, Eq)]
460pub enum NodeState {
461 Created,
463 Starting,
465 Running,
467 Stopping,
469 Stopped,
471}
472
473impl NodeState {
474 pub fn is_operational(&self) -> bool {
476 matches!(self, NodeState::Running)
477 }
478
479 pub fn can_start(&self) -> bool {
481 matches!(self, NodeState::Created | NodeState::Stopped)
482 }
483
484 pub fn can_stop(&self) -> bool {
486 matches!(self, NodeState::Running)
487 }
488}
489
490impl fmt::Display for NodeState {
491 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
492 let s = match self {
493 NodeState::Created => "created",
494 NodeState::Starting => "starting",
495 NodeState::Running => "running",
496 NodeState::Stopping => "stopping",
497 NodeState::Stopped => "stopped",
498 };
499 write!(f, "{}", s)
500 }
501}
502
503#[derive(Clone, Debug)]
510pub(crate) struct RecentRequest {
511 pub(crate) from_peer: NodeAddr,
513 pub(crate) timestamp_ms: u64,
515 pub(crate) response_forwarded: bool,
519}
520
521impl RecentRequest {
522 pub(crate) fn new(from_peer: NodeAddr, timestamp_ms: u64) -> Self {
523 Self {
524 from_peer,
525 timestamp_ms,
526 response_forwarded: false,
527 }
528 }
529
530 pub(crate) fn is_expired(&self, current_time_ms: u64, expiry_ms: u64) -> bool {
532 current_time_ms.saturating_sub(self.timestamp_ms) > expiry_ms
533 }
534}
535
536type AddrKey = (TransportId, TransportAddr);
538
539#[derive(Debug, Default)]
544struct TransportDropState {
545 prev_drops: u64,
547 dropping: bool,
549}
550
551struct PendingConnect {
557 link_id: LinkId,
559 transport_id: TransportId,
561 remote_addr: TransportAddr,
563 peer_identity: PeerIdentity,
565}
566
567pub struct Node {
581 identity: Identity,
584
585 startup_epoch: [u8; 8],
588
589 started_at: std::time::Instant,
591
592 config: Config,
595
596 state: NodeState,
599
600 is_leaf_only: bool,
602
603 tree_state: TreeState,
606
607 bloom_state: BloomState,
610
611 coord_cache: CoordCache,
614 learned_routes: LearnedRouteTable,
616 session_direct_degraded_until_ms: HashMap<NodeAddr, u64>,
619 recent_requests: HashMap<u64, RecentRequest>,
622 path_mtu_lookup: Arc<std::sync::RwLock<HashMap<crate::FipsAddress, u16>>>,
628
629 transports: HashMap<TransportId, TransportHandle>,
632 transport_drops: HashMap<TransportId, TransportDropState>,
634 links: HashMap<LinkId, Link>,
636 addr_to_link: HashMap<AddrKey, LinkId>,
638
639 packet_tx: Option<PacketTx>,
642 packet_rx: Option<PacketRx>,
644
645 connections: HashMap<LinkId, PeerConnection>,
649
650 peers: HashMap<NodeAddr, ActivePeer>,
654
655 sessions: HashMap<NodeAddr, SessionEntry>,
659
660 identity_cache: HashMap<[u8; 15], IdentityCacheEntry>,
664
665 pending_tun_packets: HashMap<NodeAddr, VecDeque<Vec<u8>>>,
669 pending_endpoint_data: HashMap<NodeAddr, VecDeque<Vec<u8>>>,
671 pending_lookups: HashMap<NodeAddr, handlers::discovery::PendingLookup>,
675
676 max_connections: usize,
679 max_peers: usize,
681 max_links: usize,
683
684 next_link_id: u64,
687 next_transport_id: u32,
689
690 stats: stats::NodeStats,
693
694 stats_history: stats_history::StatsHistory,
696
697 tun_state: TunState,
700 tun_name: Option<String>,
702 tun_tx: Option<TunTx>,
704 tun_outbound_rx: Option<TunOutboundRx>,
706 external_packet_tx: Option<tokio::sync::mpsc::Sender<NodeDeliveredPacket>>,
708 endpoint_command_rx: Option<tokio::sync::mpsc::Receiver<NodeEndpointCommand>>,
710 endpoint_event_tx: Option<tokio::sync::mpsc::UnboundedSender<NodeEndpointEvent>>,
712 encrypt_workers: Option<encrypt_worker::EncryptWorkerPool>,
718 decrypt_workers: Option<decrypt_worker::DecryptWorkerPool>,
721 decrypt_registered_sessions: std::collections::HashSet<(TransportId, u32)>,
730 decrypt_fallback_rx:
734 Option<tokio::sync::mpsc::UnboundedReceiver<decrypt_worker::DecryptWorkerEvent>>,
735 decrypt_fallback_tx: tokio::sync::mpsc::UnboundedSender<decrypt_worker::DecryptWorkerEvent>,
736 tun_reader_handle: Option<JoinHandle<()>>,
738 tun_writer_handle: Option<JoinHandle<()>>,
740 #[cfg(target_os = "macos")]
743 tun_shutdown_fd: Option<std::os::unix::io::RawFd>,
744
745 dns_identity_rx: Option<crate::upper::dns::DnsIdentityRx>,
748 dns_task: Option<tokio::task::JoinHandle<()>>,
750
751 index_allocator: IndexAllocator,
754 peers_by_index: HashMap<(TransportId, u32), NodeAddr>,
757 pending_outbound: HashMap<(TransportId, u32), LinkId>,
760
761 msg1_rate_limiter: HandshakeRateLimiter,
764 icmp_rate_limiter: IcmpRateLimiter,
766 routing_error_rate_limiter: RoutingErrorRateLimiter,
768 coords_response_rate_limiter: RoutingErrorRateLimiter,
770 discovery_backoff: DiscoveryBackoff,
772 discovery_forward_limiter: DiscoveryForwardRateLimiter,
774
775 pending_connects: Vec<PendingConnect>,
781
782 retry_pending: HashMap<NodeAddr, retry::RetryState>,
788
789 nostr_discovery: Option<Arc<crate::discovery::nostr::NostrDiscovery>>,
791 lan_discovery: Option<Arc<crate::discovery::lan::LanDiscovery>>,
796 local_instance_registry: Option<crate::discovery::local::LocalInstanceRegistry>,
800 local_instance_started_at_ms: Option<u64>,
801 last_local_instance_publish_ms: Option<u64>,
802 last_local_instance_scan_ms: Option<u64>,
803 nostr_discovery_started_at_ms: Option<u64>,
808 startup_open_discovery_sweep_done: bool,
812 bootstrap_transports: HashSet<TransportId>,
814 bootstrap_transport_npubs: HashMap<TransportId, String>,
821 discovery_fallback_transit_blocked_peers: HashSet<NodeAddr>,
824
825 last_parent_reeval: Option<crate::time::Instant>,
828
829 last_congestion_log: Option<std::time::Instant>,
832
833 estimated_mesh_size: Option<u64>,
836 last_mesh_size_log: Option<std::time::Instant>,
838
839 last_self_warn: Option<std::time::Instant>,
845
846 local_send_failure_at_by_peer: HashMap<NodeAddr, std::time::Instant>,
853 last_rx_loop_maintenance_timeout_at: Option<std::time::Instant>,
858
859 peer_aliases: HashMap<NodeAddr, String>,
863 configured_peer_send_weights: HashMap<NodeAddr, u8>,
866
867 peer_acl: acl::PeerAclReloader,
869
870 host_map: Arc<HostMap>,
874}
875
876impl Node {
877 pub fn new(config: Config) -> Result<Self, NodeError> {
879 config.validate()?;
880 let identity = config.create_identity()?;
881 let node_addr = *identity.node_addr();
882 let is_leaf_only = config.is_leaf_only();
883
884 let (decrypt_fallback_tx, decrypt_fallback_rx) = tokio::sync::mpsc::unbounded_channel();
885 let decrypt_fallback_rx = Some(decrypt_fallback_rx);
886
887 let mut startup_epoch = [0u8; 8];
888 rand::rng().fill_bytes(&mut startup_epoch);
889
890 let mut bloom_state = if is_leaf_only {
891 BloomState::leaf_only(node_addr)
892 } else {
893 BloomState::new(node_addr)
894 };
895 bloom_state.set_update_debounce_ms(config.node.bloom.update_debounce_ms);
896
897 let tun_state = if config.tun.enabled {
898 TunState::Configured
899 } else {
900 TunState::Disabled
901 };
902
903 let mut tree_state = TreeState::new(node_addr);
905 tree_state.set_parent_hysteresis(config.node.tree.parent_hysteresis);
906 tree_state.set_hold_down(config.node.tree.hold_down_secs);
907 tree_state.set_flap_dampening(
908 config.node.tree.flap_threshold,
909 config.node.tree.flap_window_secs,
910 config.node.tree.flap_dampening_secs,
911 );
912 tree_state
913 .sign_declaration(&identity)
914 .expect("signing own declaration should never fail");
915
916 let coord_cache = CoordCache::new(
917 config.node.cache.coord_size,
918 config.node.cache.coord_ttl_secs * 1000,
919 );
920 let rl = &config.node.rate_limit;
921 let msg1_rate_limiter = HandshakeRateLimiter::with_params(
922 rate_limit::TokenBucket::with_params(rl.handshake_burst, rl.handshake_rate),
923 config.node.limits.max_pending_inbound,
924 );
925
926 let max_connections = config.node.limits.max_connections;
927 let max_peers = config.node.limits.max_peers;
928 let max_links = config.node.limits.max_links;
929 let coords_response_interval_ms = config.node.session.coords_response_interval_ms;
930 let backoff_base_secs = config.node.discovery.backoff_base_secs;
931 let backoff_max_secs = config.node.discovery.backoff_max_secs;
932 let forward_min_interval_secs = config.node.discovery.forward_min_interval_secs;
933
934 let (host_map, peer_acl) = Self::host_map_and_peer_acl(&config);
935 let configured_peer_send_weights = Self::configured_peer_send_weights(&config);
936
937 Ok(Self {
938 identity,
939 startup_epoch,
940 started_at: std::time::Instant::now(),
941 config,
942 state: NodeState::Created,
943 is_leaf_only,
944 tree_state,
945 bloom_state,
946 coord_cache,
947 learned_routes: LearnedRouteTable::default(),
948 session_direct_degraded_until_ms: HashMap::new(),
949 recent_requests: HashMap::new(),
950 transports: HashMap::new(),
951 transport_drops: HashMap::new(),
952 links: HashMap::new(),
953 addr_to_link: HashMap::new(),
954 packet_tx: None,
955 packet_rx: None,
956 connections: HashMap::new(),
957 peers: HashMap::new(),
958 sessions: HashMap::new(),
959 identity_cache: HashMap::new(),
960 pending_tun_packets: HashMap::new(),
961 pending_endpoint_data: HashMap::new(),
962 pending_lookups: HashMap::new(),
963 max_connections,
964 max_peers,
965 max_links,
966 next_link_id: 1,
967 next_transport_id: 1,
968 stats: stats::NodeStats::new(),
969 stats_history: stats_history::StatsHistory::new(),
970 tun_state,
971 tun_name: None,
972 tun_tx: None,
973 tun_outbound_rx: None,
974 external_packet_tx: None,
975 endpoint_command_rx: None,
976 endpoint_event_tx: None,
977 encrypt_workers: None,
978 decrypt_workers: None,
979 decrypt_registered_sessions: std::collections::HashSet::new(),
980 decrypt_fallback_tx,
981 decrypt_fallback_rx,
982 tun_reader_handle: None,
983 tun_writer_handle: None,
984 #[cfg(target_os = "macos")]
985 tun_shutdown_fd: None,
986 dns_identity_rx: None,
987 dns_task: None,
988 index_allocator: IndexAllocator::new(),
989 peers_by_index: HashMap::new(),
990 pending_outbound: HashMap::new(),
991 msg1_rate_limiter,
992 icmp_rate_limiter: IcmpRateLimiter::new(),
993 routing_error_rate_limiter: RoutingErrorRateLimiter::new(),
994 coords_response_rate_limiter: RoutingErrorRateLimiter::with_interval(
995 std::time::Duration::from_millis(coords_response_interval_ms),
996 ),
997 discovery_backoff: DiscoveryBackoff::with_params(backoff_base_secs, backoff_max_secs),
998 discovery_forward_limiter: DiscoveryForwardRateLimiter::with_interval(
999 std::time::Duration::from_secs(forward_min_interval_secs),
1000 ),
1001 pending_connects: Vec::new(),
1002 retry_pending: HashMap::new(),
1003 nostr_discovery: None,
1004 nostr_discovery_started_at_ms: None,
1005 lan_discovery: None,
1006 local_instance_registry: None,
1007 local_instance_started_at_ms: None,
1008 last_local_instance_publish_ms: None,
1009 last_local_instance_scan_ms: None,
1010 startup_open_discovery_sweep_done: false,
1011 bootstrap_transports: HashSet::new(),
1012 bootstrap_transport_npubs: HashMap::new(),
1013 discovery_fallback_transit_blocked_peers: HashSet::new(),
1014 last_parent_reeval: None,
1015 last_congestion_log: None,
1016 estimated_mesh_size: None,
1017 last_mesh_size_log: None,
1018 last_self_warn: None,
1019 local_send_failure_at_by_peer: HashMap::new(),
1020 last_rx_loop_maintenance_timeout_at: None,
1021 peer_aliases: HashMap::new(),
1022 configured_peer_send_weights,
1023 peer_acl,
1024 host_map,
1025 path_mtu_lookup: Arc::new(std::sync::RwLock::new(HashMap::new())),
1026 })
1027 }
1028
1029 pub fn with_identity(identity: Identity, config: Config) -> Result<Self, NodeError> {
1034 config.validate()?;
1035 let node_addr = *identity.node_addr();
1036
1037 let (decrypt_fallback_tx, decrypt_fallback_rx) = tokio::sync::mpsc::unbounded_channel();
1038 let decrypt_fallback_rx = Some(decrypt_fallback_rx);
1039
1040 let mut startup_epoch = [0u8; 8];
1041 rand::rng().fill_bytes(&mut startup_epoch);
1042
1043 let tun_state = if config.tun.enabled {
1044 TunState::Configured
1045 } else {
1046 TunState::Disabled
1047 };
1048
1049 let mut tree_state = TreeState::new(node_addr);
1051 tree_state.set_parent_hysteresis(config.node.tree.parent_hysteresis);
1052 tree_state.set_hold_down(config.node.tree.hold_down_secs);
1053 tree_state.set_flap_dampening(
1054 config.node.tree.flap_threshold,
1055 config.node.tree.flap_window_secs,
1056 config.node.tree.flap_dampening_secs,
1057 );
1058 tree_state
1059 .sign_declaration(&identity)
1060 .expect("signing own declaration should never fail");
1061
1062 let mut bloom_state = BloomState::new(node_addr);
1063 bloom_state.set_update_debounce_ms(config.node.bloom.update_debounce_ms);
1064
1065 let coord_cache = CoordCache::new(
1066 config.node.cache.coord_size,
1067 config.node.cache.coord_ttl_secs * 1000,
1068 );
1069 let rl = &config.node.rate_limit;
1070 let msg1_rate_limiter = HandshakeRateLimiter::with_params(
1071 rate_limit::TokenBucket::with_params(rl.handshake_burst, rl.handshake_rate),
1072 config.node.limits.max_pending_inbound,
1073 );
1074
1075 let max_connections = config.node.limits.max_connections;
1076 let max_peers = config.node.limits.max_peers;
1077 let max_links = config.node.limits.max_links;
1078 let coords_response_interval_ms = config.node.session.coords_response_interval_ms;
1079
1080 let (host_map, peer_acl) = Self::host_map_and_peer_acl(&config);
1081 let configured_peer_send_weights = Self::configured_peer_send_weights(&config);
1082
1083 Ok(Self {
1084 identity,
1085 startup_epoch,
1086 started_at: std::time::Instant::now(),
1087 config,
1088 state: NodeState::Created,
1089 is_leaf_only: false,
1090 tree_state,
1091 bloom_state,
1092 coord_cache,
1093 learned_routes: LearnedRouteTable::default(),
1094 session_direct_degraded_until_ms: HashMap::new(),
1095 recent_requests: HashMap::new(),
1096 transports: HashMap::new(),
1097 transport_drops: HashMap::new(),
1098 links: HashMap::new(),
1099 addr_to_link: HashMap::new(),
1100 packet_tx: None,
1101 packet_rx: None,
1102 connections: HashMap::new(),
1103 peers: HashMap::new(),
1104 sessions: HashMap::new(),
1105 identity_cache: HashMap::new(),
1106 pending_tun_packets: HashMap::new(),
1107 pending_endpoint_data: HashMap::new(),
1108 pending_lookups: HashMap::new(),
1109 max_connections,
1110 max_peers,
1111 max_links,
1112 next_link_id: 1,
1113 next_transport_id: 1,
1114 stats: stats::NodeStats::new(),
1115 stats_history: stats_history::StatsHistory::new(),
1116 tun_state,
1117 tun_name: None,
1118 tun_tx: None,
1119 tun_outbound_rx: None,
1120 external_packet_tx: None,
1121 endpoint_command_rx: None,
1122 endpoint_event_tx: None,
1123 encrypt_workers: None,
1124 decrypt_workers: None,
1125 decrypt_registered_sessions: std::collections::HashSet::new(),
1126 decrypt_fallback_tx,
1127 decrypt_fallback_rx,
1128 tun_reader_handle: None,
1129 tun_writer_handle: None,
1130 #[cfg(target_os = "macos")]
1131 tun_shutdown_fd: None,
1132 dns_identity_rx: None,
1133 dns_task: None,
1134 index_allocator: IndexAllocator::new(),
1135 peers_by_index: HashMap::new(),
1136 pending_outbound: HashMap::new(),
1137 msg1_rate_limiter,
1138 icmp_rate_limiter: IcmpRateLimiter::new(),
1139 routing_error_rate_limiter: RoutingErrorRateLimiter::new(),
1140 coords_response_rate_limiter: RoutingErrorRateLimiter::with_interval(
1141 std::time::Duration::from_millis(coords_response_interval_ms),
1142 ),
1143 discovery_backoff: DiscoveryBackoff::new(),
1144 discovery_forward_limiter: DiscoveryForwardRateLimiter::new(),
1145 pending_connects: Vec::new(),
1146 retry_pending: HashMap::new(),
1147 nostr_discovery: None,
1148 nostr_discovery_started_at_ms: None,
1149 lan_discovery: None,
1150 local_instance_registry: None,
1151 local_instance_started_at_ms: None,
1152 last_local_instance_publish_ms: None,
1153 last_local_instance_scan_ms: None,
1154 startup_open_discovery_sweep_done: false,
1155 bootstrap_transports: HashSet::new(),
1156 bootstrap_transport_npubs: HashMap::new(),
1157 discovery_fallback_transit_blocked_peers: HashSet::new(),
1158 last_parent_reeval: None,
1159 last_congestion_log: None,
1160 estimated_mesh_size: None,
1161 last_mesh_size_log: None,
1162 last_self_warn: None,
1163 local_send_failure_at_by_peer: HashMap::new(),
1164 last_rx_loop_maintenance_timeout_at: None,
1165 peer_aliases: HashMap::new(),
1166 configured_peer_send_weights,
1167 peer_acl,
1168 host_map,
1169 path_mtu_lookup: Arc::new(std::sync::RwLock::new(HashMap::new())),
1170 })
1171 }
1172
1173 pub fn leaf_only(config: Config) -> Result<Self, NodeError> {
1175 let mut node = Self::new(config)?;
1176 node.is_leaf_only = true;
1177 node.bloom_state = BloomState::leaf_only(*node.identity.node_addr());
1178 Ok(node)
1179 }
1180
1181 fn host_map_and_peer_acl(config: &Config) -> (Arc<HostMap>, acl::PeerAclReloader) {
1182 let base_host_map = HostMap::from_peer_configs(config.peers());
1183 if !config.node.system_files_enabled {
1184 return (
1185 Arc::new(base_host_map.clone()),
1186 acl::PeerAclReloader::memory_only(base_host_map),
1187 );
1188 }
1189
1190 let mut host_map = base_host_map.clone();
1191 let hosts_path = std::path::PathBuf::from(crate::upper::hosts::DEFAULT_HOSTS_PATH);
1192 let hosts_file = HostMap::load_hosts_file(std::path::Path::new(
1193 crate::upper::hosts::DEFAULT_HOSTS_PATH,
1194 ));
1195 host_map.merge(hosts_file);
1196 let peer_acl = acl::PeerAclReloader::with_alias_sources(
1197 std::path::PathBuf::from(acl::DEFAULT_PEERS_ALLOW_PATH),
1198 std::path::PathBuf::from(acl::DEFAULT_PEERS_DENY_PATH),
1199 base_host_map,
1200 hosts_path,
1201 );
1202 (Arc::new(host_map), peer_acl)
1203 }
1204
1205 fn configured_peer_send_weights(config: &Config) -> HashMap<NodeAddr, u8> {
1206 config
1207 .peers()
1208 .iter()
1209 .filter_map(|peer| {
1210 PeerIdentity::from_npub(&peer.npub).ok().map(|identity| {
1211 (
1212 *identity.node_addr(),
1213 encrypt_worker::EXPLICIT_PEER_SEND_WEIGHT,
1214 )
1215 })
1216 })
1217 .collect()
1218 }
1219
1220 fn send_weight_for_peer(&self, peer_addr: &NodeAddr) -> u8 {
1221 self.configured_peer_send_weights
1222 .get(peer_addr)
1223 .copied()
1224 .unwrap_or(encrypt_worker::DEFAULT_SEND_WEIGHT)
1225 }
1226
1227 async fn create_transports(&mut self, packet_tx: &PacketTx) -> Vec<TransportHandle> {
1231 let mut transports = Vec::new();
1232
1233 let udp_instances: Vec<_> = self
1235 .config
1236 .transports
1237 .udp
1238 .iter()
1239 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1240 .collect();
1241
1242 for (name, udp_config) in udp_instances {
1244 let transport_id = self.allocate_transport_id();
1245 let udp = UdpTransport::new(transport_id, name, udp_config, packet_tx.clone());
1246 transports.push(TransportHandle::Udp(udp));
1247 }
1248
1249 #[cfg(feature = "sim-transport")]
1250 {
1251 let sim_instances: Vec<_> = self
1252 .config
1253 .transports
1254 .sim
1255 .iter()
1256 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1257 .collect();
1258
1259 for (name, sim_config) in sim_instances {
1260 let transport_id = self.allocate_transport_id();
1261 let sim = crate::transport::sim::SimTransport::new(
1262 transport_id,
1263 name,
1264 sim_config,
1265 packet_tx.clone(),
1266 );
1267 transports.push(TransportHandle::Sim(sim));
1268 }
1269 }
1270
1271 #[cfg(any(target_os = "linux", target_os = "macos"))]
1273 {
1274 let eth_instances: Vec<_> = self
1275 .config
1276 .transports
1277 .ethernet
1278 .iter()
1279 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1280 .collect();
1281 let xonly = self.identity.pubkey();
1282 for (name, eth_config) in eth_instances {
1283 let mut eth_config = eth_config;
1284 if eth_config.discovery_scope.is_none() {
1285 eth_config.discovery_scope = self.lan_discovery_scope();
1286 }
1287 let transport_id = self.allocate_transport_id();
1288 let mut eth =
1289 EthernetTransport::new(transport_id, name, eth_config, packet_tx.clone());
1290 eth.set_local_pubkey(xonly);
1291 transports.push(TransportHandle::Ethernet(eth));
1292 }
1293 }
1294
1295 let tcp_instances: Vec<_> = self
1297 .config
1298 .transports
1299 .tcp
1300 .iter()
1301 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1302 .collect();
1303
1304 for (name, tcp_config) in tcp_instances {
1305 let transport_id = self.allocate_transport_id();
1306 let tcp = TcpTransport::new(transport_id, name, tcp_config, packet_tx.clone());
1307 transports.push(TransportHandle::Tcp(tcp));
1308 }
1309
1310 let tor_instances: Vec<_> = self
1312 .config
1313 .transports
1314 .tor
1315 .iter()
1316 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1317 .collect();
1318
1319 for (name, tor_config) in tor_instances {
1320 let transport_id = self.allocate_transport_id();
1321 let tor = TorTransport::new(transport_id, name, tor_config, packet_tx.clone());
1322 transports.push(TransportHandle::Tor(tor));
1323 }
1324
1325 let webrtc_instances: Vec<_> = self
1326 .config
1327 .transports
1328 .webrtc
1329 .iter()
1330 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1331 .collect();
1332
1333 #[cfg(feature = "webrtc-transport")]
1334 {
1335 for (name, webrtc_config) in webrtc_instances {
1336 let transport_id = self.allocate_transport_id();
1337 match WebRtcTransport::new(
1338 transport_id,
1339 name,
1340 webrtc_config,
1341 packet_tx.clone(),
1342 &self.identity,
1343 &self.config.node.discovery.nostr,
1344 ) {
1345 Ok(webrtc) => transports.push(TransportHandle::WebRtc(Box::new(webrtc))),
1346 Err(err) => {
1347 warn!(
1348 transport_id = %transport_id,
1349 error = %err,
1350 "failed to initialize WebRTC transport"
1351 );
1352 }
1353 }
1354 }
1355 }
1356 #[cfg(not(feature = "webrtc-transport"))]
1357 if !webrtc_instances.is_empty() {
1358 warn!("WebRTC transport configured but this build lacks WebRTC transport support");
1359 }
1360
1361 #[cfg(bluer_available)]
1363 {
1364 let ble_instances: Vec<_> = self
1365 .config
1366 .transports
1367 .ble
1368 .iter()
1369 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1370 .collect();
1371
1372 #[cfg(all(bluer_available, not(test)))]
1373 for (name, ble_config) in ble_instances {
1374 let transport_id = self.allocate_transport_id();
1375 let adapter = ble_config.adapter().to_string();
1376 let mtu = ble_config.mtu();
1377 match crate::transport::ble::io::BluerIo::new(&adapter, mtu).await {
1378 Ok(io) => {
1379 let mut ble = crate::transport::ble::BleTransport::new(
1380 transport_id,
1381 name,
1382 ble_config,
1383 io,
1384 packet_tx.clone(),
1385 );
1386 ble.set_local_pubkey(self.identity.pubkey().serialize());
1387 transports.push(TransportHandle::Ble(ble));
1388 }
1389 Err(e) => {
1390 tracing::warn!(adapter = %adapter, error = %e, "failed to initialize BLE adapter");
1391 }
1392 }
1393 }
1394
1395 #[cfg(any(not(bluer_available), test))]
1396 if !ble_instances.is_empty() {
1397 #[cfg(not(test))]
1398 tracing::warn!("BLE transport configured but this build lacks BlueZ support");
1399 }
1400 }
1401
1402 transports
1403 }
1404
1405 fn find_transport_for_type(&self, transport_type: &str) -> Option<TransportId> {
1415 self.transports
1416 .iter()
1417 .filter(|(id, handle)| {
1418 handle.transport_type().name == transport_type
1419 && handle.is_operational()
1420 && !self.bootstrap_transports.contains(id)
1421 })
1422 .min_by_key(|(id, _)| id.as_u32())
1423 .map(|(id, _)| *id)
1424 }
1425
1426 #[allow(unused_variables)]
1432 fn resolve_ethernet_addr(
1433 &self,
1434 addr_str: &str,
1435 ) -> Result<(TransportId, TransportAddr), NodeError> {
1436 #[cfg(any(target_os = "linux", target_os = "macos"))]
1437 {
1438 let (iface, mac_str) = addr_str.split_once('/').ok_or_else(|| {
1439 NodeError::NoTransportForType(format!(
1440 "invalid Ethernet address format '{}': expected 'interface/mac'",
1441 addr_str
1442 ))
1443 })?;
1444
1445 let transport_id = self
1447 .transports
1448 .iter()
1449 .find(|(_, handle)| {
1450 handle.transport_type().name == "ethernet"
1451 && handle.is_operational()
1452 && handle.interface_name() == Some(iface)
1453 })
1454 .map(|(id, _)| *id)
1455 .ok_or_else(|| {
1456 NodeError::NoTransportForType(format!(
1457 "no operational Ethernet transport for interface '{}'",
1458 iface
1459 ))
1460 })?;
1461
1462 let mac = crate::transport::ethernet::parse_mac_string(mac_str).map_err(|e| {
1463 NodeError::NoTransportForType(format!("invalid MAC in '{}': {}", addr_str, e))
1464 })?;
1465
1466 Ok((transport_id, TransportAddr::from_bytes(&mac)))
1467 }
1468 #[cfg(not(any(target_os = "linux", target_os = "macos")))]
1469 {
1470 Err(NodeError::NoTransportForType(
1471 "Ethernet transport is not supported on this platform".to_string(),
1472 ))
1473 }
1474 }
1475
1476 #[cfg(bluer_available)]
1480 fn resolve_ble_addr(&self, addr_str: &str) -> Result<(TransportId, TransportAddr), NodeError> {
1481 let ta = TransportAddr::from_string(addr_str);
1482 let adapter = crate::transport::ble::addr::adapter_from_addr(&ta).ok_or_else(|| {
1483 NodeError::NoTransportForType(format!(
1484 "invalid BLE address format '{}': expected 'adapter/mac'",
1485 addr_str
1486 ))
1487 })?;
1488
1489 let transport_id = self
1491 .transports
1492 .iter()
1493 .find(|(_, handle)| handle.transport_type().name == "ble" && handle.is_operational())
1494 .map(|(id, _)| *id)
1495 .ok_or_else(|| {
1496 NodeError::NoTransportForType(format!(
1497 "no operational BLE transport for adapter '{}'",
1498 adapter
1499 ))
1500 })?;
1501
1502 crate::transport::ble::addr::BleAddr::parse(addr_str).map_err(|e| {
1504 NodeError::NoTransportForType(format!("invalid BLE address '{}': {}", addr_str, e))
1505 })?;
1506
1507 Ok((transport_id, TransportAddr::from_string(addr_str)))
1508 }
1509
1510 pub fn identity(&self) -> &Identity {
1514 &self.identity
1515 }
1516
1517 pub fn node_addr(&self) -> &NodeAddr {
1519 self.identity.node_addr()
1520 }
1521
1522 pub fn npub(&self) -> String {
1524 self.identity.npub()
1525 }
1526
1527 pub(crate) fn peer_display_name(&self, addr: &NodeAddr) -> String {
1536 if let Some(hostname) = self.host_map.lookup_hostname(addr) {
1537 return hostname.to_string();
1538 }
1539 if let Some(name) = self.peer_aliases.get(addr) {
1540 return name.clone();
1541 }
1542 if let Some(peer) = self.peers.get(addr) {
1543 return peer.identity().short_npub();
1544 }
1545 if let Some(entry) = self.sessions.get(addr) {
1546 let (xonly, _) = entry.remote_pubkey().x_only_public_key();
1547 return PeerIdentity::from_pubkey(xonly).short_npub();
1548 }
1549 addr.short_hex()
1550 }
1551
1552 pub(in crate::node) fn deregister_session_index(&mut self, cache_key: (TransportId, u32)) {
1564 let owning_peer = self.peers_by_index.get(&cache_key).copied();
1568 self.peers_by_index.remove(&cache_key);
1569 if self.decrypt_registered_sessions.remove(&cache_key)
1570 && let Some(workers) = self.decrypt_workers.as_ref()
1571 {
1572 workers.unregister_session(cache_key);
1573 }
1574 if let Some(peer_addr) = owning_peer {
1585 let peer_has_other_index = self
1586 .peers_by_index
1587 .values()
1588 .any(|other| *other == peer_addr);
1589 if !peer_has_other_index {
1590 self.clear_connected_udp_for_peer(&peer_addr);
1591 }
1592 }
1593 }
1594
1595 pub(in crate::node) fn ensure_current_session_index_registered(
1604 &mut self,
1605 node_addr: &NodeAddr,
1606 context: &'static str,
1607 ) -> bool {
1608 let Some(peer) = self.peers.get(node_addr) else {
1609 return false;
1610 };
1611 let Some(transport_id) = peer.transport_id() else {
1612 warn!(
1613 peer = %self.peer_display_name(node_addr),
1614 context,
1615 "Cannot register current session index without transport id"
1616 );
1617 return false;
1618 };
1619 let Some(our_index) = peer.our_index() else {
1620 warn!(
1621 peer = %self.peer_display_name(node_addr),
1622 context,
1623 "Cannot register current session index without local index"
1624 );
1625 return false;
1626 };
1627
1628 let cache_key = (transport_id, our_index.as_u32());
1629 match self.peers_by_index.get(&cache_key).copied() {
1630 Some(existing) if existing == *node_addr => true,
1631 Some(existing) => {
1632 warn!(
1633 peer = %self.peer_display_name(node_addr),
1634 previous_owner = %self.peer_display_name(&existing),
1635 transport_id = %transport_id,
1636 our_index = %our_index,
1637 context,
1638 "Repairing current session index with stale owner"
1639 );
1640 self.peers_by_index.insert(cache_key, *node_addr);
1641 true
1642 }
1643 None => {
1644 warn!(
1645 peer = %self.peer_display_name(node_addr),
1646 transport_id = %transport_id,
1647 our_index = %our_index,
1648 context,
1649 "Repairing missing current session index"
1650 );
1651 self.peers_by_index.insert(cache_key, *node_addr);
1652 true
1653 }
1654 }
1655 }
1656
1657 pub fn config(&self) -> &Config {
1661 &self.config
1662 }
1663
1664 pub fn effective_ipv6_mtu(&self) -> u16 {
1670 crate::upper::icmp::effective_ipv6_mtu(self.transport_mtu())
1671 }
1672
1673 pub fn transport_mtu(&self) -> u16 {
1690 let min_operational = self
1691 .transports
1692 .values()
1693 .filter(|h| h.is_operational())
1694 .map(|h| h.mtu())
1695 .min();
1696 if let Some(mtu) = min_operational {
1697 return mtu;
1698 }
1699 if let Some((_, cfg)) = self.config.transports.udp.iter().next() {
1701 return cfg.mtu();
1702 }
1703 1280
1704 }
1705
1706 pub fn state(&self) -> NodeState {
1710 self.state
1711 }
1712
1713 pub fn uptime(&self) -> std::time::Duration {
1715 self.started_at.elapsed()
1716 }
1717
1718 pub fn is_running(&self) -> bool {
1720 self.state.is_operational()
1721 }
1722
1723 pub fn is_leaf_only(&self) -> bool {
1725 self.is_leaf_only
1726 }
1727
1728 pub fn tree_state(&self) -> &TreeState {
1732 &self.tree_state
1733 }
1734
1735 pub fn tree_state_mut(&mut self) -> &mut TreeState {
1737 &mut self.tree_state
1738 }
1739
1740 pub fn bloom_state(&self) -> &BloomState {
1744 &self.bloom_state
1745 }
1746
1747 pub fn bloom_state_mut(&mut self) -> &mut BloomState {
1749 &mut self.bloom_state
1750 }
1751
1752 pub fn estimated_mesh_size(&self) -> Option<u64> {
1756 self.estimated_mesh_size
1757 }
1758
1759 pub(crate) fn compute_mesh_size(&mut self) {
1765 let my_addr = *self.tree_state.my_node_addr();
1766 let parent_id = *self.tree_state.my_declaration().parent_id();
1767 let is_root = self.tree_state.is_root();
1768
1769 let max_fpr = self.config.node.bloom.max_inbound_fpr;
1770 let mut total: f64 = 1.0; let mut child_count: u32 = 0;
1772 let mut has_data = false;
1773
1774 if !is_root
1780 && let Some(parent) = self.peers.get(&parent_id)
1781 && let Some(filter) = parent.inbound_filter()
1782 {
1783 match filter.estimated_count(max_fpr) {
1784 Some(n) => {
1785 total += n;
1786 has_data = true;
1787 }
1788 None => {
1789 self.estimated_mesh_size = None;
1790 return;
1791 }
1792 }
1793 }
1794
1795 for (peer_addr, peer) in &self.peers {
1797 if peer_addr == &parent_id {
1798 continue;
1799 }
1800 if let Some(decl) = self.tree_state.peer_declaration(peer_addr)
1801 && *decl.parent_id() == my_addr
1802 {
1803 child_count += 1;
1804 if let Some(filter) = peer.inbound_filter() {
1805 match filter.estimated_count(max_fpr) {
1806 Some(n) => {
1807 total += n;
1808 has_data = true;
1809 }
1810 None => {
1811 self.estimated_mesh_size = None;
1812 return;
1813 }
1814 }
1815 }
1816 }
1817 }
1818
1819 if !has_data {
1820 self.estimated_mesh_size = None;
1821 return;
1822 }
1823
1824 let size = total.round() as u64;
1825 self.estimated_mesh_size = Some(size);
1826
1827 let now = std::time::Instant::now();
1829 let should_log = match self.last_mesh_size_log {
1830 None => true,
1831 Some(last) => {
1832 now.duration_since(last)
1833 >= std::time::Duration::from_secs(self.config.node.mmp.log_interval_secs)
1834 }
1835 };
1836 if should_log {
1837 tracing::debug!(
1838 estimated_mesh_size = size,
1839 peers = self.peers.len(),
1840 children = child_count,
1841 "Mesh size estimate"
1842 );
1843 self.last_mesh_size_log = Some(now);
1844 }
1845 }
1846
1847 pub fn coord_cache(&self) -> &CoordCache {
1851 &self.coord_cache
1852 }
1853
1854 pub fn coord_cache_mut(&mut self) -> &mut CoordCache {
1856 &mut self.coord_cache
1857 }
1858
1859 pub fn stats(&self) -> &stats::NodeStats {
1863 &self.stats
1864 }
1865
1866 pub(crate) fn stats_mut(&mut self) -> &mut stats::NodeStats {
1868 &mut self.stats
1869 }
1870
1871 pub fn stats_history(&self) -> &stats_history::StatsHistory {
1873 &self.stats_history
1874 }
1875
1876 pub(crate) fn record_stats_history(&mut self) {
1879 let fwd = &self.stats.forwarding;
1880 let peers_with_mmp: Vec<f64> = self
1881 .peers
1882 .values()
1883 .filter_map(|p| p.mmp().map(|m| m.metrics.loss_rate()))
1884 .collect();
1885 let loss_rate = if peers_with_mmp.is_empty() {
1886 0.0
1887 } else {
1888 peers_with_mmp.iter().sum::<f64>() / peers_with_mmp.len() as f64
1889 };
1890
1891 let snap = stats_history::Snapshot {
1892 mesh_size: self.estimated_mesh_size,
1893 tree_depth: self.tree_state.my_coords().depth() as u32,
1894 peer_count: self.peers.len() as u64,
1895 parent_switches_total: self.stats.tree.parent_switches,
1896 bytes_in_total: fwd.received_bytes,
1897 bytes_out_total: fwd.forwarded_bytes + fwd.originated_bytes,
1898 packets_in_total: fwd.received_packets,
1899 packets_out_total: fwd.forwarded_packets + fwd.originated_packets,
1900 loss_rate,
1901 active_sessions: self.sessions.len() as u64,
1902 };
1903
1904 let now = std::time::Instant::now();
1905 let peer_snaps: Vec<stats_history::PeerSnapshot> = self
1906 .peers
1907 .values()
1908 .map(|p| {
1909 let stats = p.link_stats();
1910 let (srtt_ms, loss_rate, ecn_ce) = match p.mmp() {
1911 Some(m) => (
1912 m.metrics.srtt_ms(),
1913 Some(m.metrics.loss_rate()),
1914 m.receiver.ecn_ce_count() as u64,
1915 ),
1916 None => (None, None, 0),
1917 };
1918 stats_history::PeerSnapshot {
1919 node_addr: *p.node_addr(),
1920 last_seen: now,
1921 srtt_ms,
1922 loss_rate,
1923 bytes_in_total: stats.bytes_recv,
1924 bytes_out_total: stats.bytes_sent,
1925 packets_in_total: stats.packets_recv,
1926 packets_out_total: stats.packets_sent,
1927 ecn_ce_total: ecn_ce,
1928 }
1929 })
1930 .collect();
1931
1932 self.stats_history.tick(now, &snap, &peer_snaps);
1933 }
1934
1935 pub fn tun_state(&self) -> TunState {
1939 self.tun_state
1940 }
1941
1942 pub fn tun_name(&self) -> Option<&str> {
1944 self.tun_name.as_deref()
1945 }
1946
1947 pub fn set_max_connections(&mut self, max: usize) {
1951 self.max_connections = max;
1952 }
1953
1954 pub fn set_max_peers(&mut self, max: usize) {
1956 self.max_peers = max;
1957 }
1958
1959 pub(crate) fn outbound_admission_check(&self) -> bool {
1962 let connection_used = self
1963 .connections
1964 .len()
1965 .saturating_add(self.pending_connects.len());
1966 let peer_allowed = self.max_peers == 0 || self.peers.len() < self.max_peers;
1967 let connection_allowed =
1968 self.max_connections == 0 || connection_used < self.max_connections;
1969 let link_allowed = self.max_links == 0 || self.links.len() < self.max_links;
1970 peer_allowed && connection_allowed && link_allowed
1971 }
1972
1973 pub(crate) fn open_discovery_outbound_admission_check(&self) -> bool {
1977 if !self.outbound_admission_check() {
1978 return false;
1979 }
1980
1981 let nostr = &self.config.node.discovery.nostr;
1982 if !nostr.enabled || nostr.policy != NostrDiscoveryPolicy::Open {
1983 return true;
1984 }
1985
1986 let configured_npubs = self
1987 .config
1988 .peers()
1989 .iter()
1990 .map(|peer| peer.npub.clone())
1991 .collect::<HashSet<_>>();
1992 self.open_discovery_enqueue_budget(&configured_npubs) > 0
1993 }
1994
1995 pub(crate) fn outbound_direct_refresh_admission_check(&self) -> bool {
1999 let connection_used = self
2000 .connections
2001 .len()
2002 .saturating_add(self.pending_connects.len());
2003 let connection_allowed =
2004 self.max_connections == 0 || connection_used < self.max_connections;
2005 let link_allowed = self.max_links == 0 || self.links.len() < self.max_links;
2006 connection_allowed && link_allowed
2007 }
2008
2009 pub fn set_max_links(&mut self, max: usize) {
2011 self.max_links = max;
2012 }
2013
2014 pub fn connection_count(&self) -> usize {
2018 self.connections.len()
2019 }
2020
2021 pub fn peer_count(&self) -> usize {
2023 self.peers.len()
2024 }
2025
2026 pub fn link_count(&self) -> usize {
2028 self.links.len()
2029 }
2030
2031 pub fn transport_count(&self) -> usize {
2033 self.transports.len()
2034 }
2035
2036 pub fn allocate_transport_id(&mut self) -> TransportId {
2040 let id = TransportId::new(self.next_transport_id);
2041 self.next_transport_id += 1;
2042 id
2043 }
2044
2045 pub fn get_transport(&self, id: &TransportId) -> Option<&TransportHandle> {
2047 self.transports.get(id)
2048 }
2049
2050 pub fn get_transport_mut(&mut self, id: &TransportId) -> Option<&mut TransportHandle> {
2052 self.transports.get_mut(id)
2053 }
2054
2055 pub fn transport_ids(&self) -> impl Iterator<Item = &TransportId> {
2057 self.transports.keys()
2058 }
2059
2060 pub fn packet_rx(&mut self) -> Option<&mut PacketRx> {
2062 self.packet_rx.as_mut()
2063 }
2064
2065 pub fn allocate_link_id(&mut self) -> LinkId {
2069 let id = LinkId::new(self.next_link_id);
2070 self.next_link_id += 1;
2071 id
2072 }
2073
2074 pub fn add_link(&mut self, link: Link) -> Result<(), NodeError> {
2076 if self.max_links > 0 && self.links.len() >= self.max_links {
2077 return Err(NodeError::MaxLinksExceeded {
2078 max: self.max_links,
2079 });
2080 }
2081 let link_id = link.link_id();
2082 let transport_id = link.transport_id();
2083 let remote_addr = link.remote_addr().clone();
2084
2085 self.links.insert(link_id, link);
2086 self.addr_to_link
2087 .insert((transport_id, remote_addr), link_id);
2088 Ok(())
2089 }
2090
2091 pub fn get_link(&self, link_id: &LinkId) -> Option<&Link> {
2093 self.links.get(link_id)
2094 }
2095
2096 pub fn get_link_mut(&mut self, link_id: &LinkId) -> Option<&mut Link> {
2098 self.links.get_mut(link_id)
2099 }
2100
2101 pub fn find_link_by_addr(
2103 &self,
2104 transport_id: TransportId,
2105 addr: &TransportAddr,
2106 ) -> Option<LinkId> {
2107 self.addr_to_link
2108 .get(&(transport_id, addr.clone()))
2109 .copied()
2110 }
2111
2112 pub fn remove_link(&mut self, link_id: &LinkId) -> Option<Link> {
2118 if let Some(link) = self.links.remove(link_id) {
2119 let key = (link.transport_id(), link.remote_addr().clone());
2121 if self.addr_to_link.get(&key) == Some(link_id) {
2122 self.addr_to_link.remove(&key);
2123 }
2124 Some(link)
2125 } else {
2126 None
2127 }
2128 }
2129
2130 pub(crate) fn cleanup_bootstrap_transport_if_unused(&mut self, transport_id: TransportId) {
2131 if !self.bootstrap_transports.contains(&transport_id) {
2132 return;
2133 }
2134
2135 let transport_in_use = self
2136 .links
2137 .values()
2138 .any(|link| link.transport_id() == transport_id)
2139 || self
2140 .connections
2141 .values()
2142 .any(|conn| conn.transport_id() == Some(transport_id))
2143 || self
2144 .peers
2145 .values()
2146 .any(|peer| peer.transport_id() == Some(transport_id))
2147 || self
2148 .pending_connects
2149 .iter()
2150 .any(|pending| pending.transport_id == transport_id);
2151
2152 if transport_in_use {
2153 return;
2154 }
2155
2156 tracing::debug!(
2157 transport_id = %transport_id,
2158 "bootstrap transport has no remaining references; dropping"
2159 );
2160
2161 self.bootstrap_transports.remove(&transport_id);
2162 self.bootstrap_transport_npubs.remove(&transport_id);
2163 self.transport_drops.remove(&transport_id);
2164 self.transports.remove(&transport_id);
2165 }
2166
2167 pub fn links(&self) -> impl Iterator<Item = &Link> {
2169 self.links.values()
2170 }
2171
2172 pub fn add_connection(&mut self, connection: PeerConnection) -> Result<(), NodeError> {
2176 let link_id = connection.link_id();
2177
2178 if self.connections.contains_key(&link_id) {
2179 return Err(NodeError::ConnectionAlreadyExists(link_id));
2180 }
2181
2182 if self.max_connections > 0 && self.connections.len() >= self.max_connections {
2183 return Err(NodeError::MaxConnectionsExceeded {
2184 max: self.max_connections,
2185 });
2186 }
2187
2188 self.connections.insert(link_id, connection);
2189 Ok(())
2190 }
2191
2192 pub fn get_connection(&self, link_id: &LinkId) -> Option<&PeerConnection> {
2194 self.connections.get(link_id)
2195 }
2196
2197 pub fn get_connection_mut(&mut self, link_id: &LinkId) -> Option<&mut PeerConnection> {
2199 self.connections.get_mut(link_id)
2200 }
2201
2202 pub fn remove_connection(&mut self, link_id: &LinkId) -> Option<PeerConnection> {
2204 self.connections.remove(link_id)
2205 }
2206
2207 pub fn connections(&self) -> impl Iterator<Item = &PeerConnection> {
2209 self.connections.values()
2210 }
2211
2212 pub fn get_peer(&self, node_addr: &NodeAddr) -> Option<&ActivePeer> {
2216 self.peers.get(node_addr)
2217 }
2218
2219 pub fn get_peer_mut(&mut self, node_addr: &NodeAddr) -> Option<&mut ActivePeer> {
2221 self.peers.get_mut(node_addr)
2222 }
2223
2224 pub fn remove_peer(&mut self, node_addr: &NodeAddr) -> Option<ActivePeer> {
2226 self.peers.remove(node_addr)
2227 }
2228
2229 pub fn peers(&self) -> impl Iterator<Item = &ActivePeer> {
2231 self.peers.values()
2232 }
2233
2234 pub fn nostr_discovery_handle(&self) -> Option<&crate::discovery::nostr::NostrDiscovery> {
2238 self.nostr_discovery.as_deref()
2239 }
2240
2241 pub fn peer_ids(&self) -> impl Iterator<Item = &NodeAddr> {
2243 self.peers.keys()
2244 }
2245
2246 pub fn sendable_peers(&self) -> impl Iterator<Item = &ActivePeer> {
2248 self.peers.values().filter(|p| p.can_send())
2249 }
2250
2251 pub fn sendable_peer_count(&self) -> usize {
2253 self.peers.values().filter(|p| p.can_send()).count()
2254 }
2255
2256 pub(crate) fn set_discovery_fallback_transit_allowed(
2257 &mut self,
2258 peer_addr: NodeAddr,
2259 allowed: bool,
2260 ) {
2261 if allowed {
2262 self.discovery_fallback_transit_blocked_peers
2263 .remove(&peer_addr);
2264 } else {
2265 self.discovery_fallback_transit_blocked_peers
2266 .insert(peer_addr);
2267 }
2268 }
2269
2270 pub(crate) fn configured_discovery_fallback_transit(
2271 &self,
2272 peer_addr: &NodeAddr,
2273 ) -> Option<bool> {
2274 self.configured_peer(peer_addr)
2275 .map(|peer| peer.discovery_fallback_transit)
2276 }
2277
2278 pub(crate) fn configured_peer(&self, peer_addr: &NodeAddr) -> Option<&PeerConfig> {
2279 self.config.peers().iter().find(|peer| {
2280 PeerIdentity::from_npub(&peer.npub)
2281 .ok()
2282 .is_some_and(|identity| identity.node_addr() == peer_addr)
2283 })
2284 }
2285
2286 pub(in crate::node) fn active_peer_uses_configured_static_udp_path(
2287 &self,
2288 peer_addr: &NodeAddr,
2289 ) -> bool {
2290 let Some(peer_config) = self.configured_peer(peer_addr) else {
2291 return false;
2292 };
2293
2294 peer_config.addresses.iter().any(|candidate| {
2295 candidate.seen_at_ms.is_none()
2296 && candidate.transport.eq_ignore_ascii_case("udp")
2297 && self.active_peer_matches_candidate(peer_addr, candidate)
2298 })
2299 }
2300
2301 pub(crate) fn discovery_fallback_transit_for_promotion(&self, peer_addr: &NodeAddr) -> bool {
2302 if let Some(retry_state) = self.retry_pending.get(peer_addr) {
2303 return retry_state.peer_config.discovery_fallback_transit;
2304 }
2305
2306 if let Some(allowed) = self.configured_discovery_fallback_transit(peer_addr) {
2307 return allowed;
2308 }
2309
2310 self.config.node.discovery.nostr.policy != crate::config::NostrDiscoveryPolicy::Open
2311 }
2312
2313 #[cfg(test)]
2318 pub(crate) fn disable_discovery_forward_rate_limit(&mut self) {
2319 self.discovery_forward_limiter
2320 .set_interval(std::time::Duration::ZERO);
2321 }
2322
2323 #[cfg(test)]
2324 pub(crate) fn get_session(&self, remote: &NodeAddr) -> Option<&SessionEntry> {
2325 self.sessions.get(remote)
2326 }
2327
2328 #[cfg(test)]
2330 pub(crate) fn get_session_mut(&mut self, remote: &NodeAddr) -> Option<&mut SessionEntry> {
2331 self.sessions.get_mut(remote)
2332 }
2333
2334 #[cfg(test)]
2336 pub(crate) fn remove_session(&mut self, remote: &NodeAddr) -> Option<SessionEntry> {
2337 self.sessions.remove(remote)
2338 }
2339
2340 #[cfg(test)]
2342 pub(crate) fn path_mtu_lookup_get(&self, fips_addr: &crate::FipsAddress) -> Option<u16> {
2343 self.path_mtu_lookup
2344 .read()
2345 .ok()
2346 .and_then(|map| map.get(fips_addr).copied())
2347 }
2348
2349 #[cfg(test)]
2351 pub(crate) fn path_mtu_lookup_insert(&self, fips_addr: crate::FipsAddress, mtu: u16) {
2352 if let Ok(mut map) = self.path_mtu_lookup.write() {
2353 map.insert(fips_addr, mtu);
2354 }
2355 }
2356
2357 pub fn session_count(&self) -> usize {
2359 self.sessions.len()
2360 }
2361
2362 pub(crate) fn session_entries(&self) -> impl Iterator<Item = (&NodeAddr, &SessionEntry)> {
2364 self.sessions.iter()
2365 }
2366
2367 pub(crate) fn register_identity(
2371 &mut self,
2372 node_addr: NodeAddr,
2373 pubkey: secp256k1::PublicKey,
2374 ) -> bool {
2375 let mut prefix = [0u8; 15];
2376 prefix.copy_from_slice(&node_addr.as_bytes()[0..15]);
2377 if let Some(entry) = self.identity_cache.get(&prefix)
2378 && entry.node_addr == node_addr
2379 && entry.pubkey == pubkey
2380 {
2381 return true;
2385 }
2386
2387 let (xonly, _) = pubkey.x_only_public_key();
2388 let derived_node_addr = NodeAddr::from_pubkey(&xonly);
2389 if derived_node_addr != node_addr {
2390 debug!(
2391 claimed_node_addr = %node_addr,
2392 derived_node_addr = %derived_node_addr,
2393 "Rejected identity cache entry with mismatched public key"
2394 );
2395 return false;
2396 }
2397
2398 let now_ms = Self::now_ms();
2399 if let Some(entry) = self.identity_cache.get_mut(&prefix)
2400 && entry.node_addr == node_addr
2401 {
2402 entry.pubkey = pubkey;
2403 entry.last_seen_ms = now_ms;
2404 return true;
2405 }
2406
2407 let npub = encode_npub(&xonly);
2408 self.identity_cache.insert(
2409 prefix,
2410 IdentityCacheEntry::new(node_addr, pubkey, npub, now_ms),
2411 );
2412 let max = self.config.node.cache.identity_size;
2414 if self.identity_cache.len() > max
2415 && let Some(oldest_key) = self
2416 .identity_cache
2417 .iter()
2418 .min_by_key(|(_, entry)| entry.last_seen_ms)
2419 .map(|(k, _)| *k)
2420 {
2421 self.identity_cache.remove(&oldest_key);
2422 }
2423 true
2424 }
2425
2426 pub(crate) fn lookup_by_fips_prefix(
2428 &mut self,
2429 prefix: &[u8; 15],
2430 ) -> Option<(NodeAddr, secp256k1::PublicKey)> {
2431 if let Some(entry) = self.identity_cache.get_mut(prefix) {
2432 entry.last_seen_ms = Self::now_ms(); Some((entry.node_addr, entry.pubkey))
2434 } else {
2435 None
2436 }
2437 }
2438
2439 pub(crate) fn has_cached_identity(&self, addr: &NodeAddr) -> bool {
2441 let mut prefix = [0u8; 15];
2442 prefix.copy_from_slice(&addr.as_bytes()[0..15]);
2443 self.identity_cache.contains_key(&prefix)
2444 }
2445
2446 pub fn identity_cache_len(&self) -> usize {
2448 self.identity_cache.len()
2449 }
2450
2451 pub fn identity_cache_iter(
2456 &self,
2457 ) -> impl Iterator<Item = (&NodeAddr, &secp256k1::PublicKey, u64)> {
2458 self.identity_cache
2459 .values()
2460 .map(|entry| (&entry.node_addr, &entry.pubkey, entry.last_seen_ms))
2461 }
2462
2463 pub fn identity_cache_max(&self) -> usize {
2465 self.config.node.cache.identity_size
2466 }
2467
2468 pub fn pending_lookup_count(&self) -> usize {
2470 self.pending_lookups.len()
2471 }
2472
2473 pub fn pending_lookups_iter(
2475 &self,
2476 ) -> impl Iterator<Item = (&NodeAddr, &handlers::discovery::PendingLookup)> {
2477 self.pending_lookups.iter()
2478 }
2479
2480 pub fn recent_request_count(&self) -> usize {
2482 self.recent_requests.len()
2483 }
2484
2485 pub fn pending_tun_destinations(&self) -> usize {
2487 self.pending_tun_packets.len()
2488 }
2489
2490 pub fn pending_tun_total_packets(&self) -> usize {
2492 self.pending_tun_packets.values().map(|q| q.len()).sum()
2493 }
2494
2495 pub fn retry_state_iter(&self) -> impl Iterator<Item = (&NodeAddr, &retry::RetryState)> {
2497 self.retry_pending.iter()
2498 }
2499
2500 pub(crate) fn is_tree_peer(&self, peer_addr: &NodeAddr) -> bool {
2507 if !self.tree_state.is_root() && self.tree_state.my_declaration().parent_id() == peer_addr {
2509 return true;
2510 }
2511 if let Some(decl) = self.tree_state.peer_declaration(peer_addr)
2513 && decl.parent_id() == self.node_addr()
2514 {
2515 return true;
2516 }
2517 false
2518 }
2519
2520 pub fn find_next_hop(&mut self, dest_node_addr: &NodeAddr) -> Option<&ActivePeer> {
2544 if dest_node_addr == self.node_addr() {
2546 return None;
2547 }
2548 let now_ms = Self::now_ms();
2549 let direct_session_degraded =
2550 self.session_direct_path_blocks_direct_payload(dest_node_addr, now_ms);
2551
2552 let healthy_direct_route = self
2553 .peers
2554 .get(dest_node_addr)
2555 .filter(|peer| peer.is_healthy() && !direct_session_degraded)
2556 .map(|_| *dest_node_addr);
2557 let direct_payload_eligible = healthy_direct_route.is_some();
2558 let payload_candidate_can_send = |addr: &NodeAddr, peer: &ActivePeer| {
2559 if addr == dest_node_addr {
2560 direct_payload_eligible
2561 } else {
2562 peer.is_healthy()
2563 }
2564 };
2565
2566 let fallback_beats_direct = |node: &Self, fallback_addr: NodeAddr| {
2571 node.route_candidate_beats_direct(healthy_direct_route, fallback_addr)
2572 };
2573
2574 let sendable_learned_peers = if self.config.node.routing.mode == RoutingMode::ReplyLearned {
2575 Some(
2576 self.peers
2577 .iter()
2578 .filter(|(addr, peer)| payload_candidate_can_send(addr, peer))
2579 .map(|(addr, _)| *addr)
2580 .collect::<HashSet<_>>(),
2581 )
2582 } else {
2583 None
2584 };
2585
2586 let explore_fallback = sendable_learned_peers.as_ref().is_some_and(|sendable| {
2593 self.learned_routes.should_explore_fallback(
2594 dest_node_addr,
2595 now_ms,
2596 self.config.node.routing.learned_fallback_explore_interval,
2597 |addr| sendable.contains(addr),
2598 )
2599 });
2600 if let Some(sendable) = &sendable_learned_peers
2601 && !explore_fallback
2602 {
2603 let eligible = sendable
2604 .iter()
2605 .copied()
2606 .filter(|addr| fallback_beats_direct(self, *addr))
2607 .collect::<HashSet<_>>();
2608 if !eligible.is_empty()
2609 && let Some(next_hop_addr) =
2610 self.learned_routes
2611 .select_next_hop(dest_node_addr, now_ms, |addr| eligible.contains(addr))
2612 {
2613 return self.peers.get(&next_hop_addr);
2614 }
2615 }
2616
2617 let Some(dest_coords) = self
2619 .coord_cache
2620 .get_and_touch(dest_node_addr, now_ms)
2621 .cloned()
2622 else {
2623 if (healthy_direct_route.is_none() || explore_fallback)
2624 && let Some(sendable) = &sendable_learned_peers
2625 && let Some(next_hop_addr) =
2626 self.learned_routes
2627 .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2628 {
2629 return self.peers.get(&next_hop_addr);
2630 }
2631 if let Some(direct_addr) = healthy_direct_route {
2632 return self.peers.get(&direct_addr);
2633 }
2634 return None;
2635 };
2636
2637 let coordinate_route_addr = {
2640 let candidates: Vec<&ActivePeer> = self
2641 .peers
2642 .iter()
2643 .filter(|(addr, peer)| {
2644 payload_candidate_can_send(addr, peer) && peer.may_reach(dest_node_addr)
2645 })
2646 .map(|(_, peer)| peer)
2647 .collect();
2648 if !candidates.is_empty() {
2649 self.select_best_candidate(&candidates, &dest_coords)
2650 .map(|peer| *peer.node_addr())
2651 } else {
2652 None
2653 }
2654 };
2655 if let Some(next_hop_addr) = coordinate_route_addr
2656 && fallback_beats_direct(self, next_hop_addr)
2657 {
2658 return self.peers.get(&next_hop_addr);
2659 }
2660
2661 let tree_route_addr = self.select_tree_payload_candidate(
2663 &dest_coords,
2664 dest_node_addr,
2665 direct_payload_eligible,
2666 );
2667 if let Some(next_hop_addr) = tree_route_addr
2668 && fallback_beats_direct(self, next_hop_addr)
2669 {
2670 return self.peers.get(&next_hop_addr);
2671 }
2672
2673 if explore_fallback {
2674 return sendable_learned_peers.as_ref().and_then(|sendable| {
2675 self.learned_routes
2676 .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2677 .and_then(|next_hop_addr| self.peers.get(&next_hop_addr))
2678 });
2679 }
2680
2681 if let Some(direct_addr) = healthy_direct_route {
2682 return self.peers.get(&direct_addr);
2683 }
2684
2685 if let Some(sendable) = &sendable_learned_peers
2686 && let Some(next_hop_addr) =
2687 self.learned_routes
2688 .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2689 {
2690 return self.peers.get(&next_hop_addr);
2691 }
2692
2693 None
2694 }
2695
2696 pub(in crate::node) fn find_transit_next_hop(
2697 &mut self,
2698 dest_node_addr: &NodeAddr,
2699 previous_hop: &NodeAddr,
2700 ) -> Option<NodeAddr> {
2701 if dest_node_addr == self.node_addr() {
2702 return None;
2703 }
2704
2705 if dest_node_addr != previous_hop
2706 && self
2707 .peers
2708 .get(dest_node_addr)
2709 .is_some_and(|peer| peer.is_healthy())
2710 {
2711 return Some(*dest_node_addr);
2712 }
2713
2714 let next_hop_addr = *self.find_next_hop(dest_node_addr)?.node_addr();
2715 if &next_hop_addr == previous_hop {
2716 self.record_route_failure(*dest_node_addr, next_hop_addr);
2717 return None;
2718 }
2719 Some(next_hop_addr)
2720 }
2721
2722 fn route_candidate_beats_direct(
2723 &self,
2724 healthy_direct_route: Option<NodeAddr>,
2725 candidate_addr: NodeAddr,
2726 ) -> bool {
2727 let Some(direct_addr) = healthy_direct_route else {
2728 return true;
2729 };
2730 if candidate_addr == direct_addr {
2731 return false;
2732 }
2733
2734 let Some(direct) = self.peers.get(&direct_addr) else {
2735 return true;
2736 };
2737 let Some(candidate) = self.peers.get(&candidate_addr) else {
2738 return false;
2739 };
2740 if !candidate.is_healthy() {
2741 return false;
2742 }
2743
2744 let direct_cost = direct.link_cost();
2745 let candidate_cost = candidate.link_cost();
2746 candidate_cost + ROUTING_FALLBACK_MIN_COST_ADVANTAGE < direct_cost
2747 }
2748
2749 fn select_tree_payload_candidate(
2750 &self,
2751 dest_coords: &crate::tree::TreeCoordinate,
2752 direct_dest: &NodeAddr,
2753 direct_payload_eligible: bool,
2754 ) -> Option<NodeAddr> {
2755 if self.tree_state.my_coords().root_id() != dest_coords.root_id() {
2756 return None;
2757 }
2758
2759 let my_distance = self.tree_state.my_coords().distance_to(dest_coords);
2760 let mut best: Option<(NodeAddr, usize)> = None;
2761
2762 for (peer_addr, peer) in &self.peers {
2763 if peer_addr == direct_dest {
2764 if !direct_payload_eligible {
2765 continue;
2766 }
2767 } else if !peer.is_healthy() {
2768 continue;
2769 }
2770
2771 let Some(peer_coords) = self.tree_state.peer_coords(peer_addr) else {
2772 continue;
2773 };
2774 let distance = peer_coords.distance_to(dest_coords);
2775 if distance >= my_distance {
2776 continue;
2777 }
2778
2779 let dominated = match &best {
2780 None => true,
2781 Some((best_id, best_dist)) => {
2782 distance < *best_dist || (distance == *best_dist && peer_addr < best_id)
2783 }
2784 };
2785 if dominated {
2786 best = Some((*peer_addr, distance));
2787 }
2788 }
2789
2790 best.map(|(peer_addr, _)| peer_addr)
2791 }
2792
2793 pub(in crate::node) fn session_direct_path_is_degraded(
2794 &mut self,
2795 dest: &NodeAddr,
2796 now_ms: u64,
2797 ) -> bool {
2798 match self.session_direct_degraded_until_ms.get(dest).copied() {
2799 Some(until_ms) if until_ms > now_ms => true,
2800 Some(_) => {
2801 self.session_direct_degraded_until_ms.remove(dest);
2802 false
2803 }
2804 None => false,
2805 }
2806 }
2807
2808 pub(in crate::node) fn session_direct_path_blocks_direct_payload(
2809 &mut self,
2810 dest: &NodeAddr,
2811 now_ms: u64,
2812 ) -> bool {
2813 self.session_direct_path_is_degraded(dest, now_ms)
2814 && !self.active_peer_uses_configured_static_udp_path(dest)
2815 }
2816
2817 pub(in crate::node) fn mark_session_direct_path_degraded(
2818 &mut self,
2819 dest: NodeAddr,
2820 now_ms: u64,
2821 ) -> bool {
2822 let until_ms = now_ms.saturating_add(SESSION_DIRECT_DEGRADED_HOLD_MS);
2823 let entry = self
2824 .session_direct_degraded_until_ms
2825 .entry(dest)
2826 .or_insert(0);
2827 let was_degraded = *entry > now_ms;
2828 *entry = (*entry).max(until_ms);
2829 !was_degraded
2830 }
2831
2832 pub(in crate::node) fn clear_session_direct_path_degraded(&mut self, dest: &NodeAddr) -> bool {
2833 self.session_direct_degraded_until_ms.remove(dest).is_some()
2834 }
2835
2836 pub(in crate::node) fn learn_reverse_route(
2837 &mut self,
2838 destination: NodeAddr,
2839 next_hop: NodeAddr,
2840 ) {
2841 if self.config.node.routing.mode != RoutingMode::ReplyLearned
2842 || destination == *self.node_addr()
2843 {
2844 return;
2845 }
2846 let now_ms = Self::now_ms();
2847 self.learned_routes.learn(
2848 destination,
2849 next_hop,
2850 now_ms,
2851 self.config.node.routing.learned_ttl_secs,
2852 self.config.node.routing.max_learned_routes_per_dest,
2853 );
2854 }
2855
2856 pub(in crate::node) fn record_route_failure(
2857 &mut self,
2858 destination: NodeAddr,
2859 next_hop: NodeAddr,
2860 ) {
2861 if self.config.node.routing.mode != RoutingMode::ReplyLearned {
2862 return;
2863 }
2864 self.learned_routes.record_failure(&destination, &next_hop);
2865 }
2866
2867 pub(crate) fn learned_route_table_snapshot(&self, now_ms: u64) -> LearnedRouteTableSnapshot {
2868 self.learned_routes.snapshot(now_ms)
2869 }
2870
2871 pub(in crate::node) fn purge_learned_routes(&mut self, now_ms: u64) {
2872 self.learned_routes.purge_expired(now_ms);
2873 }
2874
2875 fn select_best_candidate<'a>(
2884 &'a self,
2885 candidates: &[&'a ActivePeer],
2886 dest_coords: &crate::tree::TreeCoordinate,
2887 ) -> Option<&'a ActivePeer> {
2888 let my_distance = self.tree_state.my_coords().distance_to(dest_coords);
2889
2890 let mut best: Option<(&ActivePeer, f64, usize)> = None;
2891
2892 for &candidate in candidates {
2893 if !candidate.can_send() {
2894 continue;
2895 }
2896
2897 let cost = candidate.link_cost();
2898
2899 let dist = self
2900 .tree_state
2901 .peer_coords(candidate.node_addr())
2902 .map(|pc| pc.distance_to(dest_coords))
2903 .unwrap_or(usize::MAX);
2904
2905 if dist >= my_distance {
2908 continue;
2909 }
2910
2911 let dominated = match &best {
2912 None => true,
2913 Some((_, best_cost, best_dist)) => {
2914 cost < *best_cost
2915 || (cost == *best_cost && dist < *best_dist)
2916 || (cost == *best_cost
2917 && dist == *best_dist
2918 && candidate.node_addr() < best.as_ref().unwrap().0.node_addr())
2919 }
2920 };
2921
2922 if dominated {
2923 best = Some((candidate, cost, dist));
2924 }
2925 }
2926
2927 best.map(|(peer, _, _)| peer)
2928 }
2929
2930 pub fn destination_in_filters(&self, dest: &NodeAddr) -> Vec<&ActivePeer> {
2932 self.peers.values().filter(|p| p.may_reach(dest)).collect()
2933 }
2934
2935 pub fn tun_tx(&self) -> Option<&TunTx> {
2939 self.tun_tx.as_ref()
2940 }
2941
2942 pub fn attach_external_packet_io(
2949 &mut self,
2950 capacity: usize,
2951 ) -> Result<ExternalPacketIo, NodeError> {
2952 if self.state != NodeState::Created {
2953 return Err(NodeError::Config(ConfigError::Validation(
2954 "external packet I/O must be attached before node start".to_string(),
2955 )));
2956 }
2957 if self.config.tun.enabled {
2958 return Err(NodeError::Config(ConfigError::Validation(
2959 "external packet I/O requires tun.enabled=false".to_string(),
2960 )));
2961 }
2962
2963 let capacity = capacity.max(1);
2964 let (outbound_tx, outbound_rx) = tokio::sync::mpsc::channel(capacity);
2965 let (inbound_tx, inbound_rx) = tokio::sync::mpsc::channel(capacity);
2966 self.tun_outbound_rx = Some(outbound_rx);
2967 self.external_packet_tx = Some(inbound_tx);
2968
2969 Ok(ExternalPacketIo {
2970 outbound_tx,
2971 inbound_rx,
2972 })
2973 }
2974
2975 pub(crate) fn attach_endpoint_data_io(
2980 &mut self,
2981 capacity: usize,
2982 ) -> Result<EndpointDataIo, NodeError> {
2983 if self.state != NodeState::Created {
2984 return Err(NodeError::Config(ConfigError::Validation(
2985 "endpoint data I/O must be attached before node start".to_string(),
2986 )));
2987 }
2988
2989 let command_capacity = endpoint_data_command_capacity(capacity);
2990 let (command_tx, command_rx) = tokio::sync::mpsc::channel(command_capacity);
2991 let (event_tx, event_rx) = tokio::sync::mpsc::unbounded_channel();
2996 self.endpoint_command_rx = Some(command_rx);
2997 self.endpoint_event_tx = Some(event_tx.clone());
2998
2999 Ok(EndpointDataIo {
3000 command_tx,
3001 event_rx,
3002 event_tx,
3003 })
3004 }
3005
3006 pub(crate) fn pubkey_for_node_addr(&self, addr: &NodeAddr) -> Option<secp256k1::PublicKey> {
3007 let mut prefix = [0u8; 15];
3008 prefix.copy_from_slice(&addr.as_bytes()[0..15]);
3009 self.identity_cache
3010 .get(&prefix)
3011 .filter(|entry| &entry.node_addr == addr)
3012 .map(|entry| entry.pubkey)
3013 }
3014
3015 pub(crate) fn npub_for_node_addr(&self, addr: &NodeAddr) -> Option<String> {
3016 let mut prefix = [0u8; 15];
3017 prefix.copy_from_slice(&addr.as_bytes()[0..15]);
3018 self.identity_cache
3019 .get(&prefix)
3020 .filter(|entry| &entry.node_addr == addr)
3021 .map(|entry| entry.npub.clone())
3022 }
3023
3024 pub(in crate::node) fn deliver_external_ipv6_packet(
3025 &self,
3026 src_addr: &NodeAddr,
3027 packet: Vec<u8>,
3028 ) {
3029 let Some(external_packet_tx) = &self.external_packet_tx else {
3030 return;
3031 };
3032 if packet.len() < 40 {
3033 return;
3034 }
3035 let Ok(destination) = FipsAddress::from_slice(&packet[24..40]) else {
3036 return;
3037 };
3038 let delivered = NodeDeliveredPacket {
3039 source_node_addr: *src_addr,
3040 source_npub: self.npub_for_node_addr(src_addr),
3041 destination,
3042 packet,
3043 };
3044 if let Err(error) = external_packet_tx.try_send(delivered) {
3045 debug!(error = %error, "Failed to deliver packet to external app sink");
3046 }
3047 }
3048
3049 pub(super) async fn send_encrypted_link_message(
3063 &mut self,
3064 node_addr: &NodeAddr,
3065 plaintext: &[u8],
3066 ) -> Result<(), NodeError> {
3067 self.send_encrypted_link_message_with_ce(node_addr, plaintext, false)
3068 .await
3069 }
3070
3071 pub(in crate::node) fn note_local_send_outcome(
3077 &mut self,
3078 node_addr: &NodeAddr,
3079 result: &Result<usize, TransportError>,
3080 ) {
3081 match result {
3082 Ok(_) => {
3083 self.local_send_failure_at_by_peer.remove(node_addr);
3084 }
3085 Err(error) if error.is_local_route_unavailable() => {
3086 self.local_send_failure_at_by_peer
3087 .insert(*node_addr, std::time::Instant::now());
3088 }
3089 Err(_) => {}
3090 }
3091 }
3092
3093 pub(in crate::node) fn local_send_failure_dead_timeout_for_peer(
3099 &self,
3100 node_addr: &NodeAddr,
3101 now: std::time::Instant,
3102 dead_timeout: std::time::Duration,
3103 fast_dead_timeout: std::time::Duration,
3104 ) -> std::time::Duration {
3105 match self.local_send_failure_at_by_peer.get(node_addr).copied() {
3106 Some(t) if now.duration_since(t) <= LOCAL_SEND_FAILURE_FAST_DEAD_WINDOW => {
3107 fast_dead_timeout.min(dead_timeout)
3108 }
3109 None => dead_timeout,
3110 Some(_) => dead_timeout,
3111 }
3112 }
3113
3114 pub(in crate::node) fn purge_expired_local_send_failures(&mut self, now: std::time::Instant) {
3115 self.local_send_failure_at_by_peer
3116 .retain(|_, at| now.duration_since(*at) <= LOCAL_SEND_FAILURE_FAST_DEAD_WINDOW);
3117 }
3118
3119 pub(in crate::node) fn mark_rx_loop_maintenance_timeout(&mut self) {
3120 self.last_rx_loop_maintenance_timeout_at = Some(std::time::Instant::now());
3121 }
3122
3123 pub(in crate::node) fn rx_loop_maintenance_timed_out_recently(&self) -> bool {
3124 let Some(t) = self.last_rx_loop_maintenance_timeout_at else {
3125 return false;
3126 };
3127 let grace = std::time::Duration::from_secs(self.config.node.link_dead_timeout_secs.max(1));
3128 std::time::Instant::now().duration_since(t) <= grace
3129 }
3130
3131 pub(super) async fn send_encrypted_link_message_with_ce(
3135 &mut self,
3136 node_addr: &NodeAddr,
3137 plaintext: &[u8],
3138 ce_flag: bool,
3139 ) -> Result<(), NodeError> {
3140 let peer = self
3141 .peers
3142 .get_mut(node_addr)
3143 .ok_or(NodeError::PeerNotFound(*node_addr))?;
3144
3145 let their_index = peer.their_index().ok_or_else(|| NodeError::SendFailed {
3146 node_addr: *node_addr,
3147 reason: "no their_index".into(),
3148 })?;
3149 let transport_id = peer.transport_id().ok_or_else(|| NodeError::SendFailed {
3150 node_addr: *node_addr,
3151 reason: "no transport_id".into(),
3152 })?;
3153 let remote_addr = peer
3154 .current_addr()
3155 .cloned()
3156 .ok_or_else(|| NodeError::SendFailed {
3157 node_addr: *node_addr,
3158 reason: "no current_addr".into(),
3159 })?;
3160 #[cfg(any(target_os = "linux", target_os = "macos"))]
3161 let connected_socket = peer.connected_udp();
3162
3163 let timestamp_ms = peer.session_elapsed_ms();
3165
3166 let sp_flag = peer.mmp().map(|mmp| mmp.spin_bit.tx_bit()).unwrap_or(false);
3168 let mut flags = if sp_flag { FLAG_SP } else { 0 };
3169 if ce_flag {
3170 flags |= FLAG_CE;
3171 }
3172 if peer.current_k_bit() {
3173 flags |= FLAG_KEY_EPOCH;
3174 }
3175
3176 let session = peer
3177 .noise_session_mut()
3178 .ok_or_else(|| NodeError::SendFailed {
3179 node_addr: *node_addr,
3180 reason: "no noise session".into(),
3181 })?;
3182
3183 const INNER_TS_LEN: usize = 4;
3191 let counter = session.current_send_counter();
3192 let inner_len = INNER_TS_LEN + plaintext.len();
3193 let payload_len = inner_len as u16;
3194 let header = build_established_header(their_index, counter, flags, payload_len);
3195
3196 let transport_for_send = self
3215 .transports
3216 .get(&transport_id)
3217 .ok_or(NodeError::TransportNotFound(transport_id))?;
3218 match transport_for_send.connection_state(&remote_addr) {
3219 ConnectionState::Connected => {}
3220 other => {
3221 if matches!(other, ConnectionState::None) {
3222 let _ = transport_for_send.connect(&remote_addr).await;
3223 }
3224 return Err(NodeError::SendFailed {
3225 node_addr: *node_addr,
3226 reason: format!("transport connection not ready: {:?}", other),
3227 });
3228 }
3229 }
3230 let is_udp = matches!(transport_for_send, TransportHandle::Udp(_));
3231 if let Some(workers) = self.encrypt_workers.as_ref().cloned()
3232 && is_udp
3233 && let Some(cipher_clone) = session.send_cipher_clone()
3234 {
3235 {
3236 let reserved_counter =
3240 session
3241 .take_send_counter()
3242 .map_err(|e| NodeError::SendFailed {
3243 node_addr: *node_addr,
3244 reason: format!("counter reservation failed: {}", e),
3245 })?;
3246 debug_assert_eq!(reserved_counter, counter);
3247 let header =
3251 build_established_header(their_index, reserved_counter, flags, payload_len);
3252 let transport = transport_for_send;
3253 let send_target = {
3260 if let TransportHandle::Udp(udp) = transport {
3261 let socket_addr = {
3262 #[cfg(any(target_os = "linux", target_os = "macos"))]
3263 {
3264 match connected_socket.as_ref() {
3265 Some(socket) => Some(socket.peer_addr()),
3266 None => udp.resolve_for_off_task(&remote_addr).await.ok(),
3267 }
3268 }
3269 #[cfg(not(any(target_os = "linux", target_os = "macos")))]
3270 {
3271 udp.resolve_for_off_task(&remote_addr).await.ok()
3272 }
3273 };
3274 match (udp.async_socket(), socket_addr) {
3275 (Some(socket), Some(socket_addr)) => Some((socket, socket_addr)),
3276 _ => None,
3277 }
3278 } else {
3279 None
3280 }
3281 };
3282 if let Some((socket, socket_addr)) = send_target {
3283 let wire_capacity = ESTABLISHED_HEADER_SIZE + inner_len + 16;
3299 let mut wire_buf = Vec::with_capacity(wire_capacity);
3300 wire_buf.extend_from_slice(&header);
3301 wire_buf.extend_from_slice(×tamp_ms.to_le_bytes());
3302 wire_buf.extend_from_slice(plaintext);
3303 let predicted_bytes = wire_capacity;
3304 if let Some(peer) = self.peers.get_mut(node_addr) {
3311 peer.link_stats_mut().record_sent(predicted_bytes);
3312 if let Some(mmp) = peer.mmp_mut() {
3313 mmp.sender
3314 .record_sent(reserved_counter, timestamp_ms, predicted_bytes);
3315 }
3316 }
3317 let scheduling_weight = self.send_weight_for_peer(node_addr);
3318 let bulk_endpoint_data = fmp_plaintext_is_bulk_session_datagram(plaintext);
3319 workers.dispatch(self::encrypt_worker::FmpSendJob {
3320 cipher: cipher_clone,
3321 counter: reserved_counter,
3322 wire_buf,
3323 fsp_seal: None,
3324 socket,
3325 dest_addr: socket_addr,
3326 #[cfg(any(target_os = "linux", target_os = "macos"))]
3327 connected_socket,
3328 bulk_endpoint_data,
3329 drop_on_backpressure: bulk_endpoint_data,
3330 scheduling_weight,
3331 queued_at: crate::perf_profile::stamp(),
3332 });
3333 return Ok(());
3334 }
3335 }
3336 }
3337
3338 let inner_plaintext = prepend_inner_header(timestamp_ms, plaintext);
3343 let ciphertext = {
3345 let _t = crate::perf_profile::Timer::start(crate::perf_profile::Stage::FmpEncrypt);
3346 session
3347 .encrypt_with_aad(&inner_plaintext, &header)
3348 .map_err(|e| NodeError::SendFailed {
3349 node_addr: *node_addr,
3350 reason: format!("encryption failed: {}", e),
3351 })?
3352 };
3353
3354 let wire_packet = build_encrypted(&header, &ciphertext);
3355
3356 let send_result = {
3358 let _t = crate::perf_profile::Timer::start(crate::perf_profile::Stage::UdpSend);
3359 let transport = self
3360 .transports
3361 .get(&transport_id)
3362 .ok_or(NodeError::TransportNotFound(transport_id))?;
3363 transport.send(&remote_addr, &wire_packet).await
3364 };
3365 self.note_local_send_outcome(node_addr, &send_result);
3366 let bytes_sent = send_result.map_err(|e| match e {
3367 TransportError::MtuExceeded { packet_size, mtu } => NodeError::MtuExceeded {
3368 node_addr: *node_addr,
3369 packet_size,
3370 mtu,
3371 },
3372 other => NodeError::SendFailed {
3373 node_addr: *node_addr,
3374 reason: format!("transport send: {}", other),
3375 },
3376 })?;
3377
3378 if let Some(peer) = self.peers.get_mut(node_addr) {
3380 peer.link_stats_mut().record_sent(bytes_sent);
3381 if let Some(mmp) = peer.mmp_mut() {
3383 mmp.sender.record_sent(counter, timestamp_ms, bytes_sent);
3384 }
3385 }
3386
3387 Ok(())
3388 }
3389}
3390
3391impl fmt::Debug for Node {
3392 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3393 f.debug_struct("Node")
3394 .field("node_addr", self.node_addr())
3395 .field("state", &self.state)
3396 .field("is_leaf_only", &self.is_leaf_only)
3397 .field("connections", &self.connection_count())
3398 .field("peers", &self.peer_count())
3399 .field("links", &self.link_count())
3400 .field("transports", &self.transport_count())
3401 .finish()
3402 }
3403}