1mod acl;
8mod bloom;
9mod decrypt_worker;
10mod discovery_rate_limit;
11mod encrypt_worker;
12mod handlers;
13mod lifecycle;
14mod rate_limit;
15mod retry;
16mod routing;
17mod routing_error_rate_limit;
18pub(crate) mod session;
19pub(crate) mod session_wire;
20pub(crate) mod stats;
21pub(crate) mod stats_history;
22#[cfg(test)]
23mod tests;
24mod tree;
25pub(crate) mod wire;
26
27use self::discovery_rate_limit::{DiscoveryBackoff, DiscoveryForwardRateLimiter};
28use self::rate_limit::HandshakeRateLimiter;
29use self::routing::{LearnedRouteTable, LearnedRouteTableSnapshot};
30use self::routing_error_rate_limit::RoutingErrorRateLimiter;
31use self::wire::{
32 ESTABLISHED_HEADER_SIZE, FLAG_CE, FLAG_KEY_EPOCH, FLAG_SP, build_encrypted,
33 build_established_header, prepend_inner_header,
34};
35use crate::bloom::BloomState;
36use crate::cache::CoordCache;
37use crate::config::{NostrDiscoveryPolicy, PeerConfig, RoutingMode};
38use crate::node::session::SessionEntry;
39use crate::node::session_wire::{FSP_PHASE_ESTABLISHED, FspCommonPrefix};
40use crate::peer::{ActivePeer, PeerConnection};
41#[cfg(any(target_os = "linux", target_os = "macos"))]
42use crate::transport::ethernet::EthernetTransport;
43use crate::transport::tcp::TcpTransport;
44use crate::transport::tor::TorTransport;
45use crate::transport::udp::UdpTransport;
46#[cfg(feature = "webrtc-transport")]
47use crate::transport::webrtc::WebRtcTransport;
48use crate::transport::{
49 ConnectionState, Link, LinkId, PacketRx, PacketTx, TransportAddr, TransportError,
50 TransportHandle, TransportId,
51};
52use crate::tree::TreeState;
53use crate::upper::hosts::HostMap;
54use crate::upper::icmp_rate_limit::IcmpRateLimiter;
55use crate::upper::tun::{TunError, TunOutboundRx, TunState, TunTx};
56use crate::utils::index::IndexAllocator;
57use crate::{
58 Config, ConfigError, FipsAddress, Identity, IdentityError, LinkMessageType, NodeAddr,
59 PeerIdentity, encode_npub,
60};
61use rand::Rng;
62use std::collections::{HashMap, HashSet, VecDeque};
63use std::fmt;
64use std::sync::Arc;
65use std::thread::JoinHandle;
66use thiserror::Error;
67use tracing::{debug, warn};
68
69const LOCAL_SEND_FAILURE_FAST_DEAD_WINDOW: std::time::Duration = std::time::Duration::from_secs(3);
70const SESSION_DIRECT_DEGRADED_HOLD_MS: u64 = 20_000;
71const SESSION_DIRECT_DEGRADED_MIN_SAMPLE: u64 = 16;
72const SESSION_DIRECT_DEGRADED_LOSS_THRESHOLD: f64 = 0.08;
73const SESSION_DIRECT_RECOVERY_LOSS_THRESHOLD: f64 = 0.02;
74const ROUTING_FALLBACK_MIN_COST_ADVANTAGE: f64 = 0.25;
75
76fn fmp_plaintext_is_bulk_session_datagram(plaintext: &[u8]) -> bool {
77 if plaintext
78 .first()
79 .is_none_or(|ty| *ty != LinkMessageType::SessionDatagram.to_byte())
80 {
81 return false;
82 }
83 let Some(fsp_payload) = plaintext.get(crate::protocol::SESSION_DATAGRAM_HEADER_SIZE..) else {
84 return false;
85 };
86 FspCommonPrefix::parse(fsp_payload).is_some_and(|prefix| {
87 prefix.phase == FSP_PHASE_ESTABLISHED && !prefix.is_unencrypted() && !prefix.has_coords()
88 })
89}
90
91pub(crate) const REKEY_JITTER_SECS: i64 = 15;
98
99#[derive(Debug, Error)]
101pub enum NodeError {
102 #[error("node not started")]
103 NotStarted,
104
105 #[error("node already started")]
106 AlreadyStarted,
107
108 #[error("node already stopped")]
109 AlreadyStopped,
110
111 #[error("transport not found: {0}")]
112 TransportNotFound(TransportId),
113
114 #[error("no transport available for type: {0}")]
115 NoTransportForType(String),
116
117 #[error("link not found: {0}")]
118 LinkNotFound(LinkId),
119
120 #[error("connection not found: {0}")]
121 ConnectionNotFound(LinkId),
122
123 #[error("peer not found: {0:?}")]
124 PeerNotFound(NodeAddr),
125
126 #[error("peer already exists: {0:?}")]
127 PeerAlreadyExists(NodeAddr),
128
129 #[error("connection already exists for link: {0}")]
130 ConnectionAlreadyExists(LinkId),
131
132 #[error("invalid peer npub '{npub}': {reason}")]
133 InvalidPeerNpub { npub: String, reason: String },
134
135 #[error("discovery error: {0}")]
136 Discovery(String),
137
138 #[error("access denied: {0}")]
139 AccessDenied(String),
140
141 #[error("max connections exceeded: {max}")]
142 MaxConnectionsExceeded { max: usize },
143
144 #[error("max peers exceeded: {max}")]
145 MaxPeersExceeded { max: usize },
146
147 #[error("max links exceeded: {max}")]
148 MaxLinksExceeded { max: usize },
149
150 #[error("handshake incomplete for link {0}")]
151 HandshakeIncomplete(LinkId),
152
153 #[error("no session available for link {0}")]
154 NoSession(LinkId),
155
156 #[error("promotion failed for link {link_id}: {reason}")]
157 PromotionFailed { link_id: LinkId, reason: String },
158
159 #[error("send failed to {node_addr}: {reason}")]
160 SendFailed { node_addr: NodeAddr, reason: String },
161
162 #[error("mtu exceeded forwarding to {node_addr}: packet {packet_size} > mtu {mtu}")]
163 MtuExceeded {
164 node_addr: NodeAddr,
165 packet_size: usize,
166 mtu: u16,
167 },
168
169 #[error("config error: {0}")]
170 Config(#[from] ConfigError),
171
172 #[error("identity error: {0}")]
173 Identity(#[from] IdentityError),
174
175 #[error("TUN error: {0}")]
176 Tun(#[from] TunError),
177
178 #[error("index allocation failed: {0}")]
179 IndexAllocationFailed(String),
180
181 #[error("handshake failed: {0}")]
182 HandshakeFailed(String),
183
184 #[error("transport error: {0}")]
185 TransportError(String),
186
187 #[error("local route unavailable: {0}")]
188 LocalRouteUnavailable(String),
189
190 #[error("bootstrap handoff failed: {0}")]
191 BootstrapHandoff(String),
192}
193
194impl NodeError {
195 pub(in crate::node) fn from_transport_error(error: TransportError) -> Self {
196 if error.is_local_route_unavailable() {
197 Self::LocalRouteUnavailable(error.to_string())
198 } else {
199 Self::TransportError(error.to_string())
200 }
201 }
202
203 pub(in crate::node) fn is_local_route_unavailable(&self) -> bool {
204 matches!(self, Self::LocalRouteUnavailable(_))
205 }
206}
207
208#[derive(Debug, Clone, PartialEq, Eq)]
210pub struct NodeDeliveredPacket {
211 pub source_node_addr: NodeAddr,
213 pub source_npub: Option<String>,
215 pub destination: FipsAddress,
217 pub packet: Vec<u8>,
219}
220
221#[derive(Debug, Clone)]
222struct IdentityCacheEntry {
223 node_addr: NodeAddr,
224 pubkey: secp256k1::PublicKey,
225 npub: String,
226 last_seen_ms: u64,
227}
228
229impl IdentityCacheEntry {
230 fn new(
231 node_addr: NodeAddr,
232 pubkey: secp256k1::PublicKey,
233 npub: String,
234 last_seen_ms: u64,
235 ) -> Self {
236 Self {
237 node_addr,
238 pubkey,
239 npub,
240 last_seen_ms,
241 }
242 }
243}
244
245#[derive(Debug)]
247pub struct ExternalPacketIo {
248 pub outbound_tx: crate::upper::tun::TunOutboundTx,
250 pub inbound_rx: tokio::sync::mpsc::Receiver<NodeDeliveredPacket>,
252}
253
254#[derive(Debug)]
256pub(crate) struct EndpointDataIo {
257 pub(crate) command_tx: tokio::sync::mpsc::Sender<NodeEndpointCommand>,
266 pub(crate) event_rx: tokio::sync::mpsc::UnboundedReceiver<NodeEndpointEvent>,
276 pub(crate) event_tx: tokio::sync::mpsc::UnboundedSender<NodeEndpointEvent>,
282}
283
284fn endpoint_data_command_capacity(requested: usize) -> usize {
285 if let Ok(raw) = std::env::var("FIPS_ENDPOINT_DATA_QUEUE_CAP")
286 && let Ok(value) = raw.trim().parse::<usize>()
287 && value > 0
288 {
289 return value;
290 }
291
292 requested.max(1).max(32_768)
293}
294
295#[derive(Debug)]
297pub(crate) enum NodeEndpointCommand {
298 Send {
302 remote: PeerIdentity,
303 payload: Vec<u8>,
304 queued_at: Option<std::time::Instant>,
305 response_tx: tokio::sync::oneshot::Sender<Result<(), NodeError>>,
306 },
307 SendOneway {
313 remote: PeerIdentity,
314 payload: Vec<u8>,
315 queued_at: Option<std::time::Instant>,
316 },
317 PeerSnapshot {
318 response_tx: tokio::sync::oneshot::Sender<Vec<NodeEndpointPeer>>,
319 },
320 RelaySnapshot {
321 response_tx: tokio::sync::oneshot::Sender<Vec<NodeEndpointRelayStatus>>,
322 },
323 UpdateRelays {
324 advert_relays: Vec<String>,
325 dm_relays: Vec<String>,
326 response_tx: tokio::sync::oneshot::Sender<Result<(), NodeError>>,
327 },
328 UpdatePeers {
334 peers: Vec<crate::config::PeerConfig>,
335 response_tx: tokio::sync::oneshot::Sender<Result<UpdatePeersOutcome, NodeError>>,
336 },
337}
338
339#[derive(Debug, Clone, Default, PartialEq, Eq)]
341pub(crate) struct UpdatePeersOutcome {
342 pub(crate) added: usize,
343 pub(crate) removed: usize,
344 pub(crate) updated: usize,
345 pub(crate) unchanged: usize,
346}
347
348#[derive(Debug)]
350pub(crate) enum NodeEndpointEvent {
351 Data {
352 source_node_addr: NodeAddr,
353 source_npub: Option<String>,
354 payload: Vec<u8>,
355 queued_at: Option<std::time::Instant>,
356 },
357}
358
359#[derive(Debug, Clone, PartialEq, Eq)]
361pub(crate) struct NodeEndpointPeer {
362 pub(crate) npub: String,
363 pub(crate) connected: bool,
364 pub(crate) transport_addr: Option<String>,
365 pub(crate) transport_type: Option<String>,
366 pub(crate) link_id: u64,
367 pub(crate) srtt_ms: Option<u64>,
368 pub(crate) packets_sent: u64,
369 pub(crate) packets_recv: u64,
370 pub(crate) bytes_sent: u64,
371 pub(crate) bytes_recv: u64,
372 pub(crate) direct_probe_pending: bool,
373 pub(crate) direct_probe_after_ms: Option<u64>,
374}
375
376#[derive(Debug, Clone, PartialEq, Eq)]
378pub(crate) struct NodeEndpointRelayStatus {
379 pub(crate) url: String,
380 pub(crate) status: String,
381}
382
383#[derive(Clone, Copy, Debug, PartialEq, Eq)]
385pub enum NodeState {
386 Created,
388 Starting,
390 Running,
392 Stopping,
394 Stopped,
396}
397
398impl NodeState {
399 pub fn is_operational(&self) -> bool {
401 matches!(self, NodeState::Running)
402 }
403
404 pub fn can_start(&self) -> bool {
406 matches!(self, NodeState::Created | NodeState::Stopped)
407 }
408
409 pub fn can_stop(&self) -> bool {
411 matches!(self, NodeState::Running)
412 }
413}
414
415impl fmt::Display for NodeState {
416 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
417 let s = match self {
418 NodeState::Created => "created",
419 NodeState::Starting => "starting",
420 NodeState::Running => "running",
421 NodeState::Stopping => "stopping",
422 NodeState::Stopped => "stopped",
423 };
424 write!(f, "{}", s)
425 }
426}
427
428#[derive(Clone, Debug)]
435pub(crate) struct RecentRequest {
436 pub(crate) from_peer: NodeAddr,
438 pub(crate) timestamp_ms: u64,
440 pub(crate) response_forwarded: bool,
444}
445
446impl RecentRequest {
447 pub(crate) fn new(from_peer: NodeAddr, timestamp_ms: u64) -> Self {
448 Self {
449 from_peer,
450 timestamp_ms,
451 response_forwarded: false,
452 }
453 }
454
455 pub(crate) fn is_expired(&self, current_time_ms: u64, expiry_ms: u64) -> bool {
457 current_time_ms.saturating_sub(self.timestamp_ms) > expiry_ms
458 }
459}
460
461type AddrKey = (TransportId, TransportAddr);
463
464#[derive(Debug, Default)]
469struct TransportDropState {
470 prev_drops: u64,
472 dropping: bool,
474}
475
476struct PendingConnect {
482 link_id: LinkId,
484 transport_id: TransportId,
486 remote_addr: TransportAddr,
488 peer_identity: PeerIdentity,
490}
491
492pub struct Node {
506 identity: Identity,
509
510 startup_epoch: [u8; 8],
513
514 started_at: std::time::Instant,
516
517 config: Config,
520
521 state: NodeState,
524
525 is_leaf_only: bool,
527
528 tree_state: TreeState,
531
532 bloom_state: BloomState,
535
536 coord_cache: CoordCache,
539 learned_routes: LearnedRouteTable,
541 session_direct_degraded_until_ms: HashMap<NodeAddr, u64>,
544 recent_requests: HashMap<u64, RecentRequest>,
547 path_mtu_lookup: Arc<std::sync::RwLock<HashMap<crate::FipsAddress, u16>>>,
553
554 transports: HashMap<TransportId, TransportHandle>,
557 transport_drops: HashMap<TransportId, TransportDropState>,
559 links: HashMap<LinkId, Link>,
561 addr_to_link: HashMap<AddrKey, LinkId>,
563
564 packet_tx: Option<PacketTx>,
567 packet_rx: Option<PacketRx>,
569
570 connections: HashMap<LinkId, PeerConnection>,
574
575 peers: HashMap<NodeAddr, ActivePeer>,
579
580 sessions: HashMap<NodeAddr, SessionEntry>,
584
585 identity_cache: HashMap<[u8; 15], IdentityCacheEntry>,
589
590 pending_tun_packets: HashMap<NodeAddr, VecDeque<Vec<u8>>>,
594 pending_endpoint_data: HashMap<NodeAddr, VecDeque<Vec<u8>>>,
596 pending_lookups: HashMap<NodeAddr, handlers::discovery::PendingLookup>,
600
601 max_connections: usize,
604 max_peers: usize,
606 max_links: usize,
608
609 next_link_id: u64,
612 next_transport_id: u32,
614
615 stats: stats::NodeStats,
618
619 stats_history: stats_history::StatsHistory,
621
622 tun_state: TunState,
625 tun_name: Option<String>,
627 tun_tx: Option<TunTx>,
629 tun_outbound_rx: Option<TunOutboundRx>,
631 external_packet_tx: Option<tokio::sync::mpsc::Sender<NodeDeliveredPacket>>,
633 endpoint_command_rx: Option<tokio::sync::mpsc::Receiver<NodeEndpointCommand>>,
635 endpoint_event_tx: Option<tokio::sync::mpsc::UnboundedSender<NodeEndpointEvent>>,
637 encrypt_workers: Option<encrypt_worker::EncryptWorkerPool>,
643 decrypt_workers: Option<decrypt_worker::DecryptWorkerPool>,
646 decrypt_registered_sessions: std::collections::HashSet<(TransportId, u32)>,
655 decrypt_fallback_rx:
659 Option<tokio::sync::mpsc::UnboundedReceiver<decrypt_worker::DecryptWorkerEvent>>,
660 decrypt_fallback_tx: tokio::sync::mpsc::UnboundedSender<decrypt_worker::DecryptWorkerEvent>,
661 tun_reader_handle: Option<JoinHandle<()>>,
663 tun_writer_handle: Option<JoinHandle<()>>,
665 #[cfg(target_os = "macos")]
668 tun_shutdown_fd: Option<std::os::unix::io::RawFd>,
669
670 dns_identity_rx: Option<crate::upper::dns::DnsIdentityRx>,
673 dns_task: Option<tokio::task::JoinHandle<()>>,
675
676 index_allocator: IndexAllocator,
679 peers_by_index: HashMap<(TransportId, u32), NodeAddr>,
682 pending_outbound: HashMap<(TransportId, u32), LinkId>,
685
686 msg1_rate_limiter: HandshakeRateLimiter,
689 icmp_rate_limiter: IcmpRateLimiter,
691 routing_error_rate_limiter: RoutingErrorRateLimiter,
693 coords_response_rate_limiter: RoutingErrorRateLimiter,
695 discovery_backoff: DiscoveryBackoff,
697 discovery_forward_limiter: DiscoveryForwardRateLimiter,
699
700 pending_connects: Vec<PendingConnect>,
706
707 retry_pending: HashMap<NodeAddr, retry::RetryState>,
713
714 nostr_discovery: Option<Arc<crate::discovery::nostr::NostrDiscovery>>,
716 lan_discovery: Option<Arc<crate::discovery::lan::LanDiscovery>>,
721 local_instance_registry: Option<crate::discovery::local::LocalInstanceRegistry>,
725 local_instance_started_at_ms: Option<u64>,
726 last_local_instance_publish_ms: Option<u64>,
727 last_local_instance_scan_ms: Option<u64>,
728 nostr_discovery_started_at_ms: Option<u64>,
733 startup_open_discovery_sweep_done: bool,
737 bootstrap_transports: HashSet<TransportId>,
739 bootstrap_transport_npubs: HashMap<TransportId, String>,
746 discovery_fallback_transit_blocked_peers: HashSet<NodeAddr>,
749
750 last_parent_reeval: Option<crate::time::Instant>,
753
754 last_congestion_log: Option<std::time::Instant>,
757
758 estimated_mesh_size: Option<u64>,
761 last_mesh_size_log: Option<std::time::Instant>,
763
764 last_self_warn: Option<std::time::Instant>,
770
771 last_local_send_failure_at: Option<std::time::Instant>,
779 last_rx_loop_maintenance_timeout_at: Option<std::time::Instant>,
784
785 peer_aliases: HashMap<NodeAddr, String>,
789 configured_peer_send_weights: HashMap<NodeAddr, u8>,
792
793 peer_acl: acl::PeerAclReloader,
795
796 host_map: Arc<HostMap>,
800}
801
802impl Node {
803 pub fn new(config: Config) -> Result<Self, NodeError> {
805 config.validate()?;
806 let identity = config.create_identity()?;
807 let node_addr = *identity.node_addr();
808 let is_leaf_only = config.is_leaf_only();
809
810 let (decrypt_fallback_tx, decrypt_fallback_rx) = tokio::sync::mpsc::unbounded_channel();
811 let decrypt_fallback_rx = Some(decrypt_fallback_rx);
812
813 let mut startup_epoch = [0u8; 8];
814 rand::rng().fill_bytes(&mut startup_epoch);
815
816 let mut bloom_state = if is_leaf_only {
817 BloomState::leaf_only(node_addr)
818 } else {
819 BloomState::new(node_addr)
820 };
821 bloom_state.set_update_debounce_ms(config.node.bloom.update_debounce_ms);
822
823 let tun_state = if config.tun.enabled {
824 TunState::Configured
825 } else {
826 TunState::Disabled
827 };
828
829 let mut tree_state = TreeState::new(node_addr);
831 tree_state.set_parent_hysteresis(config.node.tree.parent_hysteresis);
832 tree_state.set_hold_down(config.node.tree.hold_down_secs);
833 tree_state.set_flap_dampening(
834 config.node.tree.flap_threshold,
835 config.node.tree.flap_window_secs,
836 config.node.tree.flap_dampening_secs,
837 );
838 tree_state
839 .sign_declaration(&identity)
840 .expect("signing own declaration should never fail");
841
842 let coord_cache = CoordCache::new(
843 config.node.cache.coord_size,
844 config.node.cache.coord_ttl_secs * 1000,
845 );
846 let rl = &config.node.rate_limit;
847 let msg1_rate_limiter = HandshakeRateLimiter::with_params(
848 rate_limit::TokenBucket::with_params(rl.handshake_burst, rl.handshake_rate),
849 config.node.limits.max_pending_inbound,
850 );
851
852 let max_connections = config.node.limits.max_connections;
853 let max_peers = config.node.limits.max_peers;
854 let max_links = config.node.limits.max_links;
855 let coords_response_interval_ms = config.node.session.coords_response_interval_ms;
856 let backoff_base_secs = config.node.discovery.backoff_base_secs;
857 let backoff_max_secs = config.node.discovery.backoff_max_secs;
858 let forward_min_interval_secs = config.node.discovery.forward_min_interval_secs;
859
860 let (host_map, peer_acl) = Self::host_map_and_peer_acl(&config);
861 let configured_peer_send_weights = Self::configured_peer_send_weights(&config);
862
863 Ok(Self {
864 identity,
865 startup_epoch,
866 started_at: std::time::Instant::now(),
867 config,
868 state: NodeState::Created,
869 is_leaf_only,
870 tree_state,
871 bloom_state,
872 coord_cache,
873 learned_routes: LearnedRouteTable::default(),
874 session_direct_degraded_until_ms: HashMap::new(),
875 recent_requests: HashMap::new(),
876 transports: HashMap::new(),
877 transport_drops: HashMap::new(),
878 links: HashMap::new(),
879 addr_to_link: HashMap::new(),
880 packet_tx: None,
881 packet_rx: None,
882 connections: HashMap::new(),
883 peers: HashMap::new(),
884 sessions: HashMap::new(),
885 identity_cache: HashMap::new(),
886 pending_tun_packets: HashMap::new(),
887 pending_endpoint_data: HashMap::new(),
888 pending_lookups: HashMap::new(),
889 max_connections,
890 max_peers,
891 max_links,
892 next_link_id: 1,
893 next_transport_id: 1,
894 stats: stats::NodeStats::new(),
895 stats_history: stats_history::StatsHistory::new(),
896 tun_state,
897 tun_name: None,
898 tun_tx: None,
899 tun_outbound_rx: None,
900 external_packet_tx: None,
901 endpoint_command_rx: None,
902 endpoint_event_tx: None,
903 encrypt_workers: None,
904 decrypt_workers: None,
905 decrypt_registered_sessions: std::collections::HashSet::new(),
906 decrypt_fallback_tx,
907 decrypt_fallback_rx,
908 tun_reader_handle: None,
909 tun_writer_handle: None,
910 #[cfg(target_os = "macos")]
911 tun_shutdown_fd: None,
912 dns_identity_rx: None,
913 dns_task: None,
914 index_allocator: IndexAllocator::new(),
915 peers_by_index: HashMap::new(),
916 pending_outbound: HashMap::new(),
917 msg1_rate_limiter,
918 icmp_rate_limiter: IcmpRateLimiter::new(),
919 routing_error_rate_limiter: RoutingErrorRateLimiter::new(),
920 coords_response_rate_limiter: RoutingErrorRateLimiter::with_interval(
921 std::time::Duration::from_millis(coords_response_interval_ms),
922 ),
923 discovery_backoff: DiscoveryBackoff::with_params(backoff_base_secs, backoff_max_secs),
924 discovery_forward_limiter: DiscoveryForwardRateLimiter::with_interval(
925 std::time::Duration::from_secs(forward_min_interval_secs),
926 ),
927 pending_connects: Vec::new(),
928 retry_pending: HashMap::new(),
929 nostr_discovery: None,
930 nostr_discovery_started_at_ms: None,
931 lan_discovery: None,
932 local_instance_registry: None,
933 local_instance_started_at_ms: None,
934 last_local_instance_publish_ms: None,
935 last_local_instance_scan_ms: None,
936 startup_open_discovery_sweep_done: false,
937 bootstrap_transports: HashSet::new(),
938 bootstrap_transport_npubs: HashMap::new(),
939 discovery_fallback_transit_blocked_peers: HashSet::new(),
940 last_parent_reeval: None,
941 last_congestion_log: None,
942 estimated_mesh_size: None,
943 last_mesh_size_log: None,
944 last_self_warn: None,
945 last_local_send_failure_at: None,
946 last_rx_loop_maintenance_timeout_at: None,
947 peer_aliases: HashMap::new(),
948 configured_peer_send_weights,
949 peer_acl,
950 host_map,
951 path_mtu_lookup: Arc::new(std::sync::RwLock::new(HashMap::new())),
952 })
953 }
954
955 pub fn with_identity(identity: Identity, config: Config) -> Result<Self, NodeError> {
960 config.validate()?;
961 let node_addr = *identity.node_addr();
962
963 let (decrypt_fallback_tx, decrypt_fallback_rx) = tokio::sync::mpsc::unbounded_channel();
964 let decrypt_fallback_rx = Some(decrypt_fallback_rx);
965
966 let mut startup_epoch = [0u8; 8];
967 rand::rng().fill_bytes(&mut startup_epoch);
968
969 let tun_state = if config.tun.enabled {
970 TunState::Configured
971 } else {
972 TunState::Disabled
973 };
974
975 let mut tree_state = TreeState::new(node_addr);
977 tree_state.set_parent_hysteresis(config.node.tree.parent_hysteresis);
978 tree_state.set_hold_down(config.node.tree.hold_down_secs);
979 tree_state.set_flap_dampening(
980 config.node.tree.flap_threshold,
981 config.node.tree.flap_window_secs,
982 config.node.tree.flap_dampening_secs,
983 );
984 tree_state
985 .sign_declaration(&identity)
986 .expect("signing own declaration should never fail");
987
988 let mut bloom_state = BloomState::new(node_addr);
989 bloom_state.set_update_debounce_ms(config.node.bloom.update_debounce_ms);
990
991 let coord_cache = CoordCache::new(
992 config.node.cache.coord_size,
993 config.node.cache.coord_ttl_secs * 1000,
994 );
995 let rl = &config.node.rate_limit;
996 let msg1_rate_limiter = HandshakeRateLimiter::with_params(
997 rate_limit::TokenBucket::with_params(rl.handshake_burst, rl.handshake_rate),
998 config.node.limits.max_pending_inbound,
999 );
1000
1001 let max_connections = config.node.limits.max_connections;
1002 let max_peers = config.node.limits.max_peers;
1003 let max_links = config.node.limits.max_links;
1004 let coords_response_interval_ms = config.node.session.coords_response_interval_ms;
1005
1006 let (host_map, peer_acl) = Self::host_map_and_peer_acl(&config);
1007 let configured_peer_send_weights = Self::configured_peer_send_weights(&config);
1008
1009 Ok(Self {
1010 identity,
1011 startup_epoch,
1012 started_at: std::time::Instant::now(),
1013 config,
1014 state: NodeState::Created,
1015 is_leaf_only: false,
1016 tree_state,
1017 bloom_state,
1018 coord_cache,
1019 learned_routes: LearnedRouteTable::default(),
1020 session_direct_degraded_until_ms: HashMap::new(),
1021 recent_requests: HashMap::new(),
1022 transports: HashMap::new(),
1023 transport_drops: HashMap::new(),
1024 links: HashMap::new(),
1025 addr_to_link: HashMap::new(),
1026 packet_tx: None,
1027 packet_rx: None,
1028 connections: HashMap::new(),
1029 peers: HashMap::new(),
1030 sessions: HashMap::new(),
1031 identity_cache: HashMap::new(),
1032 pending_tun_packets: HashMap::new(),
1033 pending_endpoint_data: HashMap::new(),
1034 pending_lookups: HashMap::new(),
1035 max_connections,
1036 max_peers,
1037 max_links,
1038 next_link_id: 1,
1039 next_transport_id: 1,
1040 stats: stats::NodeStats::new(),
1041 stats_history: stats_history::StatsHistory::new(),
1042 tun_state,
1043 tun_name: None,
1044 tun_tx: None,
1045 tun_outbound_rx: None,
1046 external_packet_tx: None,
1047 endpoint_command_rx: None,
1048 endpoint_event_tx: None,
1049 encrypt_workers: None,
1050 decrypt_workers: None,
1051 decrypt_registered_sessions: std::collections::HashSet::new(),
1052 decrypt_fallback_tx,
1053 decrypt_fallback_rx,
1054 tun_reader_handle: None,
1055 tun_writer_handle: None,
1056 #[cfg(target_os = "macos")]
1057 tun_shutdown_fd: None,
1058 dns_identity_rx: None,
1059 dns_task: None,
1060 index_allocator: IndexAllocator::new(),
1061 peers_by_index: HashMap::new(),
1062 pending_outbound: HashMap::new(),
1063 msg1_rate_limiter,
1064 icmp_rate_limiter: IcmpRateLimiter::new(),
1065 routing_error_rate_limiter: RoutingErrorRateLimiter::new(),
1066 coords_response_rate_limiter: RoutingErrorRateLimiter::with_interval(
1067 std::time::Duration::from_millis(coords_response_interval_ms),
1068 ),
1069 discovery_backoff: DiscoveryBackoff::new(),
1070 discovery_forward_limiter: DiscoveryForwardRateLimiter::new(),
1071 pending_connects: Vec::new(),
1072 retry_pending: HashMap::new(),
1073 nostr_discovery: None,
1074 nostr_discovery_started_at_ms: None,
1075 lan_discovery: None,
1076 local_instance_registry: None,
1077 local_instance_started_at_ms: None,
1078 last_local_instance_publish_ms: None,
1079 last_local_instance_scan_ms: None,
1080 startup_open_discovery_sweep_done: false,
1081 bootstrap_transports: HashSet::new(),
1082 bootstrap_transport_npubs: HashMap::new(),
1083 discovery_fallback_transit_blocked_peers: HashSet::new(),
1084 last_parent_reeval: None,
1085 last_congestion_log: None,
1086 estimated_mesh_size: None,
1087 last_mesh_size_log: None,
1088 last_self_warn: None,
1089 last_local_send_failure_at: None,
1090 last_rx_loop_maintenance_timeout_at: None,
1091 peer_aliases: HashMap::new(),
1092 configured_peer_send_weights,
1093 peer_acl,
1094 host_map,
1095 path_mtu_lookup: Arc::new(std::sync::RwLock::new(HashMap::new())),
1096 })
1097 }
1098
1099 pub fn leaf_only(config: Config) -> Result<Self, NodeError> {
1101 let mut node = Self::new(config)?;
1102 node.is_leaf_only = true;
1103 node.bloom_state = BloomState::leaf_only(*node.identity.node_addr());
1104 Ok(node)
1105 }
1106
1107 fn host_map_and_peer_acl(config: &Config) -> (Arc<HostMap>, acl::PeerAclReloader) {
1108 let base_host_map = HostMap::from_peer_configs(config.peers());
1109 if !config.node.system_files_enabled {
1110 return (
1111 Arc::new(base_host_map.clone()),
1112 acl::PeerAclReloader::memory_only(base_host_map),
1113 );
1114 }
1115
1116 let mut host_map = base_host_map.clone();
1117 let hosts_path = std::path::PathBuf::from(crate::upper::hosts::DEFAULT_HOSTS_PATH);
1118 let hosts_file = HostMap::load_hosts_file(std::path::Path::new(
1119 crate::upper::hosts::DEFAULT_HOSTS_PATH,
1120 ));
1121 host_map.merge(hosts_file);
1122 let peer_acl = acl::PeerAclReloader::with_alias_sources(
1123 std::path::PathBuf::from(acl::DEFAULT_PEERS_ALLOW_PATH),
1124 std::path::PathBuf::from(acl::DEFAULT_PEERS_DENY_PATH),
1125 base_host_map,
1126 hosts_path,
1127 );
1128 (Arc::new(host_map), peer_acl)
1129 }
1130
1131 fn configured_peer_send_weights(config: &Config) -> HashMap<NodeAddr, u8> {
1132 config
1133 .peers()
1134 .iter()
1135 .filter_map(|peer| {
1136 PeerIdentity::from_npub(&peer.npub).ok().map(|identity| {
1137 (
1138 *identity.node_addr(),
1139 encrypt_worker::EXPLICIT_PEER_SEND_WEIGHT,
1140 )
1141 })
1142 })
1143 .collect()
1144 }
1145
1146 fn send_weight_for_peer(&self, peer_addr: &NodeAddr) -> u8 {
1147 self.configured_peer_send_weights
1148 .get(peer_addr)
1149 .copied()
1150 .unwrap_or(encrypt_worker::DEFAULT_SEND_WEIGHT)
1151 }
1152
1153 async fn create_transports(&mut self, packet_tx: &PacketTx) -> Vec<TransportHandle> {
1157 let mut transports = Vec::new();
1158
1159 let udp_instances: Vec<_> = self
1161 .config
1162 .transports
1163 .udp
1164 .iter()
1165 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1166 .collect();
1167
1168 for (name, udp_config) in udp_instances {
1170 let transport_id = self.allocate_transport_id();
1171 let udp = UdpTransport::new(transport_id, name, udp_config, packet_tx.clone());
1172 transports.push(TransportHandle::Udp(udp));
1173 }
1174
1175 #[cfg(feature = "sim-transport")]
1176 {
1177 let sim_instances: Vec<_> = self
1178 .config
1179 .transports
1180 .sim
1181 .iter()
1182 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1183 .collect();
1184
1185 for (name, sim_config) in sim_instances {
1186 let transport_id = self.allocate_transport_id();
1187 let sim = crate::transport::sim::SimTransport::new(
1188 transport_id,
1189 name,
1190 sim_config,
1191 packet_tx.clone(),
1192 );
1193 transports.push(TransportHandle::Sim(sim));
1194 }
1195 }
1196
1197 #[cfg(any(target_os = "linux", target_os = "macos"))]
1199 {
1200 let eth_instances: Vec<_> = self
1201 .config
1202 .transports
1203 .ethernet
1204 .iter()
1205 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1206 .collect();
1207 let xonly = self.identity.pubkey();
1208 for (name, eth_config) in eth_instances {
1209 let mut eth_config = eth_config;
1210 if eth_config.discovery_scope.is_none() {
1211 eth_config.discovery_scope = self.lan_discovery_scope();
1212 }
1213 let transport_id = self.allocate_transport_id();
1214 let mut eth =
1215 EthernetTransport::new(transport_id, name, eth_config, packet_tx.clone());
1216 eth.set_local_pubkey(xonly);
1217 transports.push(TransportHandle::Ethernet(eth));
1218 }
1219 }
1220
1221 let tcp_instances: Vec<_> = self
1223 .config
1224 .transports
1225 .tcp
1226 .iter()
1227 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1228 .collect();
1229
1230 for (name, tcp_config) in tcp_instances {
1231 let transport_id = self.allocate_transport_id();
1232 let tcp = TcpTransport::new(transport_id, name, tcp_config, packet_tx.clone());
1233 transports.push(TransportHandle::Tcp(tcp));
1234 }
1235
1236 let tor_instances: Vec<_> = self
1238 .config
1239 .transports
1240 .tor
1241 .iter()
1242 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1243 .collect();
1244
1245 for (name, tor_config) in tor_instances {
1246 let transport_id = self.allocate_transport_id();
1247 let tor = TorTransport::new(transport_id, name, tor_config, packet_tx.clone());
1248 transports.push(TransportHandle::Tor(tor));
1249 }
1250
1251 let webrtc_instances: Vec<_> = self
1252 .config
1253 .transports
1254 .webrtc
1255 .iter()
1256 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1257 .collect();
1258
1259 #[cfg(feature = "webrtc-transport")]
1260 {
1261 for (name, webrtc_config) in webrtc_instances {
1262 let transport_id = self.allocate_transport_id();
1263 match WebRtcTransport::new(
1264 transport_id,
1265 name,
1266 webrtc_config,
1267 packet_tx.clone(),
1268 &self.identity,
1269 &self.config.node.discovery.nostr,
1270 ) {
1271 Ok(webrtc) => transports.push(TransportHandle::WebRtc(Box::new(webrtc))),
1272 Err(err) => {
1273 warn!(
1274 transport_id = %transport_id,
1275 error = %err,
1276 "failed to initialize WebRTC transport"
1277 );
1278 }
1279 }
1280 }
1281 }
1282 #[cfg(not(feature = "webrtc-transport"))]
1283 if !webrtc_instances.is_empty() {
1284 warn!("WebRTC transport configured but this build lacks WebRTC transport support");
1285 }
1286
1287 #[cfg(bluer_available)]
1289 {
1290 let ble_instances: Vec<_> = self
1291 .config
1292 .transports
1293 .ble
1294 .iter()
1295 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1296 .collect();
1297
1298 #[cfg(all(bluer_available, not(test)))]
1299 for (name, ble_config) in ble_instances {
1300 let transport_id = self.allocate_transport_id();
1301 let adapter = ble_config.adapter().to_string();
1302 let mtu = ble_config.mtu();
1303 match crate::transport::ble::io::BluerIo::new(&adapter, mtu).await {
1304 Ok(io) => {
1305 let mut ble = crate::transport::ble::BleTransport::new(
1306 transport_id,
1307 name,
1308 ble_config,
1309 io,
1310 packet_tx.clone(),
1311 );
1312 ble.set_local_pubkey(self.identity.pubkey().serialize());
1313 transports.push(TransportHandle::Ble(ble));
1314 }
1315 Err(e) => {
1316 tracing::warn!(adapter = %adapter, error = %e, "failed to initialize BLE adapter");
1317 }
1318 }
1319 }
1320
1321 #[cfg(any(not(bluer_available), test))]
1322 if !ble_instances.is_empty() {
1323 #[cfg(not(test))]
1324 tracing::warn!("BLE transport configured but this build lacks BlueZ support");
1325 }
1326 }
1327
1328 transports
1329 }
1330
1331 fn find_transport_for_type(&self, transport_type: &str) -> Option<TransportId> {
1341 self.transports
1342 .iter()
1343 .filter(|(id, handle)| {
1344 handle.transport_type().name == transport_type
1345 && handle.is_operational()
1346 && !self.bootstrap_transports.contains(id)
1347 })
1348 .min_by_key(|(id, _)| id.as_u32())
1349 .map(|(id, _)| *id)
1350 }
1351
1352 #[allow(unused_variables)]
1358 fn resolve_ethernet_addr(
1359 &self,
1360 addr_str: &str,
1361 ) -> Result<(TransportId, TransportAddr), NodeError> {
1362 #[cfg(any(target_os = "linux", target_os = "macos"))]
1363 {
1364 let (iface, mac_str) = addr_str.split_once('/').ok_or_else(|| {
1365 NodeError::NoTransportForType(format!(
1366 "invalid Ethernet address format '{}': expected 'interface/mac'",
1367 addr_str
1368 ))
1369 })?;
1370
1371 let transport_id = self
1373 .transports
1374 .iter()
1375 .find(|(_, handle)| {
1376 handle.transport_type().name == "ethernet"
1377 && handle.is_operational()
1378 && handle.interface_name() == Some(iface)
1379 })
1380 .map(|(id, _)| *id)
1381 .ok_or_else(|| {
1382 NodeError::NoTransportForType(format!(
1383 "no operational Ethernet transport for interface '{}'",
1384 iface
1385 ))
1386 })?;
1387
1388 let mac = crate::transport::ethernet::parse_mac_string(mac_str).map_err(|e| {
1389 NodeError::NoTransportForType(format!("invalid MAC in '{}': {}", addr_str, e))
1390 })?;
1391
1392 Ok((transport_id, TransportAddr::from_bytes(&mac)))
1393 }
1394 #[cfg(not(any(target_os = "linux", target_os = "macos")))]
1395 {
1396 Err(NodeError::NoTransportForType(
1397 "Ethernet transport is not supported on this platform".to_string(),
1398 ))
1399 }
1400 }
1401
1402 #[cfg(bluer_available)]
1406 fn resolve_ble_addr(&self, addr_str: &str) -> Result<(TransportId, TransportAddr), NodeError> {
1407 let ta = TransportAddr::from_string(addr_str);
1408 let adapter = crate::transport::ble::addr::adapter_from_addr(&ta).ok_or_else(|| {
1409 NodeError::NoTransportForType(format!(
1410 "invalid BLE address format '{}': expected 'adapter/mac'",
1411 addr_str
1412 ))
1413 })?;
1414
1415 let transport_id = self
1417 .transports
1418 .iter()
1419 .find(|(_, handle)| handle.transport_type().name == "ble" && handle.is_operational())
1420 .map(|(id, _)| *id)
1421 .ok_or_else(|| {
1422 NodeError::NoTransportForType(format!(
1423 "no operational BLE transport for adapter '{}'",
1424 adapter
1425 ))
1426 })?;
1427
1428 crate::transport::ble::addr::BleAddr::parse(addr_str).map_err(|e| {
1430 NodeError::NoTransportForType(format!("invalid BLE address '{}': {}", addr_str, e))
1431 })?;
1432
1433 Ok((transport_id, TransportAddr::from_string(addr_str)))
1434 }
1435
1436 pub fn identity(&self) -> &Identity {
1440 &self.identity
1441 }
1442
1443 pub fn node_addr(&self) -> &NodeAddr {
1445 self.identity.node_addr()
1446 }
1447
1448 pub fn npub(&self) -> String {
1450 self.identity.npub()
1451 }
1452
1453 pub(crate) fn peer_display_name(&self, addr: &NodeAddr) -> String {
1462 if let Some(hostname) = self.host_map.lookup_hostname(addr) {
1463 return hostname.to_string();
1464 }
1465 if let Some(name) = self.peer_aliases.get(addr) {
1466 return name.clone();
1467 }
1468 if let Some(peer) = self.peers.get(addr) {
1469 return peer.identity().short_npub();
1470 }
1471 if let Some(entry) = self.sessions.get(addr) {
1472 let (xonly, _) = entry.remote_pubkey().x_only_public_key();
1473 return PeerIdentity::from_pubkey(xonly).short_npub();
1474 }
1475 addr.short_hex()
1476 }
1477
1478 pub(in crate::node) fn deregister_session_index(&mut self, cache_key: (TransportId, u32)) {
1490 let owning_peer = self.peers_by_index.get(&cache_key).copied();
1494 self.peers_by_index.remove(&cache_key);
1495 if self.decrypt_registered_sessions.remove(&cache_key)
1496 && let Some(workers) = self.decrypt_workers.as_ref()
1497 {
1498 workers.unregister_session(cache_key);
1499 }
1500 if let Some(peer_addr) = owning_peer {
1511 let peer_has_other_index = self
1512 .peers_by_index
1513 .values()
1514 .any(|other| *other == peer_addr);
1515 if !peer_has_other_index {
1516 self.clear_connected_udp_for_peer(&peer_addr);
1517 }
1518 }
1519 }
1520
1521 pub(in crate::node) fn ensure_current_session_index_registered(
1530 &mut self,
1531 node_addr: &NodeAddr,
1532 context: &'static str,
1533 ) -> bool {
1534 let Some(peer) = self.peers.get(node_addr) else {
1535 return false;
1536 };
1537 let Some(transport_id) = peer.transport_id() else {
1538 warn!(
1539 peer = %self.peer_display_name(node_addr),
1540 context,
1541 "Cannot register current session index without transport id"
1542 );
1543 return false;
1544 };
1545 let Some(our_index) = peer.our_index() else {
1546 warn!(
1547 peer = %self.peer_display_name(node_addr),
1548 context,
1549 "Cannot register current session index without local index"
1550 );
1551 return false;
1552 };
1553
1554 let cache_key = (transport_id, our_index.as_u32());
1555 match self.peers_by_index.get(&cache_key).copied() {
1556 Some(existing) if existing == *node_addr => true,
1557 Some(existing) => {
1558 warn!(
1559 peer = %self.peer_display_name(node_addr),
1560 previous_owner = %self.peer_display_name(&existing),
1561 transport_id = %transport_id,
1562 our_index = %our_index,
1563 context,
1564 "Repairing current session index with stale owner"
1565 );
1566 self.peers_by_index.insert(cache_key, *node_addr);
1567 true
1568 }
1569 None => {
1570 warn!(
1571 peer = %self.peer_display_name(node_addr),
1572 transport_id = %transport_id,
1573 our_index = %our_index,
1574 context,
1575 "Repairing missing current session index"
1576 );
1577 self.peers_by_index.insert(cache_key, *node_addr);
1578 true
1579 }
1580 }
1581 }
1582
1583 pub fn config(&self) -> &Config {
1587 &self.config
1588 }
1589
1590 pub fn effective_ipv6_mtu(&self) -> u16 {
1596 crate::upper::icmp::effective_ipv6_mtu(self.transport_mtu())
1597 }
1598
1599 pub fn transport_mtu(&self) -> u16 {
1616 let min_operational = self
1617 .transports
1618 .values()
1619 .filter(|h| h.is_operational())
1620 .map(|h| h.mtu())
1621 .min();
1622 if let Some(mtu) = min_operational {
1623 return mtu;
1624 }
1625 if let Some((_, cfg)) = self.config.transports.udp.iter().next() {
1627 return cfg.mtu();
1628 }
1629 1280
1630 }
1631
1632 pub fn state(&self) -> NodeState {
1636 self.state
1637 }
1638
1639 pub fn uptime(&self) -> std::time::Duration {
1641 self.started_at.elapsed()
1642 }
1643
1644 pub fn is_running(&self) -> bool {
1646 self.state.is_operational()
1647 }
1648
1649 pub fn is_leaf_only(&self) -> bool {
1651 self.is_leaf_only
1652 }
1653
1654 pub fn tree_state(&self) -> &TreeState {
1658 &self.tree_state
1659 }
1660
1661 pub fn tree_state_mut(&mut self) -> &mut TreeState {
1663 &mut self.tree_state
1664 }
1665
1666 pub fn bloom_state(&self) -> &BloomState {
1670 &self.bloom_state
1671 }
1672
1673 pub fn bloom_state_mut(&mut self) -> &mut BloomState {
1675 &mut self.bloom_state
1676 }
1677
1678 pub fn estimated_mesh_size(&self) -> Option<u64> {
1682 self.estimated_mesh_size
1683 }
1684
1685 pub(crate) fn compute_mesh_size(&mut self) {
1691 let my_addr = *self.tree_state.my_node_addr();
1692 let parent_id = *self.tree_state.my_declaration().parent_id();
1693 let is_root = self.tree_state.is_root();
1694
1695 let max_fpr = self.config.node.bloom.max_inbound_fpr;
1696 let mut total: f64 = 1.0; let mut child_count: u32 = 0;
1698 let mut has_data = false;
1699
1700 if !is_root
1706 && let Some(parent) = self.peers.get(&parent_id)
1707 && let Some(filter) = parent.inbound_filter()
1708 {
1709 match filter.estimated_count(max_fpr) {
1710 Some(n) => {
1711 total += n;
1712 has_data = true;
1713 }
1714 None => {
1715 self.estimated_mesh_size = None;
1716 return;
1717 }
1718 }
1719 }
1720
1721 for (peer_addr, peer) in &self.peers {
1723 if peer_addr == &parent_id {
1724 continue;
1725 }
1726 if let Some(decl) = self.tree_state.peer_declaration(peer_addr)
1727 && *decl.parent_id() == my_addr
1728 {
1729 child_count += 1;
1730 if let Some(filter) = peer.inbound_filter() {
1731 match filter.estimated_count(max_fpr) {
1732 Some(n) => {
1733 total += n;
1734 has_data = true;
1735 }
1736 None => {
1737 self.estimated_mesh_size = None;
1738 return;
1739 }
1740 }
1741 }
1742 }
1743 }
1744
1745 if !has_data {
1746 self.estimated_mesh_size = None;
1747 return;
1748 }
1749
1750 let size = total.round() as u64;
1751 self.estimated_mesh_size = Some(size);
1752
1753 let now = std::time::Instant::now();
1755 let should_log = match self.last_mesh_size_log {
1756 None => true,
1757 Some(last) => {
1758 now.duration_since(last)
1759 >= std::time::Duration::from_secs(self.config.node.mmp.log_interval_secs)
1760 }
1761 };
1762 if should_log {
1763 tracing::debug!(
1764 estimated_mesh_size = size,
1765 peers = self.peers.len(),
1766 children = child_count,
1767 "Mesh size estimate"
1768 );
1769 self.last_mesh_size_log = Some(now);
1770 }
1771 }
1772
1773 pub fn coord_cache(&self) -> &CoordCache {
1777 &self.coord_cache
1778 }
1779
1780 pub fn coord_cache_mut(&mut self) -> &mut CoordCache {
1782 &mut self.coord_cache
1783 }
1784
1785 pub fn stats(&self) -> &stats::NodeStats {
1789 &self.stats
1790 }
1791
1792 pub(crate) fn stats_mut(&mut self) -> &mut stats::NodeStats {
1794 &mut self.stats
1795 }
1796
1797 pub fn stats_history(&self) -> &stats_history::StatsHistory {
1799 &self.stats_history
1800 }
1801
1802 pub(crate) fn record_stats_history(&mut self) {
1805 let fwd = &self.stats.forwarding;
1806 let peers_with_mmp: Vec<f64> = self
1807 .peers
1808 .values()
1809 .filter_map(|p| p.mmp().map(|m| m.metrics.loss_rate()))
1810 .collect();
1811 let loss_rate = if peers_with_mmp.is_empty() {
1812 0.0
1813 } else {
1814 peers_with_mmp.iter().sum::<f64>() / peers_with_mmp.len() as f64
1815 };
1816
1817 let snap = stats_history::Snapshot {
1818 mesh_size: self.estimated_mesh_size,
1819 tree_depth: self.tree_state.my_coords().depth() as u32,
1820 peer_count: self.peers.len() as u64,
1821 parent_switches_total: self.stats.tree.parent_switches,
1822 bytes_in_total: fwd.received_bytes,
1823 bytes_out_total: fwd.forwarded_bytes + fwd.originated_bytes,
1824 packets_in_total: fwd.received_packets,
1825 packets_out_total: fwd.forwarded_packets + fwd.originated_packets,
1826 loss_rate,
1827 active_sessions: self.sessions.len() as u64,
1828 };
1829
1830 let now = std::time::Instant::now();
1831 let peer_snaps: Vec<stats_history::PeerSnapshot> = self
1832 .peers
1833 .values()
1834 .map(|p| {
1835 let stats = p.link_stats();
1836 let (srtt_ms, loss_rate, ecn_ce) = match p.mmp() {
1837 Some(m) => (
1838 m.metrics.srtt_ms(),
1839 Some(m.metrics.loss_rate()),
1840 m.receiver.ecn_ce_count() as u64,
1841 ),
1842 None => (None, None, 0),
1843 };
1844 stats_history::PeerSnapshot {
1845 node_addr: *p.node_addr(),
1846 last_seen: now,
1847 srtt_ms,
1848 loss_rate,
1849 bytes_in_total: stats.bytes_recv,
1850 bytes_out_total: stats.bytes_sent,
1851 packets_in_total: stats.packets_recv,
1852 packets_out_total: stats.packets_sent,
1853 ecn_ce_total: ecn_ce,
1854 }
1855 })
1856 .collect();
1857
1858 self.stats_history.tick(now, &snap, &peer_snaps);
1859 }
1860
1861 pub fn tun_state(&self) -> TunState {
1865 self.tun_state
1866 }
1867
1868 pub fn tun_name(&self) -> Option<&str> {
1870 self.tun_name.as_deref()
1871 }
1872
1873 pub fn set_max_connections(&mut self, max: usize) {
1877 self.max_connections = max;
1878 }
1879
1880 pub fn set_max_peers(&mut self, max: usize) {
1882 self.max_peers = max;
1883 }
1884
1885 pub(crate) fn outbound_admission_check(&self) -> bool {
1888 let connection_used = self
1889 .connections
1890 .len()
1891 .saturating_add(self.pending_connects.len());
1892 let peer_allowed = self.max_peers == 0 || self.peers.len() < self.max_peers;
1893 let connection_allowed =
1894 self.max_connections == 0 || connection_used < self.max_connections;
1895 let link_allowed = self.max_links == 0 || self.links.len() < self.max_links;
1896 peer_allowed && connection_allowed && link_allowed
1897 }
1898
1899 pub(crate) fn open_discovery_outbound_admission_check(&self) -> bool {
1903 if !self.outbound_admission_check() {
1904 return false;
1905 }
1906
1907 let nostr = &self.config.node.discovery.nostr;
1908 if !nostr.enabled || nostr.policy != NostrDiscoveryPolicy::Open {
1909 return true;
1910 }
1911
1912 let configured_npubs = self
1913 .config
1914 .peers()
1915 .iter()
1916 .map(|peer| peer.npub.clone())
1917 .collect::<HashSet<_>>();
1918 self.open_discovery_enqueue_budget(&configured_npubs) > 0
1919 }
1920
1921 pub(crate) fn outbound_direct_refresh_admission_check(&self) -> bool {
1925 let connection_used = self
1926 .connections
1927 .len()
1928 .saturating_add(self.pending_connects.len());
1929 let connection_allowed =
1930 self.max_connections == 0 || connection_used < self.max_connections;
1931 let link_allowed = self.max_links == 0 || self.links.len() < self.max_links;
1932 connection_allowed && link_allowed
1933 }
1934
1935 pub fn set_max_links(&mut self, max: usize) {
1937 self.max_links = max;
1938 }
1939
1940 pub fn connection_count(&self) -> usize {
1944 self.connections.len()
1945 }
1946
1947 pub fn peer_count(&self) -> usize {
1949 self.peers.len()
1950 }
1951
1952 pub fn link_count(&self) -> usize {
1954 self.links.len()
1955 }
1956
1957 pub fn transport_count(&self) -> usize {
1959 self.transports.len()
1960 }
1961
1962 pub fn allocate_transport_id(&mut self) -> TransportId {
1966 let id = TransportId::new(self.next_transport_id);
1967 self.next_transport_id += 1;
1968 id
1969 }
1970
1971 pub fn get_transport(&self, id: &TransportId) -> Option<&TransportHandle> {
1973 self.transports.get(id)
1974 }
1975
1976 pub fn get_transport_mut(&mut self, id: &TransportId) -> Option<&mut TransportHandle> {
1978 self.transports.get_mut(id)
1979 }
1980
1981 pub fn transport_ids(&self) -> impl Iterator<Item = &TransportId> {
1983 self.transports.keys()
1984 }
1985
1986 pub fn packet_rx(&mut self) -> Option<&mut PacketRx> {
1988 self.packet_rx.as_mut()
1989 }
1990
1991 pub fn allocate_link_id(&mut self) -> LinkId {
1995 let id = LinkId::new(self.next_link_id);
1996 self.next_link_id += 1;
1997 id
1998 }
1999
2000 pub fn add_link(&mut self, link: Link) -> Result<(), NodeError> {
2002 if self.max_links > 0 && self.links.len() >= self.max_links {
2003 return Err(NodeError::MaxLinksExceeded {
2004 max: self.max_links,
2005 });
2006 }
2007 let link_id = link.link_id();
2008 let transport_id = link.transport_id();
2009 let remote_addr = link.remote_addr().clone();
2010
2011 self.links.insert(link_id, link);
2012 self.addr_to_link
2013 .insert((transport_id, remote_addr), link_id);
2014 Ok(())
2015 }
2016
2017 pub fn get_link(&self, link_id: &LinkId) -> Option<&Link> {
2019 self.links.get(link_id)
2020 }
2021
2022 pub fn get_link_mut(&mut self, link_id: &LinkId) -> Option<&mut Link> {
2024 self.links.get_mut(link_id)
2025 }
2026
2027 pub fn find_link_by_addr(
2029 &self,
2030 transport_id: TransportId,
2031 addr: &TransportAddr,
2032 ) -> Option<LinkId> {
2033 self.addr_to_link
2034 .get(&(transport_id, addr.clone()))
2035 .copied()
2036 }
2037
2038 pub fn remove_link(&mut self, link_id: &LinkId) -> Option<Link> {
2044 if let Some(link) = self.links.remove(link_id) {
2045 let key = (link.transport_id(), link.remote_addr().clone());
2047 if self.addr_to_link.get(&key) == Some(link_id) {
2048 self.addr_to_link.remove(&key);
2049 }
2050 Some(link)
2051 } else {
2052 None
2053 }
2054 }
2055
2056 pub(crate) fn cleanup_bootstrap_transport_if_unused(&mut self, transport_id: TransportId) {
2057 if !self.bootstrap_transports.contains(&transport_id) {
2058 return;
2059 }
2060
2061 let transport_in_use = self
2062 .links
2063 .values()
2064 .any(|link| link.transport_id() == transport_id)
2065 || self
2066 .connections
2067 .values()
2068 .any(|conn| conn.transport_id() == Some(transport_id))
2069 || self
2070 .peers
2071 .values()
2072 .any(|peer| peer.transport_id() == Some(transport_id))
2073 || self
2074 .pending_connects
2075 .iter()
2076 .any(|pending| pending.transport_id == transport_id);
2077
2078 if transport_in_use {
2079 return;
2080 }
2081
2082 tracing::debug!(
2083 transport_id = %transport_id,
2084 "bootstrap transport has no remaining references; dropping"
2085 );
2086
2087 self.bootstrap_transports.remove(&transport_id);
2088 self.bootstrap_transport_npubs.remove(&transport_id);
2089 self.transport_drops.remove(&transport_id);
2090 self.transports.remove(&transport_id);
2091 }
2092
2093 pub fn links(&self) -> impl Iterator<Item = &Link> {
2095 self.links.values()
2096 }
2097
2098 pub fn add_connection(&mut self, connection: PeerConnection) -> Result<(), NodeError> {
2102 let link_id = connection.link_id();
2103
2104 if self.connections.contains_key(&link_id) {
2105 return Err(NodeError::ConnectionAlreadyExists(link_id));
2106 }
2107
2108 if self.max_connections > 0 && self.connections.len() >= self.max_connections {
2109 return Err(NodeError::MaxConnectionsExceeded {
2110 max: self.max_connections,
2111 });
2112 }
2113
2114 self.connections.insert(link_id, connection);
2115 Ok(())
2116 }
2117
2118 pub fn get_connection(&self, link_id: &LinkId) -> Option<&PeerConnection> {
2120 self.connections.get(link_id)
2121 }
2122
2123 pub fn get_connection_mut(&mut self, link_id: &LinkId) -> Option<&mut PeerConnection> {
2125 self.connections.get_mut(link_id)
2126 }
2127
2128 pub fn remove_connection(&mut self, link_id: &LinkId) -> Option<PeerConnection> {
2130 self.connections.remove(link_id)
2131 }
2132
2133 pub fn connections(&self) -> impl Iterator<Item = &PeerConnection> {
2135 self.connections.values()
2136 }
2137
2138 pub fn get_peer(&self, node_addr: &NodeAddr) -> Option<&ActivePeer> {
2142 self.peers.get(node_addr)
2143 }
2144
2145 pub fn get_peer_mut(&mut self, node_addr: &NodeAddr) -> Option<&mut ActivePeer> {
2147 self.peers.get_mut(node_addr)
2148 }
2149
2150 pub fn remove_peer(&mut self, node_addr: &NodeAddr) -> Option<ActivePeer> {
2152 self.peers.remove(node_addr)
2153 }
2154
2155 pub fn peers(&self) -> impl Iterator<Item = &ActivePeer> {
2157 self.peers.values()
2158 }
2159
2160 pub fn nostr_discovery_handle(&self) -> Option<&crate::discovery::nostr::NostrDiscovery> {
2164 self.nostr_discovery.as_deref()
2165 }
2166
2167 pub fn peer_ids(&self) -> impl Iterator<Item = &NodeAddr> {
2169 self.peers.keys()
2170 }
2171
2172 pub fn sendable_peers(&self) -> impl Iterator<Item = &ActivePeer> {
2174 self.peers.values().filter(|p| p.can_send())
2175 }
2176
2177 pub fn sendable_peer_count(&self) -> usize {
2179 self.peers.values().filter(|p| p.can_send()).count()
2180 }
2181
2182 pub(crate) fn set_discovery_fallback_transit_allowed(
2183 &mut self,
2184 peer_addr: NodeAddr,
2185 allowed: bool,
2186 ) {
2187 if allowed {
2188 self.discovery_fallback_transit_blocked_peers
2189 .remove(&peer_addr);
2190 } else {
2191 self.discovery_fallback_transit_blocked_peers
2192 .insert(peer_addr);
2193 }
2194 }
2195
2196 pub(crate) fn configured_discovery_fallback_transit(
2197 &self,
2198 peer_addr: &NodeAddr,
2199 ) -> Option<bool> {
2200 self.configured_peer(peer_addr)
2201 .map(|peer| peer.discovery_fallback_transit)
2202 }
2203
2204 pub(crate) fn configured_peer(&self, peer_addr: &NodeAddr) -> Option<&PeerConfig> {
2205 self.config.peers().iter().find(|peer| {
2206 PeerIdentity::from_npub(&peer.npub)
2207 .ok()
2208 .is_some_and(|identity| identity.node_addr() == peer_addr)
2209 })
2210 }
2211
2212 pub(crate) fn discovery_fallback_transit_for_promotion(&self, peer_addr: &NodeAddr) -> bool {
2213 if let Some(retry_state) = self.retry_pending.get(peer_addr) {
2214 return retry_state.peer_config.discovery_fallback_transit;
2215 }
2216
2217 if let Some(allowed) = self.configured_discovery_fallback_transit(peer_addr) {
2218 return allowed;
2219 }
2220
2221 self.config.node.discovery.nostr.policy != crate::config::NostrDiscoveryPolicy::Open
2222 }
2223
2224 #[cfg(test)]
2229 pub(crate) fn disable_discovery_forward_rate_limit(&mut self) {
2230 self.discovery_forward_limiter
2231 .set_interval(std::time::Duration::ZERO);
2232 }
2233
2234 #[cfg(test)]
2235 pub(crate) fn get_session(&self, remote: &NodeAddr) -> Option<&SessionEntry> {
2236 self.sessions.get(remote)
2237 }
2238
2239 #[cfg(test)]
2241 pub(crate) fn get_session_mut(&mut self, remote: &NodeAddr) -> Option<&mut SessionEntry> {
2242 self.sessions.get_mut(remote)
2243 }
2244
2245 #[cfg(test)]
2247 pub(crate) fn remove_session(&mut self, remote: &NodeAddr) -> Option<SessionEntry> {
2248 self.sessions.remove(remote)
2249 }
2250
2251 #[cfg(test)]
2253 pub(crate) fn path_mtu_lookup_get(&self, fips_addr: &crate::FipsAddress) -> Option<u16> {
2254 self.path_mtu_lookup
2255 .read()
2256 .ok()
2257 .and_then(|map| map.get(fips_addr).copied())
2258 }
2259
2260 #[cfg(test)]
2262 pub(crate) fn path_mtu_lookup_insert(&self, fips_addr: crate::FipsAddress, mtu: u16) {
2263 if let Ok(mut map) = self.path_mtu_lookup.write() {
2264 map.insert(fips_addr, mtu);
2265 }
2266 }
2267
2268 pub fn session_count(&self) -> usize {
2270 self.sessions.len()
2271 }
2272
2273 pub(crate) fn session_entries(&self) -> impl Iterator<Item = (&NodeAddr, &SessionEntry)> {
2275 self.sessions.iter()
2276 }
2277
2278 pub(crate) fn register_identity(
2282 &mut self,
2283 node_addr: NodeAddr,
2284 pubkey: secp256k1::PublicKey,
2285 ) -> bool {
2286 let mut prefix = [0u8; 15];
2287 prefix.copy_from_slice(&node_addr.as_bytes()[0..15]);
2288 if let Some(entry) = self.identity_cache.get(&prefix)
2289 && entry.node_addr == node_addr
2290 && entry.pubkey == pubkey
2291 {
2292 return true;
2296 }
2297
2298 let (xonly, _) = pubkey.x_only_public_key();
2299 let derived_node_addr = NodeAddr::from_pubkey(&xonly);
2300 if derived_node_addr != node_addr {
2301 debug!(
2302 claimed_node_addr = %node_addr,
2303 derived_node_addr = %derived_node_addr,
2304 "Rejected identity cache entry with mismatched public key"
2305 );
2306 return false;
2307 }
2308
2309 let now_ms = Self::now_ms();
2310 if let Some(entry) = self.identity_cache.get_mut(&prefix)
2311 && entry.node_addr == node_addr
2312 {
2313 entry.pubkey = pubkey;
2314 entry.last_seen_ms = now_ms;
2315 return true;
2316 }
2317
2318 let npub = encode_npub(&xonly);
2319 self.identity_cache.insert(
2320 prefix,
2321 IdentityCacheEntry::new(node_addr, pubkey, npub, now_ms),
2322 );
2323 let max = self.config.node.cache.identity_size;
2325 if self.identity_cache.len() > max
2326 && let Some(oldest_key) = self
2327 .identity_cache
2328 .iter()
2329 .min_by_key(|(_, entry)| entry.last_seen_ms)
2330 .map(|(k, _)| *k)
2331 {
2332 self.identity_cache.remove(&oldest_key);
2333 }
2334 true
2335 }
2336
2337 pub(crate) fn lookup_by_fips_prefix(
2339 &mut self,
2340 prefix: &[u8; 15],
2341 ) -> Option<(NodeAddr, secp256k1::PublicKey)> {
2342 if let Some(entry) = self.identity_cache.get_mut(prefix) {
2343 entry.last_seen_ms = Self::now_ms(); Some((entry.node_addr, entry.pubkey))
2345 } else {
2346 None
2347 }
2348 }
2349
2350 pub(crate) fn has_cached_identity(&self, addr: &NodeAddr) -> bool {
2352 let mut prefix = [0u8; 15];
2353 prefix.copy_from_slice(&addr.as_bytes()[0..15]);
2354 self.identity_cache.contains_key(&prefix)
2355 }
2356
2357 pub fn identity_cache_len(&self) -> usize {
2359 self.identity_cache.len()
2360 }
2361
2362 pub fn identity_cache_iter(
2367 &self,
2368 ) -> impl Iterator<Item = (&NodeAddr, &secp256k1::PublicKey, u64)> {
2369 self.identity_cache
2370 .values()
2371 .map(|entry| (&entry.node_addr, &entry.pubkey, entry.last_seen_ms))
2372 }
2373
2374 pub fn identity_cache_max(&self) -> usize {
2376 self.config.node.cache.identity_size
2377 }
2378
2379 pub fn pending_lookup_count(&self) -> usize {
2381 self.pending_lookups.len()
2382 }
2383
2384 pub fn pending_lookups_iter(
2386 &self,
2387 ) -> impl Iterator<Item = (&NodeAddr, &handlers::discovery::PendingLookup)> {
2388 self.pending_lookups.iter()
2389 }
2390
2391 pub fn recent_request_count(&self) -> usize {
2393 self.recent_requests.len()
2394 }
2395
2396 pub fn pending_tun_destinations(&self) -> usize {
2398 self.pending_tun_packets.len()
2399 }
2400
2401 pub fn pending_tun_total_packets(&self) -> usize {
2403 self.pending_tun_packets.values().map(|q| q.len()).sum()
2404 }
2405
2406 pub fn retry_state_iter(&self) -> impl Iterator<Item = (&NodeAddr, &retry::RetryState)> {
2408 self.retry_pending.iter()
2409 }
2410
2411 pub(crate) fn is_tree_peer(&self, peer_addr: &NodeAddr) -> bool {
2418 if !self.tree_state.is_root() && self.tree_state.my_declaration().parent_id() == peer_addr {
2420 return true;
2421 }
2422 if let Some(decl) = self.tree_state.peer_declaration(peer_addr)
2424 && decl.parent_id() == self.node_addr()
2425 {
2426 return true;
2427 }
2428 false
2429 }
2430
2431 pub fn find_next_hop(&mut self, dest_node_addr: &NodeAddr) -> Option<&ActivePeer> {
2455 if dest_node_addr == self.node_addr() {
2457 return None;
2458 }
2459 let now_ms = Self::now_ms();
2460 let direct_session_degraded = self.session_direct_path_is_degraded(dest_node_addr, now_ms);
2461
2462 let healthy_direct_route = self
2463 .peers
2464 .get(dest_node_addr)
2465 .filter(|peer| peer.is_healthy() && !direct_session_degraded)
2466 .map(|_| *dest_node_addr);
2467 let direct_payload_eligible = healthy_direct_route.is_some();
2468 let payload_candidate_can_send = |addr: &NodeAddr, peer: &ActivePeer| {
2469 if addr == dest_node_addr {
2470 direct_payload_eligible
2471 } else {
2472 peer.is_healthy()
2473 }
2474 };
2475
2476 let fallback_beats_direct = |node: &Self, fallback_addr: NodeAddr| {
2481 node.route_candidate_beats_direct(healthy_direct_route, fallback_addr)
2482 };
2483
2484 let sendable_learned_peers = if self.config.node.routing.mode == RoutingMode::ReplyLearned {
2485 Some(
2486 self.peers
2487 .iter()
2488 .filter(|(addr, peer)| payload_candidate_can_send(addr, peer))
2489 .map(|(addr, _)| *addr)
2490 .collect::<HashSet<_>>(),
2491 )
2492 } else {
2493 None
2494 };
2495
2496 let explore_fallback = sendable_learned_peers.as_ref().is_some_and(|sendable| {
2503 self.learned_routes.should_explore_fallback(
2504 dest_node_addr,
2505 now_ms,
2506 self.config.node.routing.learned_fallback_explore_interval,
2507 |addr| sendable.contains(addr),
2508 )
2509 });
2510 if let Some(sendable) = &sendable_learned_peers
2511 && !explore_fallback
2512 {
2513 let eligible = sendable
2514 .iter()
2515 .copied()
2516 .filter(|addr| fallback_beats_direct(self, *addr))
2517 .collect::<HashSet<_>>();
2518 if !eligible.is_empty()
2519 && let Some(next_hop_addr) =
2520 self.learned_routes
2521 .select_next_hop(dest_node_addr, now_ms, |addr| eligible.contains(addr))
2522 {
2523 return self.peers.get(&next_hop_addr);
2524 }
2525 }
2526
2527 let Some(dest_coords) = self
2529 .coord_cache
2530 .get_and_touch(dest_node_addr, now_ms)
2531 .cloned()
2532 else {
2533 if (healthy_direct_route.is_none() || explore_fallback)
2534 && let Some(sendable) = &sendable_learned_peers
2535 && let Some(next_hop_addr) =
2536 self.learned_routes
2537 .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2538 {
2539 return self.peers.get(&next_hop_addr);
2540 }
2541 if let Some(direct_addr) = healthy_direct_route {
2542 return self.peers.get(&direct_addr);
2543 }
2544 return None;
2545 };
2546
2547 let coordinate_route_addr = {
2550 let candidates: Vec<&ActivePeer> = self
2551 .peers
2552 .iter()
2553 .filter(|(addr, peer)| {
2554 payload_candidate_can_send(addr, peer) && peer.may_reach(dest_node_addr)
2555 })
2556 .map(|(_, peer)| peer)
2557 .collect();
2558 if !candidates.is_empty() {
2559 self.select_best_candidate(&candidates, &dest_coords)
2560 .map(|peer| *peer.node_addr())
2561 } else {
2562 None
2563 }
2564 };
2565 if let Some(next_hop_addr) = coordinate_route_addr
2566 && fallback_beats_direct(self, next_hop_addr)
2567 {
2568 return self.peers.get(&next_hop_addr);
2569 }
2570
2571 let tree_route_addr = self.select_tree_payload_candidate(
2573 &dest_coords,
2574 dest_node_addr,
2575 direct_payload_eligible,
2576 );
2577 if let Some(next_hop_addr) = tree_route_addr
2578 && fallback_beats_direct(self, next_hop_addr)
2579 {
2580 return self.peers.get(&next_hop_addr);
2581 }
2582
2583 if explore_fallback {
2584 return sendable_learned_peers.as_ref().and_then(|sendable| {
2585 self.learned_routes
2586 .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2587 .and_then(|next_hop_addr| self.peers.get(&next_hop_addr))
2588 });
2589 }
2590
2591 if let Some(direct_addr) = healthy_direct_route {
2592 return self.peers.get(&direct_addr);
2593 }
2594
2595 if let Some(sendable) = &sendable_learned_peers
2596 && let Some(next_hop_addr) =
2597 self.learned_routes
2598 .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2599 {
2600 return self.peers.get(&next_hop_addr);
2601 }
2602
2603 None
2604 }
2605
2606 pub(in crate::node) fn find_transit_next_hop(
2607 &mut self,
2608 dest_node_addr: &NodeAddr,
2609 previous_hop: &NodeAddr,
2610 ) -> Option<NodeAddr> {
2611 if dest_node_addr == self.node_addr() {
2612 return None;
2613 }
2614
2615 if dest_node_addr != previous_hop
2616 && self
2617 .peers
2618 .get(dest_node_addr)
2619 .is_some_and(|peer| peer.is_healthy())
2620 {
2621 return Some(*dest_node_addr);
2622 }
2623
2624 let next_hop_addr = *self.find_next_hop(dest_node_addr)?.node_addr();
2625 if &next_hop_addr == previous_hop {
2626 self.record_route_failure(*dest_node_addr, next_hop_addr);
2627 return None;
2628 }
2629 Some(next_hop_addr)
2630 }
2631
2632 fn route_candidate_beats_direct(
2633 &self,
2634 healthy_direct_route: Option<NodeAddr>,
2635 candidate_addr: NodeAddr,
2636 ) -> bool {
2637 let Some(direct_addr) = healthy_direct_route else {
2638 return true;
2639 };
2640 if candidate_addr == direct_addr {
2641 return false;
2642 }
2643
2644 let Some(direct) = self.peers.get(&direct_addr) else {
2645 return true;
2646 };
2647 let Some(candidate) = self.peers.get(&candidate_addr) else {
2648 return false;
2649 };
2650 if !candidate.is_healthy() {
2651 return false;
2652 }
2653
2654 let direct_cost = direct.link_cost();
2655 let candidate_cost = candidate.link_cost();
2656 candidate_cost + ROUTING_FALLBACK_MIN_COST_ADVANTAGE < direct_cost
2657 }
2658
2659 fn select_tree_payload_candidate(
2660 &self,
2661 dest_coords: &crate::tree::TreeCoordinate,
2662 direct_dest: &NodeAddr,
2663 direct_payload_eligible: bool,
2664 ) -> Option<NodeAddr> {
2665 if self.tree_state.my_coords().root_id() != dest_coords.root_id() {
2666 return None;
2667 }
2668
2669 let my_distance = self.tree_state.my_coords().distance_to(dest_coords);
2670 let mut best: Option<(NodeAddr, usize)> = None;
2671
2672 for (peer_addr, peer) in &self.peers {
2673 if peer_addr == direct_dest {
2674 if !direct_payload_eligible {
2675 continue;
2676 }
2677 } else if !peer.is_healthy() {
2678 continue;
2679 }
2680
2681 let Some(peer_coords) = self.tree_state.peer_coords(peer_addr) else {
2682 continue;
2683 };
2684 let distance = peer_coords.distance_to(dest_coords);
2685 if distance >= my_distance {
2686 continue;
2687 }
2688
2689 let dominated = match &best {
2690 None => true,
2691 Some((best_id, best_dist)) => {
2692 distance < *best_dist || (distance == *best_dist && peer_addr < best_id)
2693 }
2694 };
2695 if dominated {
2696 best = Some((*peer_addr, distance));
2697 }
2698 }
2699
2700 best.map(|(peer_addr, _)| peer_addr)
2701 }
2702
2703 pub(in crate::node) fn session_direct_path_is_degraded(
2704 &mut self,
2705 dest: &NodeAddr,
2706 now_ms: u64,
2707 ) -> bool {
2708 match self.session_direct_degraded_until_ms.get(dest).copied() {
2709 Some(until_ms) if until_ms > now_ms => true,
2710 Some(_) => {
2711 self.session_direct_degraded_until_ms.remove(dest);
2712 false
2713 }
2714 None => false,
2715 }
2716 }
2717
2718 pub(in crate::node) fn mark_session_direct_path_degraded(
2719 &mut self,
2720 dest: NodeAddr,
2721 now_ms: u64,
2722 ) -> bool {
2723 let until_ms = now_ms.saturating_add(SESSION_DIRECT_DEGRADED_HOLD_MS);
2724 let entry = self
2725 .session_direct_degraded_until_ms
2726 .entry(dest)
2727 .or_insert(0);
2728 let was_degraded = *entry > now_ms;
2729 *entry = (*entry).max(until_ms);
2730 !was_degraded
2731 }
2732
2733 pub(in crate::node) fn clear_session_direct_path_degraded(&mut self, dest: &NodeAddr) -> bool {
2734 self.session_direct_degraded_until_ms.remove(dest).is_some()
2735 }
2736
2737 pub(in crate::node) fn learn_reverse_route(
2738 &mut self,
2739 destination: NodeAddr,
2740 next_hop: NodeAddr,
2741 ) {
2742 if self.config.node.routing.mode != RoutingMode::ReplyLearned
2743 || destination == *self.node_addr()
2744 {
2745 return;
2746 }
2747 let now_ms = Self::now_ms();
2748 self.learned_routes.learn(
2749 destination,
2750 next_hop,
2751 now_ms,
2752 self.config.node.routing.learned_ttl_secs,
2753 self.config.node.routing.max_learned_routes_per_dest,
2754 );
2755 }
2756
2757 pub(in crate::node) fn record_route_failure(
2758 &mut self,
2759 destination: NodeAddr,
2760 next_hop: NodeAddr,
2761 ) {
2762 if self.config.node.routing.mode != RoutingMode::ReplyLearned {
2763 return;
2764 }
2765 self.learned_routes.record_failure(&destination, &next_hop);
2766 }
2767
2768 pub(crate) fn learned_route_table_snapshot(&self, now_ms: u64) -> LearnedRouteTableSnapshot {
2769 self.learned_routes.snapshot(now_ms)
2770 }
2771
2772 pub(in crate::node) fn purge_learned_routes(&mut self, now_ms: u64) {
2773 self.learned_routes.purge_expired(now_ms);
2774 }
2775
2776 fn select_best_candidate<'a>(
2785 &'a self,
2786 candidates: &[&'a ActivePeer],
2787 dest_coords: &crate::tree::TreeCoordinate,
2788 ) -> Option<&'a ActivePeer> {
2789 let my_distance = self.tree_state.my_coords().distance_to(dest_coords);
2790
2791 let mut best: Option<(&ActivePeer, f64, usize)> = None;
2792
2793 for &candidate in candidates {
2794 if !candidate.can_send() {
2795 continue;
2796 }
2797
2798 let cost = candidate.link_cost();
2799
2800 let dist = self
2801 .tree_state
2802 .peer_coords(candidate.node_addr())
2803 .map(|pc| pc.distance_to(dest_coords))
2804 .unwrap_or(usize::MAX);
2805
2806 if dist >= my_distance {
2809 continue;
2810 }
2811
2812 let dominated = match &best {
2813 None => true,
2814 Some((_, best_cost, best_dist)) => {
2815 cost < *best_cost
2816 || (cost == *best_cost && dist < *best_dist)
2817 || (cost == *best_cost
2818 && dist == *best_dist
2819 && candidate.node_addr() < best.as_ref().unwrap().0.node_addr())
2820 }
2821 };
2822
2823 if dominated {
2824 best = Some((candidate, cost, dist));
2825 }
2826 }
2827
2828 best.map(|(peer, _, _)| peer)
2829 }
2830
2831 pub fn destination_in_filters(&self, dest: &NodeAddr) -> Vec<&ActivePeer> {
2833 self.peers.values().filter(|p| p.may_reach(dest)).collect()
2834 }
2835
2836 pub fn tun_tx(&self) -> Option<&TunTx> {
2840 self.tun_tx.as_ref()
2841 }
2842
2843 pub fn attach_external_packet_io(
2850 &mut self,
2851 capacity: usize,
2852 ) -> Result<ExternalPacketIo, NodeError> {
2853 if self.state != NodeState::Created {
2854 return Err(NodeError::Config(ConfigError::Validation(
2855 "external packet I/O must be attached before node start".to_string(),
2856 )));
2857 }
2858 if self.config.tun.enabled {
2859 return Err(NodeError::Config(ConfigError::Validation(
2860 "external packet I/O requires tun.enabled=false".to_string(),
2861 )));
2862 }
2863
2864 let capacity = capacity.max(1);
2865 let (outbound_tx, outbound_rx) = tokio::sync::mpsc::channel(capacity);
2866 let (inbound_tx, inbound_rx) = tokio::sync::mpsc::channel(capacity);
2867 self.tun_outbound_rx = Some(outbound_rx);
2868 self.external_packet_tx = Some(inbound_tx);
2869
2870 Ok(ExternalPacketIo {
2871 outbound_tx,
2872 inbound_rx,
2873 })
2874 }
2875
2876 pub(crate) fn attach_endpoint_data_io(
2881 &mut self,
2882 capacity: usize,
2883 ) -> Result<EndpointDataIo, NodeError> {
2884 if self.state != NodeState::Created {
2885 return Err(NodeError::Config(ConfigError::Validation(
2886 "endpoint data I/O must be attached before node start".to_string(),
2887 )));
2888 }
2889
2890 let command_capacity = endpoint_data_command_capacity(capacity);
2891 let (command_tx, command_rx) = tokio::sync::mpsc::channel(command_capacity);
2892 let (event_tx, event_rx) = tokio::sync::mpsc::unbounded_channel();
2897 self.endpoint_command_rx = Some(command_rx);
2898 self.endpoint_event_tx = Some(event_tx.clone());
2899
2900 Ok(EndpointDataIo {
2901 command_tx,
2902 event_rx,
2903 event_tx,
2904 })
2905 }
2906
2907 pub(crate) fn pubkey_for_node_addr(&self, addr: &NodeAddr) -> Option<secp256k1::PublicKey> {
2908 let mut prefix = [0u8; 15];
2909 prefix.copy_from_slice(&addr.as_bytes()[0..15]);
2910 self.identity_cache
2911 .get(&prefix)
2912 .filter(|entry| &entry.node_addr == addr)
2913 .map(|entry| entry.pubkey)
2914 }
2915
2916 pub(crate) fn npub_for_node_addr(&self, addr: &NodeAddr) -> Option<String> {
2917 let mut prefix = [0u8; 15];
2918 prefix.copy_from_slice(&addr.as_bytes()[0..15]);
2919 self.identity_cache
2920 .get(&prefix)
2921 .filter(|entry| &entry.node_addr == addr)
2922 .map(|entry| entry.npub.clone())
2923 }
2924
2925 pub(in crate::node) fn deliver_external_ipv6_packet(
2926 &self,
2927 src_addr: &NodeAddr,
2928 packet: Vec<u8>,
2929 ) {
2930 let Some(external_packet_tx) = &self.external_packet_tx else {
2931 return;
2932 };
2933 if packet.len() < 40 {
2934 return;
2935 }
2936 let Ok(destination) = FipsAddress::from_slice(&packet[24..40]) else {
2937 return;
2938 };
2939 let delivered = NodeDeliveredPacket {
2940 source_node_addr: *src_addr,
2941 source_npub: self.npub_for_node_addr(src_addr),
2942 destination,
2943 packet,
2944 };
2945 if let Err(error) = external_packet_tx.try_send(delivered) {
2946 debug!(error = %error, "Failed to deliver packet to external app sink");
2947 }
2948 }
2949
2950 pub(super) async fn send_encrypted_link_message(
2964 &mut self,
2965 node_addr: &NodeAddr,
2966 plaintext: &[u8],
2967 ) -> Result<(), NodeError> {
2968 self.send_encrypted_link_message_with_ce(node_addr, plaintext, false)
2969 .await
2970 }
2971
2972 pub(in crate::node) fn note_local_send_outcome(
2978 &mut self,
2979 result: &Result<usize, TransportError>,
2980 ) {
2981 match result {
2982 Ok(_) => {
2983 if self.last_local_send_failure_at.is_some() {
2984 self.last_local_send_failure_at = None;
2985 }
2986 }
2987 Err(error) if error.is_local_route_unavailable() => {
2988 self.last_local_send_failure_at = Some(std::time::Instant::now());
2989 }
2990 Err(_) => {}
2991 }
2992 }
2993
2994 pub(in crate::node) fn local_send_failure_dead_timeout(
3000 &mut self,
3001 now: std::time::Instant,
3002 dead_timeout: std::time::Duration,
3003 fast_dead_timeout: std::time::Duration,
3004 ) -> std::time::Duration {
3005 match self.last_local_send_failure_at {
3006 Some(t) if now.duration_since(t) <= LOCAL_SEND_FAILURE_FAST_DEAD_WINDOW => {
3007 fast_dead_timeout.min(dead_timeout)
3008 }
3009 Some(_) => {
3010 self.last_local_send_failure_at = None;
3011 dead_timeout
3012 }
3013 None => dead_timeout,
3014 }
3015 }
3016
3017 pub(in crate::node) fn mark_rx_loop_maintenance_timeout(&mut self) {
3018 self.last_rx_loop_maintenance_timeout_at = Some(std::time::Instant::now());
3019 }
3020
3021 pub(in crate::node) fn rx_loop_maintenance_timed_out_recently(&self) -> bool {
3022 let Some(t) = self.last_rx_loop_maintenance_timeout_at else {
3023 return false;
3024 };
3025 let grace = std::time::Duration::from_secs(self.config.node.link_dead_timeout_secs.max(1));
3026 std::time::Instant::now().duration_since(t) <= grace
3027 }
3028
3029 pub(super) async fn send_encrypted_link_message_with_ce(
3033 &mut self,
3034 node_addr: &NodeAddr,
3035 plaintext: &[u8],
3036 ce_flag: bool,
3037 ) -> Result<(), NodeError> {
3038 let peer = self
3039 .peers
3040 .get_mut(node_addr)
3041 .ok_or(NodeError::PeerNotFound(*node_addr))?;
3042
3043 let their_index = peer.their_index().ok_or_else(|| NodeError::SendFailed {
3044 node_addr: *node_addr,
3045 reason: "no their_index".into(),
3046 })?;
3047 let transport_id = peer.transport_id().ok_or_else(|| NodeError::SendFailed {
3048 node_addr: *node_addr,
3049 reason: "no transport_id".into(),
3050 })?;
3051 let remote_addr = peer
3052 .current_addr()
3053 .cloned()
3054 .ok_or_else(|| NodeError::SendFailed {
3055 node_addr: *node_addr,
3056 reason: "no current_addr".into(),
3057 })?;
3058 #[cfg(any(target_os = "linux", target_os = "macos"))]
3059 let connected_socket = peer.connected_udp();
3060
3061 let timestamp_ms = peer.session_elapsed_ms();
3063
3064 let sp_flag = peer.mmp().map(|mmp| mmp.spin_bit.tx_bit()).unwrap_or(false);
3066 let mut flags = if sp_flag { FLAG_SP } else { 0 };
3067 if ce_flag {
3068 flags |= FLAG_CE;
3069 }
3070 if peer.current_k_bit() {
3071 flags |= FLAG_KEY_EPOCH;
3072 }
3073
3074 let session = peer
3075 .noise_session_mut()
3076 .ok_or_else(|| NodeError::SendFailed {
3077 node_addr: *node_addr,
3078 reason: "no noise session".into(),
3079 })?;
3080
3081 const INNER_TS_LEN: usize = 4;
3089 let counter = session.current_send_counter();
3090 let inner_len = INNER_TS_LEN + plaintext.len();
3091 let payload_len = inner_len as u16;
3092 let header = build_established_header(their_index, counter, flags, payload_len);
3093
3094 let transport_for_send = self
3113 .transports
3114 .get(&transport_id)
3115 .ok_or(NodeError::TransportNotFound(transport_id))?;
3116 match transport_for_send.connection_state(&remote_addr) {
3117 ConnectionState::Connected => {}
3118 other => {
3119 if matches!(other, ConnectionState::None) {
3120 let _ = transport_for_send.connect(&remote_addr).await;
3121 }
3122 return Err(NodeError::SendFailed {
3123 node_addr: *node_addr,
3124 reason: format!("transport connection not ready: {:?}", other),
3125 });
3126 }
3127 }
3128 let is_udp = matches!(transport_for_send, TransportHandle::Udp(_));
3129 if let Some(workers) = self.encrypt_workers.as_ref().cloned()
3130 && is_udp
3131 && let Some(cipher_clone) = session.send_cipher_clone()
3132 {
3133 {
3134 let reserved_counter =
3138 session
3139 .take_send_counter()
3140 .map_err(|e| NodeError::SendFailed {
3141 node_addr: *node_addr,
3142 reason: format!("counter reservation failed: {}", e),
3143 })?;
3144 debug_assert_eq!(reserved_counter, counter);
3145 let header =
3149 build_established_header(their_index, reserved_counter, flags, payload_len);
3150 let transport = transport_for_send;
3151 let send_target = {
3158 if let TransportHandle::Udp(udp) = transport {
3159 let socket_addr = {
3160 #[cfg(any(target_os = "linux", target_os = "macos"))]
3161 {
3162 match connected_socket.as_ref() {
3163 Some(socket) => Some(socket.peer_addr()),
3164 None => udp.resolve_for_off_task(&remote_addr).await.ok(),
3165 }
3166 }
3167 #[cfg(not(any(target_os = "linux", target_os = "macos")))]
3168 {
3169 udp.resolve_for_off_task(&remote_addr).await.ok()
3170 }
3171 };
3172 match (udp.async_socket(), socket_addr) {
3173 (Some(socket), Some(socket_addr)) => Some((socket, socket_addr)),
3174 _ => None,
3175 }
3176 } else {
3177 None
3178 }
3179 };
3180 if let Some((socket, socket_addr)) = send_target {
3181 let wire_capacity = ESTABLISHED_HEADER_SIZE + inner_len + 16;
3197 let mut wire_buf = Vec::with_capacity(wire_capacity);
3198 wire_buf.extend_from_slice(&header);
3199 wire_buf.extend_from_slice(×tamp_ms.to_le_bytes());
3200 wire_buf.extend_from_slice(plaintext);
3201 let predicted_bytes = wire_capacity;
3202 if let Some(peer) = self.peers.get_mut(node_addr) {
3209 peer.link_stats_mut().record_sent(predicted_bytes);
3210 if let Some(mmp) = peer.mmp_mut() {
3211 mmp.sender
3212 .record_sent(reserved_counter, timestamp_ms, predicted_bytes);
3213 }
3214 }
3215 let scheduling_weight = self.send_weight_for_peer(node_addr);
3216 workers.dispatch(self::encrypt_worker::FmpSendJob {
3217 cipher: cipher_clone,
3218 counter: reserved_counter,
3219 wire_buf,
3220 fsp_seal: None,
3221 socket,
3222 dest_addr: socket_addr,
3223 #[cfg(any(target_os = "linux", target_os = "macos"))]
3224 connected_socket,
3225 drop_on_backpressure: fmp_plaintext_is_bulk_session_datagram(plaintext),
3226 scheduling_weight,
3227 queued_at: crate::perf_profile::stamp(),
3228 });
3229 return Ok(());
3230 }
3231 }
3232 }
3233
3234 let inner_plaintext = prepend_inner_header(timestamp_ms, plaintext);
3239 let ciphertext = {
3241 let _t = crate::perf_profile::Timer::start(crate::perf_profile::Stage::FmpEncrypt);
3242 session
3243 .encrypt_with_aad(&inner_plaintext, &header)
3244 .map_err(|e| NodeError::SendFailed {
3245 node_addr: *node_addr,
3246 reason: format!("encryption failed: {}", e),
3247 })?
3248 };
3249
3250 let wire_packet = build_encrypted(&header, &ciphertext);
3251
3252 let send_result = {
3254 let _t = crate::perf_profile::Timer::start(crate::perf_profile::Stage::UdpSend);
3255 let transport = self
3256 .transports
3257 .get(&transport_id)
3258 .ok_or(NodeError::TransportNotFound(transport_id))?;
3259 transport.send(&remote_addr, &wire_packet).await
3260 };
3261 self.note_local_send_outcome(&send_result);
3262 let bytes_sent = send_result.map_err(|e| match e {
3263 TransportError::MtuExceeded { packet_size, mtu } => NodeError::MtuExceeded {
3264 node_addr: *node_addr,
3265 packet_size,
3266 mtu,
3267 },
3268 other => NodeError::SendFailed {
3269 node_addr: *node_addr,
3270 reason: format!("transport send: {}", other),
3271 },
3272 })?;
3273
3274 if let Some(peer) = self.peers.get_mut(node_addr) {
3276 peer.link_stats_mut().record_sent(bytes_sent);
3277 if let Some(mmp) = peer.mmp_mut() {
3279 mmp.sender.record_sent(counter, timestamp_ms, bytes_sent);
3280 }
3281 }
3282
3283 Ok(())
3284 }
3285}
3286
3287impl fmt::Debug for Node {
3288 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3289 f.debug_struct("Node")
3290 .field("node_addr", self.node_addr())
3291 .field("state", &self.state)
3292 .field("is_leaf_only", &self.is_leaf_only)
3293 .field("connections", &self.connection_count())
3294 .field("peers", &self.peer_count())
3295 .field("links", &self.link_count())
3296 .field("transports", &self.transport_count())
3297 .finish()
3298 }
3299}