1mod acl;
8mod bloom;
9mod decrypt_worker;
10mod discovery_rate_limit;
11mod encrypt_worker;
12mod handlers;
13mod lifecycle;
14mod rate_limit;
15mod retry;
16mod routing;
17mod routing_error_rate_limit;
18pub(crate) mod session;
19pub(crate) mod session_wire;
20pub(crate) mod stats;
21pub(crate) mod stats_history;
22#[cfg(test)]
23mod tests;
24mod tree;
25pub(crate) mod wire;
26
27use self::discovery_rate_limit::{DiscoveryBackoff, DiscoveryForwardRateLimiter};
28use self::rate_limit::HandshakeRateLimiter;
29use self::routing::{LearnedRouteTable, LearnedRouteTableSnapshot};
30use self::routing_error_rate_limit::RoutingErrorRateLimiter;
31use self::wire::{
32 ESTABLISHED_HEADER_SIZE, FLAG_CE, FLAG_KEY_EPOCH, FLAG_SP, build_encrypted,
33 build_established_header, prepend_inner_header,
34};
35use crate::bloom::BloomState;
36use crate::cache::CoordCache;
37use crate::config::{NostrDiscoveryPolicy, PeerConfig, RoutingMode};
38use crate::node::session::SessionEntry;
39use crate::node::session_wire::{FSP_PHASE_ESTABLISHED, FspCommonPrefix};
40use crate::peer::{ActivePeer, PeerConnection};
41#[cfg(any(target_os = "linux", target_os = "macos"))]
42use crate::transport::ethernet::EthernetTransport;
43use crate::transport::tcp::TcpTransport;
44use crate::transport::tor::TorTransport;
45use crate::transport::udp::UdpTransport;
46#[cfg(feature = "webrtc-transport")]
47use crate::transport::webrtc::WebRtcTransport;
48use crate::transport::{
49 ConnectionState, Link, LinkId, PacketRx, PacketTx, TransportAddr, TransportError,
50 TransportHandle, TransportId,
51};
52use crate::tree::TreeState;
53use crate::upper::hosts::HostMap;
54use crate::upper::icmp_rate_limit::IcmpRateLimiter;
55use crate::upper::tun::{TunError, TunOutboundRx, TunState, TunTx};
56use crate::utils::index::IndexAllocator;
57use crate::{
58 Config, ConfigError, FipsAddress, Identity, IdentityError, LinkMessageType, NodeAddr,
59 PeerIdentity, encode_npub,
60};
61use rand::Rng;
62use std::collections::{HashMap, HashSet, VecDeque};
63use std::fmt;
64use std::sync::Arc;
65use std::thread::JoinHandle;
66use thiserror::Error;
67use tracing::{debug, warn};
68
69const LOCAL_SEND_FAILURE_FAST_DEAD_WINDOW: std::time::Duration = std::time::Duration::from_secs(3);
70
71fn fmp_plaintext_is_bulk_session_datagram(plaintext: &[u8]) -> bool {
72 if !plaintext
73 .first()
74 .is_some_and(|ty| *ty == LinkMessageType::SessionDatagram.to_byte())
75 {
76 return false;
77 }
78 let Some(fsp_payload) = plaintext.get(crate::protocol::SESSION_DATAGRAM_HEADER_SIZE..) else {
79 return false;
80 };
81 FspCommonPrefix::parse(fsp_payload)
82 .is_some_and(|prefix| prefix.phase == FSP_PHASE_ESTABLISHED && !prefix.is_unencrypted())
83}
84
85pub(crate) const REKEY_JITTER_SECS: i64 = 15;
92
93#[derive(Debug, Error)]
95pub enum NodeError {
96 #[error("node not started")]
97 NotStarted,
98
99 #[error("node already started")]
100 AlreadyStarted,
101
102 #[error("node already stopped")]
103 AlreadyStopped,
104
105 #[error("transport not found: {0}")]
106 TransportNotFound(TransportId),
107
108 #[error("no transport available for type: {0}")]
109 NoTransportForType(String),
110
111 #[error("link not found: {0}")]
112 LinkNotFound(LinkId),
113
114 #[error("connection not found: {0}")]
115 ConnectionNotFound(LinkId),
116
117 #[error("peer not found: {0:?}")]
118 PeerNotFound(NodeAddr),
119
120 #[error("peer already exists: {0:?}")]
121 PeerAlreadyExists(NodeAddr),
122
123 #[error("connection already exists for link: {0}")]
124 ConnectionAlreadyExists(LinkId),
125
126 #[error("invalid peer npub '{npub}': {reason}")]
127 InvalidPeerNpub { npub: String, reason: String },
128
129 #[error("discovery error: {0}")]
130 Discovery(String),
131
132 #[error("access denied: {0}")]
133 AccessDenied(String),
134
135 #[error("max connections exceeded: {max}")]
136 MaxConnectionsExceeded { max: usize },
137
138 #[error("max peers exceeded: {max}")]
139 MaxPeersExceeded { max: usize },
140
141 #[error("max links exceeded: {max}")]
142 MaxLinksExceeded { max: usize },
143
144 #[error("handshake incomplete for link {0}")]
145 HandshakeIncomplete(LinkId),
146
147 #[error("no session available for link {0}")]
148 NoSession(LinkId),
149
150 #[error("promotion failed for link {link_id}: {reason}")]
151 PromotionFailed { link_id: LinkId, reason: String },
152
153 #[error("send failed to {node_addr}: {reason}")]
154 SendFailed { node_addr: NodeAddr, reason: String },
155
156 #[error("mtu exceeded forwarding to {node_addr}: packet {packet_size} > mtu {mtu}")]
157 MtuExceeded {
158 node_addr: NodeAddr,
159 packet_size: usize,
160 mtu: u16,
161 },
162
163 #[error("config error: {0}")]
164 Config(#[from] ConfigError),
165
166 #[error("identity error: {0}")]
167 Identity(#[from] IdentityError),
168
169 #[error("TUN error: {0}")]
170 Tun(#[from] TunError),
171
172 #[error("index allocation failed: {0}")]
173 IndexAllocationFailed(String),
174
175 #[error("handshake failed: {0}")]
176 HandshakeFailed(String),
177
178 #[error("transport error: {0}")]
179 TransportError(String),
180
181 #[error("local route unavailable: {0}")]
182 LocalRouteUnavailable(String),
183
184 #[error("bootstrap handoff failed: {0}")]
185 BootstrapHandoff(String),
186}
187
188impl NodeError {
189 pub(in crate::node) fn from_transport_error(error: TransportError) -> Self {
190 if error.is_local_route_unavailable() {
191 Self::LocalRouteUnavailable(error.to_string())
192 } else {
193 Self::TransportError(error.to_string())
194 }
195 }
196
197 pub(in crate::node) fn is_local_route_unavailable(&self) -> bool {
198 matches!(self, Self::LocalRouteUnavailable(_))
199 }
200}
201
202#[derive(Debug, Clone, PartialEq, Eq)]
204pub struct NodeDeliveredPacket {
205 pub source_node_addr: NodeAddr,
207 pub source_npub: Option<String>,
209 pub destination: FipsAddress,
211 pub packet: Vec<u8>,
213}
214
215#[derive(Debug, Clone)]
216struct IdentityCacheEntry {
217 node_addr: NodeAddr,
218 pubkey: secp256k1::PublicKey,
219 npub: String,
220 last_seen_ms: u64,
221}
222
223impl IdentityCacheEntry {
224 fn new(
225 node_addr: NodeAddr,
226 pubkey: secp256k1::PublicKey,
227 npub: String,
228 last_seen_ms: u64,
229 ) -> Self {
230 Self {
231 node_addr,
232 pubkey,
233 npub,
234 last_seen_ms,
235 }
236 }
237}
238
239#[derive(Debug)]
241pub struct ExternalPacketIo {
242 pub outbound_tx: crate::upper::tun::TunOutboundTx,
244 pub inbound_rx: tokio::sync::mpsc::Receiver<NodeDeliveredPacket>,
246}
247
248#[derive(Debug)]
250pub(crate) struct EndpointDataIo {
251 pub(crate) command_tx: tokio::sync::mpsc::Sender<NodeEndpointCommand>,
260 pub(crate) event_rx: tokio::sync::mpsc::UnboundedReceiver<NodeEndpointEvent>,
270 pub(crate) event_tx: tokio::sync::mpsc::UnboundedSender<NodeEndpointEvent>,
276}
277
278fn endpoint_data_command_capacity(requested: usize) -> usize {
279 if let Ok(raw) = std::env::var("FIPS_ENDPOINT_DATA_QUEUE_CAP")
280 && let Ok(value) = raw.trim().parse::<usize>()
281 && value > 0
282 {
283 return value;
284 }
285
286 requested.max(1).max(32_768)
287}
288
289#[derive(Debug)]
291pub(crate) enum NodeEndpointCommand {
292 Send {
296 remote: PeerIdentity,
297 payload: Vec<u8>,
298 queued_at: Option<std::time::Instant>,
299 response_tx: tokio::sync::oneshot::Sender<Result<(), NodeError>>,
300 },
301 SendOneway {
307 remote: PeerIdentity,
308 payload: Vec<u8>,
309 queued_at: Option<std::time::Instant>,
310 },
311 PeerSnapshot {
312 response_tx: tokio::sync::oneshot::Sender<Vec<NodeEndpointPeer>>,
313 },
314 RelaySnapshot {
315 response_tx: tokio::sync::oneshot::Sender<Vec<NodeEndpointRelayStatus>>,
316 },
317 UpdateRelays {
318 advert_relays: Vec<String>,
319 dm_relays: Vec<String>,
320 response_tx: tokio::sync::oneshot::Sender<Result<(), NodeError>>,
321 },
322 UpdatePeers {
328 peers: Vec<crate::config::PeerConfig>,
329 response_tx: tokio::sync::oneshot::Sender<Result<UpdatePeersOutcome, NodeError>>,
330 },
331}
332
333#[derive(Debug, Clone, Default, PartialEq, Eq)]
335pub(crate) struct UpdatePeersOutcome {
336 pub(crate) added: usize,
337 pub(crate) removed: usize,
338 pub(crate) updated: usize,
339 pub(crate) unchanged: usize,
340}
341
342#[derive(Debug)]
344pub(crate) enum NodeEndpointEvent {
345 Data {
346 source_node_addr: NodeAddr,
347 source_npub: Option<String>,
348 payload: Vec<u8>,
349 queued_at: Option<std::time::Instant>,
350 },
351}
352
353#[derive(Debug, Clone, PartialEq, Eq)]
355pub(crate) struct NodeEndpointPeer {
356 pub(crate) npub: String,
357 pub(crate) transport_addr: Option<String>,
358 pub(crate) transport_type: Option<String>,
359 pub(crate) link_id: u64,
360 pub(crate) srtt_ms: Option<u64>,
361 pub(crate) packets_sent: u64,
362 pub(crate) packets_recv: u64,
363 pub(crate) bytes_sent: u64,
364 pub(crate) bytes_recv: u64,
365 pub(crate) direct_probe_pending: bool,
366 pub(crate) direct_probe_after_ms: Option<u64>,
367}
368
369#[derive(Debug, Clone, PartialEq, Eq)]
371pub(crate) struct NodeEndpointRelayStatus {
372 pub(crate) url: String,
373 pub(crate) status: String,
374}
375
376#[derive(Clone, Copy, Debug, PartialEq, Eq)]
378pub enum NodeState {
379 Created,
381 Starting,
383 Running,
385 Stopping,
387 Stopped,
389}
390
391impl NodeState {
392 pub fn is_operational(&self) -> bool {
394 matches!(self, NodeState::Running)
395 }
396
397 pub fn can_start(&self) -> bool {
399 matches!(self, NodeState::Created | NodeState::Stopped)
400 }
401
402 pub fn can_stop(&self) -> bool {
404 matches!(self, NodeState::Running)
405 }
406}
407
408impl fmt::Display for NodeState {
409 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
410 let s = match self {
411 NodeState::Created => "created",
412 NodeState::Starting => "starting",
413 NodeState::Running => "running",
414 NodeState::Stopping => "stopping",
415 NodeState::Stopped => "stopped",
416 };
417 write!(f, "{}", s)
418 }
419}
420
421#[derive(Clone, Debug)]
428pub(crate) struct RecentRequest {
429 pub(crate) from_peer: NodeAddr,
431 pub(crate) timestamp_ms: u64,
433 pub(crate) response_forwarded: bool,
437}
438
439impl RecentRequest {
440 pub(crate) fn new(from_peer: NodeAddr, timestamp_ms: u64) -> Self {
441 Self {
442 from_peer,
443 timestamp_ms,
444 response_forwarded: false,
445 }
446 }
447
448 pub(crate) fn is_expired(&self, current_time_ms: u64, expiry_ms: u64) -> bool {
450 current_time_ms.saturating_sub(self.timestamp_ms) > expiry_ms
451 }
452}
453
454type AddrKey = (TransportId, TransportAddr);
456
457#[derive(Debug, Default)]
462struct TransportDropState {
463 prev_drops: u64,
465 dropping: bool,
467}
468
469struct PendingConnect {
475 link_id: LinkId,
477 transport_id: TransportId,
479 remote_addr: TransportAddr,
481 peer_identity: PeerIdentity,
483}
484
485pub struct Node {
499 identity: Identity,
502
503 startup_epoch: [u8; 8],
506
507 started_at: std::time::Instant,
509
510 config: Config,
513
514 state: NodeState,
517
518 is_leaf_only: bool,
520
521 tree_state: TreeState,
524
525 bloom_state: BloomState,
528
529 coord_cache: CoordCache,
532 learned_routes: LearnedRouteTable,
534 recent_requests: HashMap<u64, RecentRequest>,
537 path_mtu_lookup: Arc<std::sync::RwLock<HashMap<crate::FipsAddress, u16>>>,
543
544 transports: HashMap<TransportId, TransportHandle>,
547 transport_drops: HashMap<TransportId, TransportDropState>,
549 links: HashMap<LinkId, Link>,
551 addr_to_link: HashMap<AddrKey, LinkId>,
553
554 packet_tx: Option<PacketTx>,
557 packet_rx: Option<PacketRx>,
559
560 connections: HashMap<LinkId, PeerConnection>,
564
565 peers: HashMap<NodeAddr, ActivePeer>,
569
570 sessions: HashMap<NodeAddr, SessionEntry>,
574
575 identity_cache: HashMap<[u8; 15], IdentityCacheEntry>,
579
580 pending_tun_packets: HashMap<NodeAddr, VecDeque<Vec<u8>>>,
584 pending_endpoint_data: HashMap<NodeAddr, VecDeque<Vec<u8>>>,
586 pending_lookups: HashMap<NodeAddr, handlers::discovery::PendingLookup>,
590
591 max_connections: usize,
594 max_peers: usize,
596 max_links: usize,
598
599 next_link_id: u64,
602 next_transport_id: u32,
604
605 stats: stats::NodeStats,
608
609 stats_history: stats_history::StatsHistory,
611
612 tun_state: TunState,
615 tun_name: Option<String>,
617 tun_tx: Option<TunTx>,
619 tun_outbound_rx: Option<TunOutboundRx>,
621 external_packet_tx: Option<tokio::sync::mpsc::Sender<NodeDeliveredPacket>>,
623 endpoint_command_rx: Option<tokio::sync::mpsc::Receiver<NodeEndpointCommand>>,
625 endpoint_event_tx: Option<tokio::sync::mpsc::UnboundedSender<NodeEndpointEvent>>,
627 encrypt_workers: Option<encrypt_worker::EncryptWorkerPool>,
633 decrypt_workers: Option<decrypt_worker::DecryptWorkerPool>,
636 decrypt_registered_sessions: std::collections::HashSet<(TransportId, u32)>,
645 decrypt_fallback_rx:
649 Option<tokio::sync::mpsc::UnboundedReceiver<decrypt_worker::DecryptWorkerEvent>>,
650 decrypt_fallback_tx: tokio::sync::mpsc::UnboundedSender<decrypt_worker::DecryptWorkerEvent>,
651 tun_reader_handle: Option<JoinHandle<()>>,
653 tun_writer_handle: Option<JoinHandle<()>>,
655 #[cfg(target_os = "macos")]
658 tun_shutdown_fd: Option<std::os::unix::io::RawFd>,
659
660 dns_identity_rx: Option<crate::upper::dns::DnsIdentityRx>,
663 dns_task: Option<tokio::task::JoinHandle<()>>,
665
666 index_allocator: IndexAllocator,
669 peers_by_index: HashMap<(TransportId, u32), NodeAddr>,
672 pending_outbound: HashMap<(TransportId, u32), LinkId>,
675
676 msg1_rate_limiter: HandshakeRateLimiter,
679 icmp_rate_limiter: IcmpRateLimiter,
681 routing_error_rate_limiter: RoutingErrorRateLimiter,
683 coords_response_rate_limiter: RoutingErrorRateLimiter,
685 discovery_backoff: DiscoveryBackoff,
687 discovery_forward_limiter: DiscoveryForwardRateLimiter,
689
690 pending_connects: Vec<PendingConnect>,
696
697 retry_pending: HashMap<NodeAddr, retry::RetryState>,
703
704 nostr_discovery: Option<Arc<crate::discovery::nostr::NostrDiscovery>>,
706 lan_discovery: Option<Arc<crate::discovery::lan::LanDiscovery>>,
711 local_instance_registry: Option<crate::discovery::local::LocalInstanceRegistry>,
715 local_instance_started_at_ms: Option<u64>,
716 last_local_instance_publish_ms: Option<u64>,
717 last_local_instance_scan_ms: Option<u64>,
718 nostr_discovery_started_at_ms: Option<u64>,
723 startup_open_discovery_sweep_done: bool,
727 bootstrap_transports: HashSet<TransportId>,
729 bootstrap_transport_npubs: HashMap<TransportId, String>,
736 discovery_fallback_transit_blocked_peers: HashSet<NodeAddr>,
739
740 last_parent_reeval: Option<crate::time::Instant>,
743
744 last_congestion_log: Option<std::time::Instant>,
747
748 estimated_mesh_size: Option<u64>,
751 last_mesh_size_log: Option<std::time::Instant>,
753
754 last_self_warn: Option<std::time::Instant>,
760
761 last_local_send_failure_at: Option<std::time::Instant>,
769 last_rx_loop_maintenance_timeout_at: Option<std::time::Instant>,
774
775 peer_aliases: HashMap<NodeAddr, String>,
779 configured_peer_send_weights: HashMap<NodeAddr, u8>,
782
783 peer_acl: acl::PeerAclReloader,
785
786 host_map: Arc<HostMap>,
790}
791
792impl Node {
793 pub fn new(config: Config) -> Result<Self, NodeError> {
795 config.validate()?;
796 let identity = config.create_identity()?;
797 let node_addr = *identity.node_addr();
798 let is_leaf_only = config.is_leaf_only();
799
800 let (decrypt_fallback_tx, decrypt_fallback_rx) = tokio::sync::mpsc::unbounded_channel();
801 let decrypt_fallback_rx = Some(decrypt_fallback_rx);
802
803 let mut startup_epoch = [0u8; 8];
804 rand::rng().fill_bytes(&mut startup_epoch);
805
806 let mut bloom_state = if is_leaf_only {
807 BloomState::leaf_only(node_addr)
808 } else {
809 BloomState::new(node_addr)
810 };
811 bloom_state.set_update_debounce_ms(config.node.bloom.update_debounce_ms);
812
813 let tun_state = if config.tun.enabled {
814 TunState::Configured
815 } else {
816 TunState::Disabled
817 };
818
819 let mut tree_state = TreeState::new(node_addr);
821 tree_state.set_parent_hysteresis(config.node.tree.parent_hysteresis);
822 tree_state.set_hold_down(config.node.tree.hold_down_secs);
823 tree_state.set_flap_dampening(
824 config.node.tree.flap_threshold,
825 config.node.tree.flap_window_secs,
826 config.node.tree.flap_dampening_secs,
827 );
828 tree_state
829 .sign_declaration(&identity)
830 .expect("signing own declaration should never fail");
831
832 let coord_cache = CoordCache::new(
833 config.node.cache.coord_size,
834 config.node.cache.coord_ttl_secs * 1000,
835 );
836 let rl = &config.node.rate_limit;
837 let msg1_rate_limiter = HandshakeRateLimiter::with_params(
838 rate_limit::TokenBucket::with_params(rl.handshake_burst, rl.handshake_rate),
839 config.node.limits.max_pending_inbound,
840 );
841
842 let max_connections = config.node.limits.max_connections;
843 let max_peers = config.node.limits.max_peers;
844 let max_links = config.node.limits.max_links;
845 let coords_response_interval_ms = config.node.session.coords_response_interval_ms;
846 let backoff_base_secs = config.node.discovery.backoff_base_secs;
847 let backoff_max_secs = config.node.discovery.backoff_max_secs;
848 let forward_min_interval_secs = config.node.discovery.forward_min_interval_secs;
849
850 let (host_map, peer_acl) = Self::host_map_and_peer_acl(&config);
851 let configured_peer_send_weights = Self::configured_peer_send_weights(&config);
852
853 Ok(Self {
854 identity,
855 startup_epoch,
856 started_at: std::time::Instant::now(),
857 config,
858 state: NodeState::Created,
859 is_leaf_only,
860 tree_state,
861 bloom_state,
862 coord_cache,
863 learned_routes: LearnedRouteTable::default(),
864 recent_requests: HashMap::new(),
865 transports: HashMap::new(),
866 transport_drops: HashMap::new(),
867 links: HashMap::new(),
868 addr_to_link: HashMap::new(),
869 packet_tx: None,
870 packet_rx: None,
871 connections: HashMap::new(),
872 peers: HashMap::new(),
873 sessions: HashMap::new(),
874 identity_cache: HashMap::new(),
875 pending_tun_packets: HashMap::new(),
876 pending_endpoint_data: HashMap::new(),
877 pending_lookups: HashMap::new(),
878 max_connections,
879 max_peers,
880 max_links,
881 next_link_id: 1,
882 next_transport_id: 1,
883 stats: stats::NodeStats::new(),
884 stats_history: stats_history::StatsHistory::new(),
885 tun_state,
886 tun_name: None,
887 tun_tx: None,
888 tun_outbound_rx: None,
889 external_packet_tx: None,
890 endpoint_command_rx: None,
891 endpoint_event_tx: None,
892 encrypt_workers: None,
893 decrypt_workers: None,
894 decrypt_registered_sessions: std::collections::HashSet::new(),
895 decrypt_fallback_tx,
896 decrypt_fallback_rx,
897 tun_reader_handle: None,
898 tun_writer_handle: None,
899 #[cfg(target_os = "macos")]
900 tun_shutdown_fd: None,
901 dns_identity_rx: None,
902 dns_task: None,
903 index_allocator: IndexAllocator::new(),
904 peers_by_index: HashMap::new(),
905 pending_outbound: HashMap::new(),
906 msg1_rate_limiter,
907 icmp_rate_limiter: IcmpRateLimiter::new(),
908 routing_error_rate_limiter: RoutingErrorRateLimiter::new(),
909 coords_response_rate_limiter: RoutingErrorRateLimiter::with_interval(
910 std::time::Duration::from_millis(coords_response_interval_ms),
911 ),
912 discovery_backoff: DiscoveryBackoff::with_params(backoff_base_secs, backoff_max_secs),
913 discovery_forward_limiter: DiscoveryForwardRateLimiter::with_interval(
914 std::time::Duration::from_secs(forward_min_interval_secs),
915 ),
916 pending_connects: Vec::new(),
917 retry_pending: HashMap::new(),
918 nostr_discovery: None,
919 nostr_discovery_started_at_ms: None,
920 lan_discovery: None,
921 local_instance_registry: None,
922 local_instance_started_at_ms: None,
923 last_local_instance_publish_ms: None,
924 last_local_instance_scan_ms: None,
925 startup_open_discovery_sweep_done: false,
926 bootstrap_transports: HashSet::new(),
927 bootstrap_transport_npubs: HashMap::new(),
928 discovery_fallback_transit_blocked_peers: HashSet::new(),
929 last_parent_reeval: None,
930 last_congestion_log: None,
931 estimated_mesh_size: None,
932 last_mesh_size_log: None,
933 last_self_warn: None,
934 last_local_send_failure_at: None,
935 last_rx_loop_maintenance_timeout_at: None,
936 peer_aliases: HashMap::new(),
937 configured_peer_send_weights,
938 peer_acl,
939 host_map,
940 path_mtu_lookup: Arc::new(std::sync::RwLock::new(HashMap::new())),
941 })
942 }
943
944 pub fn with_identity(identity: Identity, config: Config) -> Result<Self, NodeError> {
949 config.validate()?;
950 let node_addr = *identity.node_addr();
951
952 let (decrypt_fallback_tx, decrypt_fallback_rx) = tokio::sync::mpsc::unbounded_channel();
953 let decrypt_fallback_rx = Some(decrypt_fallback_rx);
954
955 let mut startup_epoch = [0u8; 8];
956 rand::rng().fill_bytes(&mut startup_epoch);
957
958 let tun_state = if config.tun.enabled {
959 TunState::Configured
960 } else {
961 TunState::Disabled
962 };
963
964 let mut tree_state = TreeState::new(node_addr);
966 tree_state.set_parent_hysteresis(config.node.tree.parent_hysteresis);
967 tree_state.set_hold_down(config.node.tree.hold_down_secs);
968 tree_state.set_flap_dampening(
969 config.node.tree.flap_threshold,
970 config.node.tree.flap_window_secs,
971 config.node.tree.flap_dampening_secs,
972 );
973 tree_state
974 .sign_declaration(&identity)
975 .expect("signing own declaration should never fail");
976
977 let mut bloom_state = BloomState::new(node_addr);
978 bloom_state.set_update_debounce_ms(config.node.bloom.update_debounce_ms);
979
980 let coord_cache = CoordCache::new(
981 config.node.cache.coord_size,
982 config.node.cache.coord_ttl_secs * 1000,
983 );
984 let rl = &config.node.rate_limit;
985 let msg1_rate_limiter = HandshakeRateLimiter::with_params(
986 rate_limit::TokenBucket::with_params(rl.handshake_burst, rl.handshake_rate),
987 config.node.limits.max_pending_inbound,
988 );
989
990 let max_connections = config.node.limits.max_connections;
991 let max_peers = config.node.limits.max_peers;
992 let max_links = config.node.limits.max_links;
993 let coords_response_interval_ms = config.node.session.coords_response_interval_ms;
994
995 let (host_map, peer_acl) = Self::host_map_and_peer_acl(&config);
996 let configured_peer_send_weights = Self::configured_peer_send_weights(&config);
997
998 Ok(Self {
999 identity,
1000 startup_epoch,
1001 started_at: std::time::Instant::now(),
1002 config,
1003 state: NodeState::Created,
1004 is_leaf_only: false,
1005 tree_state,
1006 bloom_state,
1007 coord_cache,
1008 learned_routes: LearnedRouteTable::default(),
1009 recent_requests: HashMap::new(),
1010 transports: HashMap::new(),
1011 transport_drops: HashMap::new(),
1012 links: HashMap::new(),
1013 addr_to_link: HashMap::new(),
1014 packet_tx: None,
1015 packet_rx: None,
1016 connections: HashMap::new(),
1017 peers: HashMap::new(),
1018 sessions: HashMap::new(),
1019 identity_cache: HashMap::new(),
1020 pending_tun_packets: HashMap::new(),
1021 pending_endpoint_data: HashMap::new(),
1022 pending_lookups: HashMap::new(),
1023 max_connections,
1024 max_peers,
1025 max_links,
1026 next_link_id: 1,
1027 next_transport_id: 1,
1028 stats: stats::NodeStats::new(),
1029 stats_history: stats_history::StatsHistory::new(),
1030 tun_state,
1031 tun_name: None,
1032 tun_tx: None,
1033 tun_outbound_rx: None,
1034 external_packet_tx: None,
1035 endpoint_command_rx: None,
1036 endpoint_event_tx: None,
1037 encrypt_workers: None,
1038 decrypt_workers: None,
1039 decrypt_registered_sessions: std::collections::HashSet::new(),
1040 decrypt_fallback_tx,
1041 decrypt_fallback_rx,
1042 tun_reader_handle: None,
1043 tun_writer_handle: None,
1044 #[cfg(target_os = "macos")]
1045 tun_shutdown_fd: None,
1046 dns_identity_rx: None,
1047 dns_task: None,
1048 index_allocator: IndexAllocator::new(),
1049 peers_by_index: HashMap::new(),
1050 pending_outbound: HashMap::new(),
1051 msg1_rate_limiter,
1052 icmp_rate_limiter: IcmpRateLimiter::new(),
1053 routing_error_rate_limiter: RoutingErrorRateLimiter::new(),
1054 coords_response_rate_limiter: RoutingErrorRateLimiter::with_interval(
1055 std::time::Duration::from_millis(coords_response_interval_ms),
1056 ),
1057 discovery_backoff: DiscoveryBackoff::new(),
1058 discovery_forward_limiter: DiscoveryForwardRateLimiter::new(),
1059 pending_connects: Vec::new(),
1060 retry_pending: HashMap::new(),
1061 nostr_discovery: None,
1062 nostr_discovery_started_at_ms: None,
1063 lan_discovery: None,
1064 local_instance_registry: None,
1065 local_instance_started_at_ms: None,
1066 last_local_instance_publish_ms: None,
1067 last_local_instance_scan_ms: None,
1068 startup_open_discovery_sweep_done: false,
1069 bootstrap_transports: HashSet::new(),
1070 bootstrap_transport_npubs: HashMap::new(),
1071 discovery_fallback_transit_blocked_peers: HashSet::new(),
1072 last_parent_reeval: None,
1073 last_congestion_log: None,
1074 estimated_mesh_size: None,
1075 last_mesh_size_log: None,
1076 last_self_warn: None,
1077 last_local_send_failure_at: None,
1078 last_rx_loop_maintenance_timeout_at: None,
1079 peer_aliases: HashMap::new(),
1080 configured_peer_send_weights,
1081 peer_acl,
1082 host_map,
1083 path_mtu_lookup: Arc::new(std::sync::RwLock::new(HashMap::new())),
1084 })
1085 }
1086
1087 pub fn leaf_only(config: Config) -> Result<Self, NodeError> {
1089 let mut node = Self::new(config)?;
1090 node.is_leaf_only = true;
1091 node.bloom_state = BloomState::leaf_only(*node.identity.node_addr());
1092 Ok(node)
1093 }
1094
1095 fn host_map_and_peer_acl(config: &Config) -> (Arc<HostMap>, acl::PeerAclReloader) {
1096 let base_host_map = HostMap::from_peer_configs(config.peers());
1097 if !config.node.system_files_enabled {
1098 return (
1099 Arc::new(base_host_map.clone()),
1100 acl::PeerAclReloader::memory_only(base_host_map),
1101 );
1102 }
1103
1104 let mut host_map = base_host_map.clone();
1105 let hosts_path = std::path::PathBuf::from(crate::upper::hosts::DEFAULT_HOSTS_PATH);
1106 let hosts_file = HostMap::load_hosts_file(std::path::Path::new(
1107 crate::upper::hosts::DEFAULT_HOSTS_PATH,
1108 ));
1109 host_map.merge(hosts_file);
1110 let peer_acl = acl::PeerAclReloader::with_alias_sources(
1111 std::path::PathBuf::from(acl::DEFAULT_PEERS_ALLOW_PATH),
1112 std::path::PathBuf::from(acl::DEFAULT_PEERS_DENY_PATH),
1113 base_host_map,
1114 hosts_path,
1115 );
1116 (Arc::new(host_map), peer_acl)
1117 }
1118
1119 fn configured_peer_send_weights(config: &Config) -> HashMap<NodeAddr, u8> {
1120 config
1121 .peers()
1122 .iter()
1123 .filter_map(|peer| {
1124 PeerIdentity::from_npub(&peer.npub).ok().map(|identity| {
1125 (
1126 *identity.node_addr(),
1127 encrypt_worker::EXPLICIT_PEER_SEND_WEIGHT,
1128 )
1129 })
1130 })
1131 .collect()
1132 }
1133
1134 fn send_weight_for_peer(&self, peer_addr: &NodeAddr) -> u8 {
1135 self.configured_peer_send_weights
1136 .get(peer_addr)
1137 .copied()
1138 .unwrap_or(encrypt_worker::DEFAULT_SEND_WEIGHT)
1139 }
1140
1141 async fn create_transports(&mut self, packet_tx: &PacketTx) -> Vec<TransportHandle> {
1145 let mut transports = Vec::new();
1146
1147 let udp_instances: Vec<_> = self
1149 .config
1150 .transports
1151 .udp
1152 .iter()
1153 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1154 .collect();
1155
1156 for (name, udp_config) in udp_instances {
1158 let transport_id = self.allocate_transport_id();
1159 let udp = UdpTransport::new(transport_id, name, udp_config, packet_tx.clone());
1160 transports.push(TransportHandle::Udp(udp));
1161 }
1162
1163 #[cfg(feature = "sim-transport")]
1164 {
1165 let sim_instances: Vec<_> = self
1166 .config
1167 .transports
1168 .sim
1169 .iter()
1170 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1171 .collect();
1172
1173 for (name, sim_config) in sim_instances {
1174 let transport_id = self.allocate_transport_id();
1175 let sim = crate::transport::sim::SimTransport::new(
1176 transport_id,
1177 name,
1178 sim_config,
1179 packet_tx.clone(),
1180 );
1181 transports.push(TransportHandle::Sim(sim));
1182 }
1183 }
1184
1185 #[cfg(any(target_os = "linux", target_os = "macos"))]
1187 {
1188 let eth_instances: Vec<_> = self
1189 .config
1190 .transports
1191 .ethernet
1192 .iter()
1193 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1194 .collect();
1195 let xonly = self.identity.pubkey();
1196 for (name, eth_config) in eth_instances {
1197 let mut eth_config = eth_config;
1198 if eth_config.discovery_scope.is_none() {
1199 eth_config.discovery_scope = self.lan_discovery_scope();
1200 }
1201 let transport_id = self.allocate_transport_id();
1202 let mut eth =
1203 EthernetTransport::new(transport_id, name, eth_config, packet_tx.clone());
1204 eth.set_local_pubkey(xonly);
1205 transports.push(TransportHandle::Ethernet(eth));
1206 }
1207 }
1208
1209 let tcp_instances: Vec<_> = self
1211 .config
1212 .transports
1213 .tcp
1214 .iter()
1215 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1216 .collect();
1217
1218 for (name, tcp_config) in tcp_instances {
1219 let transport_id = self.allocate_transport_id();
1220 let tcp = TcpTransport::new(transport_id, name, tcp_config, packet_tx.clone());
1221 transports.push(TransportHandle::Tcp(tcp));
1222 }
1223
1224 let tor_instances: Vec<_> = self
1226 .config
1227 .transports
1228 .tor
1229 .iter()
1230 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1231 .collect();
1232
1233 for (name, tor_config) in tor_instances {
1234 let transport_id = self.allocate_transport_id();
1235 let tor = TorTransport::new(transport_id, name, tor_config, packet_tx.clone());
1236 transports.push(TransportHandle::Tor(tor));
1237 }
1238
1239 let webrtc_instances: Vec<_> = self
1240 .config
1241 .transports
1242 .webrtc
1243 .iter()
1244 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1245 .collect();
1246
1247 #[cfg(feature = "webrtc-transport")]
1248 {
1249 for (name, webrtc_config) in webrtc_instances {
1250 let transport_id = self.allocate_transport_id();
1251 match WebRtcTransport::new(
1252 transport_id,
1253 name,
1254 webrtc_config,
1255 packet_tx.clone(),
1256 &self.identity,
1257 &self.config.node.discovery.nostr,
1258 ) {
1259 Ok(webrtc) => transports.push(TransportHandle::WebRtc(Box::new(webrtc))),
1260 Err(err) => {
1261 warn!(
1262 transport_id = %transport_id,
1263 error = %err,
1264 "failed to initialize WebRTC transport"
1265 );
1266 }
1267 }
1268 }
1269 }
1270 #[cfg(not(feature = "webrtc-transport"))]
1271 if !webrtc_instances.is_empty() {
1272 warn!("WebRTC transport configured but this build lacks WebRTC transport support");
1273 }
1274
1275 #[cfg(bluer_available)]
1277 {
1278 let ble_instances: Vec<_> = self
1279 .config
1280 .transports
1281 .ble
1282 .iter()
1283 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1284 .collect();
1285
1286 #[cfg(all(bluer_available, not(test)))]
1287 for (name, ble_config) in ble_instances {
1288 let transport_id = self.allocate_transport_id();
1289 let adapter = ble_config.adapter().to_string();
1290 let mtu = ble_config.mtu();
1291 match crate::transport::ble::io::BluerIo::new(&adapter, mtu).await {
1292 Ok(io) => {
1293 let mut ble = crate::transport::ble::BleTransport::new(
1294 transport_id,
1295 name,
1296 ble_config,
1297 io,
1298 packet_tx.clone(),
1299 );
1300 ble.set_local_pubkey(self.identity.pubkey().serialize());
1301 transports.push(TransportHandle::Ble(ble));
1302 }
1303 Err(e) => {
1304 tracing::warn!(adapter = %adapter, error = %e, "failed to initialize BLE adapter");
1305 }
1306 }
1307 }
1308
1309 #[cfg(any(not(bluer_available), test))]
1310 if !ble_instances.is_empty() {
1311 #[cfg(not(test))]
1312 tracing::warn!("BLE transport configured but this build lacks BlueZ support");
1313 }
1314 }
1315
1316 transports
1317 }
1318
1319 fn find_transport_for_type(&self, transport_type: &str) -> Option<TransportId> {
1329 self.transports
1330 .iter()
1331 .filter(|(id, handle)| {
1332 handle.transport_type().name == transport_type
1333 && handle.is_operational()
1334 && !self.bootstrap_transports.contains(id)
1335 })
1336 .min_by_key(|(id, _)| id.as_u32())
1337 .map(|(id, _)| *id)
1338 }
1339
1340 #[allow(unused_variables)]
1346 fn resolve_ethernet_addr(
1347 &self,
1348 addr_str: &str,
1349 ) -> Result<(TransportId, TransportAddr), NodeError> {
1350 #[cfg(any(target_os = "linux", target_os = "macos"))]
1351 {
1352 let (iface, mac_str) = addr_str.split_once('/').ok_or_else(|| {
1353 NodeError::NoTransportForType(format!(
1354 "invalid Ethernet address format '{}': expected 'interface/mac'",
1355 addr_str
1356 ))
1357 })?;
1358
1359 let transport_id = self
1361 .transports
1362 .iter()
1363 .find(|(_, handle)| {
1364 handle.transport_type().name == "ethernet"
1365 && handle.is_operational()
1366 && handle.interface_name() == Some(iface)
1367 })
1368 .map(|(id, _)| *id)
1369 .ok_or_else(|| {
1370 NodeError::NoTransportForType(format!(
1371 "no operational Ethernet transport for interface '{}'",
1372 iface
1373 ))
1374 })?;
1375
1376 let mac = crate::transport::ethernet::parse_mac_string(mac_str).map_err(|e| {
1377 NodeError::NoTransportForType(format!("invalid MAC in '{}': {}", addr_str, e))
1378 })?;
1379
1380 Ok((transport_id, TransportAddr::from_bytes(&mac)))
1381 }
1382 #[cfg(not(any(target_os = "linux", target_os = "macos")))]
1383 {
1384 Err(NodeError::NoTransportForType(
1385 "Ethernet transport is not supported on this platform".to_string(),
1386 ))
1387 }
1388 }
1389
1390 #[cfg(bluer_available)]
1394 fn resolve_ble_addr(&self, addr_str: &str) -> Result<(TransportId, TransportAddr), NodeError> {
1395 let ta = TransportAddr::from_string(addr_str);
1396 let adapter = crate::transport::ble::addr::adapter_from_addr(&ta).ok_or_else(|| {
1397 NodeError::NoTransportForType(format!(
1398 "invalid BLE address format '{}': expected 'adapter/mac'",
1399 addr_str
1400 ))
1401 })?;
1402
1403 let transport_id = self
1405 .transports
1406 .iter()
1407 .find(|(_, handle)| handle.transport_type().name == "ble" && handle.is_operational())
1408 .map(|(id, _)| *id)
1409 .ok_or_else(|| {
1410 NodeError::NoTransportForType(format!(
1411 "no operational BLE transport for adapter '{}'",
1412 adapter
1413 ))
1414 })?;
1415
1416 crate::transport::ble::addr::BleAddr::parse(addr_str).map_err(|e| {
1418 NodeError::NoTransportForType(format!("invalid BLE address '{}': {}", addr_str, e))
1419 })?;
1420
1421 Ok((transport_id, TransportAddr::from_string(addr_str)))
1422 }
1423
1424 pub fn identity(&self) -> &Identity {
1428 &self.identity
1429 }
1430
1431 pub fn node_addr(&self) -> &NodeAddr {
1433 self.identity.node_addr()
1434 }
1435
1436 pub fn npub(&self) -> String {
1438 self.identity.npub()
1439 }
1440
1441 pub(crate) fn peer_display_name(&self, addr: &NodeAddr) -> String {
1450 if let Some(hostname) = self.host_map.lookup_hostname(addr) {
1451 return hostname.to_string();
1452 }
1453 if let Some(name) = self.peer_aliases.get(addr) {
1454 return name.clone();
1455 }
1456 if let Some(peer) = self.peers.get(addr) {
1457 return peer.identity().short_npub();
1458 }
1459 if let Some(entry) = self.sessions.get(addr) {
1460 let (xonly, _) = entry.remote_pubkey().x_only_public_key();
1461 return PeerIdentity::from_pubkey(xonly).short_npub();
1462 }
1463 addr.short_hex()
1464 }
1465
1466 pub(in crate::node) fn deregister_session_index(&mut self, cache_key: (TransportId, u32)) {
1478 let owning_peer = self.peers_by_index.get(&cache_key).copied();
1482 self.peers_by_index.remove(&cache_key);
1483 if self.decrypt_registered_sessions.remove(&cache_key)
1484 && let Some(workers) = self.decrypt_workers.as_ref()
1485 {
1486 workers.unregister_session(cache_key);
1487 }
1488 if let Some(peer_addr) = owning_peer {
1499 let peer_has_other_index = self
1500 .peers_by_index
1501 .values()
1502 .any(|other| *other == peer_addr);
1503 if !peer_has_other_index {
1504 self.clear_connected_udp_for_peer(&peer_addr);
1505 }
1506 }
1507 }
1508
1509 pub(in crate::node) fn ensure_current_session_index_registered(
1518 &mut self,
1519 node_addr: &NodeAddr,
1520 context: &'static str,
1521 ) -> bool {
1522 let Some(peer) = self.peers.get(node_addr) else {
1523 return false;
1524 };
1525 let Some(transport_id) = peer.transport_id() else {
1526 warn!(
1527 peer = %self.peer_display_name(node_addr),
1528 context,
1529 "Cannot register current session index without transport id"
1530 );
1531 return false;
1532 };
1533 let Some(our_index) = peer.our_index() else {
1534 warn!(
1535 peer = %self.peer_display_name(node_addr),
1536 context,
1537 "Cannot register current session index without local index"
1538 );
1539 return false;
1540 };
1541
1542 let cache_key = (transport_id, our_index.as_u32());
1543 match self.peers_by_index.get(&cache_key).copied() {
1544 Some(existing) if existing == *node_addr => true,
1545 Some(existing) => {
1546 warn!(
1547 peer = %self.peer_display_name(node_addr),
1548 previous_owner = %self.peer_display_name(&existing),
1549 transport_id = %transport_id,
1550 our_index = %our_index,
1551 context,
1552 "Repairing current session index with stale owner"
1553 );
1554 self.peers_by_index.insert(cache_key, *node_addr);
1555 true
1556 }
1557 None => {
1558 warn!(
1559 peer = %self.peer_display_name(node_addr),
1560 transport_id = %transport_id,
1561 our_index = %our_index,
1562 context,
1563 "Repairing missing current session index"
1564 );
1565 self.peers_by_index.insert(cache_key, *node_addr);
1566 true
1567 }
1568 }
1569 }
1570
1571 pub fn config(&self) -> &Config {
1575 &self.config
1576 }
1577
1578 pub fn effective_ipv6_mtu(&self) -> u16 {
1584 crate::upper::icmp::effective_ipv6_mtu(self.transport_mtu())
1585 }
1586
1587 pub fn transport_mtu(&self) -> u16 {
1604 let min_operational = self
1605 .transports
1606 .values()
1607 .filter(|h| h.is_operational())
1608 .map(|h| h.mtu())
1609 .min();
1610 if let Some(mtu) = min_operational {
1611 return mtu;
1612 }
1613 if let Some((_, cfg)) = self.config.transports.udp.iter().next() {
1615 return cfg.mtu();
1616 }
1617 1280
1618 }
1619
1620 pub fn state(&self) -> NodeState {
1624 self.state
1625 }
1626
1627 pub fn uptime(&self) -> std::time::Duration {
1629 self.started_at.elapsed()
1630 }
1631
1632 pub fn is_running(&self) -> bool {
1634 self.state.is_operational()
1635 }
1636
1637 pub fn is_leaf_only(&self) -> bool {
1639 self.is_leaf_only
1640 }
1641
1642 pub fn tree_state(&self) -> &TreeState {
1646 &self.tree_state
1647 }
1648
1649 pub fn tree_state_mut(&mut self) -> &mut TreeState {
1651 &mut self.tree_state
1652 }
1653
1654 pub fn bloom_state(&self) -> &BloomState {
1658 &self.bloom_state
1659 }
1660
1661 pub fn bloom_state_mut(&mut self) -> &mut BloomState {
1663 &mut self.bloom_state
1664 }
1665
1666 pub fn estimated_mesh_size(&self) -> Option<u64> {
1670 self.estimated_mesh_size
1671 }
1672
1673 pub(crate) fn compute_mesh_size(&mut self) {
1679 let my_addr = *self.tree_state.my_node_addr();
1680 let parent_id = *self.tree_state.my_declaration().parent_id();
1681 let is_root = self.tree_state.is_root();
1682
1683 let max_fpr = self.config.node.bloom.max_inbound_fpr;
1684 let mut total: f64 = 1.0; let mut child_count: u32 = 0;
1686 let mut has_data = false;
1687
1688 if !is_root
1694 && let Some(parent) = self.peers.get(&parent_id)
1695 && let Some(filter) = parent.inbound_filter()
1696 {
1697 match filter.estimated_count(max_fpr) {
1698 Some(n) => {
1699 total += n;
1700 has_data = true;
1701 }
1702 None => {
1703 self.estimated_mesh_size = None;
1704 return;
1705 }
1706 }
1707 }
1708
1709 for (peer_addr, peer) in &self.peers {
1711 if peer_addr == &parent_id {
1712 continue;
1713 }
1714 if let Some(decl) = self.tree_state.peer_declaration(peer_addr)
1715 && *decl.parent_id() == my_addr
1716 {
1717 child_count += 1;
1718 if let Some(filter) = peer.inbound_filter() {
1719 match filter.estimated_count(max_fpr) {
1720 Some(n) => {
1721 total += n;
1722 has_data = true;
1723 }
1724 None => {
1725 self.estimated_mesh_size = None;
1726 return;
1727 }
1728 }
1729 }
1730 }
1731 }
1732
1733 if !has_data {
1734 self.estimated_mesh_size = None;
1735 return;
1736 }
1737
1738 let size = total.round() as u64;
1739 self.estimated_mesh_size = Some(size);
1740
1741 let now = std::time::Instant::now();
1743 let should_log = match self.last_mesh_size_log {
1744 None => true,
1745 Some(last) => {
1746 now.duration_since(last)
1747 >= std::time::Duration::from_secs(self.config.node.mmp.log_interval_secs)
1748 }
1749 };
1750 if should_log {
1751 tracing::debug!(
1752 estimated_mesh_size = size,
1753 peers = self.peers.len(),
1754 children = child_count,
1755 "Mesh size estimate"
1756 );
1757 self.last_mesh_size_log = Some(now);
1758 }
1759 }
1760
1761 pub fn coord_cache(&self) -> &CoordCache {
1765 &self.coord_cache
1766 }
1767
1768 pub fn coord_cache_mut(&mut self) -> &mut CoordCache {
1770 &mut self.coord_cache
1771 }
1772
1773 pub fn stats(&self) -> &stats::NodeStats {
1777 &self.stats
1778 }
1779
1780 pub(crate) fn stats_mut(&mut self) -> &mut stats::NodeStats {
1782 &mut self.stats
1783 }
1784
1785 pub fn stats_history(&self) -> &stats_history::StatsHistory {
1787 &self.stats_history
1788 }
1789
1790 pub(crate) fn record_stats_history(&mut self) {
1793 let fwd = &self.stats.forwarding;
1794 let peers_with_mmp: Vec<f64> = self
1795 .peers
1796 .values()
1797 .filter_map(|p| p.mmp().map(|m| m.metrics.loss_rate()))
1798 .collect();
1799 let loss_rate = if peers_with_mmp.is_empty() {
1800 0.0
1801 } else {
1802 peers_with_mmp.iter().sum::<f64>() / peers_with_mmp.len() as f64
1803 };
1804
1805 let snap = stats_history::Snapshot {
1806 mesh_size: self.estimated_mesh_size,
1807 tree_depth: self.tree_state.my_coords().depth() as u32,
1808 peer_count: self.peers.len() as u64,
1809 parent_switches_total: self.stats.tree.parent_switches,
1810 bytes_in_total: fwd.received_bytes,
1811 bytes_out_total: fwd.forwarded_bytes + fwd.originated_bytes,
1812 packets_in_total: fwd.received_packets,
1813 packets_out_total: fwd.forwarded_packets + fwd.originated_packets,
1814 loss_rate,
1815 active_sessions: self.sessions.len() as u64,
1816 };
1817
1818 let now = std::time::Instant::now();
1819 let peer_snaps: Vec<stats_history::PeerSnapshot> = self
1820 .peers
1821 .values()
1822 .map(|p| {
1823 let stats = p.link_stats();
1824 let (srtt_ms, loss_rate, ecn_ce) = match p.mmp() {
1825 Some(m) => (
1826 m.metrics.srtt_ms(),
1827 Some(m.metrics.loss_rate()),
1828 m.receiver.ecn_ce_count() as u64,
1829 ),
1830 None => (None, None, 0),
1831 };
1832 stats_history::PeerSnapshot {
1833 node_addr: *p.node_addr(),
1834 last_seen: now,
1835 srtt_ms,
1836 loss_rate,
1837 bytes_in_total: stats.bytes_recv,
1838 bytes_out_total: stats.bytes_sent,
1839 packets_in_total: stats.packets_recv,
1840 packets_out_total: stats.packets_sent,
1841 ecn_ce_total: ecn_ce,
1842 }
1843 })
1844 .collect();
1845
1846 self.stats_history.tick(now, &snap, &peer_snaps);
1847 }
1848
1849 pub fn tun_state(&self) -> TunState {
1853 self.tun_state
1854 }
1855
1856 pub fn tun_name(&self) -> Option<&str> {
1858 self.tun_name.as_deref()
1859 }
1860
1861 pub fn set_max_connections(&mut self, max: usize) {
1865 self.max_connections = max;
1866 }
1867
1868 pub fn set_max_peers(&mut self, max: usize) {
1870 self.max_peers = max;
1871 }
1872
1873 pub(crate) fn outbound_admission_check(&self) -> bool {
1876 let connection_used = self
1877 .connections
1878 .len()
1879 .saturating_add(self.pending_connects.len());
1880 let peer_allowed = self.max_peers == 0 || self.peers.len() < self.max_peers;
1881 let connection_allowed =
1882 self.max_connections == 0 || connection_used < self.max_connections;
1883 let link_allowed = self.max_links == 0 || self.links.len() < self.max_links;
1884 peer_allowed && connection_allowed && link_allowed
1885 }
1886
1887 pub(crate) fn open_discovery_outbound_admission_check(&self) -> bool {
1891 if !self.outbound_admission_check() {
1892 return false;
1893 }
1894
1895 let nostr = &self.config.node.discovery.nostr;
1896 if !nostr.enabled || nostr.policy != NostrDiscoveryPolicy::Open {
1897 return true;
1898 }
1899
1900 let configured_npubs = self
1901 .config
1902 .peers()
1903 .iter()
1904 .map(|peer| peer.npub.clone())
1905 .collect::<HashSet<_>>();
1906 self.open_discovery_enqueue_budget(&configured_npubs) > 0
1907 }
1908
1909 pub(crate) fn outbound_direct_refresh_admission_check(&self) -> bool {
1913 let connection_used = self
1914 .connections
1915 .len()
1916 .saturating_add(self.pending_connects.len());
1917 let connection_allowed =
1918 self.max_connections == 0 || connection_used < self.max_connections;
1919 let link_allowed = self.max_links == 0 || self.links.len() < self.max_links;
1920 connection_allowed && link_allowed
1921 }
1922
1923 pub fn set_max_links(&mut self, max: usize) {
1925 self.max_links = max;
1926 }
1927
1928 pub fn connection_count(&self) -> usize {
1932 self.connections.len()
1933 }
1934
1935 pub fn peer_count(&self) -> usize {
1937 self.peers.len()
1938 }
1939
1940 pub fn link_count(&self) -> usize {
1942 self.links.len()
1943 }
1944
1945 pub fn transport_count(&self) -> usize {
1947 self.transports.len()
1948 }
1949
1950 pub fn allocate_transport_id(&mut self) -> TransportId {
1954 let id = TransportId::new(self.next_transport_id);
1955 self.next_transport_id += 1;
1956 id
1957 }
1958
1959 pub fn get_transport(&self, id: &TransportId) -> Option<&TransportHandle> {
1961 self.transports.get(id)
1962 }
1963
1964 pub fn get_transport_mut(&mut self, id: &TransportId) -> Option<&mut TransportHandle> {
1966 self.transports.get_mut(id)
1967 }
1968
1969 pub fn transport_ids(&self) -> impl Iterator<Item = &TransportId> {
1971 self.transports.keys()
1972 }
1973
1974 pub fn packet_rx(&mut self) -> Option<&mut PacketRx> {
1976 self.packet_rx.as_mut()
1977 }
1978
1979 pub fn allocate_link_id(&mut self) -> LinkId {
1983 let id = LinkId::new(self.next_link_id);
1984 self.next_link_id += 1;
1985 id
1986 }
1987
1988 pub fn add_link(&mut self, link: Link) -> Result<(), NodeError> {
1990 if self.max_links > 0 && self.links.len() >= self.max_links {
1991 return Err(NodeError::MaxLinksExceeded {
1992 max: self.max_links,
1993 });
1994 }
1995 let link_id = link.link_id();
1996 let transport_id = link.transport_id();
1997 let remote_addr = link.remote_addr().clone();
1998
1999 self.links.insert(link_id, link);
2000 self.addr_to_link
2001 .insert((transport_id, remote_addr), link_id);
2002 Ok(())
2003 }
2004
2005 pub fn get_link(&self, link_id: &LinkId) -> Option<&Link> {
2007 self.links.get(link_id)
2008 }
2009
2010 pub fn get_link_mut(&mut self, link_id: &LinkId) -> Option<&mut Link> {
2012 self.links.get_mut(link_id)
2013 }
2014
2015 pub fn find_link_by_addr(
2017 &self,
2018 transport_id: TransportId,
2019 addr: &TransportAddr,
2020 ) -> Option<LinkId> {
2021 self.addr_to_link
2022 .get(&(transport_id, addr.clone()))
2023 .copied()
2024 }
2025
2026 pub fn remove_link(&mut self, link_id: &LinkId) -> Option<Link> {
2032 if let Some(link) = self.links.remove(link_id) {
2033 let key = (link.transport_id(), link.remote_addr().clone());
2035 if self.addr_to_link.get(&key) == Some(link_id) {
2036 self.addr_to_link.remove(&key);
2037 }
2038 Some(link)
2039 } else {
2040 None
2041 }
2042 }
2043
2044 pub(crate) fn cleanup_bootstrap_transport_if_unused(&mut self, transport_id: TransportId) {
2045 if !self.bootstrap_transports.contains(&transport_id) {
2046 return;
2047 }
2048
2049 let transport_in_use = self
2050 .links
2051 .values()
2052 .any(|link| link.transport_id() == transport_id)
2053 || self
2054 .connections
2055 .values()
2056 .any(|conn| conn.transport_id() == Some(transport_id))
2057 || self
2058 .peers
2059 .values()
2060 .any(|peer| peer.transport_id() == Some(transport_id))
2061 || self
2062 .pending_connects
2063 .iter()
2064 .any(|pending| pending.transport_id == transport_id);
2065
2066 if transport_in_use {
2067 return;
2068 }
2069
2070 tracing::debug!(
2071 transport_id = %transport_id,
2072 "bootstrap transport has no remaining references; dropping"
2073 );
2074
2075 self.bootstrap_transports.remove(&transport_id);
2076 self.bootstrap_transport_npubs.remove(&transport_id);
2077 self.transport_drops.remove(&transport_id);
2078 self.transports.remove(&transport_id);
2079 }
2080
2081 pub fn links(&self) -> impl Iterator<Item = &Link> {
2083 self.links.values()
2084 }
2085
2086 pub fn add_connection(&mut self, connection: PeerConnection) -> Result<(), NodeError> {
2090 let link_id = connection.link_id();
2091
2092 if self.connections.contains_key(&link_id) {
2093 return Err(NodeError::ConnectionAlreadyExists(link_id));
2094 }
2095
2096 if self.max_connections > 0 && self.connections.len() >= self.max_connections {
2097 return Err(NodeError::MaxConnectionsExceeded {
2098 max: self.max_connections,
2099 });
2100 }
2101
2102 self.connections.insert(link_id, connection);
2103 Ok(())
2104 }
2105
2106 pub fn get_connection(&self, link_id: &LinkId) -> Option<&PeerConnection> {
2108 self.connections.get(link_id)
2109 }
2110
2111 pub fn get_connection_mut(&mut self, link_id: &LinkId) -> Option<&mut PeerConnection> {
2113 self.connections.get_mut(link_id)
2114 }
2115
2116 pub fn remove_connection(&mut self, link_id: &LinkId) -> Option<PeerConnection> {
2118 self.connections.remove(link_id)
2119 }
2120
2121 pub fn connections(&self) -> impl Iterator<Item = &PeerConnection> {
2123 self.connections.values()
2124 }
2125
2126 pub fn get_peer(&self, node_addr: &NodeAddr) -> Option<&ActivePeer> {
2130 self.peers.get(node_addr)
2131 }
2132
2133 pub fn get_peer_mut(&mut self, node_addr: &NodeAddr) -> Option<&mut ActivePeer> {
2135 self.peers.get_mut(node_addr)
2136 }
2137
2138 pub fn remove_peer(&mut self, node_addr: &NodeAddr) -> Option<ActivePeer> {
2140 self.peers.remove(node_addr)
2141 }
2142
2143 pub fn peers(&self) -> impl Iterator<Item = &ActivePeer> {
2145 self.peers.values()
2146 }
2147
2148 pub fn nostr_discovery_handle(&self) -> Option<&crate::discovery::nostr::NostrDiscovery> {
2152 self.nostr_discovery.as_deref()
2153 }
2154
2155 pub fn peer_ids(&self) -> impl Iterator<Item = &NodeAddr> {
2157 self.peers.keys()
2158 }
2159
2160 pub fn sendable_peers(&self) -> impl Iterator<Item = &ActivePeer> {
2162 self.peers.values().filter(|p| p.can_send())
2163 }
2164
2165 pub fn sendable_peer_count(&self) -> usize {
2167 self.peers.values().filter(|p| p.can_send()).count()
2168 }
2169
2170 pub(crate) fn set_discovery_fallback_transit_allowed(
2171 &mut self,
2172 peer_addr: NodeAddr,
2173 allowed: bool,
2174 ) {
2175 if allowed {
2176 self.discovery_fallback_transit_blocked_peers
2177 .remove(&peer_addr);
2178 } else {
2179 self.discovery_fallback_transit_blocked_peers
2180 .insert(peer_addr);
2181 }
2182 }
2183
2184 pub(crate) fn configured_discovery_fallback_transit(
2185 &self,
2186 peer_addr: &NodeAddr,
2187 ) -> Option<bool> {
2188 self.configured_peer(peer_addr)
2189 .map(|peer| peer.discovery_fallback_transit)
2190 }
2191
2192 pub(crate) fn configured_peer(&self, peer_addr: &NodeAddr) -> Option<&PeerConfig> {
2193 self.config.peers().iter().find(|peer| {
2194 PeerIdentity::from_npub(&peer.npub)
2195 .ok()
2196 .is_some_and(|identity| identity.node_addr() == peer_addr)
2197 })
2198 }
2199
2200 pub(crate) fn discovery_fallback_transit_for_promotion(&self, peer_addr: &NodeAddr) -> bool {
2201 if let Some(retry_state) = self.retry_pending.get(peer_addr) {
2202 return retry_state.peer_config.discovery_fallback_transit;
2203 }
2204
2205 if let Some(allowed) = self.configured_discovery_fallback_transit(peer_addr) {
2206 return allowed;
2207 }
2208
2209 self.config.node.discovery.nostr.policy != crate::config::NostrDiscoveryPolicy::Open
2210 }
2211
2212 #[cfg(test)]
2217 pub(crate) fn disable_discovery_forward_rate_limit(&mut self) {
2218 self.discovery_forward_limiter
2219 .set_interval(std::time::Duration::ZERO);
2220 }
2221
2222 #[cfg(test)]
2223 pub(crate) fn get_session(&self, remote: &NodeAddr) -> Option<&SessionEntry> {
2224 self.sessions.get(remote)
2225 }
2226
2227 #[cfg(test)]
2229 pub(crate) fn get_session_mut(&mut self, remote: &NodeAddr) -> Option<&mut SessionEntry> {
2230 self.sessions.get_mut(remote)
2231 }
2232
2233 #[cfg(test)]
2235 pub(crate) fn remove_session(&mut self, remote: &NodeAddr) -> Option<SessionEntry> {
2236 self.sessions.remove(remote)
2237 }
2238
2239 #[cfg(test)]
2241 pub(crate) fn path_mtu_lookup_get(&self, fips_addr: &crate::FipsAddress) -> Option<u16> {
2242 self.path_mtu_lookup
2243 .read()
2244 .ok()
2245 .and_then(|map| map.get(fips_addr).copied())
2246 }
2247
2248 #[cfg(test)]
2250 pub(crate) fn path_mtu_lookup_insert(&self, fips_addr: crate::FipsAddress, mtu: u16) {
2251 if let Ok(mut map) = self.path_mtu_lookup.write() {
2252 map.insert(fips_addr, mtu);
2253 }
2254 }
2255
2256 pub fn session_count(&self) -> usize {
2258 self.sessions.len()
2259 }
2260
2261 pub(crate) fn session_entries(&self) -> impl Iterator<Item = (&NodeAddr, &SessionEntry)> {
2263 self.sessions.iter()
2264 }
2265
2266 pub(crate) fn register_identity(
2270 &mut self,
2271 node_addr: NodeAddr,
2272 pubkey: secp256k1::PublicKey,
2273 ) -> bool {
2274 let mut prefix = [0u8; 15];
2275 prefix.copy_from_slice(&node_addr.as_bytes()[0..15]);
2276 if let Some(entry) = self.identity_cache.get(&prefix)
2277 && entry.node_addr == node_addr
2278 && entry.pubkey == pubkey
2279 {
2280 return true;
2284 }
2285
2286 let (xonly, _) = pubkey.x_only_public_key();
2287 let derived_node_addr = NodeAddr::from_pubkey(&xonly);
2288 if derived_node_addr != node_addr {
2289 debug!(
2290 claimed_node_addr = %node_addr,
2291 derived_node_addr = %derived_node_addr,
2292 "Rejected identity cache entry with mismatched public key"
2293 );
2294 return false;
2295 }
2296
2297 let now_ms = Self::now_ms();
2298 if let Some(entry) = self.identity_cache.get_mut(&prefix)
2299 && entry.node_addr == node_addr
2300 {
2301 entry.pubkey = pubkey;
2302 entry.last_seen_ms = now_ms;
2303 return true;
2304 }
2305
2306 let npub = encode_npub(&xonly);
2307 self.identity_cache.insert(
2308 prefix,
2309 IdentityCacheEntry::new(node_addr, pubkey, npub, now_ms),
2310 );
2311 let max = self.config.node.cache.identity_size;
2313 if self.identity_cache.len() > max
2314 && let Some(oldest_key) = self
2315 .identity_cache
2316 .iter()
2317 .min_by_key(|(_, entry)| entry.last_seen_ms)
2318 .map(|(k, _)| *k)
2319 {
2320 self.identity_cache.remove(&oldest_key);
2321 }
2322 true
2323 }
2324
2325 pub(crate) fn lookup_by_fips_prefix(
2327 &mut self,
2328 prefix: &[u8; 15],
2329 ) -> Option<(NodeAddr, secp256k1::PublicKey)> {
2330 if let Some(entry) = self.identity_cache.get_mut(prefix) {
2331 entry.last_seen_ms = Self::now_ms(); Some((entry.node_addr, entry.pubkey))
2333 } else {
2334 None
2335 }
2336 }
2337
2338 pub(crate) fn has_cached_identity(&self, addr: &NodeAddr) -> bool {
2340 let mut prefix = [0u8; 15];
2341 prefix.copy_from_slice(&addr.as_bytes()[0..15]);
2342 self.identity_cache.contains_key(&prefix)
2343 }
2344
2345 pub fn identity_cache_len(&self) -> usize {
2347 self.identity_cache.len()
2348 }
2349
2350 pub fn identity_cache_iter(
2355 &self,
2356 ) -> impl Iterator<Item = (&NodeAddr, &secp256k1::PublicKey, u64)> {
2357 self.identity_cache
2358 .values()
2359 .map(|entry| (&entry.node_addr, &entry.pubkey, entry.last_seen_ms))
2360 }
2361
2362 pub fn identity_cache_max(&self) -> usize {
2364 self.config.node.cache.identity_size
2365 }
2366
2367 pub fn pending_lookup_count(&self) -> usize {
2369 self.pending_lookups.len()
2370 }
2371
2372 pub fn pending_lookups_iter(
2374 &self,
2375 ) -> impl Iterator<Item = (&NodeAddr, &handlers::discovery::PendingLookup)> {
2376 self.pending_lookups.iter()
2377 }
2378
2379 pub fn recent_request_count(&self) -> usize {
2381 self.recent_requests.len()
2382 }
2383
2384 pub fn pending_tun_destinations(&self) -> usize {
2386 self.pending_tun_packets.len()
2387 }
2388
2389 pub fn pending_tun_total_packets(&self) -> usize {
2391 self.pending_tun_packets.values().map(|q| q.len()).sum()
2392 }
2393
2394 pub fn retry_state_iter(&self) -> impl Iterator<Item = (&NodeAddr, &retry::RetryState)> {
2396 self.retry_pending.iter()
2397 }
2398
2399 pub(crate) fn is_tree_peer(&self, peer_addr: &NodeAddr) -> bool {
2406 if !self.tree_state.is_root() && self.tree_state.my_declaration().parent_id() == peer_addr {
2408 return true;
2409 }
2410 if let Some(decl) = self.tree_state.peer_declaration(peer_addr)
2412 && decl.parent_id() == self.node_addr()
2413 {
2414 return true;
2415 }
2416 false
2417 }
2418
2419 pub fn find_next_hop(&mut self, dest_node_addr: &NodeAddr) -> Option<&ActivePeer> {
2442 if dest_node_addr == self.node_addr() {
2444 return None;
2445 }
2446
2447 let direct_peer_can_send = self
2451 .peers
2452 .get(dest_node_addr)
2453 .is_some_and(|peer| peer.can_send());
2454 if let Some(peer) = self.peers.get(dest_node_addr)
2455 && peer.is_healthy()
2456 {
2457 return Some(peer);
2458 }
2459
2460 let now_ms = Self::now_ms();
2461
2462 let sendable_learned_peers = if self.config.node.routing.mode == RoutingMode::ReplyLearned {
2463 Some(
2464 self.peers
2465 .iter()
2466 .filter(|(_, peer)| peer.can_send())
2467 .map(|(addr, _)| *addr)
2468 .collect::<HashSet<_>>(),
2469 )
2470 } else {
2471 None
2472 };
2473
2474 let explore_fallback = sendable_learned_peers.as_ref().is_some_and(|sendable| {
2481 self.learned_routes.should_explore_fallback(
2482 dest_node_addr,
2483 now_ms,
2484 self.config.node.routing.learned_fallback_explore_interval,
2485 |addr| sendable.contains(addr),
2486 )
2487 });
2488 if let Some(sendable) = &sendable_learned_peers
2489 && !explore_fallback
2490 && let Some(next_hop_addr) =
2491 self.learned_routes
2492 .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2493 {
2494 return self.peers.get(&next_hop_addr);
2495 }
2496
2497 let Some(dest_coords) = self
2499 .coord_cache
2500 .get_and_touch(dest_node_addr, now_ms)
2501 .cloned()
2502 else {
2503 if let Some(sendable) = &sendable_learned_peers
2504 && let Some(next_hop_addr) =
2505 self.learned_routes
2506 .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2507 {
2508 return self.peers.get(&next_hop_addr);
2509 }
2510 if direct_peer_can_send {
2511 return self.peers.get(dest_node_addr);
2512 }
2513 return None;
2514 };
2515
2516 let coordinate_route_addr = {
2519 let candidates: Vec<&ActivePeer> = self.destination_in_filters(dest_node_addr);
2520 if !candidates.is_empty() {
2521 self.select_best_candidate(&candidates, &dest_coords)
2522 .map(|peer| *peer.node_addr())
2523 } else {
2524 None
2525 }
2526 };
2527 if let Some(next_hop_addr) = coordinate_route_addr {
2528 return self.peers.get(&next_hop_addr);
2529 }
2530
2531 let tree_route_addr = self
2533 .tree_state
2534 .find_next_hop(&dest_coords)
2535 .filter(|next_hop_id| {
2536 self.peers
2537 .get(next_hop_id)
2538 .is_some_and(|peer| peer.can_send())
2539 });
2540 if let Some(next_hop_addr) = tree_route_addr {
2541 return self.peers.get(&next_hop_addr);
2542 }
2543 if explore_fallback {
2544 return sendable_learned_peers.as_ref().and_then(|sendable| {
2545 self.learned_routes
2546 .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2547 .and_then(|next_hop_addr| self.peers.get(&next_hop_addr))
2548 });
2549 }
2550
2551 if let Some(sendable) = &sendable_learned_peers
2552 && let Some(next_hop_addr) =
2553 self.learned_routes
2554 .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2555 {
2556 return self.peers.get(&next_hop_addr);
2557 }
2558
2559 if direct_peer_can_send {
2560 return self.peers.get(dest_node_addr);
2561 }
2562
2563 None
2564 }
2565
2566 pub(in crate::node) fn learn_reverse_route(
2567 &mut self,
2568 destination: NodeAddr,
2569 next_hop: NodeAddr,
2570 ) {
2571 if self.config.node.routing.mode != RoutingMode::ReplyLearned
2572 || destination == *self.node_addr()
2573 {
2574 return;
2575 }
2576 let now_ms = Self::now_ms();
2577 self.learned_routes.learn(
2578 destination,
2579 next_hop,
2580 now_ms,
2581 self.config.node.routing.learned_ttl_secs,
2582 self.config.node.routing.max_learned_routes_per_dest,
2583 );
2584 }
2585
2586 pub(in crate::node) fn record_route_failure(
2587 &mut self,
2588 destination: NodeAddr,
2589 next_hop: NodeAddr,
2590 ) {
2591 if self.config.node.routing.mode != RoutingMode::ReplyLearned {
2592 return;
2593 }
2594 self.learned_routes.record_failure(&destination, &next_hop);
2595 }
2596
2597 pub(crate) fn learned_route_table_snapshot(&self, now_ms: u64) -> LearnedRouteTableSnapshot {
2598 self.learned_routes.snapshot(now_ms)
2599 }
2600
2601 pub(in crate::node) fn purge_learned_routes(&mut self, now_ms: u64) {
2602 self.learned_routes.purge_expired(now_ms);
2603 }
2604
2605 fn select_best_candidate<'a>(
2614 &'a self,
2615 candidates: &[&'a ActivePeer],
2616 dest_coords: &crate::tree::TreeCoordinate,
2617 ) -> Option<&'a ActivePeer> {
2618 let my_distance = self.tree_state.my_coords().distance_to(dest_coords);
2619
2620 let mut best: Option<(&ActivePeer, f64, usize)> = None;
2621
2622 for &candidate in candidates {
2623 if !candidate.can_send() {
2624 continue;
2625 }
2626
2627 let cost = candidate.link_cost();
2628
2629 let dist = self
2630 .tree_state
2631 .peer_coords(candidate.node_addr())
2632 .map(|pc| pc.distance_to(dest_coords))
2633 .unwrap_or(usize::MAX);
2634
2635 if dist >= my_distance {
2638 continue;
2639 }
2640
2641 let dominated = match &best {
2642 None => true,
2643 Some((_, best_cost, best_dist)) => {
2644 cost < *best_cost
2645 || (cost == *best_cost && dist < *best_dist)
2646 || (cost == *best_cost
2647 && dist == *best_dist
2648 && candidate.node_addr() < best.as_ref().unwrap().0.node_addr())
2649 }
2650 };
2651
2652 if dominated {
2653 best = Some((candidate, cost, dist));
2654 }
2655 }
2656
2657 best.map(|(peer, _, _)| peer)
2658 }
2659
2660 pub fn destination_in_filters(&self, dest: &NodeAddr) -> Vec<&ActivePeer> {
2662 self.peers.values().filter(|p| p.may_reach(dest)).collect()
2663 }
2664
2665 pub fn tun_tx(&self) -> Option<&TunTx> {
2669 self.tun_tx.as_ref()
2670 }
2671
2672 pub fn attach_external_packet_io(
2679 &mut self,
2680 capacity: usize,
2681 ) -> Result<ExternalPacketIo, NodeError> {
2682 if self.state != NodeState::Created {
2683 return Err(NodeError::Config(ConfigError::Validation(
2684 "external packet I/O must be attached before node start".to_string(),
2685 )));
2686 }
2687 if self.config.tun.enabled {
2688 return Err(NodeError::Config(ConfigError::Validation(
2689 "external packet I/O requires tun.enabled=false".to_string(),
2690 )));
2691 }
2692
2693 let capacity = capacity.max(1);
2694 let (outbound_tx, outbound_rx) = tokio::sync::mpsc::channel(capacity);
2695 let (inbound_tx, inbound_rx) = tokio::sync::mpsc::channel(capacity);
2696 self.tun_outbound_rx = Some(outbound_rx);
2697 self.external_packet_tx = Some(inbound_tx);
2698
2699 Ok(ExternalPacketIo {
2700 outbound_tx,
2701 inbound_rx,
2702 })
2703 }
2704
2705 pub(crate) fn attach_endpoint_data_io(
2710 &mut self,
2711 capacity: usize,
2712 ) -> Result<EndpointDataIo, NodeError> {
2713 if self.state != NodeState::Created {
2714 return Err(NodeError::Config(ConfigError::Validation(
2715 "endpoint data I/O must be attached before node start".to_string(),
2716 )));
2717 }
2718
2719 let command_capacity = endpoint_data_command_capacity(capacity);
2720 let (command_tx, command_rx) = tokio::sync::mpsc::channel(command_capacity);
2721 let (event_tx, event_rx) = tokio::sync::mpsc::unbounded_channel();
2726 self.endpoint_command_rx = Some(command_rx);
2727 self.endpoint_event_tx = Some(event_tx.clone());
2728
2729 Ok(EndpointDataIo {
2730 command_tx,
2731 event_rx,
2732 event_tx,
2733 })
2734 }
2735
2736 pub(crate) fn pubkey_for_node_addr(&self, addr: &NodeAddr) -> Option<secp256k1::PublicKey> {
2737 let mut prefix = [0u8; 15];
2738 prefix.copy_from_slice(&addr.as_bytes()[0..15]);
2739 self.identity_cache
2740 .get(&prefix)
2741 .filter(|entry| &entry.node_addr == addr)
2742 .map(|entry| entry.pubkey)
2743 }
2744
2745 pub(crate) fn npub_for_node_addr(&self, addr: &NodeAddr) -> Option<String> {
2746 let mut prefix = [0u8; 15];
2747 prefix.copy_from_slice(&addr.as_bytes()[0..15]);
2748 self.identity_cache
2749 .get(&prefix)
2750 .filter(|entry| &entry.node_addr == addr)
2751 .map(|entry| entry.npub.clone())
2752 }
2753
2754 pub(in crate::node) fn deliver_external_ipv6_packet(
2755 &self,
2756 src_addr: &NodeAddr,
2757 packet: Vec<u8>,
2758 ) {
2759 let Some(external_packet_tx) = &self.external_packet_tx else {
2760 return;
2761 };
2762 if packet.len() < 40 {
2763 return;
2764 }
2765 let Ok(destination) = FipsAddress::from_slice(&packet[24..40]) else {
2766 return;
2767 };
2768 let delivered = NodeDeliveredPacket {
2769 source_node_addr: *src_addr,
2770 source_npub: self.npub_for_node_addr(src_addr),
2771 destination,
2772 packet,
2773 };
2774 if let Err(error) = external_packet_tx.try_send(delivered) {
2775 debug!(error = %error, "Failed to deliver packet to external app sink");
2776 }
2777 }
2778
2779 pub(super) async fn send_encrypted_link_message(
2793 &mut self,
2794 node_addr: &NodeAddr,
2795 plaintext: &[u8],
2796 ) -> Result<(), NodeError> {
2797 self.send_encrypted_link_message_with_ce(node_addr, plaintext, false)
2798 .await
2799 }
2800
2801 pub(in crate::node) fn note_local_send_outcome(
2807 &mut self,
2808 result: &Result<usize, TransportError>,
2809 ) {
2810 match result {
2811 Ok(_) => {
2812 if self.last_local_send_failure_at.is_some() {
2813 self.last_local_send_failure_at = None;
2814 }
2815 }
2816 Err(error) if error.is_local_route_unavailable() => {
2817 self.last_local_send_failure_at = Some(std::time::Instant::now());
2818 }
2819 Err(_) => {}
2820 }
2821 }
2822
2823 pub(in crate::node) fn local_send_failure_dead_timeout(
2829 &mut self,
2830 now: std::time::Instant,
2831 dead_timeout: std::time::Duration,
2832 fast_dead_timeout: std::time::Duration,
2833 ) -> std::time::Duration {
2834 match self.last_local_send_failure_at {
2835 Some(t) if now.duration_since(t) <= LOCAL_SEND_FAILURE_FAST_DEAD_WINDOW => {
2836 fast_dead_timeout.min(dead_timeout)
2837 }
2838 Some(_) => {
2839 self.last_local_send_failure_at = None;
2840 dead_timeout
2841 }
2842 None => dead_timeout,
2843 }
2844 }
2845
2846 pub(in crate::node) fn mark_rx_loop_maintenance_timeout(&mut self) {
2847 self.last_rx_loop_maintenance_timeout_at = Some(std::time::Instant::now());
2848 }
2849
2850 pub(in crate::node) fn rx_loop_maintenance_timed_out_recently(&self) -> bool {
2851 let Some(t) = self.last_rx_loop_maintenance_timeout_at else {
2852 return false;
2853 };
2854 let grace = std::time::Duration::from_secs(self.config.node.link_dead_timeout_secs.max(1));
2855 std::time::Instant::now().duration_since(t) <= grace
2856 }
2857
2858 pub(super) async fn send_encrypted_link_message_with_ce(
2862 &mut self,
2863 node_addr: &NodeAddr,
2864 plaintext: &[u8],
2865 ce_flag: bool,
2866 ) -> Result<(), NodeError> {
2867 let peer = self
2868 .peers
2869 .get_mut(node_addr)
2870 .ok_or(NodeError::PeerNotFound(*node_addr))?;
2871
2872 let their_index = peer.their_index().ok_or_else(|| NodeError::SendFailed {
2873 node_addr: *node_addr,
2874 reason: "no their_index".into(),
2875 })?;
2876 let transport_id = peer.transport_id().ok_or_else(|| NodeError::SendFailed {
2877 node_addr: *node_addr,
2878 reason: "no transport_id".into(),
2879 })?;
2880 let remote_addr = peer
2881 .current_addr()
2882 .cloned()
2883 .ok_or_else(|| NodeError::SendFailed {
2884 node_addr: *node_addr,
2885 reason: "no current_addr".into(),
2886 })?;
2887 #[cfg(any(target_os = "linux", target_os = "macos"))]
2888 let connected_socket = peer.connected_udp();
2889
2890 let timestamp_ms = peer.session_elapsed_ms();
2892
2893 let sp_flag = peer.mmp().map(|mmp| mmp.spin_bit.tx_bit()).unwrap_or(false);
2895 let mut flags = if sp_flag { FLAG_SP } else { 0 };
2896 if ce_flag {
2897 flags |= FLAG_CE;
2898 }
2899 if peer.current_k_bit() {
2900 flags |= FLAG_KEY_EPOCH;
2901 }
2902
2903 let session = peer
2904 .noise_session_mut()
2905 .ok_or_else(|| NodeError::SendFailed {
2906 node_addr: *node_addr,
2907 reason: "no noise session".into(),
2908 })?;
2909
2910 const INNER_TS_LEN: usize = 4;
2918 let counter = session.current_send_counter();
2919 let inner_len = INNER_TS_LEN + plaintext.len();
2920 let payload_len = inner_len as u16;
2921 let header = build_established_header(their_index, counter, flags, payload_len);
2922
2923 let transport_for_send = self
2942 .transports
2943 .get(&transport_id)
2944 .ok_or(NodeError::TransportNotFound(transport_id))?;
2945 match transport_for_send.connection_state(&remote_addr) {
2946 ConnectionState::Connected => {}
2947 other => {
2948 if matches!(other, ConnectionState::None) {
2949 let _ = transport_for_send.connect(&remote_addr).await;
2950 }
2951 return Err(NodeError::SendFailed {
2952 node_addr: *node_addr,
2953 reason: format!("transport connection not ready: {:?}", other),
2954 });
2955 }
2956 }
2957 let is_udp = matches!(transport_for_send, TransportHandle::Udp(_));
2958 if let Some(workers) = self.encrypt_workers.as_ref().cloned()
2959 && is_udp
2960 && let Some(cipher_clone) = session.send_cipher_clone()
2961 {
2962 {
2963 let reserved_counter =
2967 session
2968 .take_send_counter()
2969 .map_err(|e| NodeError::SendFailed {
2970 node_addr: *node_addr,
2971 reason: format!("counter reservation failed: {}", e),
2972 })?;
2973 debug_assert_eq!(reserved_counter, counter);
2974 let header =
2978 build_established_header(their_index, reserved_counter, flags, payload_len);
2979 let transport = transport_for_send;
2980 let send_target = {
2987 if let TransportHandle::Udp(udp) = transport {
2988 let socket_addr = {
2989 #[cfg(any(target_os = "linux", target_os = "macos"))]
2990 {
2991 match connected_socket.as_ref() {
2992 Some(socket) => Some(socket.peer_addr()),
2993 None => udp.resolve_for_off_task(&remote_addr).await.ok(),
2994 }
2995 }
2996 #[cfg(not(any(target_os = "linux", target_os = "macos")))]
2997 {
2998 udp.resolve_for_off_task(&remote_addr).await.ok()
2999 }
3000 };
3001 match (udp.async_socket(), socket_addr) {
3002 (Some(socket), Some(socket_addr)) => Some((socket, socket_addr)),
3003 _ => None,
3004 }
3005 } else {
3006 None
3007 }
3008 };
3009 if let Some((socket, socket_addr)) = send_target {
3010 let wire_capacity = ESTABLISHED_HEADER_SIZE + inner_len + 16;
3026 let mut wire_buf = Vec::with_capacity(wire_capacity);
3027 wire_buf.extend_from_slice(&header);
3028 wire_buf.extend_from_slice(×tamp_ms.to_le_bytes());
3029 wire_buf.extend_from_slice(plaintext);
3030 let predicted_bytes = wire_capacity;
3031 if let Some(peer) = self.peers.get_mut(node_addr) {
3038 peer.link_stats_mut().record_sent(predicted_bytes);
3039 if let Some(mmp) = peer.mmp_mut() {
3040 mmp.sender
3041 .record_sent(reserved_counter, timestamp_ms, predicted_bytes);
3042 }
3043 }
3044 let scheduling_weight = self.send_weight_for_peer(node_addr);
3045 workers.dispatch(self::encrypt_worker::FmpSendJob {
3046 cipher: cipher_clone,
3047 counter: reserved_counter,
3048 wire_buf,
3049 fsp_seal: None,
3050 socket,
3051 dest_addr: socket_addr,
3052 #[cfg(any(target_os = "linux", target_os = "macos"))]
3053 connected_socket,
3054 drop_on_backpressure: fmp_plaintext_is_bulk_session_datagram(plaintext),
3055 scheduling_weight,
3056 queued_at: crate::perf_profile::stamp(),
3057 });
3058 return Ok(());
3059 }
3060 }
3061 }
3062
3063 let inner_plaintext = prepend_inner_header(timestamp_ms, plaintext);
3068 let ciphertext = {
3070 let _t = crate::perf_profile::Timer::start(crate::perf_profile::Stage::FmpEncrypt);
3071 session
3072 .encrypt_with_aad(&inner_plaintext, &header)
3073 .map_err(|e| NodeError::SendFailed {
3074 node_addr: *node_addr,
3075 reason: format!("encryption failed: {}", e),
3076 })?
3077 };
3078
3079 let wire_packet = build_encrypted(&header, &ciphertext);
3080
3081 let send_result = {
3083 let _t = crate::perf_profile::Timer::start(crate::perf_profile::Stage::UdpSend);
3084 let transport = self
3085 .transports
3086 .get(&transport_id)
3087 .ok_or(NodeError::TransportNotFound(transport_id))?;
3088 transport.send(&remote_addr, &wire_packet).await
3089 };
3090 self.note_local_send_outcome(&send_result);
3091 let bytes_sent = send_result.map_err(|e| match e {
3092 TransportError::MtuExceeded { packet_size, mtu } => NodeError::MtuExceeded {
3093 node_addr: *node_addr,
3094 packet_size,
3095 mtu,
3096 },
3097 other => NodeError::SendFailed {
3098 node_addr: *node_addr,
3099 reason: format!("transport send: {}", other),
3100 },
3101 })?;
3102
3103 if let Some(peer) = self.peers.get_mut(node_addr) {
3105 peer.link_stats_mut().record_sent(bytes_sent);
3106 if let Some(mmp) = peer.mmp_mut() {
3108 mmp.sender.record_sent(counter, timestamp_ms, bytes_sent);
3109 }
3110 }
3111
3112 Ok(())
3113 }
3114}
3115
3116impl fmt::Debug for Node {
3117 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3118 f.debug_struct("Node")
3119 .field("node_addr", self.node_addr())
3120 .field("state", &self.state)
3121 .field("is_leaf_only", &self.is_leaf_only)
3122 .field("connections", &self.connection_count())
3123 .field("peers", &self.peer_count())
3124 .field("links", &self.link_count())
3125 .field("transports", &self.transport_count())
3126 .finish()
3127 }
3128}