1mod acl;
8mod bloom;
9mod decrypt_worker;
10mod discovery_rate_limit;
11mod encrypt_worker;
12mod handlers;
13mod lifecycle;
14mod rate_limit;
15mod retry;
16mod routing;
17mod routing_error_rate_limit;
18pub(crate) mod session;
19pub(crate) mod session_wire;
20pub(crate) mod stats;
21pub(crate) mod stats_history;
22#[cfg(test)]
23mod tests;
24mod tree;
25pub(crate) mod wire;
26
27use self::discovery_rate_limit::{DiscoveryBackoff, DiscoveryForwardRateLimiter};
28use self::rate_limit::HandshakeRateLimiter;
29use self::routing::{LearnedRouteTable, LearnedRouteTableSnapshot};
30use self::routing_error_rate_limit::RoutingErrorRateLimiter;
31use self::wire::{
32 ESTABLISHED_HEADER_SIZE, FLAG_CE, FLAG_KEY_EPOCH, FLAG_SP, build_encrypted,
33 build_established_header, prepend_inner_header,
34};
35use crate::bloom::BloomState;
36use crate::cache::CoordCache;
37use crate::config::{NostrDiscoveryPolicy, PeerConfig, RoutingMode};
38use crate::node::session::SessionEntry;
39use crate::node::session_wire::{FSP_PHASE_ESTABLISHED, FspCommonPrefix};
40use crate::peer::{ActivePeer, PeerConnection};
41#[cfg(any(target_os = "linux", target_os = "macos"))]
42use crate::transport::ethernet::EthernetTransport;
43use crate::transport::tcp::TcpTransport;
44use crate::transport::tor::TorTransport;
45use crate::transport::udp::UdpTransport;
46#[cfg(feature = "webrtc-transport")]
47use crate::transport::webrtc::WebRtcTransport;
48use crate::transport::{
49 ConnectionState, Link, LinkId, PacketRx, PacketTx, TransportAddr, TransportError,
50 TransportHandle, TransportId,
51};
52use crate::tree::TreeState;
53use crate::upper::hosts::HostMap;
54use crate::upper::icmp_rate_limit::IcmpRateLimiter;
55use crate::upper::tun::{TunError, TunOutboundRx, TunState, TunTx};
56use crate::utils::index::IndexAllocator;
57use crate::{
58 Config, ConfigError, FipsAddress, Identity, IdentityError, LinkMessageType, NodeAddr,
59 PeerIdentity, encode_npub,
60};
61use rand::Rng;
62use std::collections::{HashMap, HashSet, VecDeque};
63use std::fmt;
64use std::sync::Arc;
65use std::thread::JoinHandle;
66use thiserror::Error;
67use tracing::{debug, warn};
68
69const LOCAL_SEND_FAILURE_FAST_DEAD_WINDOW: std::time::Duration = std::time::Duration::from_secs(3);
70
71fn fmp_plaintext_is_bulk_session_datagram(plaintext: &[u8]) -> bool {
72 if !plaintext
73 .first()
74 .is_some_and(|ty| *ty == LinkMessageType::SessionDatagram.to_byte())
75 {
76 return false;
77 }
78 let Some(fsp_payload) = plaintext.get(crate::protocol::SESSION_DATAGRAM_HEADER_SIZE..) else {
79 return false;
80 };
81 FspCommonPrefix::parse(fsp_payload)
82 .is_some_and(|prefix| prefix.phase == FSP_PHASE_ESTABLISHED && !prefix.is_unencrypted())
83}
84
85pub(crate) const REKEY_JITTER_SECS: i64 = 15;
92
93#[derive(Debug, Error)]
95pub enum NodeError {
96 #[error("node not started")]
97 NotStarted,
98
99 #[error("node already started")]
100 AlreadyStarted,
101
102 #[error("node already stopped")]
103 AlreadyStopped,
104
105 #[error("transport not found: {0}")]
106 TransportNotFound(TransportId),
107
108 #[error("no transport available for type: {0}")]
109 NoTransportForType(String),
110
111 #[error("link not found: {0}")]
112 LinkNotFound(LinkId),
113
114 #[error("connection not found: {0}")]
115 ConnectionNotFound(LinkId),
116
117 #[error("peer not found: {0:?}")]
118 PeerNotFound(NodeAddr),
119
120 #[error("peer already exists: {0:?}")]
121 PeerAlreadyExists(NodeAddr),
122
123 #[error("connection already exists for link: {0}")]
124 ConnectionAlreadyExists(LinkId),
125
126 #[error("invalid peer npub '{npub}': {reason}")]
127 InvalidPeerNpub { npub: String, reason: String },
128
129 #[error("discovery error: {0}")]
130 Discovery(String),
131
132 #[error("access denied: {0}")]
133 AccessDenied(String),
134
135 #[error("max connections exceeded: {max}")]
136 MaxConnectionsExceeded { max: usize },
137
138 #[error("max peers exceeded: {max}")]
139 MaxPeersExceeded { max: usize },
140
141 #[error("max links exceeded: {max}")]
142 MaxLinksExceeded { max: usize },
143
144 #[error("handshake incomplete for link {0}")]
145 HandshakeIncomplete(LinkId),
146
147 #[error("no session available for link {0}")]
148 NoSession(LinkId),
149
150 #[error("promotion failed for link {link_id}: {reason}")]
151 PromotionFailed { link_id: LinkId, reason: String },
152
153 #[error("send failed to {node_addr}: {reason}")]
154 SendFailed { node_addr: NodeAddr, reason: String },
155
156 #[error("mtu exceeded forwarding to {node_addr}: packet {packet_size} > mtu {mtu}")]
157 MtuExceeded {
158 node_addr: NodeAddr,
159 packet_size: usize,
160 mtu: u16,
161 },
162
163 #[error("config error: {0}")]
164 Config(#[from] ConfigError),
165
166 #[error("identity error: {0}")]
167 Identity(#[from] IdentityError),
168
169 #[error("TUN error: {0}")]
170 Tun(#[from] TunError),
171
172 #[error("index allocation failed: {0}")]
173 IndexAllocationFailed(String),
174
175 #[error("handshake failed: {0}")]
176 HandshakeFailed(String),
177
178 #[error("transport error: {0}")]
179 TransportError(String),
180
181 #[error("local route unavailable: {0}")]
182 LocalRouteUnavailable(String),
183
184 #[error("bootstrap handoff failed: {0}")]
185 BootstrapHandoff(String),
186}
187
188impl NodeError {
189 pub(in crate::node) fn from_transport_error(error: TransportError) -> Self {
190 if error.is_local_route_unavailable() {
191 Self::LocalRouteUnavailable(error.to_string())
192 } else {
193 Self::TransportError(error.to_string())
194 }
195 }
196
197 pub(in crate::node) fn is_local_route_unavailable(&self) -> bool {
198 matches!(self, Self::LocalRouteUnavailable(_))
199 }
200}
201
202#[derive(Debug, Clone, PartialEq, Eq)]
204pub struct NodeDeliveredPacket {
205 pub source_node_addr: NodeAddr,
207 pub source_npub: Option<String>,
209 pub destination: FipsAddress,
211 pub packet: Vec<u8>,
213}
214
215#[derive(Debug, Clone)]
216struct IdentityCacheEntry {
217 node_addr: NodeAddr,
218 pubkey: secp256k1::PublicKey,
219 npub: String,
220 last_seen_ms: u64,
221}
222
223impl IdentityCacheEntry {
224 fn new(
225 node_addr: NodeAddr,
226 pubkey: secp256k1::PublicKey,
227 npub: String,
228 last_seen_ms: u64,
229 ) -> Self {
230 Self {
231 node_addr,
232 pubkey,
233 npub,
234 last_seen_ms,
235 }
236 }
237}
238
239#[derive(Debug)]
241pub struct ExternalPacketIo {
242 pub outbound_tx: crate::upper::tun::TunOutboundTx,
244 pub inbound_rx: tokio::sync::mpsc::Receiver<NodeDeliveredPacket>,
246}
247
248#[derive(Debug)]
250pub(crate) struct EndpointDataIo {
251 pub(crate) command_tx: tokio::sync::mpsc::Sender<NodeEndpointCommand>,
260 pub(crate) event_rx: tokio::sync::mpsc::UnboundedReceiver<NodeEndpointEvent>,
270 pub(crate) event_tx: tokio::sync::mpsc::UnboundedSender<NodeEndpointEvent>,
276}
277
278fn endpoint_data_command_capacity(requested: usize) -> usize {
279 if let Ok(raw) = std::env::var("FIPS_ENDPOINT_DATA_QUEUE_CAP")
280 && let Ok(value) = raw.trim().parse::<usize>()
281 && value > 0
282 {
283 return value;
284 }
285
286 requested.max(1).max(32_768)
287}
288
289#[derive(Debug)]
291pub(crate) enum NodeEndpointCommand {
292 Send {
296 remote: PeerIdentity,
297 payload: Vec<u8>,
298 queued_at: Option<std::time::Instant>,
299 response_tx: tokio::sync::oneshot::Sender<Result<(), NodeError>>,
300 },
301 SendOneway {
307 remote: PeerIdentity,
308 payload: Vec<u8>,
309 queued_at: Option<std::time::Instant>,
310 },
311 PeerSnapshot {
312 response_tx: tokio::sync::oneshot::Sender<Vec<NodeEndpointPeer>>,
313 },
314 RelaySnapshot {
315 response_tx: tokio::sync::oneshot::Sender<Vec<NodeEndpointRelayStatus>>,
316 },
317 UpdateRelays {
318 advert_relays: Vec<String>,
319 dm_relays: Vec<String>,
320 response_tx: tokio::sync::oneshot::Sender<Result<(), NodeError>>,
321 },
322 UpdatePeers {
328 peers: Vec<crate::config::PeerConfig>,
329 response_tx: tokio::sync::oneshot::Sender<Result<UpdatePeersOutcome, NodeError>>,
330 },
331}
332
333#[derive(Debug, Clone, Default, PartialEq, Eq)]
335pub(crate) struct UpdatePeersOutcome {
336 pub(crate) added: usize,
337 pub(crate) removed: usize,
338 pub(crate) updated: usize,
339 pub(crate) unchanged: usize,
340}
341
342#[derive(Debug)]
344pub(crate) enum NodeEndpointEvent {
345 Data {
346 source_node_addr: NodeAddr,
347 source_npub: Option<String>,
348 payload: Vec<u8>,
349 queued_at: Option<std::time::Instant>,
350 },
351}
352
353#[derive(Debug, Clone, PartialEq, Eq)]
355pub(crate) struct NodeEndpointPeer {
356 pub(crate) npub: String,
357 pub(crate) transport_addr: Option<String>,
358 pub(crate) transport_type: Option<String>,
359 pub(crate) link_id: u64,
360 pub(crate) srtt_ms: Option<u64>,
361 pub(crate) packets_sent: u64,
362 pub(crate) packets_recv: u64,
363 pub(crate) bytes_sent: u64,
364 pub(crate) bytes_recv: u64,
365}
366
367#[derive(Debug, Clone, PartialEq, Eq)]
369pub(crate) struct NodeEndpointRelayStatus {
370 pub(crate) url: String,
371 pub(crate) status: String,
372}
373
374#[derive(Clone, Copy, Debug, PartialEq, Eq)]
376pub enum NodeState {
377 Created,
379 Starting,
381 Running,
383 Stopping,
385 Stopped,
387}
388
389impl NodeState {
390 pub fn is_operational(&self) -> bool {
392 matches!(self, NodeState::Running)
393 }
394
395 pub fn can_start(&self) -> bool {
397 matches!(self, NodeState::Created | NodeState::Stopped)
398 }
399
400 pub fn can_stop(&self) -> bool {
402 matches!(self, NodeState::Running)
403 }
404}
405
406impl fmt::Display for NodeState {
407 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
408 let s = match self {
409 NodeState::Created => "created",
410 NodeState::Starting => "starting",
411 NodeState::Running => "running",
412 NodeState::Stopping => "stopping",
413 NodeState::Stopped => "stopped",
414 };
415 write!(f, "{}", s)
416 }
417}
418
419#[derive(Clone, Debug)]
426pub(crate) struct RecentRequest {
427 pub(crate) from_peer: NodeAddr,
429 pub(crate) timestamp_ms: u64,
431 pub(crate) response_forwarded: bool,
435}
436
437impl RecentRequest {
438 pub(crate) fn new(from_peer: NodeAddr, timestamp_ms: u64) -> Self {
439 Self {
440 from_peer,
441 timestamp_ms,
442 response_forwarded: false,
443 }
444 }
445
446 pub(crate) fn is_expired(&self, current_time_ms: u64, expiry_ms: u64) -> bool {
448 current_time_ms.saturating_sub(self.timestamp_ms) > expiry_ms
449 }
450}
451
452type AddrKey = (TransportId, TransportAddr);
454
455#[derive(Debug, Default)]
460struct TransportDropState {
461 prev_drops: u64,
463 dropping: bool,
465}
466
467struct PendingConnect {
473 link_id: LinkId,
475 transport_id: TransportId,
477 remote_addr: TransportAddr,
479 peer_identity: PeerIdentity,
481}
482
483pub struct Node {
497 identity: Identity,
500
501 startup_epoch: [u8; 8],
504
505 started_at: std::time::Instant,
507
508 config: Config,
511
512 state: NodeState,
515
516 is_leaf_only: bool,
518
519 tree_state: TreeState,
522
523 bloom_state: BloomState,
526
527 coord_cache: CoordCache,
530 learned_routes: LearnedRouteTable,
532 recent_requests: HashMap<u64, RecentRequest>,
535 path_mtu_lookup: Arc<std::sync::RwLock<HashMap<crate::FipsAddress, u16>>>,
541
542 transports: HashMap<TransportId, TransportHandle>,
545 transport_drops: HashMap<TransportId, TransportDropState>,
547 links: HashMap<LinkId, Link>,
549 addr_to_link: HashMap<AddrKey, LinkId>,
551
552 packet_tx: Option<PacketTx>,
555 packet_rx: Option<PacketRx>,
557
558 connections: HashMap<LinkId, PeerConnection>,
562
563 peers: HashMap<NodeAddr, ActivePeer>,
567
568 sessions: HashMap<NodeAddr, SessionEntry>,
572
573 identity_cache: HashMap<[u8; 15], IdentityCacheEntry>,
577
578 pending_tun_packets: HashMap<NodeAddr, VecDeque<Vec<u8>>>,
582 pending_endpoint_data: HashMap<NodeAddr, VecDeque<Vec<u8>>>,
584 pending_lookups: HashMap<NodeAddr, handlers::discovery::PendingLookup>,
588
589 max_connections: usize,
592 max_peers: usize,
594 max_links: usize,
596
597 next_link_id: u64,
600 next_transport_id: u32,
602
603 stats: stats::NodeStats,
606
607 stats_history: stats_history::StatsHistory,
609
610 tun_state: TunState,
613 tun_name: Option<String>,
615 tun_tx: Option<TunTx>,
617 tun_outbound_rx: Option<TunOutboundRx>,
619 external_packet_tx: Option<tokio::sync::mpsc::Sender<NodeDeliveredPacket>>,
621 endpoint_command_rx: Option<tokio::sync::mpsc::Receiver<NodeEndpointCommand>>,
623 endpoint_event_tx: Option<tokio::sync::mpsc::UnboundedSender<NodeEndpointEvent>>,
625 encrypt_workers: Option<encrypt_worker::EncryptWorkerPool>,
631 decrypt_workers: Option<decrypt_worker::DecryptWorkerPool>,
634 decrypt_registered_sessions: std::collections::HashSet<(TransportId, u32)>,
643 decrypt_fallback_rx:
647 Option<tokio::sync::mpsc::UnboundedReceiver<decrypt_worker::DecryptWorkerEvent>>,
648 decrypt_fallback_tx: tokio::sync::mpsc::UnboundedSender<decrypt_worker::DecryptWorkerEvent>,
649 tun_reader_handle: Option<JoinHandle<()>>,
651 tun_writer_handle: Option<JoinHandle<()>>,
653 #[cfg(target_os = "macos")]
656 tun_shutdown_fd: Option<std::os::unix::io::RawFd>,
657
658 dns_identity_rx: Option<crate::upper::dns::DnsIdentityRx>,
661 dns_task: Option<tokio::task::JoinHandle<()>>,
663
664 index_allocator: IndexAllocator,
667 peers_by_index: HashMap<(TransportId, u32), NodeAddr>,
670 pending_outbound: HashMap<(TransportId, u32), LinkId>,
673
674 msg1_rate_limiter: HandshakeRateLimiter,
677 icmp_rate_limiter: IcmpRateLimiter,
679 routing_error_rate_limiter: RoutingErrorRateLimiter,
681 coords_response_rate_limiter: RoutingErrorRateLimiter,
683 discovery_backoff: DiscoveryBackoff,
685 discovery_forward_limiter: DiscoveryForwardRateLimiter,
687
688 pending_connects: Vec<PendingConnect>,
694
695 retry_pending: HashMap<NodeAddr, retry::RetryState>,
701
702 nostr_discovery: Option<Arc<crate::discovery::nostr::NostrDiscovery>>,
704 lan_discovery: Option<Arc<crate::discovery::lan::LanDiscovery>>,
709 local_instance_registry: Option<crate::discovery::local::LocalInstanceRegistry>,
713 local_instance_started_at_ms: Option<u64>,
714 last_local_instance_publish_ms: Option<u64>,
715 last_local_instance_scan_ms: Option<u64>,
716 nostr_discovery_started_at_ms: Option<u64>,
721 startup_open_discovery_sweep_done: bool,
725 bootstrap_transports: HashSet<TransportId>,
727 bootstrap_transport_npubs: HashMap<TransportId, String>,
734 discovery_fallback_transit_blocked_peers: HashSet<NodeAddr>,
737
738 last_parent_reeval: Option<crate::time::Instant>,
741
742 last_congestion_log: Option<std::time::Instant>,
745
746 estimated_mesh_size: Option<u64>,
749 last_mesh_size_log: Option<std::time::Instant>,
751
752 last_self_warn: Option<std::time::Instant>,
758
759 last_local_send_failure_at: Option<std::time::Instant>,
767 last_rx_loop_maintenance_timeout_at: Option<std::time::Instant>,
772
773 peer_aliases: HashMap<NodeAddr, String>,
777 configured_peer_send_weights: HashMap<NodeAddr, u8>,
780
781 peer_acl: acl::PeerAclReloader,
783
784 host_map: Arc<HostMap>,
788}
789
790impl Node {
791 pub fn new(config: Config) -> Result<Self, NodeError> {
793 config.validate()?;
794 let identity = config.create_identity()?;
795 let node_addr = *identity.node_addr();
796 let is_leaf_only = config.is_leaf_only();
797
798 let (decrypt_fallback_tx, decrypt_fallback_rx) = tokio::sync::mpsc::unbounded_channel();
799 let decrypt_fallback_rx = Some(decrypt_fallback_rx);
800
801 let mut startup_epoch = [0u8; 8];
802 rand::rng().fill_bytes(&mut startup_epoch);
803
804 let mut bloom_state = if is_leaf_only {
805 BloomState::leaf_only(node_addr)
806 } else {
807 BloomState::new(node_addr)
808 };
809 bloom_state.set_update_debounce_ms(config.node.bloom.update_debounce_ms);
810
811 let tun_state = if config.tun.enabled {
812 TunState::Configured
813 } else {
814 TunState::Disabled
815 };
816
817 let mut tree_state = TreeState::new(node_addr);
819 tree_state.set_parent_hysteresis(config.node.tree.parent_hysteresis);
820 tree_state.set_hold_down(config.node.tree.hold_down_secs);
821 tree_state.set_flap_dampening(
822 config.node.tree.flap_threshold,
823 config.node.tree.flap_window_secs,
824 config.node.tree.flap_dampening_secs,
825 );
826 tree_state
827 .sign_declaration(&identity)
828 .expect("signing own declaration should never fail");
829
830 let coord_cache = CoordCache::new(
831 config.node.cache.coord_size,
832 config.node.cache.coord_ttl_secs * 1000,
833 );
834 let rl = &config.node.rate_limit;
835 let msg1_rate_limiter = HandshakeRateLimiter::with_params(
836 rate_limit::TokenBucket::with_params(rl.handshake_burst, rl.handshake_rate),
837 config.node.limits.max_pending_inbound,
838 );
839
840 let max_connections = config.node.limits.max_connections;
841 let max_peers = config.node.limits.max_peers;
842 let max_links = config.node.limits.max_links;
843 let coords_response_interval_ms = config.node.session.coords_response_interval_ms;
844 let backoff_base_secs = config.node.discovery.backoff_base_secs;
845 let backoff_max_secs = config.node.discovery.backoff_max_secs;
846 let forward_min_interval_secs = config.node.discovery.forward_min_interval_secs;
847
848 let (host_map, peer_acl) = Self::host_map_and_peer_acl(&config);
849 let configured_peer_send_weights = Self::configured_peer_send_weights(&config);
850
851 Ok(Self {
852 identity,
853 startup_epoch,
854 started_at: std::time::Instant::now(),
855 config,
856 state: NodeState::Created,
857 is_leaf_only,
858 tree_state,
859 bloom_state,
860 coord_cache,
861 learned_routes: LearnedRouteTable::default(),
862 recent_requests: HashMap::new(),
863 transports: HashMap::new(),
864 transport_drops: HashMap::new(),
865 links: HashMap::new(),
866 addr_to_link: HashMap::new(),
867 packet_tx: None,
868 packet_rx: None,
869 connections: HashMap::new(),
870 peers: HashMap::new(),
871 sessions: HashMap::new(),
872 identity_cache: HashMap::new(),
873 pending_tun_packets: HashMap::new(),
874 pending_endpoint_data: HashMap::new(),
875 pending_lookups: HashMap::new(),
876 max_connections,
877 max_peers,
878 max_links,
879 next_link_id: 1,
880 next_transport_id: 1,
881 stats: stats::NodeStats::new(),
882 stats_history: stats_history::StatsHistory::new(),
883 tun_state,
884 tun_name: None,
885 tun_tx: None,
886 tun_outbound_rx: None,
887 external_packet_tx: None,
888 endpoint_command_rx: None,
889 endpoint_event_tx: None,
890 encrypt_workers: None,
891 decrypt_workers: None,
892 decrypt_registered_sessions: std::collections::HashSet::new(),
893 decrypt_fallback_tx,
894 decrypt_fallback_rx,
895 tun_reader_handle: None,
896 tun_writer_handle: None,
897 #[cfg(target_os = "macos")]
898 tun_shutdown_fd: None,
899 dns_identity_rx: None,
900 dns_task: None,
901 index_allocator: IndexAllocator::new(),
902 peers_by_index: HashMap::new(),
903 pending_outbound: HashMap::new(),
904 msg1_rate_limiter,
905 icmp_rate_limiter: IcmpRateLimiter::new(),
906 routing_error_rate_limiter: RoutingErrorRateLimiter::new(),
907 coords_response_rate_limiter: RoutingErrorRateLimiter::with_interval(
908 std::time::Duration::from_millis(coords_response_interval_ms),
909 ),
910 discovery_backoff: DiscoveryBackoff::with_params(backoff_base_secs, backoff_max_secs),
911 discovery_forward_limiter: DiscoveryForwardRateLimiter::with_interval(
912 std::time::Duration::from_secs(forward_min_interval_secs),
913 ),
914 pending_connects: Vec::new(),
915 retry_pending: HashMap::new(),
916 nostr_discovery: None,
917 nostr_discovery_started_at_ms: None,
918 lan_discovery: None,
919 local_instance_registry: None,
920 local_instance_started_at_ms: None,
921 last_local_instance_publish_ms: None,
922 last_local_instance_scan_ms: None,
923 startup_open_discovery_sweep_done: false,
924 bootstrap_transports: HashSet::new(),
925 bootstrap_transport_npubs: HashMap::new(),
926 discovery_fallback_transit_blocked_peers: HashSet::new(),
927 last_parent_reeval: None,
928 last_congestion_log: None,
929 estimated_mesh_size: None,
930 last_mesh_size_log: None,
931 last_self_warn: None,
932 last_local_send_failure_at: None,
933 last_rx_loop_maintenance_timeout_at: None,
934 peer_aliases: HashMap::new(),
935 configured_peer_send_weights,
936 peer_acl,
937 host_map,
938 path_mtu_lookup: Arc::new(std::sync::RwLock::new(HashMap::new())),
939 })
940 }
941
942 pub fn with_identity(identity: Identity, config: Config) -> Result<Self, NodeError> {
947 config.validate()?;
948 let node_addr = *identity.node_addr();
949
950 let (decrypt_fallback_tx, decrypt_fallback_rx) = tokio::sync::mpsc::unbounded_channel();
951 let decrypt_fallback_rx = Some(decrypt_fallback_rx);
952
953 let mut startup_epoch = [0u8; 8];
954 rand::rng().fill_bytes(&mut startup_epoch);
955
956 let tun_state = if config.tun.enabled {
957 TunState::Configured
958 } else {
959 TunState::Disabled
960 };
961
962 let mut tree_state = TreeState::new(node_addr);
964 tree_state.set_parent_hysteresis(config.node.tree.parent_hysteresis);
965 tree_state.set_hold_down(config.node.tree.hold_down_secs);
966 tree_state.set_flap_dampening(
967 config.node.tree.flap_threshold,
968 config.node.tree.flap_window_secs,
969 config.node.tree.flap_dampening_secs,
970 );
971 tree_state
972 .sign_declaration(&identity)
973 .expect("signing own declaration should never fail");
974
975 let mut bloom_state = BloomState::new(node_addr);
976 bloom_state.set_update_debounce_ms(config.node.bloom.update_debounce_ms);
977
978 let coord_cache = CoordCache::new(
979 config.node.cache.coord_size,
980 config.node.cache.coord_ttl_secs * 1000,
981 );
982 let rl = &config.node.rate_limit;
983 let msg1_rate_limiter = HandshakeRateLimiter::with_params(
984 rate_limit::TokenBucket::with_params(rl.handshake_burst, rl.handshake_rate),
985 config.node.limits.max_pending_inbound,
986 );
987
988 let max_connections = config.node.limits.max_connections;
989 let max_peers = config.node.limits.max_peers;
990 let max_links = config.node.limits.max_links;
991 let coords_response_interval_ms = config.node.session.coords_response_interval_ms;
992
993 let (host_map, peer_acl) = Self::host_map_and_peer_acl(&config);
994 let configured_peer_send_weights = Self::configured_peer_send_weights(&config);
995
996 Ok(Self {
997 identity,
998 startup_epoch,
999 started_at: std::time::Instant::now(),
1000 config,
1001 state: NodeState::Created,
1002 is_leaf_only: false,
1003 tree_state,
1004 bloom_state,
1005 coord_cache,
1006 learned_routes: LearnedRouteTable::default(),
1007 recent_requests: HashMap::new(),
1008 transports: HashMap::new(),
1009 transport_drops: HashMap::new(),
1010 links: HashMap::new(),
1011 addr_to_link: HashMap::new(),
1012 packet_tx: None,
1013 packet_rx: None,
1014 connections: HashMap::new(),
1015 peers: HashMap::new(),
1016 sessions: HashMap::new(),
1017 identity_cache: HashMap::new(),
1018 pending_tun_packets: HashMap::new(),
1019 pending_endpoint_data: HashMap::new(),
1020 pending_lookups: HashMap::new(),
1021 max_connections,
1022 max_peers,
1023 max_links,
1024 next_link_id: 1,
1025 next_transport_id: 1,
1026 stats: stats::NodeStats::new(),
1027 stats_history: stats_history::StatsHistory::new(),
1028 tun_state,
1029 tun_name: None,
1030 tun_tx: None,
1031 tun_outbound_rx: None,
1032 external_packet_tx: None,
1033 endpoint_command_rx: None,
1034 endpoint_event_tx: None,
1035 encrypt_workers: None,
1036 decrypt_workers: None,
1037 decrypt_registered_sessions: std::collections::HashSet::new(),
1038 decrypt_fallback_tx,
1039 decrypt_fallback_rx,
1040 tun_reader_handle: None,
1041 tun_writer_handle: None,
1042 #[cfg(target_os = "macos")]
1043 tun_shutdown_fd: None,
1044 dns_identity_rx: None,
1045 dns_task: None,
1046 index_allocator: IndexAllocator::new(),
1047 peers_by_index: HashMap::new(),
1048 pending_outbound: HashMap::new(),
1049 msg1_rate_limiter,
1050 icmp_rate_limiter: IcmpRateLimiter::new(),
1051 routing_error_rate_limiter: RoutingErrorRateLimiter::new(),
1052 coords_response_rate_limiter: RoutingErrorRateLimiter::with_interval(
1053 std::time::Duration::from_millis(coords_response_interval_ms),
1054 ),
1055 discovery_backoff: DiscoveryBackoff::new(),
1056 discovery_forward_limiter: DiscoveryForwardRateLimiter::new(),
1057 pending_connects: Vec::new(),
1058 retry_pending: HashMap::new(),
1059 nostr_discovery: None,
1060 nostr_discovery_started_at_ms: None,
1061 lan_discovery: None,
1062 local_instance_registry: None,
1063 local_instance_started_at_ms: None,
1064 last_local_instance_publish_ms: None,
1065 last_local_instance_scan_ms: None,
1066 startup_open_discovery_sweep_done: false,
1067 bootstrap_transports: HashSet::new(),
1068 bootstrap_transport_npubs: HashMap::new(),
1069 discovery_fallback_transit_blocked_peers: HashSet::new(),
1070 last_parent_reeval: None,
1071 last_congestion_log: None,
1072 estimated_mesh_size: None,
1073 last_mesh_size_log: None,
1074 last_self_warn: None,
1075 last_local_send_failure_at: None,
1076 last_rx_loop_maintenance_timeout_at: None,
1077 peer_aliases: HashMap::new(),
1078 configured_peer_send_weights,
1079 peer_acl,
1080 host_map,
1081 path_mtu_lookup: Arc::new(std::sync::RwLock::new(HashMap::new())),
1082 })
1083 }
1084
1085 pub fn leaf_only(config: Config) -> Result<Self, NodeError> {
1087 let mut node = Self::new(config)?;
1088 node.is_leaf_only = true;
1089 node.bloom_state = BloomState::leaf_only(*node.identity.node_addr());
1090 Ok(node)
1091 }
1092
1093 fn host_map_and_peer_acl(config: &Config) -> (Arc<HostMap>, acl::PeerAclReloader) {
1094 let base_host_map = HostMap::from_peer_configs(config.peers());
1095 if !config.node.system_files_enabled {
1096 return (
1097 Arc::new(base_host_map.clone()),
1098 acl::PeerAclReloader::memory_only(base_host_map),
1099 );
1100 }
1101
1102 let mut host_map = base_host_map.clone();
1103 let hosts_path = std::path::PathBuf::from(crate::upper::hosts::DEFAULT_HOSTS_PATH);
1104 let hosts_file = HostMap::load_hosts_file(std::path::Path::new(
1105 crate::upper::hosts::DEFAULT_HOSTS_PATH,
1106 ));
1107 host_map.merge(hosts_file);
1108 let peer_acl = acl::PeerAclReloader::with_alias_sources(
1109 std::path::PathBuf::from(acl::DEFAULT_PEERS_ALLOW_PATH),
1110 std::path::PathBuf::from(acl::DEFAULT_PEERS_DENY_PATH),
1111 base_host_map,
1112 hosts_path,
1113 );
1114 (Arc::new(host_map), peer_acl)
1115 }
1116
1117 fn configured_peer_send_weights(config: &Config) -> HashMap<NodeAddr, u8> {
1118 config
1119 .peers()
1120 .iter()
1121 .filter_map(|peer| {
1122 PeerIdentity::from_npub(&peer.npub).ok().map(|identity| {
1123 (
1124 *identity.node_addr(),
1125 encrypt_worker::EXPLICIT_PEER_SEND_WEIGHT,
1126 )
1127 })
1128 })
1129 .collect()
1130 }
1131
1132 fn send_weight_for_peer(&self, peer_addr: &NodeAddr) -> u8 {
1133 self.configured_peer_send_weights
1134 .get(peer_addr)
1135 .copied()
1136 .unwrap_or(encrypt_worker::DEFAULT_SEND_WEIGHT)
1137 }
1138
1139 async fn create_transports(&mut self, packet_tx: &PacketTx) -> Vec<TransportHandle> {
1143 let mut transports = Vec::new();
1144
1145 let udp_instances: Vec<_> = self
1147 .config
1148 .transports
1149 .udp
1150 .iter()
1151 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1152 .collect();
1153
1154 for (name, udp_config) in udp_instances {
1156 let transport_id = self.allocate_transport_id();
1157 let udp = UdpTransport::new(transport_id, name, udp_config, packet_tx.clone());
1158 transports.push(TransportHandle::Udp(udp));
1159 }
1160
1161 #[cfg(feature = "sim-transport")]
1162 {
1163 let sim_instances: Vec<_> = self
1164 .config
1165 .transports
1166 .sim
1167 .iter()
1168 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1169 .collect();
1170
1171 for (name, sim_config) in sim_instances {
1172 let transport_id = self.allocate_transport_id();
1173 let sim = crate::transport::sim::SimTransport::new(
1174 transport_id,
1175 name,
1176 sim_config,
1177 packet_tx.clone(),
1178 );
1179 transports.push(TransportHandle::Sim(sim));
1180 }
1181 }
1182
1183 #[cfg(any(target_os = "linux", target_os = "macos"))]
1185 {
1186 let eth_instances: Vec<_> = self
1187 .config
1188 .transports
1189 .ethernet
1190 .iter()
1191 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1192 .collect();
1193 let xonly = self.identity.pubkey();
1194 for (name, eth_config) in eth_instances {
1195 let mut eth_config = eth_config;
1196 if eth_config.discovery_scope.is_none() {
1197 eth_config.discovery_scope = self.lan_discovery_scope();
1198 }
1199 let transport_id = self.allocate_transport_id();
1200 let mut eth =
1201 EthernetTransport::new(transport_id, name, eth_config, packet_tx.clone());
1202 eth.set_local_pubkey(xonly);
1203 transports.push(TransportHandle::Ethernet(eth));
1204 }
1205 }
1206
1207 let tcp_instances: Vec<_> = self
1209 .config
1210 .transports
1211 .tcp
1212 .iter()
1213 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1214 .collect();
1215
1216 for (name, tcp_config) in tcp_instances {
1217 let transport_id = self.allocate_transport_id();
1218 let tcp = TcpTransport::new(transport_id, name, tcp_config, packet_tx.clone());
1219 transports.push(TransportHandle::Tcp(tcp));
1220 }
1221
1222 let tor_instances: Vec<_> = self
1224 .config
1225 .transports
1226 .tor
1227 .iter()
1228 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1229 .collect();
1230
1231 for (name, tor_config) in tor_instances {
1232 let transport_id = self.allocate_transport_id();
1233 let tor = TorTransport::new(transport_id, name, tor_config, packet_tx.clone());
1234 transports.push(TransportHandle::Tor(tor));
1235 }
1236
1237 let webrtc_instances: Vec<_> = self
1238 .config
1239 .transports
1240 .webrtc
1241 .iter()
1242 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1243 .collect();
1244
1245 #[cfg(feature = "webrtc-transport")]
1246 {
1247 for (name, webrtc_config) in webrtc_instances {
1248 let transport_id = self.allocate_transport_id();
1249 match WebRtcTransport::new(
1250 transport_id,
1251 name,
1252 webrtc_config,
1253 packet_tx.clone(),
1254 &self.identity,
1255 &self.config.node.discovery.nostr,
1256 ) {
1257 Ok(webrtc) => transports.push(TransportHandle::WebRtc(Box::new(webrtc))),
1258 Err(err) => {
1259 warn!(
1260 transport_id = %transport_id,
1261 error = %err,
1262 "failed to initialize WebRTC transport"
1263 );
1264 }
1265 }
1266 }
1267 }
1268 #[cfg(not(feature = "webrtc-transport"))]
1269 if !webrtc_instances.is_empty() {
1270 warn!("WebRTC transport configured but this build lacks WebRTC transport support");
1271 }
1272
1273 #[cfg(bluer_available)]
1275 {
1276 let ble_instances: Vec<_> = self
1277 .config
1278 .transports
1279 .ble
1280 .iter()
1281 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1282 .collect();
1283
1284 #[cfg(all(bluer_available, not(test)))]
1285 for (name, ble_config) in ble_instances {
1286 let transport_id = self.allocate_transport_id();
1287 let adapter = ble_config.adapter().to_string();
1288 let mtu = ble_config.mtu();
1289 match crate::transport::ble::io::BluerIo::new(&adapter, mtu).await {
1290 Ok(io) => {
1291 let mut ble = crate::transport::ble::BleTransport::new(
1292 transport_id,
1293 name,
1294 ble_config,
1295 io,
1296 packet_tx.clone(),
1297 );
1298 ble.set_local_pubkey(self.identity.pubkey().serialize());
1299 transports.push(TransportHandle::Ble(ble));
1300 }
1301 Err(e) => {
1302 tracing::warn!(adapter = %adapter, error = %e, "failed to initialize BLE adapter");
1303 }
1304 }
1305 }
1306
1307 #[cfg(any(not(bluer_available), test))]
1308 if !ble_instances.is_empty() {
1309 #[cfg(not(test))]
1310 tracing::warn!("BLE transport configured but this build lacks BlueZ support");
1311 }
1312 }
1313
1314 transports
1315 }
1316
1317 fn find_transport_for_type(&self, transport_type: &str) -> Option<TransportId> {
1327 self.transports
1328 .iter()
1329 .filter(|(id, handle)| {
1330 handle.transport_type().name == transport_type
1331 && handle.is_operational()
1332 && !self.bootstrap_transports.contains(id)
1333 })
1334 .min_by_key(|(id, _)| id.as_u32())
1335 .map(|(id, _)| *id)
1336 }
1337
1338 #[allow(unused_variables)]
1344 fn resolve_ethernet_addr(
1345 &self,
1346 addr_str: &str,
1347 ) -> Result<(TransportId, TransportAddr), NodeError> {
1348 #[cfg(any(target_os = "linux", target_os = "macos"))]
1349 {
1350 let (iface, mac_str) = addr_str.split_once('/').ok_or_else(|| {
1351 NodeError::NoTransportForType(format!(
1352 "invalid Ethernet address format '{}': expected 'interface/mac'",
1353 addr_str
1354 ))
1355 })?;
1356
1357 let transport_id = self
1359 .transports
1360 .iter()
1361 .find(|(_, handle)| {
1362 handle.transport_type().name == "ethernet"
1363 && handle.is_operational()
1364 && handle.interface_name() == Some(iface)
1365 })
1366 .map(|(id, _)| *id)
1367 .ok_or_else(|| {
1368 NodeError::NoTransportForType(format!(
1369 "no operational Ethernet transport for interface '{}'",
1370 iface
1371 ))
1372 })?;
1373
1374 let mac = crate::transport::ethernet::parse_mac_string(mac_str).map_err(|e| {
1375 NodeError::NoTransportForType(format!("invalid MAC in '{}': {}", addr_str, e))
1376 })?;
1377
1378 Ok((transport_id, TransportAddr::from_bytes(&mac)))
1379 }
1380 #[cfg(not(any(target_os = "linux", target_os = "macos")))]
1381 {
1382 Err(NodeError::NoTransportForType(
1383 "Ethernet transport is not supported on this platform".to_string(),
1384 ))
1385 }
1386 }
1387
1388 #[cfg(bluer_available)]
1392 fn resolve_ble_addr(&self, addr_str: &str) -> Result<(TransportId, TransportAddr), NodeError> {
1393 let ta = TransportAddr::from_string(addr_str);
1394 let adapter = crate::transport::ble::addr::adapter_from_addr(&ta).ok_or_else(|| {
1395 NodeError::NoTransportForType(format!(
1396 "invalid BLE address format '{}': expected 'adapter/mac'",
1397 addr_str
1398 ))
1399 })?;
1400
1401 let transport_id = self
1403 .transports
1404 .iter()
1405 .find(|(_, handle)| handle.transport_type().name == "ble" && handle.is_operational())
1406 .map(|(id, _)| *id)
1407 .ok_or_else(|| {
1408 NodeError::NoTransportForType(format!(
1409 "no operational BLE transport for adapter '{}'",
1410 adapter
1411 ))
1412 })?;
1413
1414 crate::transport::ble::addr::BleAddr::parse(addr_str).map_err(|e| {
1416 NodeError::NoTransportForType(format!("invalid BLE address '{}': {}", addr_str, e))
1417 })?;
1418
1419 Ok((transport_id, TransportAddr::from_string(addr_str)))
1420 }
1421
1422 pub fn identity(&self) -> &Identity {
1426 &self.identity
1427 }
1428
1429 pub fn node_addr(&self) -> &NodeAddr {
1431 self.identity.node_addr()
1432 }
1433
1434 pub fn npub(&self) -> String {
1436 self.identity.npub()
1437 }
1438
1439 pub(crate) fn peer_display_name(&self, addr: &NodeAddr) -> String {
1448 if let Some(hostname) = self.host_map.lookup_hostname(addr) {
1449 return hostname.to_string();
1450 }
1451 if let Some(name) = self.peer_aliases.get(addr) {
1452 return name.clone();
1453 }
1454 if let Some(peer) = self.peers.get(addr) {
1455 return peer.identity().short_npub();
1456 }
1457 if let Some(entry) = self.sessions.get(addr) {
1458 let (xonly, _) = entry.remote_pubkey().x_only_public_key();
1459 return PeerIdentity::from_pubkey(xonly).short_npub();
1460 }
1461 addr.short_hex()
1462 }
1463
1464 pub(in crate::node) fn deregister_session_index(&mut self, cache_key: (TransportId, u32)) {
1476 let owning_peer = self.peers_by_index.get(&cache_key).copied();
1480 self.peers_by_index.remove(&cache_key);
1481 if self.decrypt_registered_sessions.remove(&cache_key)
1482 && let Some(workers) = self.decrypt_workers.as_ref()
1483 {
1484 workers.unregister_session(cache_key);
1485 }
1486 if let Some(peer_addr) = owning_peer {
1497 let peer_has_other_index = self
1498 .peers_by_index
1499 .values()
1500 .any(|other| *other == peer_addr);
1501 if !peer_has_other_index {
1502 self.clear_connected_udp_for_peer(&peer_addr);
1503 }
1504 }
1505 }
1506
1507 pub(in crate::node) fn ensure_current_session_index_registered(
1516 &mut self,
1517 node_addr: &NodeAddr,
1518 context: &'static str,
1519 ) -> bool {
1520 let Some(peer) = self.peers.get(node_addr) else {
1521 return false;
1522 };
1523 let Some(transport_id) = peer.transport_id() else {
1524 warn!(
1525 peer = %self.peer_display_name(node_addr),
1526 context,
1527 "Cannot register current session index without transport id"
1528 );
1529 return false;
1530 };
1531 let Some(our_index) = peer.our_index() else {
1532 warn!(
1533 peer = %self.peer_display_name(node_addr),
1534 context,
1535 "Cannot register current session index without local index"
1536 );
1537 return false;
1538 };
1539
1540 let cache_key = (transport_id, our_index.as_u32());
1541 match self.peers_by_index.get(&cache_key).copied() {
1542 Some(existing) if existing == *node_addr => true,
1543 Some(existing) => {
1544 warn!(
1545 peer = %self.peer_display_name(node_addr),
1546 previous_owner = %self.peer_display_name(&existing),
1547 transport_id = %transport_id,
1548 our_index = %our_index,
1549 context,
1550 "Repairing current session index with stale owner"
1551 );
1552 self.peers_by_index.insert(cache_key, *node_addr);
1553 true
1554 }
1555 None => {
1556 warn!(
1557 peer = %self.peer_display_name(node_addr),
1558 transport_id = %transport_id,
1559 our_index = %our_index,
1560 context,
1561 "Repairing missing current session index"
1562 );
1563 self.peers_by_index.insert(cache_key, *node_addr);
1564 true
1565 }
1566 }
1567 }
1568
1569 pub fn config(&self) -> &Config {
1573 &self.config
1574 }
1575
1576 pub fn effective_ipv6_mtu(&self) -> u16 {
1582 crate::upper::icmp::effective_ipv6_mtu(self.transport_mtu())
1583 }
1584
1585 pub fn transport_mtu(&self) -> u16 {
1602 let min_operational = self
1603 .transports
1604 .values()
1605 .filter(|h| h.is_operational())
1606 .map(|h| h.mtu())
1607 .min();
1608 if let Some(mtu) = min_operational {
1609 return mtu;
1610 }
1611 if let Some((_, cfg)) = self.config.transports.udp.iter().next() {
1613 return cfg.mtu();
1614 }
1615 1280
1616 }
1617
1618 pub fn state(&self) -> NodeState {
1622 self.state
1623 }
1624
1625 pub fn uptime(&self) -> std::time::Duration {
1627 self.started_at.elapsed()
1628 }
1629
1630 pub fn is_running(&self) -> bool {
1632 self.state.is_operational()
1633 }
1634
1635 pub fn is_leaf_only(&self) -> bool {
1637 self.is_leaf_only
1638 }
1639
1640 pub fn tree_state(&self) -> &TreeState {
1644 &self.tree_state
1645 }
1646
1647 pub fn tree_state_mut(&mut self) -> &mut TreeState {
1649 &mut self.tree_state
1650 }
1651
1652 pub fn bloom_state(&self) -> &BloomState {
1656 &self.bloom_state
1657 }
1658
1659 pub fn bloom_state_mut(&mut self) -> &mut BloomState {
1661 &mut self.bloom_state
1662 }
1663
1664 pub fn estimated_mesh_size(&self) -> Option<u64> {
1668 self.estimated_mesh_size
1669 }
1670
1671 pub(crate) fn compute_mesh_size(&mut self) {
1677 let my_addr = *self.tree_state.my_node_addr();
1678 let parent_id = *self.tree_state.my_declaration().parent_id();
1679 let is_root = self.tree_state.is_root();
1680
1681 let max_fpr = self.config.node.bloom.max_inbound_fpr;
1682 let mut total: f64 = 1.0; let mut child_count: u32 = 0;
1684 let mut has_data = false;
1685
1686 if !is_root
1692 && let Some(parent) = self.peers.get(&parent_id)
1693 && let Some(filter) = parent.inbound_filter()
1694 {
1695 match filter.estimated_count(max_fpr) {
1696 Some(n) => {
1697 total += n;
1698 has_data = true;
1699 }
1700 None => {
1701 self.estimated_mesh_size = None;
1702 return;
1703 }
1704 }
1705 }
1706
1707 for (peer_addr, peer) in &self.peers {
1709 if peer_addr == &parent_id {
1710 continue;
1711 }
1712 if let Some(decl) = self.tree_state.peer_declaration(peer_addr)
1713 && *decl.parent_id() == my_addr
1714 {
1715 child_count += 1;
1716 if let Some(filter) = peer.inbound_filter() {
1717 match filter.estimated_count(max_fpr) {
1718 Some(n) => {
1719 total += n;
1720 has_data = true;
1721 }
1722 None => {
1723 self.estimated_mesh_size = None;
1724 return;
1725 }
1726 }
1727 }
1728 }
1729 }
1730
1731 if !has_data {
1732 self.estimated_mesh_size = None;
1733 return;
1734 }
1735
1736 let size = total.round() as u64;
1737 self.estimated_mesh_size = Some(size);
1738
1739 let now = std::time::Instant::now();
1741 let should_log = match self.last_mesh_size_log {
1742 None => true,
1743 Some(last) => {
1744 now.duration_since(last)
1745 >= std::time::Duration::from_secs(self.config.node.mmp.log_interval_secs)
1746 }
1747 };
1748 if should_log {
1749 tracing::debug!(
1750 estimated_mesh_size = size,
1751 peers = self.peers.len(),
1752 children = child_count,
1753 "Mesh size estimate"
1754 );
1755 self.last_mesh_size_log = Some(now);
1756 }
1757 }
1758
1759 pub fn coord_cache(&self) -> &CoordCache {
1763 &self.coord_cache
1764 }
1765
1766 pub fn coord_cache_mut(&mut self) -> &mut CoordCache {
1768 &mut self.coord_cache
1769 }
1770
1771 pub fn stats(&self) -> &stats::NodeStats {
1775 &self.stats
1776 }
1777
1778 pub(crate) fn stats_mut(&mut self) -> &mut stats::NodeStats {
1780 &mut self.stats
1781 }
1782
1783 pub fn stats_history(&self) -> &stats_history::StatsHistory {
1785 &self.stats_history
1786 }
1787
1788 pub(crate) fn record_stats_history(&mut self) {
1791 let fwd = &self.stats.forwarding;
1792 let peers_with_mmp: Vec<f64> = self
1793 .peers
1794 .values()
1795 .filter_map(|p| p.mmp().map(|m| m.metrics.loss_rate()))
1796 .collect();
1797 let loss_rate = if peers_with_mmp.is_empty() {
1798 0.0
1799 } else {
1800 peers_with_mmp.iter().sum::<f64>() / peers_with_mmp.len() as f64
1801 };
1802
1803 let snap = stats_history::Snapshot {
1804 mesh_size: self.estimated_mesh_size,
1805 tree_depth: self.tree_state.my_coords().depth() as u32,
1806 peer_count: self.peers.len() as u64,
1807 parent_switches_total: self.stats.tree.parent_switches,
1808 bytes_in_total: fwd.received_bytes,
1809 bytes_out_total: fwd.forwarded_bytes + fwd.originated_bytes,
1810 packets_in_total: fwd.received_packets,
1811 packets_out_total: fwd.forwarded_packets + fwd.originated_packets,
1812 loss_rate,
1813 active_sessions: self.sessions.len() as u64,
1814 };
1815
1816 let now = std::time::Instant::now();
1817 let peer_snaps: Vec<stats_history::PeerSnapshot> = self
1818 .peers
1819 .values()
1820 .map(|p| {
1821 let stats = p.link_stats();
1822 let (srtt_ms, loss_rate, ecn_ce) = match p.mmp() {
1823 Some(m) => (
1824 m.metrics.srtt_ms(),
1825 Some(m.metrics.loss_rate()),
1826 m.receiver.ecn_ce_count() as u64,
1827 ),
1828 None => (None, None, 0),
1829 };
1830 stats_history::PeerSnapshot {
1831 node_addr: *p.node_addr(),
1832 last_seen: now,
1833 srtt_ms,
1834 loss_rate,
1835 bytes_in_total: stats.bytes_recv,
1836 bytes_out_total: stats.bytes_sent,
1837 packets_in_total: stats.packets_recv,
1838 packets_out_total: stats.packets_sent,
1839 ecn_ce_total: ecn_ce,
1840 }
1841 })
1842 .collect();
1843
1844 self.stats_history.tick(now, &snap, &peer_snaps);
1845 }
1846
1847 pub fn tun_state(&self) -> TunState {
1851 self.tun_state
1852 }
1853
1854 pub fn tun_name(&self) -> Option<&str> {
1856 self.tun_name.as_deref()
1857 }
1858
1859 pub fn set_max_connections(&mut self, max: usize) {
1863 self.max_connections = max;
1864 }
1865
1866 pub fn set_max_peers(&mut self, max: usize) {
1868 self.max_peers = max;
1869 }
1870
1871 pub(crate) fn outbound_admission_check(&self) -> bool {
1874 let connection_used = self
1875 .connections
1876 .len()
1877 .saturating_add(self.pending_connects.len());
1878 let peer_allowed = self.max_peers == 0 || self.peers.len() < self.max_peers;
1879 let connection_allowed =
1880 self.max_connections == 0 || connection_used < self.max_connections;
1881 let link_allowed = self.max_links == 0 || self.links.len() < self.max_links;
1882 peer_allowed && connection_allowed && link_allowed
1883 }
1884
1885 pub(crate) fn open_discovery_outbound_admission_check(&self) -> bool {
1889 if !self.outbound_admission_check() {
1890 return false;
1891 }
1892
1893 let nostr = &self.config.node.discovery.nostr;
1894 if !nostr.enabled || nostr.policy != NostrDiscoveryPolicy::Open {
1895 return true;
1896 }
1897
1898 let configured_npubs = self
1899 .config
1900 .peers()
1901 .iter()
1902 .map(|peer| peer.npub.clone())
1903 .collect::<HashSet<_>>();
1904 self.open_discovery_enqueue_budget(&configured_npubs) > 0
1905 }
1906
1907 pub(crate) fn outbound_direct_refresh_admission_check(&self) -> bool {
1911 let connection_used = self
1912 .connections
1913 .len()
1914 .saturating_add(self.pending_connects.len());
1915 let connection_allowed =
1916 self.max_connections == 0 || connection_used < self.max_connections;
1917 let link_allowed = self.max_links == 0 || self.links.len() < self.max_links;
1918 connection_allowed && link_allowed
1919 }
1920
1921 pub fn set_max_links(&mut self, max: usize) {
1923 self.max_links = max;
1924 }
1925
1926 pub fn connection_count(&self) -> usize {
1930 self.connections.len()
1931 }
1932
1933 pub fn peer_count(&self) -> usize {
1935 self.peers.len()
1936 }
1937
1938 pub fn link_count(&self) -> usize {
1940 self.links.len()
1941 }
1942
1943 pub fn transport_count(&self) -> usize {
1945 self.transports.len()
1946 }
1947
1948 pub fn allocate_transport_id(&mut self) -> TransportId {
1952 let id = TransportId::new(self.next_transport_id);
1953 self.next_transport_id += 1;
1954 id
1955 }
1956
1957 pub fn get_transport(&self, id: &TransportId) -> Option<&TransportHandle> {
1959 self.transports.get(id)
1960 }
1961
1962 pub fn get_transport_mut(&mut self, id: &TransportId) -> Option<&mut TransportHandle> {
1964 self.transports.get_mut(id)
1965 }
1966
1967 pub fn transport_ids(&self) -> impl Iterator<Item = &TransportId> {
1969 self.transports.keys()
1970 }
1971
1972 pub fn packet_rx(&mut self) -> Option<&mut PacketRx> {
1974 self.packet_rx.as_mut()
1975 }
1976
1977 pub fn allocate_link_id(&mut self) -> LinkId {
1981 let id = LinkId::new(self.next_link_id);
1982 self.next_link_id += 1;
1983 id
1984 }
1985
1986 pub fn add_link(&mut self, link: Link) -> Result<(), NodeError> {
1988 if self.max_links > 0 && self.links.len() >= self.max_links {
1989 return Err(NodeError::MaxLinksExceeded {
1990 max: self.max_links,
1991 });
1992 }
1993 let link_id = link.link_id();
1994 let transport_id = link.transport_id();
1995 let remote_addr = link.remote_addr().clone();
1996
1997 self.links.insert(link_id, link);
1998 self.addr_to_link
1999 .insert((transport_id, remote_addr), link_id);
2000 Ok(())
2001 }
2002
2003 pub fn get_link(&self, link_id: &LinkId) -> Option<&Link> {
2005 self.links.get(link_id)
2006 }
2007
2008 pub fn get_link_mut(&mut self, link_id: &LinkId) -> Option<&mut Link> {
2010 self.links.get_mut(link_id)
2011 }
2012
2013 pub fn find_link_by_addr(
2015 &self,
2016 transport_id: TransportId,
2017 addr: &TransportAddr,
2018 ) -> Option<LinkId> {
2019 self.addr_to_link
2020 .get(&(transport_id, addr.clone()))
2021 .copied()
2022 }
2023
2024 pub fn remove_link(&mut self, link_id: &LinkId) -> Option<Link> {
2030 if let Some(link) = self.links.remove(link_id) {
2031 let key = (link.transport_id(), link.remote_addr().clone());
2033 if self.addr_to_link.get(&key) == Some(link_id) {
2034 self.addr_to_link.remove(&key);
2035 }
2036 Some(link)
2037 } else {
2038 None
2039 }
2040 }
2041
2042 pub(crate) fn cleanup_bootstrap_transport_if_unused(&mut self, transport_id: TransportId) {
2043 if !self.bootstrap_transports.contains(&transport_id) {
2044 return;
2045 }
2046
2047 let transport_in_use = self
2048 .links
2049 .values()
2050 .any(|link| link.transport_id() == transport_id)
2051 || self
2052 .connections
2053 .values()
2054 .any(|conn| conn.transport_id() == Some(transport_id))
2055 || self
2056 .peers
2057 .values()
2058 .any(|peer| peer.transport_id() == Some(transport_id))
2059 || self
2060 .pending_connects
2061 .iter()
2062 .any(|pending| pending.transport_id == transport_id);
2063
2064 if transport_in_use {
2065 return;
2066 }
2067
2068 tracing::debug!(
2069 transport_id = %transport_id,
2070 "bootstrap transport has no remaining references; dropping"
2071 );
2072
2073 self.bootstrap_transports.remove(&transport_id);
2074 self.bootstrap_transport_npubs.remove(&transport_id);
2075 self.transport_drops.remove(&transport_id);
2076 self.transports.remove(&transport_id);
2077 }
2078
2079 pub fn links(&self) -> impl Iterator<Item = &Link> {
2081 self.links.values()
2082 }
2083
2084 pub fn add_connection(&mut self, connection: PeerConnection) -> Result<(), NodeError> {
2088 let link_id = connection.link_id();
2089
2090 if self.connections.contains_key(&link_id) {
2091 return Err(NodeError::ConnectionAlreadyExists(link_id));
2092 }
2093
2094 if self.max_connections > 0 && self.connections.len() >= self.max_connections {
2095 return Err(NodeError::MaxConnectionsExceeded {
2096 max: self.max_connections,
2097 });
2098 }
2099
2100 self.connections.insert(link_id, connection);
2101 Ok(())
2102 }
2103
2104 pub fn get_connection(&self, link_id: &LinkId) -> Option<&PeerConnection> {
2106 self.connections.get(link_id)
2107 }
2108
2109 pub fn get_connection_mut(&mut self, link_id: &LinkId) -> Option<&mut PeerConnection> {
2111 self.connections.get_mut(link_id)
2112 }
2113
2114 pub fn remove_connection(&mut self, link_id: &LinkId) -> Option<PeerConnection> {
2116 self.connections.remove(link_id)
2117 }
2118
2119 pub fn connections(&self) -> impl Iterator<Item = &PeerConnection> {
2121 self.connections.values()
2122 }
2123
2124 pub fn get_peer(&self, node_addr: &NodeAddr) -> Option<&ActivePeer> {
2128 self.peers.get(node_addr)
2129 }
2130
2131 pub fn get_peer_mut(&mut self, node_addr: &NodeAddr) -> Option<&mut ActivePeer> {
2133 self.peers.get_mut(node_addr)
2134 }
2135
2136 pub fn remove_peer(&mut self, node_addr: &NodeAddr) -> Option<ActivePeer> {
2138 self.peers.remove(node_addr)
2139 }
2140
2141 pub fn peers(&self) -> impl Iterator<Item = &ActivePeer> {
2143 self.peers.values()
2144 }
2145
2146 pub fn nostr_discovery_handle(&self) -> Option<&crate::discovery::nostr::NostrDiscovery> {
2150 self.nostr_discovery.as_deref()
2151 }
2152
2153 pub fn peer_ids(&self) -> impl Iterator<Item = &NodeAddr> {
2155 self.peers.keys()
2156 }
2157
2158 pub fn sendable_peers(&self) -> impl Iterator<Item = &ActivePeer> {
2160 self.peers.values().filter(|p| p.can_send())
2161 }
2162
2163 pub fn sendable_peer_count(&self) -> usize {
2165 self.peers.values().filter(|p| p.can_send()).count()
2166 }
2167
2168 pub(crate) fn set_discovery_fallback_transit_allowed(
2169 &mut self,
2170 peer_addr: NodeAddr,
2171 allowed: bool,
2172 ) {
2173 if allowed {
2174 self.discovery_fallback_transit_blocked_peers
2175 .remove(&peer_addr);
2176 } else {
2177 self.discovery_fallback_transit_blocked_peers
2178 .insert(peer_addr);
2179 }
2180 }
2181
2182 pub(crate) fn configured_discovery_fallback_transit(
2183 &self,
2184 peer_addr: &NodeAddr,
2185 ) -> Option<bool> {
2186 self.configured_peer(peer_addr)
2187 .map(|peer| peer.discovery_fallback_transit)
2188 }
2189
2190 pub(crate) fn configured_peer(&self, peer_addr: &NodeAddr) -> Option<&PeerConfig> {
2191 self.config.peers().iter().find(|peer| {
2192 PeerIdentity::from_npub(&peer.npub)
2193 .ok()
2194 .is_some_and(|identity| identity.node_addr() == peer_addr)
2195 })
2196 }
2197
2198 pub(crate) fn discovery_fallback_transit_for_promotion(&self, peer_addr: &NodeAddr) -> bool {
2199 if let Some(retry_state) = self.retry_pending.get(peer_addr) {
2200 return retry_state.peer_config.discovery_fallback_transit;
2201 }
2202
2203 if let Some(allowed) = self.configured_discovery_fallback_transit(peer_addr) {
2204 return allowed;
2205 }
2206
2207 self.config.node.discovery.nostr.policy != crate::config::NostrDiscoveryPolicy::Open
2208 }
2209
2210 #[cfg(test)]
2215 pub(crate) fn disable_discovery_forward_rate_limit(&mut self) {
2216 self.discovery_forward_limiter
2217 .set_interval(std::time::Duration::ZERO);
2218 }
2219
2220 #[cfg(test)]
2221 pub(crate) fn get_session(&self, remote: &NodeAddr) -> Option<&SessionEntry> {
2222 self.sessions.get(remote)
2223 }
2224
2225 #[cfg(test)]
2227 pub(crate) fn get_session_mut(&mut self, remote: &NodeAddr) -> Option<&mut SessionEntry> {
2228 self.sessions.get_mut(remote)
2229 }
2230
2231 #[cfg(test)]
2233 pub(crate) fn remove_session(&mut self, remote: &NodeAddr) -> Option<SessionEntry> {
2234 self.sessions.remove(remote)
2235 }
2236
2237 #[cfg(test)]
2239 pub(crate) fn path_mtu_lookup_get(&self, fips_addr: &crate::FipsAddress) -> Option<u16> {
2240 self.path_mtu_lookup
2241 .read()
2242 .ok()
2243 .and_then(|map| map.get(fips_addr).copied())
2244 }
2245
2246 #[cfg(test)]
2248 pub(crate) fn path_mtu_lookup_insert(&self, fips_addr: crate::FipsAddress, mtu: u16) {
2249 if let Ok(mut map) = self.path_mtu_lookup.write() {
2250 map.insert(fips_addr, mtu);
2251 }
2252 }
2253
2254 pub fn session_count(&self) -> usize {
2256 self.sessions.len()
2257 }
2258
2259 pub(crate) fn session_entries(&self) -> impl Iterator<Item = (&NodeAddr, &SessionEntry)> {
2261 self.sessions.iter()
2262 }
2263
2264 pub(crate) fn register_identity(
2268 &mut self,
2269 node_addr: NodeAddr,
2270 pubkey: secp256k1::PublicKey,
2271 ) -> bool {
2272 let mut prefix = [0u8; 15];
2273 prefix.copy_from_slice(&node_addr.as_bytes()[0..15]);
2274 if let Some(entry) = self.identity_cache.get(&prefix)
2275 && entry.node_addr == node_addr
2276 && entry.pubkey == pubkey
2277 {
2278 return true;
2282 }
2283
2284 let (xonly, _) = pubkey.x_only_public_key();
2285 let derived_node_addr = NodeAddr::from_pubkey(&xonly);
2286 if derived_node_addr != node_addr {
2287 debug!(
2288 claimed_node_addr = %node_addr,
2289 derived_node_addr = %derived_node_addr,
2290 "Rejected identity cache entry with mismatched public key"
2291 );
2292 return false;
2293 }
2294
2295 let now_ms = Self::now_ms();
2296 if let Some(entry) = self.identity_cache.get_mut(&prefix)
2297 && entry.node_addr == node_addr
2298 {
2299 entry.pubkey = pubkey;
2300 entry.last_seen_ms = now_ms;
2301 return true;
2302 }
2303
2304 let npub = encode_npub(&xonly);
2305 self.identity_cache.insert(
2306 prefix,
2307 IdentityCacheEntry::new(node_addr, pubkey, npub, now_ms),
2308 );
2309 let max = self.config.node.cache.identity_size;
2311 if self.identity_cache.len() > max
2312 && let Some(oldest_key) = self
2313 .identity_cache
2314 .iter()
2315 .min_by_key(|(_, entry)| entry.last_seen_ms)
2316 .map(|(k, _)| *k)
2317 {
2318 self.identity_cache.remove(&oldest_key);
2319 }
2320 true
2321 }
2322
2323 pub(crate) fn lookup_by_fips_prefix(
2325 &mut self,
2326 prefix: &[u8; 15],
2327 ) -> Option<(NodeAddr, secp256k1::PublicKey)> {
2328 if let Some(entry) = self.identity_cache.get_mut(prefix) {
2329 entry.last_seen_ms = Self::now_ms(); Some((entry.node_addr, entry.pubkey))
2331 } else {
2332 None
2333 }
2334 }
2335
2336 pub(crate) fn has_cached_identity(&self, addr: &NodeAddr) -> bool {
2338 let mut prefix = [0u8; 15];
2339 prefix.copy_from_slice(&addr.as_bytes()[0..15]);
2340 self.identity_cache.contains_key(&prefix)
2341 }
2342
2343 pub fn identity_cache_len(&self) -> usize {
2345 self.identity_cache.len()
2346 }
2347
2348 pub fn identity_cache_iter(
2353 &self,
2354 ) -> impl Iterator<Item = (&NodeAddr, &secp256k1::PublicKey, u64)> {
2355 self.identity_cache
2356 .values()
2357 .map(|entry| (&entry.node_addr, &entry.pubkey, entry.last_seen_ms))
2358 }
2359
2360 pub fn identity_cache_max(&self) -> usize {
2362 self.config.node.cache.identity_size
2363 }
2364
2365 pub fn pending_lookup_count(&self) -> usize {
2367 self.pending_lookups.len()
2368 }
2369
2370 pub fn pending_lookups_iter(
2372 &self,
2373 ) -> impl Iterator<Item = (&NodeAddr, &handlers::discovery::PendingLookup)> {
2374 self.pending_lookups.iter()
2375 }
2376
2377 pub fn recent_request_count(&self) -> usize {
2379 self.recent_requests.len()
2380 }
2381
2382 pub fn pending_tun_destinations(&self) -> usize {
2384 self.pending_tun_packets.len()
2385 }
2386
2387 pub fn pending_tun_total_packets(&self) -> usize {
2389 self.pending_tun_packets.values().map(|q| q.len()).sum()
2390 }
2391
2392 pub fn retry_state_iter(&self) -> impl Iterator<Item = (&NodeAddr, &retry::RetryState)> {
2394 self.retry_pending.iter()
2395 }
2396
2397 pub(crate) fn is_tree_peer(&self, peer_addr: &NodeAddr) -> bool {
2404 if !self.tree_state.is_root() && self.tree_state.my_declaration().parent_id() == peer_addr {
2406 return true;
2407 }
2408 if let Some(decl) = self.tree_state.peer_declaration(peer_addr)
2410 && decl.parent_id() == self.node_addr()
2411 {
2412 return true;
2413 }
2414 false
2415 }
2416
2417 pub fn find_next_hop(&mut self, dest_node_addr: &NodeAddr) -> Option<&ActivePeer> {
2440 if dest_node_addr == self.node_addr() {
2442 return None;
2443 }
2444
2445 let direct_peer_can_send = self
2449 .peers
2450 .get(dest_node_addr)
2451 .is_some_and(|peer| peer.can_send());
2452 if let Some(peer) = self.peers.get(dest_node_addr)
2453 && peer.is_healthy()
2454 {
2455 return Some(peer);
2456 }
2457
2458 let now_ms = Self::now_ms();
2459
2460 let sendable_learned_peers = if self.config.node.routing.mode == RoutingMode::ReplyLearned {
2461 Some(
2462 self.peers
2463 .iter()
2464 .filter(|(_, peer)| peer.can_send())
2465 .map(|(addr, _)| *addr)
2466 .collect::<HashSet<_>>(),
2467 )
2468 } else {
2469 None
2470 };
2471
2472 let explore_fallback = sendable_learned_peers.as_ref().is_some_and(|sendable| {
2479 self.learned_routes.should_explore_fallback(
2480 dest_node_addr,
2481 now_ms,
2482 self.config.node.routing.learned_fallback_explore_interval,
2483 |addr| sendable.contains(addr),
2484 )
2485 });
2486 if let Some(sendable) = &sendable_learned_peers
2487 && !explore_fallback
2488 && let Some(next_hop_addr) =
2489 self.learned_routes
2490 .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2491 {
2492 return self.peers.get(&next_hop_addr);
2493 }
2494
2495 let Some(dest_coords) = self
2497 .coord_cache
2498 .get_and_touch(dest_node_addr, now_ms)
2499 .cloned()
2500 else {
2501 if let Some(sendable) = &sendable_learned_peers
2502 && let Some(next_hop_addr) =
2503 self.learned_routes
2504 .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2505 {
2506 return self.peers.get(&next_hop_addr);
2507 }
2508 if direct_peer_can_send {
2509 return self.peers.get(dest_node_addr);
2510 }
2511 return None;
2512 };
2513
2514 let coordinate_route_addr = {
2517 let candidates: Vec<&ActivePeer> = self.destination_in_filters(dest_node_addr);
2518 if !candidates.is_empty() {
2519 self.select_best_candidate(&candidates, &dest_coords)
2520 .map(|peer| *peer.node_addr())
2521 } else {
2522 None
2523 }
2524 };
2525 if let Some(next_hop_addr) = coordinate_route_addr {
2526 return self.peers.get(&next_hop_addr);
2527 }
2528
2529 let tree_route_addr = self
2531 .tree_state
2532 .find_next_hop(&dest_coords)
2533 .filter(|next_hop_id| {
2534 self.peers
2535 .get(next_hop_id)
2536 .is_some_and(|peer| peer.can_send())
2537 });
2538 if let Some(next_hop_addr) = tree_route_addr {
2539 return self.peers.get(&next_hop_addr);
2540 }
2541 if explore_fallback {
2542 return sendable_learned_peers.as_ref().and_then(|sendable| {
2543 self.learned_routes
2544 .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2545 .and_then(|next_hop_addr| self.peers.get(&next_hop_addr))
2546 });
2547 }
2548
2549 if let Some(sendable) = &sendable_learned_peers
2550 && let Some(next_hop_addr) =
2551 self.learned_routes
2552 .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2553 {
2554 return self.peers.get(&next_hop_addr);
2555 }
2556
2557 if direct_peer_can_send {
2558 return self.peers.get(dest_node_addr);
2559 }
2560
2561 None
2562 }
2563
2564 pub(in crate::node) fn learn_reverse_route(
2565 &mut self,
2566 destination: NodeAddr,
2567 next_hop: NodeAddr,
2568 ) {
2569 if self.config.node.routing.mode != RoutingMode::ReplyLearned
2570 || destination == *self.node_addr()
2571 {
2572 return;
2573 }
2574 let now_ms = Self::now_ms();
2575 self.learned_routes.learn(
2576 destination,
2577 next_hop,
2578 now_ms,
2579 self.config.node.routing.learned_ttl_secs,
2580 self.config.node.routing.max_learned_routes_per_dest,
2581 );
2582 }
2583
2584 pub(in crate::node) fn record_route_failure(
2585 &mut self,
2586 destination: NodeAddr,
2587 next_hop: NodeAddr,
2588 ) {
2589 if self.config.node.routing.mode != RoutingMode::ReplyLearned {
2590 return;
2591 }
2592 self.learned_routes.record_failure(&destination, &next_hop);
2593 }
2594
2595 pub(crate) fn learned_route_table_snapshot(&self, now_ms: u64) -> LearnedRouteTableSnapshot {
2596 self.learned_routes.snapshot(now_ms)
2597 }
2598
2599 pub(in crate::node) fn purge_learned_routes(&mut self, now_ms: u64) {
2600 self.learned_routes.purge_expired(now_ms);
2601 }
2602
2603 fn select_best_candidate<'a>(
2612 &'a self,
2613 candidates: &[&'a ActivePeer],
2614 dest_coords: &crate::tree::TreeCoordinate,
2615 ) -> Option<&'a ActivePeer> {
2616 let my_distance = self.tree_state.my_coords().distance_to(dest_coords);
2617
2618 let mut best: Option<(&ActivePeer, f64, usize)> = None;
2619
2620 for &candidate in candidates {
2621 if !candidate.can_send() {
2622 continue;
2623 }
2624
2625 let cost = candidate.link_cost();
2626
2627 let dist = self
2628 .tree_state
2629 .peer_coords(candidate.node_addr())
2630 .map(|pc| pc.distance_to(dest_coords))
2631 .unwrap_or(usize::MAX);
2632
2633 if dist >= my_distance {
2636 continue;
2637 }
2638
2639 let dominated = match &best {
2640 None => true,
2641 Some((_, best_cost, best_dist)) => {
2642 cost < *best_cost
2643 || (cost == *best_cost && dist < *best_dist)
2644 || (cost == *best_cost
2645 && dist == *best_dist
2646 && candidate.node_addr() < best.as_ref().unwrap().0.node_addr())
2647 }
2648 };
2649
2650 if dominated {
2651 best = Some((candidate, cost, dist));
2652 }
2653 }
2654
2655 best.map(|(peer, _, _)| peer)
2656 }
2657
2658 pub fn destination_in_filters(&self, dest: &NodeAddr) -> Vec<&ActivePeer> {
2660 self.peers.values().filter(|p| p.may_reach(dest)).collect()
2661 }
2662
2663 pub fn tun_tx(&self) -> Option<&TunTx> {
2667 self.tun_tx.as_ref()
2668 }
2669
2670 pub fn attach_external_packet_io(
2677 &mut self,
2678 capacity: usize,
2679 ) -> Result<ExternalPacketIo, NodeError> {
2680 if self.state != NodeState::Created {
2681 return Err(NodeError::Config(ConfigError::Validation(
2682 "external packet I/O must be attached before node start".to_string(),
2683 )));
2684 }
2685 if self.config.tun.enabled {
2686 return Err(NodeError::Config(ConfigError::Validation(
2687 "external packet I/O requires tun.enabled=false".to_string(),
2688 )));
2689 }
2690
2691 let capacity = capacity.max(1);
2692 let (outbound_tx, outbound_rx) = tokio::sync::mpsc::channel(capacity);
2693 let (inbound_tx, inbound_rx) = tokio::sync::mpsc::channel(capacity);
2694 self.tun_outbound_rx = Some(outbound_rx);
2695 self.external_packet_tx = Some(inbound_tx);
2696
2697 Ok(ExternalPacketIo {
2698 outbound_tx,
2699 inbound_rx,
2700 })
2701 }
2702
2703 pub(crate) fn attach_endpoint_data_io(
2708 &mut self,
2709 capacity: usize,
2710 ) -> Result<EndpointDataIo, NodeError> {
2711 if self.state != NodeState::Created {
2712 return Err(NodeError::Config(ConfigError::Validation(
2713 "endpoint data I/O must be attached before node start".to_string(),
2714 )));
2715 }
2716
2717 let command_capacity = endpoint_data_command_capacity(capacity);
2718 let (command_tx, command_rx) = tokio::sync::mpsc::channel(command_capacity);
2719 let (event_tx, event_rx) = tokio::sync::mpsc::unbounded_channel();
2724 self.endpoint_command_rx = Some(command_rx);
2725 self.endpoint_event_tx = Some(event_tx.clone());
2726
2727 Ok(EndpointDataIo {
2728 command_tx,
2729 event_rx,
2730 event_tx,
2731 })
2732 }
2733
2734 pub(crate) fn pubkey_for_node_addr(&self, addr: &NodeAddr) -> Option<secp256k1::PublicKey> {
2735 let mut prefix = [0u8; 15];
2736 prefix.copy_from_slice(&addr.as_bytes()[0..15]);
2737 self.identity_cache
2738 .get(&prefix)
2739 .filter(|entry| &entry.node_addr == addr)
2740 .map(|entry| entry.pubkey)
2741 }
2742
2743 pub(crate) fn npub_for_node_addr(&self, addr: &NodeAddr) -> Option<String> {
2744 let mut prefix = [0u8; 15];
2745 prefix.copy_from_slice(&addr.as_bytes()[0..15]);
2746 self.identity_cache
2747 .get(&prefix)
2748 .filter(|entry| &entry.node_addr == addr)
2749 .map(|entry| entry.npub.clone())
2750 }
2751
2752 pub(in crate::node) fn deliver_external_ipv6_packet(
2753 &self,
2754 src_addr: &NodeAddr,
2755 packet: Vec<u8>,
2756 ) {
2757 let Some(external_packet_tx) = &self.external_packet_tx else {
2758 return;
2759 };
2760 if packet.len() < 40 {
2761 return;
2762 }
2763 let Ok(destination) = FipsAddress::from_slice(&packet[24..40]) else {
2764 return;
2765 };
2766 let delivered = NodeDeliveredPacket {
2767 source_node_addr: *src_addr,
2768 source_npub: self.npub_for_node_addr(src_addr),
2769 destination,
2770 packet,
2771 };
2772 if let Err(error) = external_packet_tx.try_send(delivered) {
2773 debug!(error = %error, "Failed to deliver packet to external app sink");
2774 }
2775 }
2776
2777 pub(super) async fn send_encrypted_link_message(
2791 &mut self,
2792 node_addr: &NodeAddr,
2793 plaintext: &[u8],
2794 ) -> Result<(), NodeError> {
2795 self.send_encrypted_link_message_with_ce(node_addr, plaintext, false)
2796 .await
2797 }
2798
2799 pub(in crate::node) fn note_local_send_outcome(
2805 &mut self,
2806 result: &Result<usize, TransportError>,
2807 ) {
2808 match result {
2809 Ok(_) => {
2810 if self.last_local_send_failure_at.is_some() {
2811 self.last_local_send_failure_at = None;
2812 }
2813 }
2814 Err(error) if error.is_local_route_unavailable() => {
2815 self.last_local_send_failure_at = Some(std::time::Instant::now());
2816 }
2817 Err(_) => {}
2818 }
2819 }
2820
2821 pub(in crate::node) fn local_send_failure_dead_timeout(
2827 &mut self,
2828 now: std::time::Instant,
2829 dead_timeout: std::time::Duration,
2830 fast_dead_timeout: std::time::Duration,
2831 ) -> std::time::Duration {
2832 match self.last_local_send_failure_at {
2833 Some(t) if now.duration_since(t) <= LOCAL_SEND_FAILURE_FAST_DEAD_WINDOW => {
2834 fast_dead_timeout.min(dead_timeout)
2835 }
2836 Some(_) => {
2837 self.last_local_send_failure_at = None;
2838 dead_timeout
2839 }
2840 None => dead_timeout,
2841 }
2842 }
2843
2844 pub(in crate::node) fn mark_rx_loop_maintenance_timeout(&mut self) {
2845 self.last_rx_loop_maintenance_timeout_at = Some(std::time::Instant::now());
2846 }
2847
2848 pub(in crate::node) fn rx_loop_maintenance_timed_out_recently(&self) -> bool {
2849 let Some(t) = self.last_rx_loop_maintenance_timeout_at else {
2850 return false;
2851 };
2852 let grace = std::time::Duration::from_secs(self.config.node.link_dead_timeout_secs.max(1));
2853 std::time::Instant::now().duration_since(t) <= grace
2854 }
2855
2856 pub(super) async fn send_encrypted_link_message_with_ce(
2860 &mut self,
2861 node_addr: &NodeAddr,
2862 plaintext: &[u8],
2863 ce_flag: bool,
2864 ) -> Result<(), NodeError> {
2865 let peer = self
2866 .peers
2867 .get_mut(node_addr)
2868 .ok_or(NodeError::PeerNotFound(*node_addr))?;
2869
2870 let their_index = peer.their_index().ok_or_else(|| NodeError::SendFailed {
2871 node_addr: *node_addr,
2872 reason: "no their_index".into(),
2873 })?;
2874 let transport_id = peer.transport_id().ok_or_else(|| NodeError::SendFailed {
2875 node_addr: *node_addr,
2876 reason: "no transport_id".into(),
2877 })?;
2878 let remote_addr = peer
2879 .current_addr()
2880 .cloned()
2881 .ok_or_else(|| NodeError::SendFailed {
2882 node_addr: *node_addr,
2883 reason: "no current_addr".into(),
2884 })?;
2885 #[cfg(any(target_os = "linux", target_os = "macos"))]
2886 let connected_socket = peer.connected_udp();
2887
2888 let timestamp_ms = peer.session_elapsed_ms();
2890
2891 let sp_flag = peer.mmp().map(|mmp| mmp.spin_bit.tx_bit()).unwrap_or(false);
2893 let mut flags = if sp_flag { FLAG_SP } else { 0 };
2894 if ce_flag {
2895 flags |= FLAG_CE;
2896 }
2897 if peer.current_k_bit() {
2898 flags |= FLAG_KEY_EPOCH;
2899 }
2900
2901 let session = peer
2902 .noise_session_mut()
2903 .ok_or_else(|| NodeError::SendFailed {
2904 node_addr: *node_addr,
2905 reason: "no noise session".into(),
2906 })?;
2907
2908 const INNER_TS_LEN: usize = 4;
2916 let counter = session.current_send_counter();
2917 let inner_len = INNER_TS_LEN + plaintext.len();
2918 let payload_len = inner_len as u16;
2919 let header = build_established_header(their_index, counter, flags, payload_len);
2920
2921 let transport_for_send = self
2940 .transports
2941 .get(&transport_id)
2942 .ok_or(NodeError::TransportNotFound(transport_id))?;
2943 match transport_for_send.connection_state(&remote_addr) {
2944 ConnectionState::Connected => {}
2945 other => {
2946 if matches!(other, ConnectionState::None) {
2947 let _ = transport_for_send.connect(&remote_addr).await;
2948 }
2949 return Err(NodeError::SendFailed {
2950 node_addr: *node_addr,
2951 reason: format!("transport connection not ready: {:?}", other),
2952 });
2953 }
2954 }
2955 let is_udp = matches!(transport_for_send, TransportHandle::Udp(_));
2956 if let Some(workers) = self.encrypt_workers.as_ref().cloned()
2957 && is_udp
2958 && let Some(cipher_clone) = session.send_cipher_clone()
2959 {
2960 {
2961 let reserved_counter =
2965 session
2966 .take_send_counter()
2967 .map_err(|e| NodeError::SendFailed {
2968 node_addr: *node_addr,
2969 reason: format!("counter reservation failed: {}", e),
2970 })?;
2971 debug_assert_eq!(reserved_counter, counter);
2972 let header =
2976 build_established_header(their_index, reserved_counter, flags, payload_len);
2977 let transport = transport_for_send;
2978 let send_target = {
2985 if let TransportHandle::Udp(udp) = transport {
2986 let socket_addr = {
2987 #[cfg(any(target_os = "linux", target_os = "macos"))]
2988 {
2989 match connected_socket.as_ref() {
2990 Some(socket) => Some(socket.peer_addr()),
2991 None => udp.resolve_for_off_task(&remote_addr).await.ok(),
2992 }
2993 }
2994 #[cfg(not(any(target_os = "linux", target_os = "macos")))]
2995 {
2996 udp.resolve_for_off_task(&remote_addr).await.ok()
2997 }
2998 };
2999 match (udp.async_socket(), socket_addr) {
3000 (Some(socket), Some(socket_addr)) => Some((socket, socket_addr)),
3001 _ => None,
3002 }
3003 } else {
3004 None
3005 }
3006 };
3007 if let Some((socket, socket_addr)) = send_target {
3008 let wire_capacity = ESTABLISHED_HEADER_SIZE + inner_len + 16;
3024 let mut wire_buf = Vec::with_capacity(wire_capacity);
3025 wire_buf.extend_from_slice(&header);
3026 wire_buf.extend_from_slice(×tamp_ms.to_le_bytes());
3027 wire_buf.extend_from_slice(plaintext);
3028 let predicted_bytes = wire_capacity;
3029 if let Some(peer) = self.peers.get_mut(node_addr) {
3036 peer.link_stats_mut().record_sent(predicted_bytes);
3037 if let Some(mmp) = peer.mmp_mut() {
3038 mmp.sender
3039 .record_sent(reserved_counter, timestamp_ms, predicted_bytes);
3040 }
3041 }
3042 let scheduling_weight = self.send_weight_for_peer(node_addr);
3043 workers.dispatch(self::encrypt_worker::FmpSendJob {
3044 cipher: cipher_clone,
3045 counter: reserved_counter,
3046 wire_buf,
3047 fsp_seal: None,
3048 socket,
3049 dest_addr: socket_addr,
3050 #[cfg(any(target_os = "linux", target_os = "macos"))]
3051 connected_socket,
3052 drop_on_backpressure: fmp_plaintext_is_bulk_session_datagram(plaintext),
3053 scheduling_weight,
3054 queued_at: crate::perf_profile::stamp(),
3055 });
3056 return Ok(());
3057 }
3058 }
3059 }
3060
3061 let inner_plaintext = prepend_inner_header(timestamp_ms, plaintext);
3066 let ciphertext = {
3068 let _t = crate::perf_profile::Timer::start(crate::perf_profile::Stage::FmpEncrypt);
3069 session
3070 .encrypt_with_aad(&inner_plaintext, &header)
3071 .map_err(|e| NodeError::SendFailed {
3072 node_addr: *node_addr,
3073 reason: format!("encryption failed: {}", e),
3074 })?
3075 };
3076
3077 let wire_packet = build_encrypted(&header, &ciphertext);
3078
3079 let send_result = {
3081 let _t = crate::perf_profile::Timer::start(crate::perf_profile::Stage::UdpSend);
3082 let transport = self
3083 .transports
3084 .get(&transport_id)
3085 .ok_or(NodeError::TransportNotFound(transport_id))?;
3086 transport.send(&remote_addr, &wire_packet).await
3087 };
3088 self.note_local_send_outcome(&send_result);
3089 let bytes_sent = send_result.map_err(|e| match e {
3090 TransportError::MtuExceeded { packet_size, mtu } => NodeError::MtuExceeded {
3091 node_addr: *node_addr,
3092 packet_size,
3093 mtu,
3094 },
3095 other => NodeError::SendFailed {
3096 node_addr: *node_addr,
3097 reason: format!("transport send: {}", other),
3098 },
3099 })?;
3100
3101 if let Some(peer) = self.peers.get_mut(node_addr) {
3103 peer.link_stats_mut().record_sent(bytes_sent);
3104 if let Some(mmp) = peer.mmp_mut() {
3106 mmp.sender.record_sent(counter, timestamp_ms, bytes_sent);
3107 }
3108 }
3109
3110 Ok(())
3111 }
3112}
3113
3114impl fmt::Debug for Node {
3115 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3116 f.debug_struct("Node")
3117 .field("node_addr", self.node_addr())
3118 .field("state", &self.state)
3119 .field("is_leaf_only", &self.is_leaf_only)
3120 .field("connections", &self.connection_count())
3121 .field("peers", &self.peer_count())
3122 .field("links", &self.link_count())
3123 .field("transports", &self.transport_count())
3124 .finish()
3125 }
3126}