1mod acl;
8mod bloom;
9mod decrypt_worker;
10mod discovery_rate_limit;
11mod encrypt_worker;
12mod handlers;
13mod lifecycle;
14mod rate_limit;
15mod retry;
16mod routing;
17mod routing_error_rate_limit;
18pub(crate) mod session;
19pub(crate) mod session_wire;
20pub(crate) mod stats;
21pub(crate) mod stats_history;
22#[cfg(test)]
23mod tests;
24mod tree;
25pub(crate) mod wire;
26
27use self::discovery_rate_limit::{DiscoveryBackoff, DiscoveryForwardRateLimiter};
28use self::rate_limit::HandshakeRateLimiter;
29use self::routing::{LearnedRouteTable, LearnedRouteTableSnapshot};
30use self::routing_error_rate_limit::RoutingErrorRateLimiter;
31use self::wire::{
32 ESTABLISHED_HEADER_SIZE, FLAG_CE, FLAG_KEY_EPOCH, FLAG_SP, build_encrypted,
33 build_established_header, prepend_inner_header,
34};
35use crate::bloom::BloomState;
36use crate::cache::CoordCache;
37use crate::config::{NostrDiscoveryPolicy, PeerConfig, RoutingMode};
38use crate::node::session::SessionEntry;
39use crate::node::session_wire::{FSP_PHASE_ESTABLISHED, FspCommonPrefix};
40use crate::peer::{ActivePeer, PeerConnection};
41#[cfg(any(target_os = "linux", target_os = "macos"))]
42use crate::transport::ethernet::EthernetTransport;
43use crate::transport::tcp::TcpTransport;
44use crate::transport::tor::TorTransport;
45use crate::transport::udp::UdpTransport;
46#[cfg(feature = "webrtc-transport")]
47use crate::transport::webrtc::WebRtcTransport;
48use crate::transport::{
49 ConnectionState, Link, LinkId, PacketRx, PacketTx, TransportAddr, TransportError,
50 TransportHandle, TransportId,
51};
52use crate::tree::TreeState;
53use crate::upper::hosts::HostMap;
54use crate::upper::icmp_rate_limit::IcmpRateLimiter;
55use crate::upper::tun::{TunError, TunOutboundRx, TunState, TunTx};
56use crate::utils::index::IndexAllocator;
57use crate::{
58 Config, ConfigError, FipsAddress, Identity, IdentityError, LinkMessageType, NodeAddr,
59 PeerIdentity, encode_npub,
60};
61use rand::Rng;
62use std::collections::{HashMap, HashSet, VecDeque};
63use std::fmt;
64use std::sync::Arc;
65use std::thread::JoinHandle;
66use thiserror::Error;
67use tracing::{debug, warn};
68
69const LOCAL_SEND_FAILURE_FAST_DEAD_WINDOW: std::time::Duration = std::time::Duration::from_secs(3);
70const SESSION_DIRECT_DEGRADED_HOLD_MS: u64 = 20_000;
71const SESSION_DIRECT_DEGRADED_MIN_SAMPLE: u64 = 8;
72const SESSION_DIRECT_DEGRADED_LOSS_THRESHOLD: f64 = 0.20;
73const SESSION_DIRECT_RECOVERY_LOSS_THRESHOLD: f64 = 0.05;
74
75fn fmp_plaintext_is_bulk_session_datagram(plaintext: &[u8]) -> bool {
76 if !plaintext
77 .first()
78 .is_some_and(|ty| *ty == LinkMessageType::SessionDatagram.to_byte())
79 {
80 return false;
81 }
82 let Some(fsp_payload) = plaintext.get(crate::protocol::SESSION_DATAGRAM_HEADER_SIZE..) else {
83 return false;
84 };
85 FspCommonPrefix::parse(fsp_payload)
86 .is_some_and(|prefix| prefix.phase == FSP_PHASE_ESTABLISHED && !prefix.is_unencrypted())
87}
88
89pub(crate) const REKEY_JITTER_SECS: i64 = 15;
96
97#[derive(Debug, Error)]
99pub enum NodeError {
100 #[error("node not started")]
101 NotStarted,
102
103 #[error("node already started")]
104 AlreadyStarted,
105
106 #[error("node already stopped")]
107 AlreadyStopped,
108
109 #[error("transport not found: {0}")]
110 TransportNotFound(TransportId),
111
112 #[error("no transport available for type: {0}")]
113 NoTransportForType(String),
114
115 #[error("link not found: {0}")]
116 LinkNotFound(LinkId),
117
118 #[error("connection not found: {0}")]
119 ConnectionNotFound(LinkId),
120
121 #[error("peer not found: {0:?}")]
122 PeerNotFound(NodeAddr),
123
124 #[error("peer already exists: {0:?}")]
125 PeerAlreadyExists(NodeAddr),
126
127 #[error("connection already exists for link: {0}")]
128 ConnectionAlreadyExists(LinkId),
129
130 #[error("invalid peer npub '{npub}': {reason}")]
131 InvalidPeerNpub { npub: String, reason: String },
132
133 #[error("discovery error: {0}")]
134 Discovery(String),
135
136 #[error("access denied: {0}")]
137 AccessDenied(String),
138
139 #[error("max connections exceeded: {max}")]
140 MaxConnectionsExceeded { max: usize },
141
142 #[error("max peers exceeded: {max}")]
143 MaxPeersExceeded { max: usize },
144
145 #[error("max links exceeded: {max}")]
146 MaxLinksExceeded { max: usize },
147
148 #[error("handshake incomplete for link {0}")]
149 HandshakeIncomplete(LinkId),
150
151 #[error("no session available for link {0}")]
152 NoSession(LinkId),
153
154 #[error("promotion failed for link {link_id}: {reason}")]
155 PromotionFailed { link_id: LinkId, reason: String },
156
157 #[error("send failed to {node_addr}: {reason}")]
158 SendFailed { node_addr: NodeAddr, reason: String },
159
160 #[error("mtu exceeded forwarding to {node_addr}: packet {packet_size} > mtu {mtu}")]
161 MtuExceeded {
162 node_addr: NodeAddr,
163 packet_size: usize,
164 mtu: u16,
165 },
166
167 #[error("config error: {0}")]
168 Config(#[from] ConfigError),
169
170 #[error("identity error: {0}")]
171 Identity(#[from] IdentityError),
172
173 #[error("TUN error: {0}")]
174 Tun(#[from] TunError),
175
176 #[error("index allocation failed: {0}")]
177 IndexAllocationFailed(String),
178
179 #[error("handshake failed: {0}")]
180 HandshakeFailed(String),
181
182 #[error("transport error: {0}")]
183 TransportError(String),
184
185 #[error("local route unavailable: {0}")]
186 LocalRouteUnavailable(String),
187
188 #[error("bootstrap handoff failed: {0}")]
189 BootstrapHandoff(String),
190}
191
192impl NodeError {
193 pub(in crate::node) fn from_transport_error(error: TransportError) -> Self {
194 if error.is_local_route_unavailable() {
195 Self::LocalRouteUnavailable(error.to_string())
196 } else {
197 Self::TransportError(error.to_string())
198 }
199 }
200
201 pub(in crate::node) fn is_local_route_unavailable(&self) -> bool {
202 matches!(self, Self::LocalRouteUnavailable(_))
203 }
204}
205
206#[derive(Debug, Clone, PartialEq, Eq)]
208pub struct NodeDeliveredPacket {
209 pub source_node_addr: NodeAddr,
211 pub source_npub: Option<String>,
213 pub destination: FipsAddress,
215 pub packet: Vec<u8>,
217}
218
219#[derive(Debug, Clone)]
220struct IdentityCacheEntry {
221 node_addr: NodeAddr,
222 pubkey: secp256k1::PublicKey,
223 npub: String,
224 last_seen_ms: u64,
225}
226
227impl IdentityCacheEntry {
228 fn new(
229 node_addr: NodeAddr,
230 pubkey: secp256k1::PublicKey,
231 npub: String,
232 last_seen_ms: u64,
233 ) -> Self {
234 Self {
235 node_addr,
236 pubkey,
237 npub,
238 last_seen_ms,
239 }
240 }
241}
242
243#[derive(Debug)]
245pub struct ExternalPacketIo {
246 pub outbound_tx: crate::upper::tun::TunOutboundTx,
248 pub inbound_rx: tokio::sync::mpsc::Receiver<NodeDeliveredPacket>,
250}
251
252#[derive(Debug)]
254pub(crate) struct EndpointDataIo {
255 pub(crate) command_tx: tokio::sync::mpsc::Sender<NodeEndpointCommand>,
264 pub(crate) event_rx: tokio::sync::mpsc::UnboundedReceiver<NodeEndpointEvent>,
274 pub(crate) event_tx: tokio::sync::mpsc::UnboundedSender<NodeEndpointEvent>,
280}
281
282fn endpoint_data_command_capacity(requested: usize) -> usize {
283 if let Ok(raw) = std::env::var("FIPS_ENDPOINT_DATA_QUEUE_CAP")
284 && let Ok(value) = raw.trim().parse::<usize>()
285 && value > 0
286 {
287 return value;
288 }
289
290 requested.max(1).max(32_768)
291}
292
293#[derive(Debug)]
295pub(crate) enum NodeEndpointCommand {
296 Send {
300 remote: PeerIdentity,
301 payload: Vec<u8>,
302 queued_at: Option<std::time::Instant>,
303 response_tx: tokio::sync::oneshot::Sender<Result<(), NodeError>>,
304 },
305 SendOneway {
311 remote: PeerIdentity,
312 payload: Vec<u8>,
313 queued_at: Option<std::time::Instant>,
314 },
315 PeerSnapshot {
316 response_tx: tokio::sync::oneshot::Sender<Vec<NodeEndpointPeer>>,
317 },
318 RelaySnapshot {
319 response_tx: tokio::sync::oneshot::Sender<Vec<NodeEndpointRelayStatus>>,
320 },
321 UpdateRelays {
322 advert_relays: Vec<String>,
323 dm_relays: Vec<String>,
324 response_tx: tokio::sync::oneshot::Sender<Result<(), NodeError>>,
325 },
326 UpdatePeers {
332 peers: Vec<crate::config::PeerConfig>,
333 response_tx: tokio::sync::oneshot::Sender<Result<UpdatePeersOutcome, NodeError>>,
334 },
335}
336
337#[derive(Debug, Clone, Default, PartialEq, Eq)]
339pub(crate) struct UpdatePeersOutcome {
340 pub(crate) added: usize,
341 pub(crate) removed: usize,
342 pub(crate) updated: usize,
343 pub(crate) unchanged: usize,
344}
345
346#[derive(Debug)]
348pub(crate) enum NodeEndpointEvent {
349 Data {
350 source_node_addr: NodeAddr,
351 source_npub: Option<String>,
352 payload: Vec<u8>,
353 queued_at: Option<std::time::Instant>,
354 },
355}
356
357#[derive(Debug, Clone, PartialEq, Eq)]
359pub(crate) struct NodeEndpointPeer {
360 pub(crate) npub: String,
361 pub(crate) connected: bool,
362 pub(crate) transport_addr: Option<String>,
363 pub(crate) transport_type: Option<String>,
364 pub(crate) link_id: u64,
365 pub(crate) srtt_ms: Option<u64>,
366 pub(crate) packets_sent: u64,
367 pub(crate) packets_recv: u64,
368 pub(crate) bytes_sent: u64,
369 pub(crate) bytes_recv: u64,
370 pub(crate) direct_probe_pending: bool,
371 pub(crate) direct_probe_after_ms: Option<u64>,
372}
373
374#[derive(Debug, Clone, PartialEq, Eq)]
376pub(crate) struct NodeEndpointRelayStatus {
377 pub(crate) url: String,
378 pub(crate) status: String,
379}
380
381#[derive(Clone, Copy, Debug, PartialEq, Eq)]
383pub enum NodeState {
384 Created,
386 Starting,
388 Running,
390 Stopping,
392 Stopped,
394}
395
396impl NodeState {
397 pub fn is_operational(&self) -> bool {
399 matches!(self, NodeState::Running)
400 }
401
402 pub fn can_start(&self) -> bool {
404 matches!(self, NodeState::Created | NodeState::Stopped)
405 }
406
407 pub fn can_stop(&self) -> bool {
409 matches!(self, NodeState::Running)
410 }
411}
412
413impl fmt::Display for NodeState {
414 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
415 let s = match self {
416 NodeState::Created => "created",
417 NodeState::Starting => "starting",
418 NodeState::Running => "running",
419 NodeState::Stopping => "stopping",
420 NodeState::Stopped => "stopped",
421 };
422 write!(f, "{}", s)
423 }
424}
425
426#[derive(Clone, Debug)]
433pub(crate) struct RecentRequest {
434 pub(crate) from_peer: NodeAddr,
436 pub(crate) timestamp_ms: u64,
438 pub(crate) response_forwarded: bool,
442}
443
444impl RecentRequest {
445 pub(crate) fn new(from_peer: NodeAddr, timestamp_ms: u64) -> Self {
446 Self {
447 from_peer,
448 timestamp_ms,
449 response_forwarded: false,
450 }
451 }
452
453 pub(crate) fn is_expired(&self, current_time_ms: u64, expiry_ms: u64) -> bool {
455 current_time_ms.saturating_sub(self.timestamp_ms) > expiry_ms
456 }
457}
458
459type AddrKey = (TransportId, TransportAddr);
461
462#[derive(Debug, Default)]
467struct TransportDropState {
468 prev_drops: u64,
470 dropping: bool,
472}
473
474struct PendingConnect {
480 link_id: LinkId,
482 transport_id: TransportId,
484 remote_addr: TransportAddr,
486 peer_identity: PeerIdentity,
488}
489
490pub struct Node {
504 identity: Identity,
507
508 startup_epoch: [u8; 8],
511
512 started_at: std::time::Instant,
514
515 config: Config,
518
519 state: NodeState,
522
523 is_leaf_only: bool,
525
526 tree_state: TreeState,
529
530 bloom_state: BloomState,
533
534 coord_cache: CoordCache,
537 learned_routes: LearnedRouteTable,
539 session_direct_degraded_until_ms: HashMap<NodeAddr, u64>,
542 recent_requests: HashMap<u64, RecentRequest>,
545 path_mtu_lookup: Arc<std::sync::RwLock<HashMap<crate::FipsAddress, u16>>>,
551
552 transports: HashMap<TransportId, TransportHandle>,
555 transport_drops: HashMap<TransportId, TransportDropState>,
557 links: HashMap<LinkId, Link>,
559 addr_to_link: HashMap<AddrKey, LinkId>,
561
562 packet_tx: Option<PacketTx>,
565 packet_rx: Option<PacketRx>,
567
568 connections: HashMap<LinkId, PeerConnection>,
572
573 peers: HashMap<NodeAddr, ActivePeer>,
577
578 sessions: HashMap<NodeAddr, SessionEntry>,
582
583 identity_cache: HashMap<[u8; 15], IdentityCacheEntry>,
587
588 pending_tun_packets: HashMap<NodeAddr, VecDeque<Vec<u8>>>,
592 pending_endpoint_data: HashMap<NodeAddr, VecDeque<Vec<u8>>>,
594 pending_lookups: HashMap<NodeAddr, handlers::discovery::PendingLookup>,
598
599 max_connections: usize,
602 max_peers: usize,
604 max_links: usize,
606
607 next_link_id: u64,
610 next_transport_id: u32,
612
613 stats: stats::NodeStats,
616
617 stats_history: stats_history::StatsHistory,
619
620 tun_state: TunState,
623 tun_name: Option<String>,
625 tun_tx: Option<TunTx>,
627 tun_outbound_rx: Option<TunOutboundRx>,
629 external_packet_tx: Option<tokio::sync::mpsc::Sender<NodeDeliveredPacket>>,
631 endpoint_command_rx: Option<tokio::sync::mpsc::Receiver<NodeEndpointCommand>>,
633 endpoint_event_tx: Option<tokio::sync::mpsc::UnboundedSender<NodeEndpointEvent>>,
635 encrypt_workers: Option<encrypt_worker::EncryptWorkerPool>,
641 decrypt_workers: Option<decrypt_worker::DecryptWorkerPool>,
644 decrypt_registered_sessions: std::collections::HashSet<(TransportId, u32)>,
653 decrypt_fallback_rx:
657 Option<tokio::sync::mpsc::UnboundedReceiver<decrypt_worker::DecryptWorkerEvent>>,
658 decrypt_fallback_tx: tokio::sync::mpsc::UnboundedSender<decrypt_worker::DecryptWorkerEvent>,
659 tun_reader_handle: Option<JoinHandle<()>>,
661 tun_writer_handle: Option<JoinHandle<()>>,
663 #[cfg(target_os = "macos")]
666 tun_shutdown_fd: Option<std::os::unix::io::RawFd>,
667
668 dns_identity_rx: Option<crate::upper::dns::DnsIdentityRx>,
671 dns_task: Option<tokio::task::JoinHandle<()>>,
673
674 index_allocator: IndexAllocator,
677 peers_by_index: HashMap<(TransportId, u32), NodeAddr>,
680 pending_outbound: HashMap<(TransportId, u32), LinkId>,
683
684 msg1_rate_limiter: HandshakeRateLimiter,
687 icmp_rate_limiter: IcmpRateLimiter,
689 routing_error_rate_limiter: RoutingErrorRateLimiter,
691 coords_response_rate_limiter: RoutingErrorRateLimiter,
693 discovery_backoff: DiscoveryBackoff,
695 discovery_forward_limiter: DiscoveryForwardRateLimiter,
697
698 pending_connects: Vec<PendingConnect>,
704
705 retry_pending: HashMap<NodeAddr, retry::RetryState>,
711
712 nostr_discovery: Option<Arc<crate::discovery::nostr::NostrDiscovery>>,
714 lan_discovery: Option<Arc<crate::discovery::lan::LanDiscovery>>,
719 local_instance_registry: Option<crate::discovery::local::LocalInstanceRegistry>,
723 local_instance_started_at_ms: Option<u64>,
724 last_local_instance_publish_ms: Option<u64>,
725 last_local_instance_scan_ms: Option<u64>,
726 nostr_discovery_started_at_ms: Option<u64>,
731 startup_open_discovery_sweep_done: bool,
735 bootstrap_transports: HashSet<TransportId>,
737 bootstrap_transport_npubs: HashMap<TransportId, String>,
744 discovery_fallback_transit_blocked_peers: HashSet<NodeAddr>,
747
748 last_parent_reeval: Option<crate::time::Instant>,
751
752 last_congestion_log: Option<std::time::Instant>,
755
756 estimated_mesh_size: Option<u64>,
759 last_mesh_size_log: Option<std::time::Instant>,
761
762 last_self_warn: Option<std::time::Instant>,
768
769 last_local_send_failure_at: Option<std::time::Instant>,
777 last_rx_loop_maintenance_timeout_at: Option<std::time::Instant>,
782
783 peer_aliases: HashMap<NodeAddr, String>,
787 configured_peer_send_weights: HashMap<NodeAddr, u8>,
790
791 peer_acl: acl::PeerAclReloader,
793
794 host_map: Arc<HostMap>,
798}
799
800impl Node {
801 pub fn new(config: Config) -> Result<Self, NodeError> {
803 config.validate()?;
804 let identity = config.create_identity()?;
805 let node_addr = *identity.node_addr();
806 let is_leaf_only = config.is_leaf_only();
807
808 let (decrypt_fallback_tx, decrypt_fallback_rx) = tokio::sync::mpsc::unbounded_channel();
809 let decrypt_fallback_rx = Some(decrypt_fallback_rx);
810
811 let mut startup_epoch = [0u8; 8];
812 rand::rng().fill_bytes(&mut startup_epoch);
813
814 let mut bloom_state = if is_leaf_only {
815 BloomState::leaf_only(node_addr)
816 } else {
817 BloomState::new(node_addr)
818 };
819 bloom_state.set_update_debounce_ms(config.node.bloom.update_debounce_ms);
820
821 let tun_state = if config.tun.enabled {
822 TunState::Configured
823 } else {
824 TunState::Disabled
825 };
826
827 let mut tree_state = TreeState::new(node_addr);
829 tree_state.set_parent_hysteresis(config.node.tree.parent_hysteresis);
830 tree_state.set_hold_down(config.node.tree.hold_down_secs);
831 tree_state.set_flap_dampening(
832 config.node.tree.flap_threshold,
833 config.node.tree.flap_window_secs,
834 config.node.tree.flap_dampening_secs,
835 );
836 tree_state
837 .sign_declaration(&identity)
838 .expect("signing own declaration should never fail");
839
840 let coord_cache = CoordCache::new(
841 config.node.cache.coord_size,
842 config.node.cache.coord_ttl_secs * 1000,
843 );
844 let rl = &config.node.rate_limit;
845 let msg1_rate_limiter = HandshakeRateLimiter::with_params(
846 rate_limit::TokenBucket::with_params(rl.handshake_burst, rl.handshake_rate),
847 config.node.limits.max_pending_inbound,
848 );
849
850 let max_connections = config.node.limits.max_connections;
851 let max_peers = config.node.limits.max_peers;
852 let max_links = config.node.limits.max_links;
853 let coords_response_interval_ms = config.node.session.coords_response_interval_ms;
854 let backoff_base_secs = config.node.discovery.backoff_base_secs;
855 let backoff_max_secs = config.node.discovery.backoff_max_secs;
856 let forward_min_interval_secs = config.node.discovery.forward_min_interval_secs;
857
858 let (host_map, peer_acl) = Self::host_map_and_peer_acl(&config);
859 let configured_peer_send_weights = Self::configured_peer_send_weights(&config);
860
861 Ok(Self {
862 identity,
863 startup_epoch,
864 started_at: std::time::Instant::now(),
865 config,
866 state: NodeState::Created,
867 is_leaf_only,
868 tree_state,
869 bloom_state,
870 coord_cache,
871 learned_routes: LearnedRouteTable::default(),
872 session_direct_degraded_until_ms: HashMap::new(),
873 recent_requests: HashMap::new(),
874 transports: HashMap::new(),
875 transport_drops: HashMap::new(),
876 links: HashMap::new(),
877 addr_to_link: HashMap::new(),
878 packet_tx: None,
879 packet_rx: None,
880 connections: HashMap::new(),
881 peers: HashMap::new(),
882 sessions: HashMap::new(),
883 identity_cache: HashMap::new(),
884 pending_tun_packets: HashMap::new(),
885 pending_endpoint_data: HashMap::new(),
886 pending_lookups: HashMap::new(),
887 max_connections,
888 max_peers,
889 max_links,
890 next_link_id: 1,
891 next_transport_id: 1,
892 stats: stats::NodeStats::new(),
893 stats_history: stats_history::StatsHistory::new(),
894 tun_state,
895 tun_name: None,
896 tun_tx: None,
897 tun_outbound_rx: None,
898 external_packet_tx: None,
899 endpoint_command_rx: None,
900 endpoint_event_tx: None,
901 encrypt_workers: None,
902 decrypt_workers: None,
903 decrypt_registered_sessions: std::collections::HashSet::new(),
904 decrypt_fallback_tx,
905 decrypt_fallback_rx,
906 tun_reader_handle: None,
907 tun_writer_handle: None,
908 #[cfg(target_os = "macos")]
909 tun_shutdown_fd: None,
910 dns_identity_rx: None,
911 dns_task: None,
912 index_allocator: IndexAllocator::new(),
913 peers_by_index: HashMap::new(),
914 pending_outbound: HashMap::new(),
915 msg1_rate_limiter,
916 icmp_rate_limiter: IcmpRateLimiter::new(),
917 routing_error_rate_limiter: RoutingErrorRateLimiter::new(),
918 coords_response_rate_limiter: RoutingErrorRateLimiter::with_interval(
919 std::time::Duration::from_millis(coords_response_interval_ms),
920 ),
921 discovery_backoff: DiscoveryBackoff::with_params(backoff_base_secs, backoff_max_secs),
922 discovery_forward_limiter: DiscoveryForwardRateLimiter::with_interval(
923 std::time::Duration::from_secs(forward_min_interval_secs),
924 ),
925 pending_connects: Vec::new(),
926 retry_pending: HashMap::new(),
927 nostr_discovery: None,
928 nostr_discovery_started_at_ms: None,
929 lan_discovery: None,
930 local_instance_registry: None,
931 local_instance_started_at_ms: None,
932 last_local_instance_publish_ms: None,
933 last_local_instance_scan_ms: None,
934 startup_open_discovery_sweep_done: false,
935 bootstrap_transports: HashSet::new(),
936 bootstrap_transport_npubs: HashMap::new(),
937 discovery_fallback_transit_blocked_peers: HashSet::new(),
938 last_parent_reeval: None,
939 last_congestion_log: None,
940 estimated_mesh_size: None,
941 last_mesh_size_log: None,
942 last_self_warn: None,
943 last_local_send_failure_at: None,
944 last_rx_loop_maintenance_timeout_at: None,
945 peer_aliases: HashMap::new(),
946 configured_peer_send_weights,
947 peer_acl,
948 host_map,
949 path_mtu_lookup: Arc::new(std::sync::RwLock::new(HashMap::new())),
950 })
951 }
952
953 pub fn with_identity(identity: Identity, config: Config) -> Result<Self, NodeError> {
958 config.validate()?;
959 let node_addr = *identity.node_addr();
960
961 let (decrypt_fallback_tx, decrypt_fallback_rx) = tokio::sync::mpsc::unbounded_channel();
962 let decrypt_fallback_rx = Some(decrypt_fallback_rx);
963
964 let mut startup_epoch = [0u8; 8];
965 rand::rng().fill_bytes(&mut startup_epoch);
966
967 let tun_state = if config.tun.enabled {
968 TunState::Configured
969 } else {
970 TunState::Disabled
971 };
972
973 let mut tree_state = TreeState::new(node_addr);
975 tree_state.set_parent_hysteresis(config.node.tree.parent_hysteresis);
976 tree_state.set_hold_down(config.node.tree.hold_down_secs);
977 tree_state.set_flap_dampening(
978 config.node.tree.flap_threshold,
979 config.node.tree.flap_window_secs,
980 config.node.tree.flap_dampening_secs,
981 );
982 tree_state
983 .sign_declaration(&identity)
984 .expect("signing own declaration should never fail");
985
986 let mut bloom_state = BloomState::new(node_addr);
987 bloom_state.set_update_debounce_ms(config.node.bloom.update_debounce_ms);
988
989 let coord_cache = CoordCache::new(
990 config.node.cache.coord_size,
991 config.node.cache.coord_ttl_secs * 1000,
992 );
993 let rl = &config.node.rate_limit;
994 let msg1_rate_limiter = HandshakeRateLimiter::with_params(
995 rate_limit::TokenBucket::with_params(rl.handshake_burst, rl.handshake_rate),
996 config.node.limits.max_pending_inbound,
997 );
998
999 let max_connections = config.node.limits.max_connections;
1000 let max_peers = config.node.limits.max_peers;
1001 let max_links = config.node.limits.max_links;
1002 let coords_response_interval_ms = config.node.session.coords_response_interval_ms;
1003
1004 let (host_map, peer_acl) = Self::host_map_and_peer_acl(&config);
1005 let configured_peer_send_weights = Self::configured_peer_send_weights(&config);
1006
1007 Ok(Self {
1008 identity,
1009 startup_epoch,
1010 started_at: std::time::Instant::now(),
1011 config,
1012 state: NodeState::Created,
1013 is_leaf_only: false,
1014 tree_state,
1015 bloom_state,
1016 coord_cache,
1017 learned_routes: LearnedRouteTable::default(),
1018 session_direct_degraded_until_ms: HashMap::new(),
1019 recent_requests: HashMap::new(),
1020 transports: HashMap::new(),
1021 transport_drops: HashMap::new(),
1022 links: HashMap::new(),
1023 addr_to_link: HashMap::new(),
1024 packet_tx: None,
1025 packet_rx: None,
1026 connections: HashMap::new(),
1027 peers: HashMap::new(),
1028 sessions: HashMap::new(),
1029 identity_cache: HashMap::new(),
1030 pending_tun_packets: HashMap::new(),
1031 pending_endpoint_data: HashMap::new(),
1032 pending_lookups: HashMap::new(),
1033 max_connections,
1034 max_peers,
1035 max_links,
1036 next_link_id: 1,
1037 next_transport_id: 1,
1038 stats: stats::NodeStats::new(),
1039 stats_history: stats_history::StatsHistory::new(),
1040 tun_state,
1041 tun_name: None,
1042 tun_tx: None,
1043 tun_outbound_rx: None,
1044 external_packet_tx: None,
1045 endpoint_command_rx: None,
1046 endpoint_event_tx: None,
1047 encrypt_workers: None,
1048 decrypt_workers: None,
1049 decrypt_registered_sessions: std::collections::HashSet::new(),
1050 decrypt_fallback_tx,
1051 decrypt_fallback_rx,
1052 tun_reader_handle: None,
1053 tun_writer_handle: None,
1054 #[cfg(target_os = "macos")]
1055 tun_shutdown_fd: None,
1056 dns_identity_rx: None,
1057 dns_task: None,
1058 index_allocator: IndexAllocator::new(),
1059 peers_by_index: HashMap::new(),
1060 pending_outbound: HashMap::new(),
1061 msg1_rate_limiter,
1062 icmp_rate_limiter: IcmpRateLimiter::new(),
1063 routing_error_rate_limiter: RoutingErrorRateLimiter::new(),
1064 coords_response_rate_limiter: RoutingErrorRateLimiter::with_interval(
1065 std::time::Duration::from_millis(coords_response_interval_ms),
1066 ),
1067 discovery_backoff: DiscoveryBackoff::new(),
1068 discovery_forward_limiter: DiscoveryForwardRateLimiter::new(),
1069 pending_connects: Vec::new(),
1070 retry_pending: HashMap::new(),
1071 nostr_discovery: None,
1072 nostr_discovery_started_at_ms: None,
1073 lan_discovery: None,
1074 local_instance_registry: None,
1075 local_instance_started_at_ms: None,
1076 last_local_instance_publish_ms: None,
1077 last_local_instance_scan_ms: None,
1078 startup_open_discovery_sweep_done: false,
1079 bootstrap_transports: HashSet::new(),
1080 bootstrap_transport_npubs: HashMap::new(),
1081 discovery_fallback_transit_blocked_peers: HashSet::new(),
1082 last_parent_reeval: None,
1083 last_congestion_log: None,
1084 estimated_mesh_size: None,
1085 last_mesh_size_log: None,
1086 last_self_warn: None,
1087 last_local_send_failure_at: None,
1088 last_rx_loop_maintenance_timeout_at: None,
1089 peer_aliases: HashMap::new(),
1090 configured_peer_send_weights,
1091 peer_acl,
1092 host_map,
1093 path_mtu_lookup: Arc::new(std::sync::RwLock::new(HashMap::new())),
1094 })
1095 }
1096
1097 pub fn leaf_only(config: Config) -> Result<Self, NodeError> {
1099 let mut node = Self::new(config)?;
1100 node.is_leaf_only = true;
1101 node.bloom_state = BloomState::leaf_only(*node.identity.node_addr());
1102 Ok(node)
1103 }
1104
1105 fn host_map_and_peer_acl(config: &Config) -> (Arc<HostMap>, acl::PeerAclReloader) {
1106 let base_host_map = HostMap::from_peer_configs(config.peers());
1107 if !config.node.system_files_enabled {
1108 return (
1109 Arc::new(base_host_map.clone()),
1110 acl::PeerAclReloader::memory_only(base_host_map),
1111 );
1112 }
1113
1114 let mut host_map = base_host_map.clone();
1115 let hosts_path = std::path::PathBuf::from(crate::upper::hosts::DEFAULT_HOSTS_PATH);
1116 let hosts_file = HostMap::load_hosts_file(std::path::Path::new(
1117 crate::upper::hosts::DEFAULT_HOSTS_PATH,
1118 ));
1119 host_map.merge(hosts_file);
1120 let peer_acl = acl::PeerAclReloader::with_alias_sources(
1121 std::path::PathBuf::from(acl::DEFAULT_PEERS_ALLOW_PATH),
1122 std::path::PathBuf::from(acl::DEFAULT_PEERS_DENY_PATH),
1123 base_host_map,
1124 hosts_path,
1125 );
1126 (Arc::new(host_map), peer_acl)
1127 }
1128
1129 fn configured_peer_send_weights(config: &Config) -> HashMap<NodeAddr, u8> {
1130 config
1131 .peers()
1132 .iter()
1133 .filter_map(|peer| {
1134 PeerIdentity::from_npub(&peer.npub).ok().map(|identity| {
1135 (
1136 *identity.node_addr(),
1137 encrypt_worker::EXPLICIT_PEER_SEND_WEIGHT,
1138 )
1139 })
1140 })
1141 .collect()
1142 }
1143
1144 fn send_weight_for_peer(&self, peer_addr: &NodeAddr) -> u8 {
1145 self.configured_peer_send_weights
1146 .get(peer_addr)
1147 .copied()
1148 .unwrap_or(encrypt_worker::DEFAULT_SEND_WEIGHT)
1149 }
1150
1151 async fn create_transports(&mut self, packet_tx: &PacketTx) -> Vec<TransportHandle> {
1155 let mut transports = Vec::new();
1156
1157 let udp_instances: Vec<_> = self
1159 .config
1160 .transports
1161 .udp
1162 .iter()
1163 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1164 .collect();
1165
1166 for (name, udp_config) in udp_instances {
1168 let transport_id = self.allocate_transport_id();
1169 let udp = UdpTransport::new(transport_id, name, udp_config, packet_tx.clone());
1170 transports.push(TransportHandle::Udp(udp));
1171 }
1172
1173 #[cfg(feature = "sim-transport")]
1174 {
1175 let sim_instances: Vec<_> = self
1176 .config
1177 .transports
1178 .sim
1179 .iter()
1180 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1181 .collect();
1182
1183 for (name, sim_config) in sim_instances {
1184 let transport_id = self.allocate_transport_id();
1185 let sim = crate::transport::sim::SimTransport::new(
1186 transport_id,
1187 name,
1188 sim_config,
1189 packet_tx.clone(),
1190 );
1191 transports.push(TransportHandle::Sim(sim));
1192 }
1193 }
1194
1195 #[cfg(any(target_os = "linux", target_os = "macos"))]
1197 {
1198 let eth_instances: Vec<_> = self
1199 .config
1200 .transports
1201 .ethernet
1202 .iter()
1203 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1204 .collect();
1205 let xonly = self.identity.pubkey();
1206 for (name, eth_config) in eth_instances {
1207 let mut eth_config = eth_config;
1208 if eth_config.discovery_scope.is_none() {
1209 eth_config.discovery_scope = self.lan_discovery_scope();
1210 }
1211 let transport_id = self.allocate_transport_id();
1212 let mut eth =
1213 EthernetTransport::new(transport_id, name, eth_config, packet_tx.clone());
1214 eth.set_local_pubkey(xonly);
1215 transports.push(TransportHandle::Ethernet(eth));
1216 }
1217 }
1218
1219 let tcp_instances: Vec<_> = self
1221 .config
1222 .transports
1223 .tcp
1224 .iter()
1225 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1226 .collect();
1227
1228 for (name, tcp_config) in tcp_instances {
1229 let transport_id = self.allocate_transport_id();
1230 let tcp = TcpTransport::new(transport_id, name, tcp_config, packet_tx.clone());
1231 transports.push(TransportHandle::Tcp(tcp));
1232 }
1233
1234 let tor_instances: Vec<_> = self
1236 .config
1237 .transports
1238 .tor
1239 .iter()
1240 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1241 .collect();
1242
1243 for (name, tor_config) in tor_instances {
1244 let transport_id = self.allocate_transport_id();
1245 let tor = TorTransport::new(transport_id, name, tor_config, packet_tx.clone());
1246 transports.push(TransportHandle::Tor(tor));
1247 }
1248
1249 let webrtc_instances: Vec<_> = self
1250 .config
1251 .transports
1252 .webrtc
1253 .iter()
1254 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1255 .collect();
1256
1257 #[cfg(feature = "webrtc-transport")]
1258 {
1259 for (name, webrtc_config) in webrtc_instances {
1260 let transport_id = self.allocate_transport_id();
1261 match WebRtcTransport::new(
1262 transport_id,
1263 name,
1264 webrtc_config,
1265 packet_tx.clone(),
1266 &self.identity,
1267 &self.config.node.discovery.nostr,
1268 ) {
1269 Ok(webrtc) => transports.push(TransportHandle::WebRtc(Box::new(webrtc))),
1270 Err(err) => {
1271 warn!(
1272 transport_id = %transport_id,
1273 error = %err,
1274 "failed to initialize WebRTC transport"
1275 );
1276 }
1277 }
1278 }
1279 }
1280 #[cfg(not(feature = "webrtc-transport"))]
1281 if !webrtc_instances.is_empty() {
1282 warn!("WebRTC transport configured but this build lacks WebRTC transport support");
1283 }
1284
1285 #[cfg(bluer_available)]
1287 {
1288 let ble_instances: Vec<_> = self
1289 .config
1290 .transports
1291 .ble
1292 .iter()
1293 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1294 .collect();
1295
1296 #[cfg(all(bluer_available, not(test)))]
1297 for (name, ble_config) in ble_instances {
1298 let transport_id = self.allocate_transport_id();
1299 let adapter = ble_config.adapter().to_string();
1300 let mtu = ble_config.mtu();
1301 match crate::transport::ble::io::BluerIo::new(&adapter, mtu).await {
1302 Ok(io) => {
1303 let mut ble = crate::transport::ble::BleTransport::new(
1304 transport_id,
1305 name,
1306 ble_config,
1307 io,
1308 packet_tx.clone(),
1309 );
1310 ble.set_local_pubkey(self.identity.pubkey().serialize());
1311 transports.push(TransportHandle::Ble(ble));
1312 }
1313 Err(e) => {
1314 tracing::warn!(adapter = %adapter, error = %e, "failed to initialize BLE adapter");
1315 }
1316 }
1317 }
1318
1319 #[cfg(any(not(bluer_available), test))]
1320 if !ble_instances.is_empty() {
1321 #[cfg(not(test))]
1322 tracing::warn!("BLE transport configured but this build lacks BlueZ support");
1323 }
1324 }
1325
1326 transports
1327 }
1328
1329 fn find_transport_for_type(&self, transport_type: &str) -> Option<TransportId> {
1339 self.transports
1340 .iter()
1341 .filter(|(id, handle)| {
1342 handle.transport_type().name == transport_type
1343 && handle.is_operational()
1344 && !self.bootstrap_transports.contains(id)
1345 })
1346 .min_by_key(|(id, _)| id.as_u32())
1347 .map(|(id, _)| *id)
1348 }
1349
1350 #[allow(unused_variables)]
1356 fn resolve_ethernet_addr(
1357 &self,
1358 addr_str: &str,
1359 ) -> Result<(TransportId, TransportAddr), NodeError> {
1360 #[cfg(any(target_os = "linux", target_os = "macos"))]
1361 {
1362 let (iface, mac_str) = addr_str.split_once('/').ok_or_else(|| {
1363 NodeError::NoTransportForType(format!(
1364 "invalid Ethernet address format '{}': expected 'interface/mac'",
1365 addr_str
1366 ))
1367 })?;
1368
1369 let transport_id = self
1371 .transports
1372 .iter()
1373 .find(|(_, handle)| {
1374 handle.transport_type().name == "ethernet"
1375 && handle.is_operational()
1376 && handle.interface_name() == Some(iface)
1377 })
1378 .map(|(id, _)| *id)
1379 .ok_or_else(|| {
1380 NodeError::NoTransportForType(format!(
1381 "no operational Ethernet transport for interface '{}'",
1382 iface
1383 ))
1384 })?;
1385
1386 let mac = crate::transport::ethernet::parse_mac_string(mac_str).map_err(|e| {
1387 NodeError::NoTransportForType(format!("invalid MAC in '{}': {}", addr_str, e))
1388 })?;
1389
1390 Ok((transport_id, TransportAddr::from_bytes(&mac)))
1391 }
1392 #[cfg(not(any(target_os = "linux", target_os = "macos")))]
1393 {
1394 Err(NodeError::NoTransportForType(
1395 "Ethernet transport is not supported on this platform".to_string(),
1396 ))
1397 }
1398 }
1399
1400 #[cfg(bluer_available)]
1404 fn resolve_ble_addr(&self, addr_str: &str) -> Result<(TransportId, TransportAddr), NodeError> {
1405 let ta = TransportAddr::from_string(addr_str);
1406 let adapter = crate::transport::ble::addr::adapter_from_addr(&ta).ok_or_else(|| {
1407 NodeError::NoTransportForType(format!(
1408 "invalid BLE address format '{}': expected 'adapter/mac'",
1409 addr_str
1410 ))
1411 })?;
1412
1413 let transport_id = self
1415 .transports
1416 .iter()
1417 .find(|(_, handle)| handle.transport_type().name == "ble" && handle.is_operational())
1418 .map(|(id, _)| *id)
1419 .ok_or_else(|| {
1420 NodeError::NoTransportForType(format!(
1421 "no operational BLE transport for adapter '{}'",
1422 adapter
1423 ))
1424 })?;
1425
1426 crate::transport::ble::addr::BleAddr::parse(addr_str).map_err(|e| {
1428 NodeError::NoTransportForType(format!("invalid BLE address '{}': {}", addr_str, e))
1429 })?;
1430
1431 Ok((transport_id, TransportAddr::from_string(addr_str)))
1432 }
1433
1434 pub fn identity(&self) -> &Identity {
1438 &self.identity
1439 }
1440
1441 pub fn node_addr(&self) -> &NodeAddr {
1443 self.identity.node_addr()
1444 }
1445
1446 pub fn npub(&self) -> String {
1448 self.identity.npub()
1449 }
1450
1451 pub(crate) fn peer_display_name(&self, addr: &NodeAddr) -> String {
1460 if let Some(hostname) = self.host_map.lookup_hostname(addr) {
1461 return hostname.to_string();
1462 }
1463 if let Some(name) = self.peer_aliases.get(addr) {
1464 return name.clone();
1465 }
1466 if let Some(peer) = self.peers.get(addr) {
1467 return peer.identity().short_npub();
1468 }
1469 if let Some(entry) = self.sessions.get(addr) {
1470 let (xonly, _) = entry.remote_pubkey().x_only_public_key();
1471 return PeerIdentity::from_pubkey(xonly).short_npub();
1472 }
1473 addr.short_hex()
1474 }
1475
1476 pub(in crate::node) fn deregister_session_index(&mut self, cache_key: (TransportId, u32)) {
1488 let owning_peer = self.peers_by_index.get(&cache_key).copied();
1492 self.peers_by_index.remove(&cache_key);
1493 if self.decrypt_registered_sessions.remove(&cache_key)
1494 && let Some(workers) = self.decrypt_workers.as_ref()
1495 {
1496 workers.unregister_session(cache_key);
1497 }
1498 if let Some(peer_addr) = owning_peer {
1509 let peer_has_other_index = self
1510 .peers_by_index
1511 .values()
1512 .any(|other| *other == peer_addr);
1513 if !peer_has_other_index {
1514 self.clear_connected_udp_for_peer(&peer_addr);
1515 }
1516 }
1517 }
1518
1519 pub(in crate::node) fn ensure_current_session_index_registered(
1528 &mut self,
1529 node_addr: &NodeAddr,
1530 context: &'static str,
1531 ) -> bool {
1532 let Some(peer) = self.peers.get(node_addr) else {
1533 return false;
1534 };
1535 let Some(transport_id) = peer.transport_id() else {
1536 warn!(
1537 peer = %self.peer_display_name(node_addr),
1538 context,
1539 "Cannot register current session index without transport id"
1540 );
1541 return false;
1542 };
1543 let Some(our_index) = peer.our_index() else {
1544 warn!(
1545 peer = %self.peer_display_name(node_addr),
1546 context,
1547 "Cannot register current session index without local index"
1548 );
1549 return false;
1550 };
1551
1552 let cache_key = (transport_id, our_index.as_u32());
1553 match self.peers_by_index.get(&cache_key).copied() {
1554 Some(existing) if existing == *node_addr => true,
1555 Some(existing) => {
1556 warn!(
1557 peer = %self.peer_display_name(node_addr),
1558 previous_owner = %self.peer_display_name(&existing),
1559 transport_id = %transport_id,
1560 our_index = %our_index,
1561 context,
1562 "Repairing current session index with stale owner"
1563 );
1564 self.peers_by_index.insert(cache_key, *node_addr);
1565 true
1566 }
1567 None => {
1568 warn!(
1569 peer = %self.peer_display_name(node_addr),
1570 transport_id = %transport_id,
1571 our_index = %our_index,
1572 context,
1573 "Repairing missing current session index"
1574 );
1575 self.peers_by_index.insert(cache_key, *node_addr);
1576 true
1577 }
1578 }
1579 }
1580
1581 pub fn config(&self) -> &Config {
1585 &self.config
1586 }
1587
1588 pub fn effective_ipv6_mtu(&self) -> u16 {
1594 crate::upper::icmp::effective_ipv6_mtu(self.transport_mtu())
1595 }
1596
1597 pub fn transport_mtu(&self) -> u16 {
1614 let min_operational = self
1615 .transports
1616 .values()
1617 .filter(|h| h.is_operational())
1618 .map(|h| h.mtu())
1619 .min();
1620 if let Some(mtu) = min_operational {
1621 return mtu;
1622 }
1623 if let Some((_, cfg)) = self.config.transports.udp.iter().next() {
1625 return cfg.mtu();
1626 }
1627 1280
1628 }
1629
1630 pub fn state(&self) -> NodeState {
1634 self.state
1635 }
1636
1637 pub fn uptime(&self) -> std::time::Duration {
1639 self.started_at.elapsed()
1640 }
1641
1642 pub fn is_running(&self) -> bool {
1644 self.state.is_operational()
1645 }
1646
1647 pub fn is_leaf_only(&self) -> bool {
1649 self.is_leaf_only
1650 }
1651
1652 pub fn tree_state(&self) -> &TreeState {
1656 &self.tree_state
1657 }
1658
1659 pub fn tree_state_mut(&mut self) -> &mut TreeState {
1661 &mut self.tree_state
1662 }
1663
1664 pub fn bloom_state(&self) -> &BloomState {
1668 &self.bloom_state
1669 }
1670
1671 pub fn bloom_state_mut(&mut self) -> &mut BloomState {
1673 &mut self.bloom_state
1674 }
1675
1676 pub fn estimated_mesh_size(&self) -> Option<u64> {
1680 self.estimated_mesh_size
1681 }
1682
1683 pub(crate) fn compute_mesh_size(&mut self) {
1689 let my_addr = *self.tree_state.my_node_addr();
1690 let parent_id = *self.tree_state.my_declaration().parent_id();
1691 let is_root = self.tree_state.is_root();
1692
1693 let max_fpr = self.config.node.bloom.max_inbound_fpr;
1694 let mut total: f64 = 1.0; let mut child_count: u32 = 0;
1696 let mut has_data = false;
1697
1698 if !is_root
1704 && let Some(parent) = self.peers.get(&parent_id)
1705 && let Some(filter) = parent.inbound_filter()
1706 {
1707 match filter.estimated_count(max_fpr) {
1708 Some(n) => {
1709 total += n;
1710 has_data = true;
1711 }
1712 None => {
1713 self.estimated_mesh_size = None;
1714 return;
1715 }
1716 }
1717 }
1718
1719 for (peer_addr, peer) in &self.peers {
1721 if peer_addr == &parent_id {
1722 continue;
1723 }
1724 if let Some(decl) = self.tree_state.peer_declaration(peer_addr)
1725 && *decl.parent_id() == my_addr
1726 {
1727 child_count += 1;
1728 if let Some(filter) = peer.inbound_filter() {
1729 match filter.estimated_count(max_fpr) {
1730 Some(n) => {
1731 total += n;
1732 has_data = true;
1733 }
1734 None => {
1735 self.estimated_mesh_size = None;
1736 return;
1737 }
1738 }
1739 }
1740 }
1741 }
1742
1743 if !has_data {
1744 self.estimated_mesh_size = None;
1745 return;
1746 }
1747
1748 let size = total.round() as u64;
1749 self.estimated_mesh_size = Some(size);
1750
1751 let now = std::time::Instant::now();
1753 let should_log = match self.last_mesh_size_log {
1754 None => true,
1755 Some(last) => {
1756 now.duration_since(last)
1757 >= std::time::Duration::from_secs(self.config.node.mmp.log_interval_secs)
1758 }
1759 };
1760 if should_log {
1761 tracing::debug!(
1762 estimated_mesh_size = size,
1763 peers = self.peers.len(),
1764 children = child_count,
1765 "Mesh size estimate"
1766 );
1767 self.last_mesh_size_log = Some(now);
1768 }
1769 }
1770
1771 pub fn coord_cache(&self) -> &CoordCache {
1775 &self.coord_cache
1776 }
1777
1778 pub fn coord_cache_mut(&mut self) -> &mut CoordCache {
1780 &mut self.coord_cache
1781 }
1782
1783 pub fn stats(&self) -> &stats::NodeStats {
1787 &self.stats
1788 }
1789
1790 pub(crate) fn stats_mut(&mut self) -> &mut stats::NodeStats {
1792 &mut self.stats
1793 }
1794
1795 pub fn stats_history(&self) -> &stats_history::StatsHistory {
1797 &self.stats_history
1798 }
1799
1800 pub(crate) fn record_stats_history(&mut self) {
1803 let fwd = &self.stats.forwarding;
1804 let peers_with_mmp: Vec<f64> = self
1805 .peers
1806 .values()
1807 .filter_map(|p| p.mmp().map(|m| m.metrics.loss_rate()))
1808 .collect();
1809 let loss_rate = if peers_with_mmp.is_empty() {
1810 0.0
1811 } else {
1812 peers_with_mmp.iter().sum::<f64>() / peers_with_mmp.len() as f64
1813 };
1814
1815 let snap = stats_history::Snapshot {
1816 mesh_size: self.estimated_mesh_size,
1817 tree_depth: self.tree_state.my_coords().depth() as u32,
1818 peer_count: self.peers.len() as u64,
1819 parent_switches_total: self.stats.tree.parent_switches,
1820 bytes_in_total: fwd.received_bytes,
1821 bytes_out_total: fwd.forwarded_bytes + fwd.originated_bytes,
1822 packets_in_total: fwd.received_packets,
1823 packets_out_total: fwd.forwarded_packets + fwd.originated_packets,
1824 loss_rate,
1825 active_sessions: self.sessions.len() as u64,
1826 };
1827
1828 let now = std::time::Instant::now();
1829 let peer_snaps: Vec<stats_history::PeerSnapshot> = self
1830 .peers
1831 .values()
1832 .map(|p| {
1833 let stats = p.link_stats();
1834 let (srtt_ms, loss_rate, ecn_ce) = match p.mmp() {
1835 Some(m) => (
1836 m.metrics.srtt_ms(),
1837 Some(m.metrics.loss_rate()),
1838 m.receiver.ecn_ce_count() as u64,
1839 ),
1840 None => (None, None, 0),
1841 };
1842 stats_history::PeerSnapshot {
1843 node_addr: *p.node_addr(),
1844 last_seen: now,
1845 srtt_ms,
1846 loss_rate,
1847 bytes_in_total: stats.bytes_recv,
1848 bytes_out_total: stats.bytes_sent,
1849 packets_in_total: stats.packets_recv,
1850 packets_out_total: stats.packets_sent,
1851 ecn_ce_total: ecn_ce,
1852 }
1853 })
1854 .collect();
1855
1856 self.stats_history.tick(now, &snap, &peer_snaps);
1857 }
1858
1859 pub fn tun_state(&self) -> TunState {
1863 self.tun_state
1864 }
1865
1866 pub fn tun_name(&self) -> Option<&str> {
1868 self.tun_name.as_deref()
1869 }
1870
1871 pub fn set_max_connections(&mut self, max: usize) {
1875 self.max_connections = max;
1876 }
1877
1878 pub fn set_max_peers(&mut self, max: usize) {
1880 self.max_peers = max;
1881 }
1882
1883 pub(crate) fn outbound_admission_check(&self) -> bool {
1886 let connection_used = self
1887 .connections
1888 .len()
1889 .saturating_add(self.pending_connects.len());
1890 let peer_allowed = self.max_peers == 0 || self.peers.len() < self.max_peers;
1891 let connection_allowed =
1892 self.max_connections == 0 || connection_used < self.max_connections;
1893 let link_allowed = self.max_links == 0 || self.links.len() < self.max_links;
1894 peer_allowed && connection_allowed && link_allowed
1895 }
1896
1897 pub(crate) fn open_discovery_outbound_admission_check(&self) -> bool {
1901 if !self.outbound_admission_check() {
1902 return false;
1903 }
1904
1905 let nostr = &self.config.node.discovery.nostr;
1906 if !nostr.enabled || nostr.policy != NostrDiscoveryPolicy::Open {
1907 return true;
1908 }
1909
1910 let configured_npubs = self
1911 .config
1912 .peers()
1913 .iter()
1914 .map(|peer| peer.npub.clone())
1915 .collect::<HashSet<_>>();
1916 self.open_discovery_enqueue_budget(&configured_npubs) > 0
1917 }
1918
1919 pub(crate) fn outbound_direct_refresh_admission_check(&self) -> bool {
1923 let connection_used = self
1924 .connections
1925 .len()
1926 .saturating_add(self.pending_connects.len());
1927 let connection_allowed =
1928 self.max_connections == 0 || connection_used < self.max_connections;
1929 let link_allowed = self.max_links == 0 || self.links.len() < self.max_links;
1930 connection_allowed && link_allowed
1931 }
1932
1933 pub fn set_max_links(&mut self, max: usize) {
1935 self.max_links = max;
1936 }
1937
1938 pub fn connection_count(&self) -> usize {
1942 self.connections.len()
1943 }
1944
1945 pub fn peer_count(&self) -> usize {
1947 self.peers.len()
1948 }
1949
1950 pub fn link_count(&self) -> usize {
1952 self.links.len()
1953 }
1954
1955 pub fn transport_count(&self) -> usize {
1957 self.transports.len()
1958 }
1959
1960 pub fn allocate_transport_id(&mut self) -> TransportId {
1964 let id = TransportId::new(self.next_transport_id);
1965 self.next_transport_id += 1;
1966 id
1967 }
1968
1969 pub fn get_transport(&self, id: &TransportId) -> Option<&TransportHandle> {
1971 self.transports.get(id)
1972 }
1973
1974 pub fn get_transport_mut(&mut self, id: &TransportId) -> Option<&mut TransportHandle> {
1976 self.transports.get_mut(id)
1977 }
1978
1979 pub fn transport_ids(&self) -> impl Iterator<Item = &TransportId> {
1981 self.transports.keys()
1982 }
1983
1984 pub fn packet_rx(&mut self) -> Option<&mut PacketRx> {
1986 self.packet_rx.as_mut()
1987 }
1988
1989 pub fn allocate_link_id(&mut self) -> LinkId {
1993 let id = LinkId::new(self.next_link_id);
1994 self.next_link_id += 1;
1995 id
1996 }
1997
1998 pub fn add_link(&mut self, link: Link) -> Result<(), NodeError> {
2000 if self.max_links > 0 && self.links.len() >= self.max_links {
2001 return Err(NodeError::MaxLinksExceeded {
2002 max: self.max_links,
2003 });
2004 }
2005 let link_id = link.link_id();
2006 let transport_id = link.transport_id();
2007 let remote_addr = link.remote_addr().clone();
2008
2009 self.links.insert(link_id, link);
2010 self.addr_to_link
2011 .insert((transport_id, remote_addr), link_id);
2012 Ok(())
2013 }
2014
2015 pub fn get_link(&self, link_id: &LinkId) -> Option<&Link> {
2017 self.links.get(link_id)
2018 }
2019
2020 pub fn get_link_mut(&mut self, link_id: &LinkId) -> Option<&mut Link> {
2022 self.links.get_mut(link_id)
2023 }
2024
2025 pub fn find_link_by_addr(
2027 &self,
2028 transport_id: TransportId,
2029 addr: &TransportAddr,
2030 ) -> Option<LinkId> {
2031 self.addr_to_link
2032 .get(&(transport_id, addr.clone()))
2033 .copied()
2034 }
2035
2036 pub fn remove_link(&mut self, link_id: &LinkId) -> Option<Link> {
2042 if let Some(link) = self.links.remove(link_id) {
2043 let key = (link.transport_id(), link.remote_addr().clone());
2045 if self.addr_to_link.get(&key) == Some(link_id) {
2046 self.addr_to_link.remove(&key);
2047 }
2048 Some(link)
2049 } else {
2050 None
2051 }
2052 }
2053
2054 pub(crate) fn cleanup_bootstrap_transport_if_unused(&mut self, transport_id: TransportId) {
2055 if !self.bootstrap_transports.contains(&transport_id) {
2056 return;
2057 }
2058
2059 let transport_in_use = self
2060 .links
2061 .values()
2062 .any(|link| link.transport_id() == transport_id)
2063 || self
2064 .connections
2065 .values()
2066 .any(|conn| conn.transport_id() == Some(transport_id))
2067 || self
2068 .peers
2069 .values()
2070 .any(|peer| peer.transport_id() == Some(transport_id))
2071 || self
2072 .pending_connects
2073 .iter()
2074 .any(|pending| pending.transport_id == transport_id);
2075
2076 if transport_in_use {
2077 return;
2078 }
2079
2080 tracing::debug!(
2081 transport_id = %transport_id,
2082 "bootstrap transport has no remaining references; dropping"
2083 );
2084
2085 self.bootstrap_transports.remove(&transport_id);
2086 self.bootstrap_transport_npubs.remove(&transport_id);
2087 self.transport_drops.remove(&transport_id);
2088 self.transports.remove(&transport_id);
2089 }
2090
2091 pub fn links(&self) -> impl Iterator<Item = &Link> {
2093 self.links.values()
2094 }
2095
2096 pub fn add_connection(&mut self, connection: PeerConnection) -> Result<(), NodeError> {
2100 let link_id = connection.link_id();
2101
2102 if self.connections.contains_key(&link_id) {
2103 return Err(NodeError::ConnectionAlreadyExists(link_id));
2104 }
2105
2106 if self.max_connections > 0 && self.connections.len() >= self.max_connections {
2107 return Err(NodeError::MaxConnectionsExceeded {
2108 max: self.max_connections,
2109 });
2110 }
2111
2112 self.connections.insert(link_id, connection);
2113 Ok(())
2114 }
2115
2116 pub fn get_connection(&self, link_id: &LinkId) -> Option<&PeerConnection> {
2118 self.connections.get(link_id)
2119 }
2120
2121 pub fn get_connection_mut(&mut self, link_id: &LinkId) -> Option<&mut PeerConnection> {
2123 self.connections.get_mut(link_id)
2124 }
2125
2126 pub fn remove_connection(&mut self, link_id: &LinkId) -> Option<PeerConnection> {
2128 self.connections.remove(link_id)
2129 }
2130
2131 pub fn connections(&self) -> impl Iterator<Item = &PeerConnection> {
2133 self.connections.values()
2134 }
2135
2136 pub fn get_peer(&self, node_addr: &NodeAddr) -> Option<&ActivePeer> {
2140 self.peers.get(node_addr)
2141 }
2142
2143 pub fn get_peer_mut(&mut self, node_addr: &NodeAddr) -> Option<&mut ActivePeer> {
2145 self.peers.get_mut(node_addr)
2146 }
2147
2148 pub fn remove_peer(&mut self, node_addr: &NodeAddr) -> Option<ActivePeer> {
2150 self.peers.remove(node_addr)
2151 }
2152
2153 pub fn peers(&self) -> impl Iterator<Item = &ActivePeer> {
2155 self.peers.values()
2156 }
2157
2158 pub fn nostr_discovery_handle(&self) -> Option<&crate::discovery::nostr::NostrDiscovery> {
2162 self.nostr_discovery.as_deref()
2163 }
2164
2165 pub fn peer_ids(&self) -> impl Iterator<Item = &NodeAddr> {
2167 self.peers.keys()
2168 }
2169
2170 pub fn sendable_peers(&self) -> impl Iterator<Item = &ActivePeer> {
2172 self.peers.values().filter(|p| p.can_send())
2173 }
2174
2175 pub fn sendable_peer_count(&self) -> usize {
2177 self.peers.values().filter(|p| p.can_send()).count()
2178 }
2179
2180 pub(crate) fn set_discovery_fallback_transit_allowed(
2181 &mut self,
2182 peer_addr: NodeAddr,
2183 allowed: bool,
2184 ) {
2185 if allowed {
2186 self.discovery_fallback_transit_blocked_peers
2187 .remove(&peer_addr);
2188 } else {
2189 self.discovery_fallback_transit_blocked_peers
2190 .insert(peer_addr);
2191 }
2192 }
2193
2194 pub(crate) fn configured_discovery_fallback_transit(
2195 &self,
2196 peer_addr: &NodeAddr,
2197 ) -> Option<bool> {
2198 self.configured_peer(peer_addr)
2199 .map(|peer| peer.discovery_fallback_transit)
2200 }
2201
2202 pub(crate) fn configured_peer(&self, peer_addr: &NodeAddr) -> Option<&PeerConfig> {
2203 self.config.peers().iter().find(|peer| {
2204 PeerIdentity::from_npub(&peer.npub)
2205 .ok()
2206 .is_some_and(|identity| identity.node_addr() == peer_addr)
2207 })
2208 }
2209
2210 pub(crate) fn discovery_fallback_transit_for_promotion(&self, peer_addr: &NodeAddr) -> bool {
2211 if let Some(retry_state) = self.retry_pending.get(peer_addr) {
2212 return retry_state.peer_config.discovery_fallback_transit;
2213 }
2214
2215 if let Some(allowed) = self.configured_discovery_fallback_transit(peer_addr) {
2216 return allowed;
2217 }
2218
2219 self.config.node.discovery.nostr.policy != crate::config::NostrDiscoveryPolicy::Open
2220 }
2221
2222 #[cfg(test)]
2227 pub(crate) fn disable_discovery_forward_rate_limit(&mut self) {
2228 self.discovery_forward_limiter
2229 .set_interval(std::time::Duration::ZERO);
2230 }
2231
2232 #[cfg(test)]
2233 pub(crate) fn get_session(&self, remote: &NodeAddr) -> Option<&SessionEntry> {
2234 self.sessions.get(remote)
2235 }
2236
2237 #[cfg(test)]
2239 pub(crate) fn get_session_mut(&mut self, remote: &NodeAddr) -> Option<&mut SessionEntry> {
2240 self.sessions.get_mut(remote)
2241 }
2242
2243 #[cfg(test)]
2245 pub(crate) fn remove_session(&mut self, remote: &NodeAddr) -> Option<SessionEntry> {
2246 self.sessions.remove(remote)
2247 }
2248
2249 #[cfg(test)]
2251 pub(crate) fn path_mtu_lookup_get(&self, fips_addr: &crate::FipsAddress) -> Option<u16> {
2252 self.path_mtu_lookup
2253 .read()
2254 .ok()
2255 .and_then(|map| map.get(fips_addr).copied())
2256 }
2257
2258 #[cfg(test)]
2260 pub(crate) fn path_mtu_lookup_insert(&self, fips_addr: crate::FipsAddress, mtu: u16) {
2261 if let Ok(mut map) = self.path_mtu_lookup.write() {
2262 map.insert(fips_addr, mtu);
2263 }
2264 }
2265
2266 pub fn session_count(&self) -> usize {
2268 self.sessions.len()
2269 }
2270
2271 pub(crate) fn session_entries(&self) -> impl Iterator<Item = (&NodeAddr, &SessionEntry)> {
2273 self.sessions.iter()
2274 }
2275
2276 pub(crate) fn register_identity(
2280 &mut self,
2281 node_addr: NodeAddr,
2282 pubkey: secp256k1::PublicKey,
2283 ) -> bool {
2284 let mut prefix = [0u8; 15];
2285 prefix.copy_from_slice(&node_addr.as_bytes()[0..15]);
2286 if let Some(entry) = self.identity_cache.get(&prefix)
2287 && entry.node_addr == node_addr
2288 && entry.pubkey == pubkey
2289 {
2290 return true;
2294 }
2295
2296 let (xonly, _) = pubkey.x_only_public_key();
2297 let derived_node_addr = NodeAddr::from_pubkey(&xonly);
2298 if derived_node_addr != node_addr {
2299 debug!(
2300 claimed_node_addr = %node_addr,
2301 derived_node_addr = %derived_node_addr,
2302 "Rejected identity cache entry with mismatched public key"
2303 );
2304 return false;
2305 }
2306
2307 let now_ms = Self::now_ms();
2308 if let Some(entry) = self.identity_cache.get_mut(&prefix)
2309 && entry.node_addr == node_addr
2310 {
2311 entry.pubkey = pubkey;
2312 entry.last_seen_ms = now_ms;
2313 return true;
2314 }
2315
2316 let npub = encode_npub(&xonly);
2317 self.identity_cache.insert(
2318 prefix,
2319 IdentityCacheEntry::new(node_addr, pubkey, npub, now_ms),
2320 );
2321 let max = self.config.node.cache.identity_size;
2323 if self.identity_cache.len() > max
2324 && let Some(oldest_key) = self
2325 .identity_cache
2326 .iter()
2327 .min_by_key(|(_, entry)| entry.last_seen_ms)
2328 .map(|(k, _)| *k)
2329 {
2330 self.identity_cache.remove(&oldest_key);
2331 }
2332 true
2333 }
2334
2335 pub(crate) fn lookup_by_fips_prefix(
2337 &mut self,
2338 prefix: &[u8; 15],
2339 ) -> Option<(NodeAddr, secp256k1::PublicKey)> {
2340 if let Some(entry) = self.identity_cache.get_mut(prefix) {
2341 entry.last_seen_ms = Self::now_ms(); Some((entry.node_addr, entry.pubkey))
2343 } else {
2344 None
2345 }
2346 }
2347
2348 pub(crate) fn has_cached_identity(&self, addr: &NodeAddr) -> bool {
2350 let mut prefix = [0u8; 15];
2351 prefix.copy_from_slice(&addr.as_bytes()[0..15]);
2352 self.identity_cache.contains_key(&prefix)
2353 }
2354
2355 pub fn identity_cache_len(&self) -> usize {
2357 self.identity_cache.len()
2358 }
2359
2360 pub fn identity_cache_iter(
2365 &self,
2366 ) -> impl Iterator<Item = (&NodeAddr, &secp256k1::PublicKey, u64)> {
2367 self.identity_cache
2368 .values()
2369 .map(|entry| (&entry.node_addr, &entry.pubkey, entry.last_seen_ms))
2370 }
2371
2372 pub fn identity_cache_max(&self) -> usize {
2374 self.config.node.cache.identity_size
2375 }
2376
2377 pub fn pending_lookup_count(&self) -> usize {
2379 self.pending_lookups.len()
2380 }
2381
2382 pub fn pending_lookups_iter(
2384 &self,
2385 ) -> impl Iterator<Item = (&NodeAddr, &handlers::discovery::PendingLookup)> {
2386 self.pending_lookups.iter()
2387 }
2388
2389 pub fn recent_request_count(&self) -> usize {
2391 self.recent_requests.len()
2392 }
2393
2394 pub fn pending_tun_destinations(&self) -> usize {
2396 self.pending_tun_packets.len()
2397 }
2398
2399 pub fn pending_tun_total_packets(&self) -> usize {
2401 self.pending_tun_packets.values().map(|q| q.len()).sum()
2402 }
2403
2404 pub fn retry_state_iter(&self) -> impl Iterator<Item = (&NodeAddr, &retry::RetryState)> {
2406 self.retry_pending.iter()
2407 }
2408
2409 pub(crate) fn is_tree_peer(&self, peer_addr: &NodeAddr) -> bool {
2416 if !self.tree_state.is_root() && self.tree_state.my_declaration().parent_id() == peer_addr {
2418 return true;
2419 }
2420 if let Some(decl) = self.tree_state.peer_declaration(peer_addr)
2422 && decl.parent_id() == self.node_addr()
2423 {
2424 return true;
2425 }
2426 false
2427 }
2428
2429 pub fn find_next_hop(&mut self, dest_node_addr: &NodeAddr) -> Option<&ActivePeer> {
2452 if dest_node_addr == self.node_addr() {
2454 return None;
2455 }
2456 let now_ms = Self::now_ms();
2457 let direct_session_degraded = self.session_direct_path_is_degraded(dest_node_addr, now_ms);
2458
2459 let direct_peer_can_send = self
2463 .peers
2464 .get(dest_node_addr)
2465 .is_some_and(|peer| peer.can_send());
2466 if let Some(peer) = self.peers.get(dest_node_addr)
2467 && peer.is_healthy()
2468 && !direct_session_degraded
2469 {
2470 return Some(peer);
2471 }
2472
2473 let sendable_learned_peers = if self.config.node.routing.mode == RoutingMode::ReplyLearned {
2474 Some(
2475 self.peers
2476 .iter()
2477 .filter(|(_, peer)| peer.can_send())
2478 .map(|(addr, _)| *addr)
2479 .collect::<HashSet<_>>(),
2480 )
2481 } else {
2482 None
2483 };
2484
2485 let explore_fallback = sendable_learned_peers.as_ref().is_some_and(|sendable| {
2492 self.learned_routes.should_explore_fallback(
2493 dest_node_addr,
2494 now_ms,
2495 self.config.node.routing.learned_fallback_explore_interval,
2496 |addr| sendable.contains(addr),
2497 )
2498 });
2499 if let Some(sendable) = &sendable_learned_peers
2500 && !explore_fallback
2501 && let Some(next_hop_addr) =
2502 self.learned_routes
2503 .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2504 {
2505 return self.peers.get(&next_hop_addr);
2506 }
2507
2508 let Some(dest_coords) = self
2510 .coord_cache
2511 .get_and_touch(dest_node_addr, now_ms)
2512 .cloned()
2513 else {
2514 if let Some(sendable) = &sendable_learned_peers
2515 && let Some(next_hop_addr) =
2516 self.learned_routes
2517 .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2518 {
2519 return self.peers.get(&next_hop_addr);
2520 }
2521 if direct_peer_can_send {
2522 return self.peers.get(dest_node_addr);
2523 }
2524 return None;
2525 };
2526
2527 let coordinate_route_addr = {
2530 let candidates: Vec<&ActivePeer> = self.destination_in_filters(dest_node_addr);
2531 if !candidates.is_empty() {
2532 self.select_best_candidate(&candidates, &dest_coords)
2533 .map(|peer| *peer.node_addr())
2534 } else {
2535 None
2536 }
2537 };
2538 if let Some(next_hop_addr) = coordinate_route_addr {
2539 return self.peers.get(&next_hop_addr);
2540 }
2541
2542 let tree_route_addr = self
2544 .tree_state
2545 .find_next_hop(&dest_coords)
2546 .filter(|next_hop_id| {
2547 self.peers
2548 .get(next_hop_id)
2549 .is_some_and(|peer| peer.can_send())
2550 });
2551 if let Some(next_hop_addr) = tree_route_addr {
2552 return self.peers.get(&next_hop_addr);
2553 }
2554 if explore_fallback {
2555 return sendable_learned_peers.as_ref().and_then(|sendable| {
2556 self.learned_routes
2557 .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2558 .and_then(|next_hop_addr| self.peers.get(&next_hop_addr))
2559 });
2560 }
2561
2562 if let Some(sendable) = &sendable_learned_peers
2563 && let Some(next_hop_addr) =
2564 self.learned_routes
2565 .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2566 {
2567 return self.peers.get(&next_hop_addr);
2568 }
2569
2570 if direct_peer_can_send {
2571 return self.peers.get(dest_node_addr);
2572 }
2573
2574 None
2575 }
2576
2577 pub(in crate::node) fn session_direct_path_is_degraded(
2578 &mut self,
2579 dest: &NodeAddr,
2580 now_ms: u64,
2581 ) -> bool {
2582 match self.session_direct_degraded_until_ms.get(dest).copied() {
2583 Some(until_ms) if until_ms > now_ms => true,
2584 Some(_) => {
2585 self.session_direct_degraded_until_ms.remove(dest);
2586 false
2587 }
2588 None => false,
2589 }
2590 }
2591
2592 pub(in crate::node) fn mark_session_direct_path_degraded(
2593 &mut self,
2594 dest: NodeAddr,
2595 now_ms: u64,
2596 ) -> bool {
2597 let until_ms = now_ms.saturating_add(SESSION_DIRECT_DEGRADED_HOLD_MS);
2598 let entry = self
2599 .session_direct_degraded_until_ms
2600 .entry(dest)
2601 .or_insert(0);
2602 let was_degraded = *entry > now_ms;
2603 *entry = (*entry).max(until_ms);
2604 !was_degraded
2605 }
2606
2607 pub(in crate::node) fn clear_session_direct_path_degraded(&mut self, dest: &NodeAddr) -> bool {
2608 self.session_direct_degraded_until_ms.remove(dest).is_some()
2609 }
2610
2611 pub(in crate::node) fn learn_reverse_route(
2612 &mut self,
2613 destination: NodeAddr,
2614 next_hop: NodeAddr,
2615 ) {
2616 if self.config.node.routing.mode != RoutingMode::ReplyLearned
2617 || destination == *self.node_addr()
2618 {
2619 return;
2620 }
2621 let now_ms = Self::now_ms();
2622 self.learned_routes.learn(
2623 destination,
2624 next_hop,
2625 now_ms,
2626 self.config.node.routing.learned_ttl_secs,
2627 self.config.node.routing.max_learned_routes_per_dest,
2628 );
2629 }
2630
2631 pub(in crate::node) fn record_route_failure(
2632 &mut self,
2633 destination: NodeAddr,
2634 next_hop: NodeAddr,
2635 ) {
2636 if self.config.node.routing.mode != RoutingMode::ReplyLearned {
2637 return;
2638 }
2639 self.learned_routes.record_failure(&destination, &next_hop);
2640 }
2641
2642 pub(crate) fn learned_route_table_snapshot(&self, now_ms: u64) -> LearnedRouteTableSnapshot {
2643 self.learned_routes.snapshot(now_ms)
2644 }
2645
2646 pub(in crate::node) fn purge_learned_routes(&mut self, now_ms: u64) {
2647 self.learned_routes.purge_expired(now_ms);
2648 }
2649
2650 fn select_best_candidate<'a>(
2659 &'a self,
2660 candidates: &[&'a ActivePeer],
2661 dest_coords: &crate::tree::TreeCoordinate,
2662 ) -> Option<&'a ActivePeer> {
2663 let my_distance = self.tree_state.my_coords().distance_to(dest_coords);
2664
2665 let mut best: Option<(&ActivePeer, f64, usize)> = None;
2666
2667 for &candidate in candidates {
2668 if !candidate.can_send() {
2669 continue;
2670 }
2671
2672 let cost = candidate.link_cost();
2673
2674 let dist = self
2675 .tree_state
2676 .peer_coords(candidate.node_addr())
2677 .map(|pc| pc.distance_to(dest_coords))
2678 .unwrap_or(usize::MAX);
2679
2680 if dist >= my_distance {
2683 continue;
2684 }
2685
2686 let dominated = match &best {
2687 None => true,
2688 Some((_, best_cost, best_dist)) => {
2689 cost < *best_cost
2690 || (cost == *best_cost && dist < *best_dist)
2691 || (cost == *best_cost
2692 && dist == *best_dist
2693 && candidate.node_addr() < best.as_ref().unwrap().0.node_addr())
2694 }
2695 };
2696
2697 if dominated {
2698 best = Some((candidate, cost, dist));
2699 }
2700 }
2701
2702 best.map(|(peer, _, _)| peer)
2703 }
2704
2705 pub fn destination_in_filters(&self, dest: &NodeAddr) -> Vec<&ActivePeer> {
2707 self.peers.values().filter(|p| p.may_reach(dest)).collect()
2708 }
2709
2710 pub fn tun_tx(&self) -> Option<&TunTx> {
2714 self.tun_tx.as_ref()
2715 }
2716
2717 pub fn attach_external_packet_io(
2724 &mut self,
2725 capacity: usize,
2726 ) -> Result<ExternalPacketIo, NodeError> {
2727 if self.state != NodeState::Created {
2728 return Err(NodeError::Config(ConfigError::Validation(
2729 "external packet I/O must be attached before node start".to_string(),
2730 )));
2731 }
2732 if self.config.tun.enabled {
2733 return Err(NodeError::Config(ConfigError::Validation(
2734 "external packet I/O requires tun.enabled=false".to_string(),
2735 )));
2736 }
2737
2738 let capacity = capacity.max(1);
2739 let (outbound_tx, outbound_rx) = tokio::sync::mpsc::channel(capacity);
2740 let (inbound_tx, inbound_rx) = tokio::sync::mpsc::channel(capacity);
2741 self.tun_outbound_rx = Some(outbound_rx);
2742 self.external_packet_tx = Some(inbound_tx);
2743
2744 Ok(ExternalPacketIo {
2745 outbound_tx,
2746 inbound_rx,
2747 })
2748 }
2749
2750 pub(crate) fn attach_endpoint_data_io(
2755 &mut self,
2756 capacity: usize,
2757 ) -> Result<EndpointDataIo, NodeError> {
2758 if self.state != NodeState::Created {
2759 return Err(NodeError::Config(ConfigError::Validation(
2760 "endpoint data I/O must be attached before node start".to_string(),
2761 )));
2762 }
2763
2764 let command_capacity = endpoint_data_command_capacity(capacity);
2765 let (command_tx, command_rx) = tokio::sync::mpsc::channel(command_capacity);
2766 let (event_tx, event_rx) = tokio::sync::mpsc::unbounded_channel();
2771 self.endpoint_command_rx = Some(command_rx);
2772 self.endpoint_event_tx = Some(event_tx.clone());
2773
2774 Ok(EndpointDataIo {
2775 command_tx,
2776 event_rx,
2777 event_tx,
2778 })
2779 }
2780
2781 pub(crate) fn pubkey_for_node_addr(&self, addr: &NodeAddr) -> Option<secp256k1::PublicKey> {
2782 let mut prefix = [0u8; 15];
2783 prefix.copy_from_slice(&addr.as_bytes()[0..15]);
2784 self.identity_cache
2785 .get(&prefix)
2786 .filter(|entry| &entry.node_addr == addr)
2787 .map(|entry| entry.pubkey)
2788 }
2789
2790 pub(crate) fn npub_for_node_addr(&self, addr: &NodeAddr) -> Option<String> {
2791 let mut prefix = [0u8; 15];
2792 prefix.copy_from_slice(&addr.as_bytes()[0..15]);
2793 self.identity_cache
2794 .get(&prefix)
2795 .filter(|entry| &entry.node_addr == addr)
2796 .map(|entry| entry.npub.clone())
2797 }
2798
2799 pub(in crate::node) fn deliver_external_ipv6_packet(
2800 &self,
2801 src_addr: &NodeAddr,
2802 packet: Vec<u8>,
2803 ) {
2804 let Some(external_packet_tx) = &self.external_packet_tx else {
2805 return;
2806 };
2807 if packet.len() < 40 {
2808 return;
2809 }
2810 let Ok(destination) = FipsAddress::from_slice(&packet[24..40]) else {
2811 return;
2812 };
2813 let delivered = NodeDeliveredPacket {
2814 source_node_addr: *src_addr,
2815 source_npub: self.npub_for_node_addr(src_addr),
2816 destination,
2817 packet,
2818 };
2819 if let Err(error) = external_packet_tx.try_send(delivered) {
2820 debug!(error = %error, "Failed to deliver packet to external app sink");
2821 }
2822 }
2823
2824 pub(super) async fn send_encrypted_link_message(
2838 &mut self,
2839 node_addr: &NodeAddr,
2840 plaintext: &[u8],
2841 ) -> Result<(), NodeError> {
2842 self.send_encrypted_link_message_with_ce(node_addr, plaintext, false)
2843 .await
2844 }
2845
2846 pub(in crate::node) fn note_local_send_outcome(
2852 &mut self,
2853 result: &Result<usize, TransportError>,
2854 ) {
2855 match result {
2856 Ok(_) => {
2857 if self.last_local_send_failure_at.is_some() {
2858 self.last_local_send_failure_at = None;
2859 }
2860 }
2861 Err(error) if error.is_local_route_unavailable() => {
2862 self.last_local_send_failure_at = Some(std::time::Instant::now());
2863 }
2864 Err(_) => {}
2865 }
2866 }
2867
2868 pub(in crate::node) fn local_send_failure_dead_timeout(
2874 &mut self,
2875 now: std::time::Instant,
2876 dead_timeout: std::time::Duration,
2877 fast_dead_timeout: std::time::Duration,
2878 ) -> std::time::Duration {
2879 match self.last_local_send_failure_at {
2880 Some(t) if now.duration_since(t) <= LOCAL_SEND_FAILURE_FAST_DEAD_WINDOW => {
2881 fast_dead_timeout.min(dead_timeout)
2882 }
2883 Some(_) => {
2884 self.last_local_send_failure_at = None;
2885 dead_timeout
2886 }
2887 None => dead_timeout,
2888 }
2889 }
2890
2891 pub(in crate::node) fn mark_rx_loop_maintenance_timeout(&mut self) {
2892 self.last_rx_loop_maintenance_timeout_at = Some(std::time::Instant::now());
2893 }
2894
2895 pub(in crate::node) fn rx_loop_maintenance_timed_out_recently(&self) -> bool {
2896 let Some(t) = self.last_rx_loop_maintenance_timeout_at else {
2897 return false;
2898 };
2899 let grace = std::time::Duration::from_secs(self.config.node.link_dead_timeout_secs.max(1));
2900 std::time::Instant::now().duration_since(t) <= grace
2901 }
2902
2903 pub(super) async fn send_encrypted_link_message_with_ce(
2907 &mut self,
2908 node_addr: &NodeAddr,
2909 plaintext: &[u8],
2910 ce_flag: bool,
2911 ) -> Result<(), NodeError> {
2912 let peer = self
2913 .peers
2914 .get_mut(node_addr)
2915 .ok_or(NodeError::PeerNotFound(*node_addr))?;
2916
2917 let their_index = peer.their_index().ok_or_else(|| NodeError::SendFailed {
2918 node_addr: *node_addr,
2919 reason: "no their_index".into(),
2920 })?;
2921 let transport_id = peer.transport_id().ok_or_else(|| NodeError::SendFailed {
2922 node_addr: *node_addr,
2923 reason: "no transport_id".into(),
2924 })?;
2925 let remote_addr = peer
2926 .current_addr()
2927 .cloned()
2928 .ok_or_else(|| NodeError::SendFailed {
2929 node_addr: *node_addr,
2930 reason: "no current_addr".into(),
2931 })?;
2932 #[cfg(any(target_os = "linux", target_os = "macos"))]
2933 let connected_socket = peer.connected_udp();
2934
2935 let timestamp_ms = peer.session_elapsed_ms();
2937
2938 let sp_flag = peer.mmp().map(|mmp| mmp.spin_bit.tx_bit()).unwrap_or(false);
2940 let mut flags = if sp_flag { FLAG_SP } else { 0 };
2941 if ce_flag {
2942 flags |= FLAG_CE;
2943 }
2944 if peer.current_k_bit() {
2945 flags |= FLAG_KEY_EPOCH;
2946 }
2947
2948 let session = peer
2949 .noise_session_mut()
2950 .ok_or_else(|| NodeError::SendFailed {
2951 node_addr: *node_addr,
2952 reason: "no noise session".into(),
2953 })?;
2954
2955 const INNER_TS_LEN: usize = 4;
2963 let counter = session.current_send_counter();
2964 let inner_len = INNER_TS_LEN + plaintext.len();
2965 let payload_len = inner_len as u16;
2966 let header = build_established_header(their_index, counter, flags, payload_len);
2967
2968 let transport_for_send = self
2987 .transports
2988 .get(&transport_id)
2989 .ok_or(NodeError::TransportNotFound(transport_id))?;
2990 match transport_for_send.connection_state(&remote_addr) {
2991 ConnectionState::Connected => {}
2992 other => {
2993 if matches!(other, ConnectionState::None) {
2994 let _ = transport_for_send.connect(&remote_addr).await;
2995 }
2996 return Err(NodeError::SendFailed {
2997 node_addr: *node_addr,
2998 reason: format!("transport connection not ready: {:?}", other),
2999 });
3000 }
3001 }
3002 let is_udp = matches!(transport_for_send, TransportHandle::Udp(_));
3003 if let Some(workers) = self.encrypt_workers.as_ref().cloned()
3004 && is_udp
3005 && let Some(cipher_clone) = session.send_cipher_clone()
3006 {
3007 {
3008 let reserved_counter =
3012 session
3013 .take_send_counter()
3014 .map_err(|e| NodeError::SendFailed {
3015 node_addr: *node_addr,
3016 reason: format!("counter reservation failed: {}", e),
3017 })?;
3018 debug_assert_eq!(reserved_counter, counter);
3019 let header =
3023 build_established_header(their_index, reserved_counter, flags, payload_len);
3024 let transport = transport_for_send;
3025 let send_target = {
3032 if let TransportHandle::Udp(udp) = transport {
3033 let socket_addr = {
3034 #[cfg(any(target_os = "linux", target_os = "macos"))]
3035 {
3036 match connected_socket.as_ref() {
3037 Some(socket) => Some(socket.peer_addr()),
3038 None => udp.resolve_for_off_task(&remote_addr).await.ok(),
3039 }
3040 }
3041 #[cfg(not(any(target_os = "linux", target_os = "macos")))]
3042 {
3043 udp.resolve_for_off_task(&remote_addr).await.ok()
3044 }
3045 };
3046 match (udp.async_socket(), socket_addr) {
3047 (Some(socket), Some(socket_addr)) => Some((socket, socket_addr)),
3048 _ => None,
3049 }
3050 } else {
3051 None
3052 }
3053 };
3054 if let Some((socket, socket_addr)) = send_target {
3055 let wire_capacity = ESTABLISHED_HEADER_SIZE + inner_len + 16;
3071 let mut wire_buf = Vec::with_capacity(wire_capacity);
3072 wire_buf.extend_from_slice(&header);
3073 wire_buf.extend_from_slice(×tamp_ms.to_le_bytes());
3074 wire_buf.extend_from_slice(plaintext);
3075 let predicted_bytes = wire_capacity;
3076 if let Some(peer) = self.peers.get_mut(node_addr) {
3083 peer.link_stats_mut().record_sent(predicted_bytes);
3084 if let Some(mmp) = peer.mmp_mut() {
3085 mmp.sender
3086 .record_sent(reserved_counter, timestamp_ms, predicted_bytes);
3087 }
3088 }
3089 let scheduling_weight = self.send_weight_for_peer(node_addr);
3090 workers.dispatch(self::encrypt_worker::FmpSendJob {
3091 cipher: cipher_clone,
3092 counter: reserved_counter,
3093 wire_buf,
3094 fsp_seal: None,
3095 socket,
3096 dest_addr: socket_addr,
3097 #[cfg(any(target_os = "linux", target_os = "macos"))]
3098 connected_socket,
3099 drop_on_backpressure: fmp_plaintext_is_bulk_session_datagram(plaintext),
3100 scheduling_weight,
3101 queued_at: crate::perf_profile::stamp(),
3102 });
3103 return Ok(());
3104 }
3105 }
3106 }
3107
3108 let inner_plaintext = prepend_inner_header(timestamp_ms, plaintext);
3113 let ciphertext = {
3115 let _t = crate::perf_profile::Timer::start(crate::perf_profile::Stage::FmpEncrypt);
3116 session
3117 .encrypt_with_aad(&inner_plaintext, &header)
3118 .map_err(|e| NodeError::SendFailed {
3119 node_addr: *node_addr,
3120 reason: format!("encryption failed: {}", e),
3121 })?
3122 };
3123
3124 let wire_packet = build_encrypted(&header, &ciphertext);
3125
3126 let send_result = {
3128 let _t = crate::perf_profile::Timer::start(crate::perf_profile::Stage::UdpSend);
3129 let transport = self
3130 .transports
3131 .get(&transport_id)
3132 .ok_or(NodeError::TransportNotFound(transport_id))?;
3133 transport.send(&remote_addr, &wire_packet).await
3134 };
3135 self.note_local_send_outcome(&send_result);
3136 let bytes_sent = send_result.map_err(|e| match e {
3137 TransportError::MtuExceeded { packet_size, mtu } => NodeError::MtuExceeded {
3138 node_addr: *node_addr,
3139 packet_size,
3140 mtu,
3141 },
3142 other => NodeError::SendFailed {
3143 node_addr: *node_addr,
3144 reason: format!("transport send: {}", other),
3145 },
3146 })?;
3147
3148 if let Some(peer) = self.peers.get_mut(node_addr) {
3150 peer.link_stats_mut().record_sent(bytes_sent);
3151 if let Some(mmp) = peer.mmp_mut() {
3153 mmp.sender.record_sent(counter, timestamp_ms, bytes_sent);
3154 }
3155 }
3156
3157 Ok(())
3158 }
3159}
3160
3161impl fmt::Debug for Node {
3162 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3163 f.debug_struct("Node")
3164 .field("node_addr", self.node_addr())
3165 .field("state", &self.state)
3166 .field("is_leaf_only", &self.is_leaf_only)
3167 .field("connections", &self.connection_count())
3168 .field("peers", &self.peer_count())
3169 .field("links", &self.link_count())
3170 .field("transports", &self.transport_count())
3171 .finish()
3172 }
3173}