1mod acl;
8mod bloom;
9mod decrypt_worker;
10mod discovery_rate_limit;
11mod encrypt_worker;
12mod handlers;
13mod lifecycle;
14mod rate_limit;
15mod retry;
16mod routing;
17mod routing_error_rate_limit;
18pub(crate) mod session;
19pub(crate) mod session_wire;
20pub(crate) mod stats;
21pub(crate) mod stats_history;
22#[cfg(test)]
23mod tests;
24mod tree;
25pub(crate) mod wire;
26
27use self::discovery_rate_limit::{DiscoveryBackoff, DiscoveryForwardRateLimiter};
28use self::rate_limit::HandshakeRateLimiter;
29use self::routing::{LearnedRouteTable, LearnedRouteTableSnapshot};
30use self::routing_error_rate_limit::RoutingErrorRateLimiter;
31use self::wire::{
32 ESTABLISHED_HEADER_SIZE, FLAG_CE, FLAG_KEY_EPOCH, FLAG_SP, build_encrypted,
33 build_established_header, prepend_inner_header,
34};
35use crate::bloom::BloomState;
36use crate::cache::CoordCache;
37use crate::config::{NostrDiscoveryPolicy, PeerConfig, RoutingMode};
38use crate::node::session::SessionEntry;
39use crate::node::session_wire::{FSP_PHASE_ESTABLISHED, FspCommonPrefix};
40use crate::peer::{ActivePeer, PeerConnection};
41#[cfg(any(target_os = "linux", target_os = "macos"))]
42use crate::transport::ethernet::EthernetTransport;
43use crate::transport::tcp::TcpTransport;
44use crate::transport::tor::TorTransport;
45use crate::transport::udp::UdpTransport;
46#[cfg(feature = "webrtc-transport")]
47use crate::transport::webrtc::WebRtcTransport;
48use crate::transport::{
49 ConnectionState, Link, LinkId, PacketRx, PacketTx, TransportAddr, TransportError,
50 TransportHandle, TransportId,
51};
52use crate::tree::TreeState;
53use crate::upper::hosts::HostMap;
54use crate::upper::icmp_rate_limit::IcmpRateLimiter;
55use crate::upper::tun::{TunError, TunOutboundRx, TunState, TunTx};
56use crate::utils::index::IndexAllocator;
57use crate::{
58 Config, ConfigError, FipsAddress, Identity, IdentityError, LinkMessageType, NodeAddr,
59 PeerIdentity, encode_npub,
60};
61use rand::Rng;
62use std::collections::{HashMap, HashSet, VecDeque};
63use std::fmt;
64use std::sync::Arc;
65use std::thread::JoinHandle;
66use thiserror::Error;
67use tracing::{debug, warn};
68
69const LOCAL_SEND_FAILURE_FAST_DEAD_WINDOW: std::time::Duration = std::time::Duration::from_secs(3);
70
71fn fmp_plaintext_is_bulk_session_datagram(plaintext: &[u8]) -> bool {
72 if !plaintext
73 .first()
74 .is_some_and(|ty| *ty == LinkMessageType::SessionDatagram.to_byte())
75 {
76 return false;
77 }
78 let Some(fsp_payload) = plaintext.get(crate::protocol::SESSION_DATAGRAM_HEADER_SIZE..) else {
79 return false;
80 };
81 FspCommonPrefix::parse(fsp_payload)
82 .is_some_and(|prefix| prefix.phase == FSP_PHASE_ESTABLISHED && !prefix.is_unencrypted())
83}
84
85pub(crate) const REKEY_JITTER_SECS: i64 = 15;
92
93#[derive(Debug, Error)]
95pub enum NodeError {
96 #[error("node not started")]
97 NotStarted,
98
99 #[error("node already started")]
100 AlreadyStarted,
101
102 #[error("node already stopped")]
103 AlreadyStopped,
104
105 #[error("transport not found: {0}")]
106 TransportNotFound(TransportId),
107
108 #[error("no transport available for type: {0}")]
109 NoTransportForType(String),
110
111 #[error("link not found: {0}")]
112 LinkNotFound(LinkId),
113
114 #[error("connection not found: {0}")]
115 ConnectionNotFound(LinkId),
116
117 #[error("peer not found: {0:?}")]
118 PeerNotFound(NodeAddr),
119
120 #[error("peer already exists: {0:?}")]
121 PeerAlreadyExists(NodeAddr),
122
123 #[error("connection already exists for link: {0}")]
124 ConnectionAlreadyExists(LinkId),
125
126 #[error("invalid peer npub '{npub}': {reason}")]
127 InvalidPeerNpub { npub: String, reason: String },
128
129 #[error("discovery error: {0}")]
130 Discovery(String),
131
132 #[error("access denied: {0}")]
133 AccessDenied(String),
134
135 #[error("max connections exceeded: {max}")]
136 MaxConnectionsExceeded { max: usize },
137
138 #[error("max peers exceeded: {max}")]
139 MaxPeersExceeded { max: usize },
140
141 #[error("max links exceeded: {max}")]
142 MaxLinksExceeded { max: usize },
143
144 #[error("handshake incomplete for link {0}")]
145 HandshakeIncomplete(LinkId),
146
147 #[error("no session available for link {0}")]
148 NoSession(LinkId),
149
150 #[error("promotion failed for link {link_id}: {reason}")]
151 PromotionFailed { link_id: LinkId, reason: String },
152
153 #[error("send failed to {node_addr}: {reason}")]
154 SendFailed { node_addr: NodeAddr, reason: String },
155
156 #[error("mtu exceeded forwarding to {node_addr}: packet {packet_size} > mtu {mtu}")]
157 MtuExceeded {
158 node_addr: NodeAddr,
159 packet_size: usize,
160 mtu: u16,
161 },
162
163 #[error("config error: {0}")]
164 Config(#[from] ConfigError),
165
166 #[error("identity error: {0}")]
167 Identity(#[from] IdentityError),
168
169 #[error("TUN error: {0}")]
170 Tun(#[from] TunError),
171
172 #[error("index allocation failed: {0}")]
173 IndexAllocationFailed(String),
174
175 #[error("handshake failed: {0}")]
176 HandshakeFailed(String),
177
178 #[error("transport error: {0}")]
179 TransportError(String),
180
181 #[error("local route unavailable: {0}")]
182 LocalRouteUnavailable(String),
183
184 #[error("bootstrap handoff failed: {0}")]
185 BootstrapHandoff(String),
186}
187
188impl NodeError {
189 pub(in crate::node) fn from_transport_error(error: TransportError) -> Self {
190 if error.is_local_route_unavailable() {
191 Self::LocalRouteUnavailable(error.to_string())
192 } else {
193 Self::TransportError(error.to_string())
194 }
195 }
196
197 pub(in crate::node) fn is_local_route_unavailable(&self) -> bool {
198 matches!(self, Self::LocalRouteUnavailable(_))
199 }
200}
201
202#[derive(Debug, Clone, PartialEq, Eq)]
204pub struct NodeDeliveredPacket {
205 pub source_node_addr: NodeAddr,
207 pub source_npub: Option<String>,
209 pub destination: FipsAddress,
211 pub packet: Vec<u8>,
213}
214
215#[derive(Debug, Clone)]
216struct IdentityCacheEntry {
217 node_addr: NodeAddr,
218 pubkey: secp256k1::PublicKey,
219 npub: String,
220 last_seen_ms: u64,
221}
222
223impl IdentityCacheEntry {
224 fn new(
225 node_addr: NodeAddr,
226 pubkey: secp256k1::PublicKey,
227 npub: String,
228 last_seen_ms: u64,
229 ) -> Self {
230 Self {
231 node_addr,
232 pubkey,
233 npub,
234 last_seen_ms,
235 }
236 }
237}
238
239#[derive(Debug)]
241pub struct ExternalPacketIo {
242 pub outbound_tx: crate::upper::tun::TunOutboundTx,
244 pub inbound_rx: tokio::sync::mpsc::Receiver<NodeDeliveredPacket>,
246}
247
248#[derive(Debug)]
250pub(crate) struct EndpointDataIo {
251 pub(crate) command_tx: tokio::sync::mpsc::Sender<NodeEndpointCommand>,
260 pub(crate) event_rx: tokio::sync::mpsc::UnboundedReceiver<NodeEndpointEvent>,
270 pub(crate) event_tx: tokio::sync::mpsc::UnboundedSender<NodeEndpointEvent>,
276}
277
278fn endpoint_data_command_capacity(requested: usize) -> usize {
279 if let Ok(raw) = std::env::var("FIPS_ENDPOINT_DATA_QUEUE_CAP")
280 && let Ok(value) = raw.trim().parse::<usize>()
281 && value > 0
282 {
283 return value;
284 }
285
286 requested.max(1).max(32_768)
287}
288
289#[derive(Debug)]
291pub(crate) enum NodeEndpointCommand {
292 Send {
296 remote: PeerIdentity,
297 payload: Vec<u8>,
298 queued_at: Option<std::time::Instant>,
299 response_tx: tokio::sync::oneshot::Sender<Result<(), NodeError>>,
300 },
301 SendOneway {
307 remote: PeerIdentity,
308 payload: Vec<u8>,
309 queued_at: Option<std::time::Instant>,
310 },
311 PeerSnapshot {
312 response_tx: tokio::sync::oneshot::Sender<Vec<NodeEndpointPeer>>,
313 },
314 RelaySnapshot {
315 response_tx: tokio::sync::oneshot::Sender<Vec<NodeEndpointRelayStatus>>,
316 },
317 UpdateRelays {
318 advert_relays: Vec<String>,
319 dm_relays: Vec<String>,
320 response_tx: tokio::sync::oneshot::Sender<Result<(), NodeError>>,
321 },
322 UpdatePeers {
328 peers: Vec<crate::config::PeerConfig>,
329 response_tx: tokio::sync::oneshot::Sender<Result<UpdatePeersOutcome, NodeError>>,
330 },
331}
332
333#[derive(Debug, Clone, Default, PartialEq, Eq)]
335pub(crate) struct UpdatePeersOutcome {
336 pub(crate) added: usize,
337 pub(crate) removed: usize,
338 pub(crate) updated: usize,
339 pub(crate) unchanged: usize,
340}
341
342#[derive(Debug)]
344pub(crate) enum NodeEndpointEvent {
345 Data {
346 source_node_addr: NodeAddr,
347 source_npub: Option<String>,
348 payload: Vec<u8>,
349 queued_at: Option<std::time::Instant>,
350 },
351}
352
353#[derive(Debug, Clone, PartialEq, Eq)]
355pub(crate) struct NodeEndpointPeer {
356 pub(crate) npub: String,
357 pub(crate) connected: bool,
358 pub(crate) transport_addr: Option<String>,
359 pub(crate) transport_type: Option<String>,
360 pub(crate) link_id: u64,
361 pub(crate) srtt_ms: Option<u64>,
362 pub(crate) packets_sent: u64,
363 pub(crate) packets_recv: u64,
364 pub(crate) bytes_sent: u64,
365 pub(crate) bytes_recv: u64,
366 pub(crate) direct_probe_pending: bool,
367 pub(crate) direct_probe_after_ms: Option<u64>,
368}
369
370#[derive(Debug, Clone, PartialEq, Eq)]
372pub(crate) struct NodeEndpointRelayStatus {
373 pub(crate) url: String,
374 pub(crate) status: String,
375}
376
377#[derive(Clone, Copy, Debug, PartialEq, Eq)]
379pub enum NodeState {
380 Created,
382 Starting,
384 Running,
386 Stopping,
388 Stopped,
390}
391
392impl NodeState {
393 pub fn is_operational(&self) -> bool {
395 matches!(self, NodeState::Running)
396 }
397
398 pub fn can_start(&self) -> bool {
400 matches!(self, NodeState::Created | NodeState::Stopped)
401 }
402
403 pub fn can_stop(&self) -> bool {
405 matches!(self, NodeState::Running)
406 }
407}
408
409impl fmt::Display for NodeState {
410 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
411 let s = match self {
412 NodeState::Created => "created",
413 NodeState::Starting => "starting",
414 NodeState::Running => "running",
415 NodeState::Stopping => "stopping",
416 NodeState::Stopped => "stopped",
417 };
418 write!(f, "{}", s)
419 }
420}
421
422#[derive(Clone, Debug)]
429pub(crate) struct RecentRequest {
430 pub(crate) from_peer: NodeAddr,
432 pub(crate) timestamp_ms: u64,
434 pub(crate) response_forwarded: bool,
438}
439
440impl RecentRequest {
441 pub(crate) fn new(from_peer: NodeAddr, timestamp_ms: u64) -> Self {
442 Self {
443 from_peer,
444 timestamp_ms,
445 response_forwarded: false,
446 }
447 }
448
449 pub(crate) fn is_expired(&self, current_time_ms: u64, expiry_ms: u64) -> bool {
451 current_time_ms.saturating_sub(self.timestamp_ms) > expiry_ms
452 }
453}
454
455type AddrKey = (TransportId, TransportAddr);
457
458#[derive(Debug, Default)]
463struct TransportDropState {
464 prev_drops: u64,
466 dropping: bool,
468}
469
470struct PendingConnect {
476 link_id: LinkId,
478 transport_id: TransportId,
480 remote_addr: TransportAddr,
482 peer_identity: PeerIdentity,
484}
485
486pub struct Node {
500 identity: Identity,
503
504 startup_epoch: [u8; 8],
507
508 started_at: std::time::Instant,
510
511 config: Config,
514
515 state: NodeState,
518
519 is_leaf_only: bool,
521
522 tree_state: TreeState,
525
526 bloom_state: BloomState,
529
530 coord_cache: CoordCache,
533 learned_routes: LearnedRouteTable,
535 recent_requests: HashMap<u64, RecentRequest>,
538 path_mtu_lookup: Arc<std::sync::RwLock<HashMap<crate::FipsAddress, u16>>>,
544
545 transports: HashMap<TransportId, TransportHandle>,
548 transport_drops: HashMap<TransportId, TransportDropState>,
550 links: HashMap<LinkId, Link>,
552 addr_to_link: HashMap<AddrKey, LinkId>,
554
555 packet_tx: Option<PacketTx>,
558 packet_rx: Option<PacketRx>,
560
561 connections: HashMap<LinkId, PeerConnection>,
565
566 peers: HashMap<NodeAddr, ActivePeer>,
570
571 sessions: HashMap<NodeAddr, SessionEntry>,
575
576 identity_cache: HashMap<[u8; 15], IdentityCacheEntry>,
580
581 pending_tun_packets: HashMap<NodeAddr, VecDeque<Vec<u8>>>,
585 pending_endpoint_data: HashMap<NodeAddr, VecDeque<Vec<u8>>>,
587 pending_lookups: HashMap<NodeAddr, handlers::discovery::PendingLookup>,
591
592 max_connections: usize,
595 max_peers: usize,
597 max_links: usize,
599
600 next_link_id: u64,
603 next_transport_id: u32,
605
606 stats: stats::NodeStats,
609
610 stats_history: stats_history::StatsHistory,
612
613 tun_state: TunState,
616 tun_name: Option<String>,
618 tun_tx: Option<TunTx>,
620 tun_outbound_rx: Option<TunOutboundRx>,
622 external_packet_tx: Option<tokio::sync::mpsc::Sender<NodeDeliveredPacket>>,
624 endpoint_command_rx: Option<tokio::sync::mpsc::Receiver<NodeEndpointCommand>>,
626 endpoint_event_tx: Option<tokio::sync::mpsc::UnboundedSender<NodeEndpointEvent>>,
628 encrypt_workers: Option<encrypt_worker::EncryptWorkerPool>,
634 decrypt_workers: Option<decrypt_worker::DecryptWorkerPool>,
637 decrypt_registered_sessions: std::collections::HashSet<(TransportId, u32)>,
646 decrypt_fallback_rx:
650 Option<tokio::sync::mpsc::UnboundedReceiver<decrypt_worker::DecryptWorkerEvent>>,
651 decrypt_fallback_tx: tokio::sync::mpsc::UnboundedSender<decrypt_worker::DecryptWorkerEvent>,
652 tun_reader_handle: Option<JoinHandle<()>>,
654 tun_writer_handle: Option<JoinHandle<()>>,
656 #[cfg(target_os = "macos")]
659 tun_shutdown_fd: Option<std::os::unix::io::RawFd>,
660
661 dns_identity_rx: Option<crate::upper::dns::DnsIdentityRx>,
664 dns_task: Option<tokio::task::JoinHandle<()>>,
666
667 index_allocator: IndexAllocator,
670 peers_by_index: HashMap<(TransportId, u32), NodeAddr>,
673 pending_outbound: HashMap<(TransportId, u32), LinkId>,
676
677 msg1_rate_limiter: HandshakeRateLimiter,
680 icmp_rate_limiter: IcmpRateLimiter,
682 routing_error_rate_limiter: RoutingErrorRateLimiter,
684 coords_response_rate_limiter: RoutingErrorRateLimiter,
686 discovery_backoff: DiscoveryBackoff,
688 discovery_forward_limiter: DiscoveryForwardRateLimiter,
690
691 pending_connects: Vec<PendingConnect>,
697
698 retry_pending: HashMap<NodeAddr, retry::RetryState>,
704
705 nostr_discovery: Option<Arc<crate::discovery::nostr::NostrDiscovery>>,
707 lan_discovery: Option<Arc<crate::discovery::lan::LanDiscovery>>,
712 local_instance_registry: Option<crate::discovery::local::LocalInstanceRegistry>,
716 local_instance_started_at_ms: Option<u64>,
717 last_local_instance_publish_ms: Option<u64>,
718 last_local_instance_scan_ms: Option<u64>,
719 nostr_discovery_started_at_ms: Option<u64>,
724 startup_open_discovery_sweep_done: bool,
728 bootstrap_transports: HashSet<TransportId>,
730 bootstrap_transport_npubs: HashMap<TransportId, String>,
737 discovery_fallback_transit_blocked_peers: HashSet<NodeAddr>,
740
741 last_parent_reeval: Option<crate::time::Instant>,
744
745 last_congestion_log: Option<std::time::Instant>,
748
749 estimated_mesh_size: Option<u64>,
752 last_mesh_size_log: Option<std::time::Instant>,
754
755 last_self_warn: Option<std::time::Instant>,
761
762 last_local_send_failure_at: Option<std::time::Instant>,
770 last_rx_loop_maintenance_timeout_at: Option<std::time::Instant>,
775
776 peer_aliases: HashMap<NodeAddr, String>,
780 configured_peer_send_weights: HashMap<NodeAddr, u8>,
783
784 peer_acl: acl::PeerAclReloader,
786
787 host_map: Arc<HostMap>,
791}
792
793impl Node {
794 pub fn new(config: Config) -> Result<Self, NodeError> {
796 config.validate()?;
797 let identity = config.create_identity()?;
798 let node_addr = *identity.node_addr();
799 let is_leaf_only = config.is_leaf_only();
800
801 let (decrypt_fallback_tx, decrypt_fallback_rx) = tokio::sync::mpsc::unbounded_channel();
802 let decrypt_fallback_rx = Some(decrypt_fallback_rx);
803
804 let mut startup_epoch = [0u8; 8];
805 rand::rng().fill_bytes(&mut startup_epoch);
806
807 let mut bloom_state = if is_leaf_only {
808 BloomState::leaf_only(node_addr)
809 } else {
810 BloomState::new(node_addr)
811 };
812 bloom_state.set_update_debounce_ms(config.node.bloom.update_debounce_ms);
813
814 let tun_state = if config.tun.enabled {
815 TunState::Configured
816 } else {
817 TunState::Disabled
818 };
819
820 let mut tree_state = TreeState::new(node_addr);
822 tree_state.set_parent_hysteresis(config.node.tree.parent_hysteresis);
823 tree_state.set_hold_down(config.node.tree.hold_down_secs);
824 tree_state.set_flap_dampening(
825 config.node.tree.flap_threshold,
826 config.node.tree.flap_window_secs,
827 config.node.tree.flap_dampening_secs,
828 );
829 tree_state
830 .sign_declaration(&identity)
831 .expect("signing own declaration should never fail");
832
833 let coord_cache = CoordCache::new(
834 config.node.cache.coord_size,
835 config.node.cache.coord_ttl_secs * 1000,
836 );
837 let rl = &config.node.rate_limit;
838 let msg1_rate_limiter = HandshakeRateLimiter::with_params(
839 rate_limit::TokenBucket::with_params(rl.handshake_burst, rl.handshake_rate),
840 config.node.limits.max_pending_inbound,
841 );
842
843 let max_connections = config.node.limits.max_connections;
844 let max_peers = config.node.limits.max_peers;
845 let max_links = config.node.limits.max_links;
846 let coords_response_interval_ms = config.node.session.coords_response_interval_ms;
847 let backoff_base_secs = config.node.discovery.backoff_base_secs;
848 let backoff_max_secs = config.node.discovery.backoff_max_secs;
849 let forward_min_interval_secs = config.node.discovery.forward_min_interval_secs;
850
851 let (host_map, peer_acl) = Self::host_map_and_peer_acl(&config);
852 let configured_peer_send_weights = Self::configured_peer_send_weights(&config);
853
854 Ok(Self {
855 identity,
856 startup_epoch,
857 started_at: std::time::Instant::now(),
858 config,
859 state: NodeState::Created,
860 is_leaf_only,
861 tree_state,
862 bloom_state,
863 coord_cache,
864 learned_routes: LearnedRouteTable::default(),
865 recent_requests: HashMap::new(),
866 transports: HashMap::new(),
867 transport_drops: HashMap::new(),
868 links: HashMap::new(),
869 addr_to_link: HashMap::new(),
870 packet_tx: None,
871 packet_rx: None,
872 connections: HashMap::new(),
873 peers: HashMap::new(),
874 sessions: HashMap::new(),
875 identity_cache: HashMap::new(),
876 pending_tun_packets: HashMap::new(),
877 pending_endpoint_data: HashMap::new(),
878 pending_lookups: HashMap::new(),
879 max_connections,
880 max_peers,
881 max_links,
882 next_link_id: 1,
883 next_transport_id: 1,
884 stats: stats::NodeStats::new(),
885 stats_history: stats_history::StatsHistory::new(),
886 tun_state,
887 tun_name: None,
888 tun_tx: None,
889 tun_outbound_rx: None,
890 external_packet_tx: None,
891 endpoint_command_rx: None,
892 endpoint_event_tx: None,
893 encrypt_workers: None,
894 decrypt_workers: None,
895 decrypt_registered_sessions: std::collections::HashSet::new(),
896 decrypt_fallback_tx,
897 decrypt_fallback_rx,
898 tun_reader_handle: None,
899 tun_writer_handle: None,
900 #[cfg(target_os = "macos")]
901 tun_shutdown_fd: None,
902 dns_identity_rx: None,
903 dns_task: None,
904 index_allocator: IndexAllocator::new(),
905 peers_by_index: HashMap::new(),
906 pending_outbound: HashMap::new(),
907 msg1_rate_limiter,
908 icmp_rate_limiter: IcmpRateLimiter::new(),
909 routing_error_rate_limiter: RoutingErrorRateLimiter::new(),
910 coords_response_rate_limiter: RoutingErrorRateLimiter::with_interval(
911 std::time::Duration::from_millis(coords_response_interval_ms),
912 ),
913 discovery_backoff: DiscoveryBackoff::with_params(backoff_base_secs, backoff_max_secs),
914 discovery_forward_limiter: DiscoveryForwardRateLimiter::with_interval(
915 std::time::Duration::from_secs(forward_min_interval_secs),
916 ),
917 pending_connects: Vec::new(),
918 retry_pending: HashMap::new(),
919 nostr_discovery: None,
920 nostr_discovery_started_at_ms: None,
921 lan_discovery: None,
922 local_instance_registry: None,
923 local_instance_started_at_ms: None,
924 last_local_instance_publish_ms: None,
925 last_local_instance_scan_ms: None,
926 startup_open_discovery_sweep_done: false,
927 bootstrap_transports: HashSet::new(),
928 bootstrap_transport_npubs: HashMap::new(),
929 discovery_fallback_transit_blocked_peers: HashSet::new(),
930 last_parent_reeval: None,
931 last_congestion_log: None,
932 estimated_mesh_size: None,
933 last_mesh_size_log: None,
934 last_self_warn: None,
935 last_local_send_failure_at: None,
936 last_rx_loop_maintenance_timeout_at: None,
937 peer_aliases: HashMap::new(),
938 configured_peer_send_weights,
939 peer_acl,
940 host_map,
941 path_mtu_lookup: Arc::new(std::sync::RwLock::new(HashMap::new())),
942 })
943 }
944
945 pub fn with_identity(identity: Identity, config: Config) -> Result<Self, NodeError> {
950 config.validate()?;
951 let node_addr = *identity.node_addr();
952
953 let (decrypt_fallback_tx, decrypt_fallback_rx) = tokio::sync::mpsc::unbounded_channel();
954 let decrypt_fallback_rx = Some(decrypt_fallback_rx);
955
956 let mut startup_epoch = [0u8; 8];
957 rand::rng().fill_bytes(&mut startup_epoch);
958
959 let tun_state = if config.tun.enabled {
960 TunState::Configured
961 } else {
962 TunState::Disabled
963 };
964
965 let mut tree_state = TreeState::new(node_addr);
967 tree_state.set_parent_hysteresis(config.node.tree.parent_hysteresis);
968 tree_state.set_hold_down(config.node.tree.hold_down_secs);
969 tree_state.set_flap_dampening(
970 config.node.tree.flap_threshold,
971 config.node.tree.flap_window_secs,
972 config.node.tree.flap_dampening_secs,
973 );
974 tree_state
975 .sign_declaration(&identity)
976 .expect("signing own declaration should never fail");
977
978 let mut bloom_state = BloomState::new(node_addr);
979 bloom_state.set_update_debounce_ms(config.node.bloom.update_debounce_ms);
980
981 let coord_cache = CoordCache::new(
982 config.node.cache.coord_size,
983 config.node.cache.coord_ttl_secs * 1000,
984 );
985 let rl = &config.node.rate_limit;
986 let msg1_rate_limiter = HandshakeRateLimiter::with_params(
987 rate_limit::TokenBucket::with_params(rl.handshake_burst, rl.handshake_rate),
988 config.node.limits.max_pending_inbound,
989 );
990
991 let max_connections = config.node.limits.max_connections;
992 let max_peers = config.node.limits.max_peers;
993 let max_links = config.node.limits.max_links;
994 let coords_response_interval_ms = config.node.session.coords_response_interval_ms;
995
996 let (host_map, peer_acl) = Self::host_map_and_peer_acl(&config);
997 let configured_peer_send_weights = Self::configured_peer_send_weights(&config);
998
999 Ok(Self {
1000 identity,
1001 startup_epoch,
1002 started_at: std::time::Instant::now(),
1003 config,
1004 state: NodeState::Created,
1005 is_leaf_only: false,
1006 tree_state,
1007 bloom_state,
1008 coord_cache,
1009 learned_routes: LearnedRouteTable::default(),
1010 recent_requests: HashMap::new(),
1011 transports: HashMap::new(),
1012 transport_drops: HashMap::new(),
1013 links: HashMap::new(),
1014 addr_to_link: HashMap::new(),
1015 packet_tx: None,
1016 packet_rx: None,
1017 connections: HashMap::new(),
1018 peers: HashMap::new(),
1019 sessions: HashMap::new(),
1020 identity_cache: HashMap::new(),
1021 pending_tun_packets: HashMap::new(),
1022 pending_endpoint_data: HashMap::new(),
1023 pending_lookups: HashMap::new(),
1024 max_connections,
1025 max_peers,
1026 max_links,
1027 next_link_id: 1,
1028 next_transport_id: 1,
1029 stats: stats::NodeStats::new(),
1030 stats_history: stats_history::StatsHistory::new(),
1031 tun_state,
1032 tun_name: None,
1033 tun_tx: None,
1034 tun_outbound_rx: None,
1035 external_packet_tx: None,
1036 endpoint_command_rx: None,
1037 endpoint_event_tx: None,
1038 encrypt_workers: None,
1039 decrypt_workers: None,
1040 decrypt_registered_sessions: std::collections::HashSet::new(),
1041 decrypt_fallback_tx,
1042 decrypt_fallback_rx,
1043 tun_reader_handle: None,
1044 tun_writer_handle: None,
1045 #[cfg(target_os = "macos")]
1046 tun_shutdown_fd: None,
1047 dns_identity_rx: None,
1048 dns_task: None,
1049 index_allocator: IndexAllocator::new(),
1050 peers_by_index: HashMap::new(),
1051 pending_outbound: HashMap::new(),
1052 msg1_rate_limiter,
1053 icmp_rate_limiter: IcmpRateLimiter::new(),
1054 routing_error_rate_limiter: RoutingErrorRateLimiter::new(),
1055 coords_response_rate_limiter: RoutingErrorRateLimiter::with_interval(
1056 std::time::Duration::from_millis(coords_response_interval_ms),
1057 ),
1058 discovery_backoff: DiscoveryBackoff::new(),
1059 discovery_forward_limiter: DiscoveryForwardRateLimiter::new(),
1060 pending_connects: Vec::new(),
1061 retry_pending: HashMap::new(),
1062 nostr_discovery: None,
1063 nostr_discovery_started_at_ms: None,
1064 lan_discovery: None,
1065 local_instance_registry: None,
1066 local_instance_started_at_ms: None,
1067 last_local_instance_publish_ms: None,
1068 last_local_instance_scan_ms: None,
1069 startup_open_discovery_sweep_done: false,
1070 bootstrap_transports: HashSet::new(),
1071 bootstrap_transport_npubs: HashMap::new(),
1072 discovery_fallback_transit_blocked_peers: HashSet::new(),
1073 last_parent_reeval: None,
1074 last_congestion_log: None,
1075 estimated_mesh_size: None,
1076 last_mesh_size_log: None,
1077 last_self_warn: None,
1078 last_local_send_failure_at: None,
1079 last_rx_loop_maintenance_timeout_at: None,
1080 peer_aliases: HashMap::new(),
1081 configured_peer_send_weights,
1082 peer_acl,
1083 host_map,
1084 path_mtu_lookup: Arc::new(std::sync::RwLock::new(HashMap::new())),
1085 })
1086 }
1087
1088 pub fn leaf_only(config: Config) -> Result<Self, NodeError> {
1090 let mut node = Self::new(config)?;
1091 node.is_leaf_only = true;
1092 node.bloom_state = BloomState::leaf_only(*node.identity.node_addr());
1093 Ok(node)
1094 }
1095
1096 fn host_map_and_peer_acl(config: &Config) -> (Arc<HostMap>, acl::PeerAclReloader) {
1097 let base_host_map = HostMap::from_peer_configs(config.peers());
1098 if !config.node.system_files_enabled {
1099 return (
1100 Arc::new(base_host_map.clone()),
1101 acl::PeerAclReloader::memory_only(base_host_map),
1102 );
1103 }
1104
1105 let mut host_map = base_host_map.clone();
1106 let hosts_path = std::path::PathBuf::from(crate::upper::hosts::DEFAULT_HOSTS_PATH);
1107 let hosts_file = HostMap::load_hosts_file(std::path::Path::new(
1108 crate::upper::hosts::DEFAULT_HOSTS_PATH,
1109 ));
1110 host_map.merge(hosts_file);
1111 let peer_acl = acl::PeerAclReloader::with_alias_sources(
1112 std::path::PathBuf::from(acl::DEFAULT_PEERS_ALLOW_PATH),
1113 std::path::PathBuf::from(acl::DEFAULT_PEERS_DENY_PATH),
1114 base_host_map,
1115 hosts_path,
1116 );
1117 (Arc::new(host_map), peer_acl)
1118 }
1119
1120 fn configured_peer_send_weights(config: &Config) -> HashMap<NodeAddr, u8> {
1121 config
1122 .peers()
1123 .iter()
1124 .filter_map(|peer| {
1125 PeerIdentity::from_npub(&peer.npub).ok().map(|identity| {
1126 (
1127 *identity.node_addr(),
1128 encrypt_worker::EXPLICIT_PEER_SEND_WEIGHT,
1129 )
1130 })
1131 })
1132 .collect()
1133 }
1134
1135 fn send_weight_for_peer(&self, peer_addr: &NodeAddr) -> u8 {
1136 self.configured_peer_send_weights
1137 .get(peer_addr)
1138 .copied()
1139 .unwrap_or(encrypt_worker::DEFAULT_SEND_WEIGHT)
1140 }
1141
1142 async fn create_transports(&mut self, packet_tx: &PacketTx) -> Vec<TransportHandle> {
1146 let mut transports = Vec::new();
1147
1148 let udp_instances: Vec<_> = self
1150 .config
1151 .transports
1152 .udp
1153 .iter()
1154 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1155 .collect();
1156
1157 for (name, udp_config) in udp_instances {
1159 let transport_id = self.allocate_transport_id();
1160 let udp = UdpTransport::new(transport_id, name, udp_config, packet_tx.clone());
1161 transports.push(TransportHandle::Udp(udp));
1162 }
1163
1164 #[cfg(feature = "sim-transport")]
1165 {
1166 let sim_instances: Vec<_> = self
1167 .config
1168 .transports
1169 .sim
1170 .iter()
1171 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1172 .collect();
1173
1174 for (name, sim_config) in sim_instances {
1175 let transport_id = self.allocate_transport_id();
1176 let sim = crate::transport::sim::SimTransport::new(
1177 transport_id,
1178 name,
1179 sim_config,
1180 packet_tx.clone(),
1181 );
1182 transports.push(TransportHandle::Sim(sim));
1183 }
1184 }
1185
1186 #[cfg(any(target_os = "linux", target_os = "macos"))]
1188 {
1189 let eth_instances: Vec<_> = self
1190 .config
1191 .transports
1192 .ethernet
1193 .iter()
1194 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1195 .collect();
1196 let xonly = self.identity.pubkey();
1197 for (name, eth_config) in eth_instances {
1198 let mut eth_config = eth_config;
1199 if eth_config.discovery_scope.is_none() {
1200 eth_config.discovery_scope = self.lan_discovery_scope();
1201 }
1202 let transport_id = self.allocate_transport_id();
1203 let mut eth =
1204 EthernetTransport::new(transport_id, name, eth_config, packet_tx.clone());
1205 eth.set_local_pubkey(xonly);
1206 transports.push(TransportHandle::Ethernet(eth));
1207 }
1208 }
1209
1210 let tcp_instances: Vec<_> = self
1212 .config
1213 .transports
1214 .tcp
1215 .iter()
1216 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1217 .collect();
1218
1219 for (name, tcp_config) in tcp_instances {
1220 let transport_id = self.allocate_transport_id();
1221 let tcp = TcpTransport::new(transport_id, name, tcp_config, packet_tx.clone());
1222 transports.push(TransportHandle::Tcp(tcp));
1223 }
1224
1225 let tor_instances: Vec<_> = self
1227 .config
1228 .transports
1229 .tor
1230 .iter()
1231 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1232 .collect();
1233
1234 for (name, tor_config) in tor_instances {
1235 let transport_id = self.allocate_transport_id();
1236 let tor = TorTransport::new(transport_id, name, tor_config, packet_tx.clone());
1237 transports.push(TransportHandle::Tor(tor));
1238 }
1239
1240 let webrtc_instances: Vec<_> = self
1241 .config
1242 .transports
1243 .webrtc
1244 .iter()
1245 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1246 .collect();
1247
1248 #[cfg(feature = "webrtc-transport")]
1249 {
1250 for (name, webrtc_config) in webrtc_instances {
1251 let transport_id = self.allocate_transport_id();
1252 match WebRtcTransport::new(
1253 transport_id,
1254 name,
1255 webrtc_config,
1256 packet_tx.clone(),
1257 &self.identity,
1258 &self.config.node.discovery.nostr,
1259 ) {
1260 Ok(webrtc) => transports.push(TransportHandle::WebRtc(Box::new(webrtc))),
1261 Err(err) => {
1262 warn!(
1263 transport_id = %transport_id,
1264 error = %err,
1265 "failed to initialize WebRTC transport"
1266 );
1267 }
1268 }
1269 }
1270 }
1271 #[cfg(not(feature = "webrtc-transport"))]
1272 if !webrtc_instances.is_empty() {
1273 warn!("WebRTC transport configured but this build lacks WebRTC transport support");
1274 }
1275
1276 #[cfg(bluer_available)]
1278 {
1279 let ble_instances: Vec<_> = self
1280 .config
1281 .transports
1282 .ble
1283 .iter()
1284 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1285 .collect();
1286
1287 #[cfg(all(bluer_available, not(test)))]
1288 for (name, ble_config) in ble_instances {
1289 let transport_id = self.allocate_transport_id();
1290 let adapter = ble_config.adapter().to_string();
1291 let mtu = ble_config.mtu();
1292 match crate::transport::ble::io::BluerIo::new(&adapter, mtu).await {
1293 Ok(io) => {
1294 let mut ble = crate::transport::ble::BleTransport::new(
1295 transport_id,
1296 name,
1297 ble_config,
1298 io,
1299 packet_tx.clone(),
1300 );
1301 ble.set_local_pubkey(self.identity.pubkey().serialize());
1302 transports.push(TransportHandle::Ble(ble));
1303 }
1304 Err(e) => {
1305 tracing::warn!(adapter = %adapter, error = %e, "failed to initialize BLE adapter");
1306 }
1307 }
1308 }
1309
1310 #[cfg(any(not(bluer_available), test))]
1311 if !ble_instances.is_empty() {
1312 #[cfg(not(test))]
1313 tracing::warn!("BLE transport configured but this build lacks BlueZ support");
1314 }
1315 }
1316
1317 transports
1318 }
1319
1320 fn find_transport_for_type(&self, transport_type: &str) -> Option<TransportId> {
1330 self.transports
1331 .iter()
1332 .filter(|(id, handle)| {
1333 handle.transport_type().name == transport_type
1334 && handle.is_operational()
1335 && !self.bootstrap_transports.contains(id)
1336 })
1337 .min_by_key(|(id, _)| id.as_u32())
1338 .map(|(id, _)| *id)
1339 }
1340
1341 #[allow(unused_variables)]
1347 fn resolve_ethernet_addr(
1348 &self,
1349 addr_str: &str,
1350 ) -> Result<(TransportId, TransportAddr), NodeError> {
1351 #[cfg(any(target_os = "linux", target_os = "macos"))]
1352 {
1353 let (iface, mac_str) = addr_str.split_once('/').ok_or_else(|| {
1354 NodeError::NoTransportForType(format!(
1355 "invalid Ethernet address format '{}': expected 'interface/mac'",
1356 addr_str
1357 ))
1358 })?;
1359
1360 let transport_id = self
1362 .transports
1363 .iter()
1364 .find(|(_, handle)| {
1365 handle.transport_type().name == "ethernet"
1366 && handle.is_operational()
1367 && handle.interface_name() == Some(iface)
1368 })
1369 .map(|(id, _)| *id)
1370 .ok_or_else(|| {
1371 NodeError::NoTransportForType(format!(
1372 "no operational Ethernet transport for interface '{}'",
1373 iface
1374 ))
1375 })?;
1376
1377 let mac = crate::transport::ethernet::parse_mac_string(mac_str).map_err(|e| {
1378 NodeError::NoTransportForType(format!("invalid MAC in '{}': {}", addr_str, e))
1379 })?;
1380
1381 Ok((transport_id, TransportAddr::from_bytes(&mac)))
1382 }
1383 #[cfg(not(any(target_os = "linux", target_os = "macos")))]
1384 {
1385 Err(NodeError::NoTransportForType(
1386 "Ethernet transport is not supported on this platform".to_string(),
1387 ))
1388 }
1389 }
1390
1391 #[cfg(bluer_available)]
1395 fn resolve_ble_addr(&self, addr_str: &str) -> Result<(TransportId, TransportAddr), NodeError> {
1396 let ta = TransportAddr::from_string(addr_str);
1397 let adapter = crate::transport::ble::addr::adapter_from_addr(&ta).ok_or_else(|| {
1398 NodeError::NoTransportForType(format!(
1399 "invalid BLE address format '{}': expected 'adapter/mac'",
1400 addr_str
1401 ))
1402 })?;
1403
1404 let transport_id = self
1406 .transports
1407 .iter()
1408 .find(|(_, handle)| handle.transport_type().name == "ble" && handle.is_operational())
1409 .map(|(id, _)| *id)
1410 .ok_or_else(|| {
1411 NodeError::NoTransportForType(format!(
1412 "no operational BLE transport for adapter '{}'",
1413 adapter
1414 ))
1415 })?;
1416
1417 crate::transport::ble::addr::BleAddr::parse(addr_str).map_err(|e| {
1419 NodeError::NoTransportForType(format!("invalid BLE address '{}': {}", addr_str, e))
1420 })?;
1421
1422 Ok((transport_id, TransportAddr::from_string(addr_str)))
1423 }
1424
1425 pub fn identity(&self) -> &Identity {
1429 &self.identity
1430 }
1431
1432 pub fn node_addr(&self) -> &NodeAddr {
1434 self.identity.node_addr()
1435 }
1436
1437 pub fn npub(&self) -> String {
1439 self.identity.npub()
1440 }
1441
1442 pub(crate) fn peer_display_name(&self, addr: &NodeAddr) -> String {
1451 if let Some(hostname) = self.host_map.lookup_hostname(addr) {
1452 return hostname.to_string();
1453 }
1454 if let Some(name) = self.peer_aliases.get(addr) {
1455 return name.clone();
1456 }
1457 if let Some(peer) = self.peers.get(addr) {
1458 return peer.identity().short_npub();
1459 }
1460 if let Some(entry) = self.sessions.get(addr) {
1461 let (xonly, _) = entry.remote_pubkey().x_only_public_key();
1462 return PeerIdentity::from_pubkey(xonly).short_npub();
1463 }
1464 addr.short_hex()
1465 }
1466
1467 pub(in crate::node) fn deregister_session_index(&mut self, cache_key: (TransportId, u32)) {
1479 let owning_peer = self.peers_by_index.get(&cache_key).copied();
1483 self.peers_by_index.remove(&cache_key);
1484 if self.decrypt_registered_sessions.remove(&cache_key)
1485 && let Some(workers) = self.decrypt_workers.as_ref()
1486 {
1487 workers.unregister_session(cache_key);
1488 }
1489 if let Some(peer_addr) = owning_peer {
1500 let peer_has_other_index = self
1501 .peers_by_index
1502 .values()
1503 .any(|other| *other == peer_addr);
1504 if !peer_has_other_index {
1505 self.clear_connected_udp_for_peer(&peer_addr);
1506 }
1507 }
1508 }
1509
1510 pub(in crate::node) fn ensure_current_session_index_registered(
1519 &mut self,
1520 node_addr: &NodeAddr,
1521 context: &'static str,
1522 ) -> bool {
1523 let Some(peer) = self.peers.get(node_addr) else {
1524 return false;
1525 };
1526 let Some(transport_id) = peer.transport_id() else {
1527 warn!(
1528 peer = %self.peer_display_name(node_addr),
1529 context,
1530 "Cannot register current session index without transport id"
1531 );
1532 return false;
1533 };
1534 let Some(our_index) = peer.our_index() else {
1535 warn!(
1536 peer = %self.peer_display_name(node_addr),
1537 context,
1538 "Cannot register current session index without local index"
1539 );
1540 return false;
1541 };
1542
1543 let cache_key = (transport_id, our_index.as_u32());
1544 match self.peers_by_index.get(&cache_key).copied() {
1545 Some(existing) if existing == *node_addr => true,
1546 Some(existing) => {
1547 warn!(
1548 peer = %self.peer_display_name(node_addr),
1549 previous_owner = %self.peer_display_name(&existing),
1550 transport_id = %transport_id,
1551 our_index = %our_index,
1552 context,
1553 "Repairing current session index with stale owner"
1554 );
1555 self.peers_by_index.insert(cache_key, *node_addr);
1556 true
1557 }
1558 None => {
1559 warn!(
1560 peer = %self.peer_display_name(node_addr),
1561 transport_id = %transport_id,
1562 our_index = %our_index,
1563 context,
1564 "Repairing missing current session index"
1565 );
1566 self.peers_by_index.insert(cache_key, *node_addr);
1567 true
1568 }
1569 }
1570 }
1571
1572 pub fn config(&self) -> &Config {
1576 &self.config
1577 }
1578
1579 pub fn effective_ipv6_mtu(&self) -> u16 {
1585 crate::upper::icmp::effective_ipv6_mtu(self.transport_mtu())
1586 }
1587
1588 pub fn transport_mtu(&self) -> u16 {
1605 let min_operational = self
1606 .transports
1607 .values()
1608 .filter(|h| h.is_operational())
1609 .map(|h| h.mtu())
1610 .min();
1611 if let Some(mtu) = min_operational {
1612 return mtu;
1613 }
1614 if let Some((_, cfg)) = self.config.transports.udp.iter().next() {
1616 return cfg.mtu();
1617 }
1618 1280
1619 }
1620
1621 pub fn state(&self) -> NodeState {
1625 self.state
1626 }
1627
1628 pub fn uptime(&self) -> std::time::Duration {
1630 self.started_at.elapsed()
1631 }
1632
1633 pub fn is_running(&self) -> bool {
1635 self.state.is_operational()
1636 }
1637
1638 pub fn is_leaf_only(&self) -> bool {
1640 self.is_leaf_only
1641 }
1642
1643 pub fn tree_state(&self) -> &TreeState {
1647 &self.tree_state
1648 }
1649
1650 pub fn tree_state_mut(&mut self) -> &mut TreeState {
1652 &mut self.tree_state
1653 }
1654
1655 pub fn bloom_state(&self) -> &BloomState {
1659 &self.bloom_state
1660 }
1661
1662 pub fn bloom_state_mut(&mut self) -> &mut BloomState {
1664 &mut self.bloom_state
1665 }
1666
1667 pub fn estimated_mesh_size(&self) -> Option<u64> {
1671 self.estimated_mesh_size
1672 }
1673
1674 pub(crate) fn compute_mesh_size(&mut self) {
1680 let my_addr = *self.tree_state.my_node_addr();
1681 let parent_id = *self.tree_state.my_declaration().parent_id();
1682 let is_root = self.tree_state.is_root();
1683
1684 let max_fpr = self.config.node.bloom.max_inbound_fpr;
1685 let mut total: f64 = 1.0; let mut child_count: u32 = 0;
1687 let mut has_data = false;
1688
1689 if !is_root
1695 && let Some(parent) = self.peers.get(&parent_id)
1696 && let Some(filter) = parent.inbound_filter()
1697 {
1698 match filter.estimated_count(max_fpr) {
1699 Some(n) => {
1700 total += n;
1701 has_data = true;
1702 }
1703 None => {
1704 self.estimated_mesh_size = None;
1705 return;
1706 }
1707 }
1708 }
1709
1710 for (peer_addr, peer) in &self.peers {
1712 if peer_addr == &parent_id {
1713 continue;
1714 }
1715 if let Some(decl) = self.tree_state.peer_declaration(peer_addr)
1716 && *decl.parent_id() == my_addr
1717 {
1718 child_count += 1;
1719 if let Some(filter) = peer.inbound_filter() {
1720 match filter.estimated_count(max_fpr) {
1721 Some(n) => {
1722 total += n;
1723 has_data = true;
1724 }
1725 None => {
1726 self.estimated_mesh_size = None;
1727 return;
1728 }
1729 }
1730 }
1731 }
1732 }
1733
1734 if !has_data {
1735 self.estimated_mesh_size = None;
1736 return;
1737 }
1738
1739 let size = total.round() as u64;
1740 self.estimated_mesh_size = Some(size);
1741
1742 let now = std::time::Instant::now();
1744 let should_log = match self.last_mesh_size_log {
1745 None => true,
1746 Some(last) => {
1747 now.duration_since(last)
1748 >= std::time::Duration::from_secs(self.config.node.mmp.log_interval_secs)
1749 }
1750 };
1751 if should_log {
1752 tracing::debug!(
1753 estimated_mesh_size = size,
1754 peers = self.peers.len(),
1755 children = child_count,
1756 "Mesh size estimate"
1757 );
1758 self.last_mesh_size_log = Some(now);
1759 }
1760 }
1761
1762 pub fn coord_cache(&self) -> &CoordCache {
1766 &self.coord_cache
1767 }
1768
1769 pub fn coord_cache_mut(&mut self) -> &mut CoordCache {
1771 &mut self.coord_cache
1772 }
1773
1774 pub fn stats(&self) -> &stats::NodeStats {
1778 &self.stats
1779 }
1780
1781 pub(crate) fn stats_mut(&mut self) -> &mut stats::NodeStats {
1783 &mut self.stats
1784 }
1785
1786 pub fn stats_history(&self) -> &stats_history::StatsHistory {
1788 &self.stats_history
1789 }
1790
1791 pub(crate) fn record_stats_history(&mut self) {
1794 let fwd = &self.stats.forwarding;
1795 let peers_with_mmp: Vec<f64> = self
1796 .peers
1797 .values()
1798 .filter_map(|p| p.mmp().map(|m| m.metrics.loss_rate()))
1799 .collect();
1800 let loss_rate = if peers_with_mmp.is_empty() {
1801 0.0
1802 } else {
1803 peers_with_mmp.iter().sum::<f64>() / peers_with_mmp.len() as f64
1804 };
1805
1806 let snap = stats_history::Snapshot {
1807 mesh_size: self.estimated_mesh_size,
1808 tree_depth: self.tree_state.my_coords().depth() as u32,
1809 peer_count: self.peers.len() as u64,
1810 parent_switches_total: self.stats.tree.parent_switches,
1811 bytes_in_total: fwd.received_bytes,
1812 bytes_out_total: fwd.forwarded_bytes + fwd.originated_bytes,
1813 packets_in_total: fwd.received_packets,
1814 packets_out_total: fwd.forwarded_packets + fwd.originated_packets,
1815 loss_rate,
1816 active_sessions: self.sessions.len() as u64,
1817 };
1818
1819 let now = std::time::Instant::now();
1820 let peer_snaps: Vec<stats_history::PeerSnapshot> = self
1821 .peers
1822 .values()
1823 .map(|p| {
1824 let stats = p.link_stats();
1825 let (srtt_ms, loss_rate, ecn_ce) = match p.mmp() {
1826 Some(m) => (
1827 m.metrics.srtt_ms(),
1828 Some(m.metrics.loss_rate()),
1829 m.receiver.ecn_ce_count() as u64,
1830 ),
1831 None => (None, None, 0),
1832 };
1833 stats_history::PeerSnapshot {
1834 node_addr: *p.node_addr(),
1835 last_seen: now,
1836 srtt_ms,
1837 loss_rate,
1838 bytes_in_total: stats.bytes_recv,
1839 bytes_out_total: stats.bytes_sent,
1840 packets_in_total: stats.packets_recv,
1841 packets_out_total: stats.packets_sent,
1842 ecn_ce_total: ecn_ce,
1843 }
1844 })
1845 .collect();
1846
1847 self.stats_history.tick(now, &snap, &peer_snaps);
1848 }
1849
1850 pub fn tun_state(&self) -> TunState {
1854 self.tun_state
1855 }
1856
1857 pub fn tun_name(&self) -> Option<&str> {
1859 self.tun_name.as_deref()
1860 }
1861
1862 pub fn set_max_connections(&mut self, max: usize) {
1866 self.max_connections = max;
1867 }
1868
1869 pub fn set_max_peers(&mut self, max: usize) {
1871 self.max_peers = max;
1872 }
1873
1874 pub(crate) fn outbound_admission_check(&self) -> bool {
1877 let connection_used = self
1878 .connections
1879 .len()
1880 .saturating_add(self.pending_connects.len());
1881 let peer_allowed = self.max_peers == 0 || self.peers.len() < self.max_peers;
1882 let connection_allowed =
1883 self.max_connections == 0 || connection_used < self.max_connections;
1884 let link_allowed = self.max_links == 0 || self.links.len() < self.max_links;
1885 peer_allowed && connection_allowed && link_allowed
1886 }
1887
1888 pub(crate) fn open_discovery_outbound_admission_check(&self) -> bool {
1892 if !self.outbound_admission_check() {
1893 return false;
1894 }
1895
1896 let nostr = &self.config.node.discovery.nostr;
1897 if !nostr.enabled || nostr.policy != NostrDiscoveryPolicy::Open {
1898 return true;
1899 }
1900
1901 let configured_npubs = self
1902 .config
1903 .peers()
1904 .iter()
1905 .map(|peer| peer.npub.clone())
1906 .collect::<HashSet<_>>();
1907 self.open_discovery_enqueue_budget(&configured_npubs) > 0
1908 }
1909
1910 pub(crate) fn outbound_direct_refresh_admission_check(&self) -> bool {
1914 let connection_used = self
1915 .connections
1916 .len()
1917 .saturating_add(self.pending_connects.len());
1918 let connection_allowed =
1919 self.max_connections == 0 || connection_used < self.max_connections;
1920 let link_allowed = self.max_links == 0 || self.links.len() < self.max_links;
1921 connection_allowed && link_allowed
1922 }
1923
1924 pub fn set_max_links(&mut self, max: usize) {
1926 self.max_links = max;
1927 }
1928
1929 pub fn connection_count(&self) -> usize {
1933 self.connections.len()
1934 }
1935
1936 pub fn peer_count(&self) -> usize {
1938 self.peers.len()
1939 }
1940
1941 pub fn link_count(&self) -> usize {
1943 self.links.len()
1944 }
1945
1946 pub fn transport_count(&self) -> usize {
1948 self.transports.len()
1949 }
1950
1951 pub fn allocate_transport_id(&mut self) -> TransportId {
1955 let id = TransportId::new(self.next_transport_id);
1956 self.next_transport_id += 1;
1957 id
1958 }
1959
1960 pub fn get_transport(&self, id: &TransportId) -> Option<&TransportHandle> {
1962 self.transports.get(id)
1963 }
1964
1965 pub fn get_transport_mut(&mut self, id: &TransportId) -> Option<&mut TransportHandle> {
1967 self.transports.get_mut(id)
1968 }
1969
1970 pub fn transport_ids(&self) -> impl Iterator<Item = &TransportId> {
1972 self.transports.keys()
1973 }
1974
1975 pub fn packet_rx(&mut self) -> Option<&mut PacketRx> {
1977 self.packet_rx.as_mut()
1978 }
1979
1980 pub fn allocate_link_id(&mut self) -> LinkId {
1984 let id = LinkId::new(self.next_link_id);
1985 self.next_link_id += 1;
1986 id
1987 }
1988
1989 pub fn add_link(&mut self, link: Link) -> Result<(), NodeError> {
1991 if self.max_links > 0 && self.links.len() >= self.max_links {
1992 return Err(NodeError::MaxLinksExceeded {
1993 max: self.max_links,
1994 });
1995 }
1996 let link_id = link.link_id();
1997 let transport_id = link.transport_id();
1998 let remote_addr = link.remote_addr().clone();
1999
2000 self.links.insert(link_id, link);
2001 self.addr_to_link
2002 .insert((transport_id, remote_addr), link_id);
2003 Ok(())
2004 }
2005
2006 pub fn get_link(&self, link_id: &LinkId) -> Option<&Link> {
2008 self.links.get(link_id)
2009 }
2010
2011 pub fn get_link_mut(&mut self, link_id: &LinkId) -> Option<&mut Link> {
2013 self.links.get_mut(link_id)
2014 }
2015
2016 pub fn find_link_by_addr(
2018 &self,
2019 transport_id: TransportId,
2020 addr: &TransportAddr,
2021 ) -> Option<LinkId> {
2022 self.addr_to_link
2023 .get(&(transport_id, addr.clone()))
2024 .copied()
2025 }
2026
2027 pub fn remove_link(&mut self, link_id: &LinkId) -> Option<Link> {
2033 if let Some(link) = self.links.remove(link_id) {
2034 let key = (link.transport_id(), link.remote_addr().clone());
2036 if self.addr_to_link.get(&key) == Some(link_id) {
2037 self.addr_to_link.remove(&key);
2038 }
2039 Some(link)
2040 } else {
2041 None
2042 }
2043 }
2044
2045 pub(crate) fn cleanup_bootstrap_transport_if_unused(&mut self, transport_id: TransportId) {
2046 if !self.bootstrap_transports.contains(&transport_id) {
2047 return;
2048 }
2049
2050 let transport_in_use = self
2051 .links
2052 .values()
2053 .any(|link| link.transport_id() == transport_id)
2054 || self
2055 .connections
2056 .values()
2057 .any(|conn| conn.transport_id() == Some(transport_id))
2058 || self
2059 .peers
2060 .values()
2061 .any(|peer| peer.transport_id() == Some(transport_id))
2062 || self
2063 .pending_connects
2064 .iter()
2065 .any(|pending| pending.transport_id == transport_id);
2066
2067 if transport_in_use {
2068 return;
2069 }
2070
2071 tracing::debug!(
2072 transport_id = %transport_id,
2073 "bootstrap transport has no remaining references; dropping"
2074 );
2075
2076 self.bootstrap_transports.remove(&transport_id);
2077 self.bootstrap_transport_npubs.remove(&transport_id);
2078 self.transport_drops.remove(&transport_id);
2079 self.transports.remove(&transport_id);
2080 }
2081
2082 pub fn links(&self) -> impl Iterator<Item = &Link> {
2084 self.links.values()
2085 }
2086
2087 pub fn add_connection(&mut self, connection: PeerConnection) -> Result<(), NodeError> {
2091 let link_id = connection.link_id();
2092
2093 if self.connections.contains_key(&link_id) {
2094 return Err(NodeError::ConnectionAlreadyExists(link_id));
2095 }
2096
2097 if self.max_connections > 0 && self.connections.len() >= self.max_connections {
2098 return Err(NodeError::MaxConnectionsExceeded {
2099 max: self.max_connections,
2100 });
2101 }
2102
2103 self.connections.insert(link_id, connection);
2104 Ok(())
2105 }
2106
2107 pub fn get_connection(&self, link_id: &LinkId) -> Option<&PeerConnection> {
2109 self.connections.get(link_id)
2110 }
2111
2112 pub fn get_connection_mut(&mut self, link_id: &LinkId) -> Option<&mut PeerConnection> {
2114 self.connections.get_mut(link_id)
2115 }
2116
2117 pub fn remove_connection(&mut self, link_id: &LinkId) -> Option<PeerConnection> {
2119 self.connections.remove(link_id)
2120 }
2121
2122 pub fn connections(&self) -> impl Iterator<Item = &PeerConnection> {
2124 self.connections.values()
2125 }
2126
2127 pub fn get_peer(&self, node_addr: &NodeAddr) -> Option<&ActivePeer> {
2131 self.peers.get(node_addr)
2132 }
2133
2134 pub fn get_peer_mut(&mut self, node_addr: &NodeAddr) -> Option<&mut ActivePeer> {
2136 self.peers.get_mut(node_addr)
2137 }
2138
2139 pub fn remove_peer(&mut self, node_addr: &NodeAddr) -> Option<ActivePeer> {
2141 self.peers.remove(node_addr)
2142 }
2143
2144 pub fn peers(&self) -> impl Iterator<Item = &ActivePeer> {
2146 self.peers.values()
2147 }
2148
2149 pub fn nostr_discovery_handle(&self) -> Option<&crate::discovery::nostr::NostrDiscovery> {
2153 self.nostr_discovery.as_deref()
2154 }
2155
2156 pub fn peer_ids(&self) -> impl Iterator<Item = &NodeAddr> {
2158 self.peers.keys()
2159 }
2160
2161 pub fn sendable_peers(&self) -> impl Iterator<Item = &ActivePeer> {
2163 self.peers.values().filter(|p| p.can_send())
2164 }
2165
2166 pub fn sendable_peer_count(&self) -> usize {
2168 self.peers.values().filter(|p| p.can_send()).count()
2169 }
2170
2171 pub(crate) fn set_discovery_fallback_transit_allowed(
2172 &mut self,
2173 peer_addr: NodeAddr,
2174 allowed: bool,
2175 ) {
2176 if allowed {
2177 self.discovery_fallback_transit_blocked_peers
2178 .remove(&peer_addr);
2179 } else {
2180 self.discovery_fallback_transit_blocked_peers
2181 .insert(peer_addr);
2182 }
2183 }
2184
2185 pub(crate) fn configured_discovery_fallback_transit(
2186 &self,
2187 peer_addr: &NodeAddr,
2188 ) -> Option<bool> {
2189 self.configured_peer(peer_addr)
2190 .map(|peer| peer.discovery_fallback_transit)
2191 }
2192
2193 pub(crate) fn configured_peer(&self, peer_addr: &NodeAddr) -> Option<&PeerConfig> {
2194 self.config.peers().iter().find(|peer| {
2195 PeerIdentity::from_npub(&peer.npub)
2196 .ok()
2197 .is_some_and(|identity| identity.node_addr() == peer_addr)
2198 })
2199 }
2200
2201 pub(crate) fn discovery_fallback_transit_for_promotion(&self, peer_addr: &NodeAddr) -> bool {
2202 if let Some(retry_state) = self.retry_pending.get(peer_addr) {
2203 return retry_state.peer_config.discovery_fallback_transit;
2204 }
2205
2206 if let Some(allowed) = self.configured_discovery_fallback_transit(peer_addr) {
2207 return allowed;
2208 }
2209
2210 self.config.node.discovery.nostr.policy != crate::config::NostrDiscoveryPolicy::Open
2211 }
2212
2213 #[cfg(test)]
2218 pub(crate) fn disable_discovery_forward_rate_limit(&mut self) {
2219 self.discovery_forward_limiter
2220 .set_interval(std::time::Duration::ZERO);
2221 }
2222
2223 #[cfg(test)]
2224 pub(crate) fn get_session(&self, remote: &NodeAddr) -> Option<&SessionEntry> {
2225 self.sessions.get(remote)
2226 }
2227
2228 #[cfg(test)]
2230 pub(crate) fn get_session_mut(&mut self, remote: &NodeAddr) -> Option<&mut SessionEntry> {
2231 self.sessions.get_mut(remote)
2232 }
2233
2234 #[cfg(test)]
2236 pub(crate) fn remove_session(&mut self, remote: &NodeAddr) -> Option<SessionEntry> {
2237 self.sessions.remove(remote)
2238 }
2239
2240 #[cfg(test)]
2242 pub(crate) fn path_mtu_lookup_get(&self, fips_addr: &crate::FipsAddress) -> Option<u16> {
2243 self.path_mtu_lookup
2244 .read()
2245 .ok()
2246 .and_then(|map| map.get(fips_addr).copied())
2247 }
2248
2249 #[cfg(test)]
2251 pub(crate) fn path_mtu_lookup_insert(&self, fips_addr: crate::FipsAddress, mtu: u16) {
2252 if let Ok(mut map) = self.path_mtu_lookup.write() {
2253 map.insert(fips_addr, mtu);
2254 }
2255 }
2256
2257 pub fn session_count(&self) -> usize {
2259 self.sessions.len()
2260 }
2261
2262 pub(crate) fn session_entries(&self) -> impl Iterator<Item = (&NodeAddr, &SessionEntry)> {
2264 self.sessions.iter()
2265 }
2266
2267 pub(crate) fn register_identity(
2271 &mut self,
2272 node_addr: NodeAddr,
2273 pubkey: secp256k1::PublicKey,
2274 ) -> bool {
2275 let mut prefix = [0u8; 15];
2276 prefix.copy_from_slice(&node_addr.as_bytes()[0..15]);
2277 if let Some(entry) = self.identity_cache.get(&prefix)
2278 && entry.node_addr == node_addr
2279 && entry.pubkey == pubkey
2280 {
2281 return true;
2285 }
2286
2287 let (xonly, _) = pubkey.x_only_public_key();
2288 let derived_node_addr = NodeAddr::from_pubkey(&xonly);
2289 if derived_node_addr != node_addr {
2290 debug!(
2291 claimed_node_addr = %node_addr,
2292 derived_node_addr = %derived_node_addr,
2293 "Rejected identity cache entry with mismatched public key"
2294 );
2295 return false;
2296 }
2297
2298 let now_ms = Self::now_ms();
2299 if let Some(entry) = self.identity_cache.get_mut(&prefix)
2300 && entry.node_addr == node_addr
2301 {
2302 entry.pubkey = pubkey;
2303 entry.last_seen_ms = now_ms;
2304 return true;
2305 }
2306
2307 let npub = encode_npub(&xonly);
2308 self.identity_cache.insert(
2309 prefix,
2310 IdentityCacheEntry::new(node_addr, pubkey, npub, now_ms),
2311 );
2312 let max = self.config.node.cache.identity_size;
2314 if self.identity_cache.len() > max
2315 && let Some(oldest_key) = self
2316 .identity_cache
2317 .iter()
2318 .min_by_key(|(_, entry)| entry.last_seen_ms)
2319 .map(|(k, _)| *k)
2320 {
2321 self.identity_cache.remove(&oldest_key);
2322 }
2323 true
2324 }
2325
2326 pub(crate) fn lookup_by_fips_prefix(
2328 &mut self,
2329 prefix: &[u8; 15],
2330 ) -> Option<(NodeAddr, secp256k1::PublicKey)> {
2331 if let Some(entry) = self.identity_cache.get_mut(prefix) {
2332 entry.last_seen_ms = Self::now_ms(); Some((entry.node_addr, entry.pubkey))
2334 } else {
2335 None
2336 }
2337 }
2338
2339 pub(crate) fn has_cached_identity(&self, addr: &NodeAddr) -> bool {
2341 let mut prefix = [0u8; 15];
2342 prefix.copy_from_slice(&addr.as_bytes()[0..15]);
2343 self.identity_cache.contains_key(&prefix)
2344 }
2345
2346 pub fn identity_cache_len(&self) -> usize {
2348 self.identity_cache.len()
2349 }
2350
2351 pub fn identity_cache_iter(
2356 &self,
2357 ) -> impl Iterator<Item = (&NodeAddr, &secp256k1::PublicKey, u64)> {
2358 self.identity_cache
2359 .values()
2360 .map(|entry| (&entry.node_addr, &entry.pubkey, entry.last_seen_ms))
2361 }
2362
2363 pub fn identity_cache_max(&self) -> usize {
2365 self.config.node.cache.identity_size
2366 }
2367
2368 pub fn pending_lookup_count(&self) -> usize {
2370 self.pending_lookups.len()
2371 }
2372
2373 pub fn pending_lookups_iter(
2375 &self,
2376 ) -> impl Iterator<Item = (&NodeAddr, &handlers::discovery::PendingLookup)> {
2377 self.pending_lookups.iter()
2378 }
2379
2380 pub fn recent_request_count(&self) -> usize {
2382 self.recent_requests.len()
2383 }
2384
2385 pub fn pending_tun_destinations(&self) -> usize {
2387 self.pending_tun_packets.len()
2388 }
2389
2390 pub fn pending_tun_total_packets(&self) -> usize {
2392 self.pending_tun_packets.values().map(|q| q.len()).sum()
2393 }
2394
2395 pub fn retry_state_iter(&self) -> impl Iterator<Item = (&NodeAddr, &retry::RetryState)> {
2397 self.retry_pending.iter()
2398 }
2399
2400 pub(crate) fn is_tree_peer(&self, peer_addr: &NodeAddr) -> bool {
2407 if !self.tree_state.is_root() && self.tree_state.my_declaration().parent_id() == peer_addr {
2409 return true;
2410 }
2411 if let Some(decl) = self.tree_state.peer_declaration(peer_addr)
2413 && decl.parent_id() == self.node_addr()
2414 {
2415 return true;
2416 }
2417 false
2418 }
2419
2420 pub fn find_next_hop(&mut self, dest_node_addr: &NodeAddr) -> Option<&ActivePeer> {
2443 if dest_node_addr == self.node_addr() {
2445 return None;
2446 }
2447
2448 let direct_peer_can_send = self
2452 .peers
2453 .get(dest_node_addr)
2454 .is_some_and(|peer| peer.can_send());
2455 if let Some(peer) = self.peers.get(dest_node_addr)
2456 && peer.is_healthy()
2457 {
2458 return Some(peer);
2459 }
2460
2461 let now_ms = Self::now_ms();
2462
2463 let sendable_learned_peers = if self.config.node.routing.mode == RoutingMode::ReplyLearned {
2464 Some(
2465 self.peers
2466 .iter()
2467 .filter(|(_, peer)| peer.can_send())
2468 .map(|(addr, _)| *addr)
2469 .collect::<HashSet<_>>(),
2470 )
2471 } else {
2472 None
2473 };
2474
2475 let explore_fallback = sendable_learned_peers.as_ref().is_some_and(|sendable| {
2482 self.learned_routes.should_explore_fallback(
2483 dest_node_addr,
2484 now_ms,
2485 self.config.node.routing.learned_fallback_explore_interval,
2486 |addr| sendable.contains(addr),
2487 )
2488 });
2489 if let Some(sendable) = &sendable_learned_peers
2490 && !explore_fallback
2491 && let Some(next_hop_addr) =
2492 self.learned_routes
2493 .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2494 {
2495 return self.peers.get(&next_hop_addr);
2496 }
2497
2498 let Some(dest_coords) = self
2500 .coord_cache
2501 .get_and_touch(dest_node_addr, now_ms)
2502 .cloned()
2503 else {
2504 if let Some(sendable) = &sendable_learned_peers
2505 && let Some(next_hop_addr) =
2506 self.learned_routes
2507 .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2508 {
2509 return self.peers.get(&next_hop_addr);
2510 }
2511 if direct_peer_can_send {
2512 return self.peers.get(dest_node_addr);
2513 }
2514 return None;
2515 };
2516
2517 let coordinate_route_addr = {
2520 let candidates: Vec<&ActivePeer> = self.destination_in_filters(dest_node_addr);
2521 if !candidates.is_empty() {
2522 self.select_best_candidate(&candidates, &dest_coords)
2523 .map(|peer| *peer.node_addr())
2524 } else {
2525 None
2526 }
2527 };
2528 if let Some(next_hop_addr) = coordinate_route_addr {
2529 return self.peers.get(&next_hop_addr);
2530 }
2531
2532 let tree_route_addr = self
2534 .tree_state
2535 .find_next_hop(&dest_coords)
2536 .filter(|next_hop_id| {
2537 self.peers
2538 .get(next_hop_id)
2539 .is_some_and(|peer| peer.can_send())
2540 });
2541 if let Some(next_hop_addr) = tree_route_addr {
2542 return self.peers.get(&next_hop_addr);
2543 }
2544 if explore_fallback {
2545 return sendable_learned_peers.as_ref().and_then(|sendable| {
2546 self.learned_routes
2547 .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2548 .and_then(|next_hop_addr| self.peers.get(&next_hop_addr))
2549 });
2550 }
2551
2552 if let Some(sendable) = &sendable_learned_peers
2553 && let Some(next_hop_addr) =
2554 self.learned_routes
2555 .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2556 {
2557 return self.peers.get(&next_hop_addr);
2558 }
2559
2560 if direct_peer_can_send {
2561 return self.peers.get(dest_node_addr);
2562 }
2563
2564 None
2565 }
2566
2567 pub(in crate::node) fn learn_reverse_route(
2568 &mut self,
2569 destination: NodeAddr,
2570 next_hop: NodeAddr,
2571 ) {
2572 if self.config.node.routing.mode != RoutingMode::ReplyLearned
2573 || destination == *self.node_addr()
2574 {
2575 return;
2576 }
2577 let now_ms = Self::now_ms();
2578 self.learned_routes.learn(
2579 destination,
2580 next_hop,
2581 now_ms,
2582 self.config.node.routing.learned_ttl_secs,
2583 self.config.node.routing.max_learned_routes_per_dest,
2584 );
2585 }
2586
2587 pub(in crate::node) fn record_route_failure(
2588 &mut self,
2589 destination: NodeAddr,
2590 next_hop: NodeAddr,
2591 ) {
2592 if self.config.node.routing.mode != RoutingMode::ReplyLearned {
2593 return;
2594 }
2595 self.learned_routes.record_failure(&destination, &next_hop);
2596 }
2597
2598 pub(crate) fn learned_route_table_snapshot(&self, now_ms: u64) -> LearnedRouteTableSnapshot {
2599 self.learned_routes.snapshot(now_ms)
2600 }
2601
2602 pub(in crate::node) fn purge_learned_routes(&mut self, now_ms: u64) {
2603 self.learned_routes.purge_expired(now_ms);
2604 }
2605
2606 fn select_best_candidate<'a>(
2615 &'a self,
2616 candidates: &[&'a ActivePeer],
2617 dest_coords: &crate::tree::TreeCoordinate,
2618 ) -> Option<&'a ActivePeer> {
2619 let my_distance = self.tree_state.my_coords().distance_to(dest_coords);
2620
2621 let mut best: Option<(&ActivePeer, f64, usize)> = None;
2622
2623 for &candidate in candidates {
2624 if !candidate.can_send() {
2625 continue;
2626 }
2627
2628 let cost = candidate.link_cost();
2629
2630 let dist = self
2631 .tree_state
2632 .peer_coords(candidate.node_addr())
2633 .map(|pc| pc.distance_to(dest_coords))
2634 .unwrap_or(usize::MAX);
2635
2636 if dist >= my_distance {
2639 continue;
2640 }
2641
2642 let dominated = match &best {
2643 None => true,
2644 Some((_, best_cost, best_dist)) => {
2645 cost < *best_cost
2646 || (cost == *best_cost && dist < *best_dist)
2647 || (cost == *best_cost
2648 && dist == *best_dist
2649 && candidate.node_addr() < best.as_ref().unwrap().0.node_addr())
2650 }
2651 };
2652
2653 if dominated {
2654 best = Some((candidate, cost, dist));
2655 }
2656 }
2657
2658 best.map(|(peer, _, _)| peer)
2659 }
2660
2661 pub fn destination_in_filters(&self, dest: &NodeAddr) -> Vec<&ActivePeer> {
2663 self.peers.values().filter(|p| p.may_reach(dest)).collect()
2664 }
2665
2666 pub fn tun_tx(&self) -> Option<&TunTx> {
2670 self.tun_tx.as_ref()
2671 }
2672
2673 pub fn attach_external_packet_io(
2680 &mut self,
2681 capacity: usize,
2682 ) -> Result<ExternalPacketIo, NodeError> {
2683 if self.state != NodeState::Created {
2684 return Err(NodeError::Config(ConfigError::Validation(
2685 "external packet I/O must be attached before node start".to_string(),
2686 )));
2687 }
2688 if self.config.tun.enabled {
2689 return Err(NodeError::Config(ConfigError::Validation(
2690 "external packet I/O requires tun.enabled=false".to_string(),
2691 )));
2692 }
2693
2694 let capacity = capacity.max(1);
2695 let (outbound_tx, outbound_rx) = tokio::sync::mpsc::channel(capacity);
2696 let (inbound_tx, inbound_rx) = tokio::sync::mpsc::channel(capacity);
2697 self.tun_outbound_rx = Some(outbound_rx);
2698 self.external_packet_tx = Some(inbound_tx);
2699
2700 Ok(ExternalPacketIo {
2701 outbound_tx,
2702 inbound_rx,
2703 })
2704 }
2705
2706 pub(crate) fn attach_endpoint_data_io(
2711 &mut self,
2712 capacity: usize,
2713 ) -> Result<EndpointDataIo, NodeError> {
2714 if self.state != NodeState::Created {
2715 return Err(NodeError::Config(ConfigError::Validation(
2716 "endpoint data I/O must be attached before node start".to_string(),
2717 )));
2718 }
2719
2720 let command_capacity = endpoint_data_command_capacity(capacity);
2721 let (command_tx, command_rx) = tokio::sync::mpsc::channel(command_capacity);
2722 let (event_tx, event_rx) = tokio::sync::mpsc::unbounded_channel();
2727 self.endpoint_command_rx = Some(command_rx);
2728 self.endpoint_event_tx = Some(event_tx.clone());
2729
2730 Ok(EndpointDataIo {
2731 command_tx,
2732 event_rx,
2733 event_tx,
2734 })
2735 }
2736
2737 pub(crate) fn pubkey_for_node_addr(&self, addr: &NodeAddr) -> Option<secp256k1::PublicKey> {
2738 let mut prefix = [0u8; 15];
2739 prefix.copy_from_slice(&addr.as_bytes()[0..15]);
2740 self.identity_cache
2741 .get(&prefix)
2742 .filter(|entry| &entry.node_addr == addr)
2743 .map(|entry| entry.pubkey)
2744 }
2745
2746 pub(crate) fn npub_for_node_addr(&self, addr: &NodeAddr) -> Option<String> {
2747 let mut prefix = [0u8; 15];
2748 prefix.copy_from_slice(&addr.as_bytes()[0..15]);
2749 self.identity_cache
2750 .get(&prefix)
2751 .filter(|entry| &entry.node_addr == addr)
2752 .map(|entry| entry.npub.clone())
2753 }
2754
2755 pub(in crate::node) fn deliver_external_ipv6_packet(
2756 &self,
2757 src_addr: &NodeAddr,
2758 packet: Vec<u8>,
2759 ) {
2760 let Some(external_packet_tx) = &self.external_packet_tx else {
2761 return;
2762 };
2763 if packet.len() < 40 {
2764 return;
2765 }
2766 let Ok(destination) = FipsAddress::from_slice(&packet[24..40]) else {
2767 return;
2768 };
2769 let delivered = NodeDeliveredPacket {
2770 source_node_addr: *src_addr,
2771 source_npub: self.npub_for_node_addr(src_addr),
2772 destination,
2773 packet,
2774 };
2775 if let Err(error) = external_packet_tx.try_send(delivered) {
2776 debug!(error = %error, "Failed to deliver packet to external app sink");
2777 }
2778 }
2779
2780 pub(super) async fn send_encrypted_link_message(
2794 &mut self,
2795 node_addr: &NodeAddr,
2796 plaintext: &[u8],
2797 ) -> Result<(), NodeError> {
2798 self.send_encrypted_link_message_with_ce(node_addr, plaintext, false)
2799 .await
2800 }
2801
2802 pub(in crate::node) fn note_local_send_outcome(
2808 &mut self,
2809 result: &Result<usize, TransportError>,
2810 ) {
2811 match result {
2812 Ok(_) => {
2813 if self.last_local_send_failure_at.is_some() {
2814 self.last_local_send_failure_at = None;
2815 }
2816 }
2817 Err(error) if error.is_local_route_unavailable() => {
2818 self.last_local_send_failure_at = Some(std::time::Instant::now());
2819 }
2820 Err(_) => {}
2821 }
2822 }
2823
2824 pub(in crate::node) fn local_send_failure_dead_timeout(
2830 &mut self,
2831 now: std::time::Instant,
2832 dead_timeout: std::time::Duration,
2833 fast_dead_timeout: std::time::Duration,
2834 ) -> std::time::Duration {
2835 match self.last_local_send_failure_at {
2836 Some(t) if now.duration_since(t) <= LOCAL_SEND_FAILURE_FAST_DEAD_WINDOW => {
2837 fast_dead_timeout.min(dead_timeout)
2838 }
2839 Some(_) => {
2840 self.last_local_send_failure_at = None;
2841 dead_timeout
2842 }
2843 None => dead_timeout,
2844 }
2845 }
2846
2847 pub(in crate::node) fn mark_rx_loop_maintenance_timeout(&mut self) {
2848 self.last_rx_loop_maintenance_timeout_at = Some(std::time::Instant::now());
2849 }
2850
2851 pub(in crate::node) fn rx_loop_maintenance_timed_out_recently(&self) -> bool {
2852 let Some(t) = self.last_rx_loop_maintenance_timeout_at else {
2853 return false;
2854 };
2855 let grace = std::time::Duration::from_secs(self.config.node.link_dead_timeout_secs.max(1));
2856 std::time::Instant::now().duration_since(t) <= grace
2857 }
2858
2859 pub(super) async fn send_encrypted_link_message_with_ce(
2863 &mut self,
2864 node_addr: &NodeAddr,
2865 plaintext: &[u8],
2866 ce_flag: bool,
2867 ) -> Result<(), NodeError> {
2868 let peer = self
2869 .peers
2870 .get_mut(node_addr)
2871 .ok_or(NodeError::PeerNotFound(*node_addr))?;
2872
2873 let their_index = peer.their_index().ok_or_else(|| NodeError::SendFailed {
2874 node_addr: *node_addr,
2875 reason: "no their_index".into(),
2876 })?;
2877 let transport_id = peer.transport_id().ok_or_else(|| NodeError::SendFailed {
2878 node_addr: *node_addr,
2879 reason: "no transport_id".into(),
2880 })?;
2881 let remote_addr = peer
2882 .current_addr()
2883 .cloned()
2884 .ok_or_else(|| NodeError::SendFailed {
2885 node_addr: *node_addr,
2886 reason: "no current_addr".into(),
2887 })?;
2888 #[cfg(any(target_os = "linux", target_os = "macos"))]
2889 let connected_socket = peer.connected_udp();
2890
2891 let timestamp_ms = peer.session_elapsed_ms();
2893
2894 let sp_flag = peer.mmp().map(|mmp| mmp.spin_bit.tx_bit()).unwrap_or(false);
2896 let mut flags = if sp_flag { FLAG_SP } else { 0 };
2897 if ce_flag {
2898 flags |= FLAG_CE;
2899 }
2900 if peer.current_k_bit() {
2901 flags |= FLAG_KEY_EPOCH;
2902 }
2903
2904 let session = peer
2905 .noise_session_mut()
2906 .ok_or_else(|| NodeError::SendFailed {
2907 node_addr: *node_addr,
2908 reason: "no noise session".into(),
2909 })?;
2910
2911 const INNER_TS_LEN: usize = 4;
2919 let counter = session.current_send_counter();
2920 let inner_len = INNER_TS_LEN + plaintext.len();
2921 let payload_len = inner_len as u16;
2922 let header = build_established_header(their_index, counter, flags, payload_len);
2923
2924 let transport_for_send = self
2943 .transports
2944 .get(&transport_id)
2945 .ok_or(NodeError::TransportNotFound(transport_id))?;
2946 match transport_for_send.connection_state(&remote_addr) {
2947 ConnectionState::Connected => {}
2948 other => {
2949 if matches!(other, ConnectionState::None) {
2950 let _ = transport_for_send.connect(&remote_addr).await;
2951 }
2952 return Err(NodeError::SendFailed {
2953 node_addr: *node_addr,
2954 reason: format!("transport connection not ready: {:?}", other),
2955 });
2956 }
2957 }
2958 let is_udp = matches!(transport_for_send, TransportHandle::Udp(_));
2959 if let Some(workers) = self.encrypt_workers.as_ref().cloned()
2960 && is_udp
2961 && let Some(cipher_clone) = session.send_cipher_clone()
2962 {
2963 {
2964 let reserved_counter =
2968 session
2969 .take_send_counter()
2970 .map_err(|e| NodeError::SendFailed {
2971 node_addr: *node_addr,
2972 reason: format!("counter reservation failed: {}", e),
2973 })?;
2974 debug_assert_eq!(reserved_counter, counter);
2975 let header =
2979 build_established_header(their_index, reserved_counter, flags, payload_len);
2980 let transport = transport_for_send;
2981 let send_target = {
2988 if let TransportHandle::Udp(udp) = transport {
2989 let socket_addr = {
2990 #[cfg(any(target_os = "linux", target_os = "macos"))]
2991 {
2992 match connected_socket.as_ref() {
2993 Some(socket) => Some(socket.peer_addr()),
2994 None => udp.resolve_for_off_task(&remote_addr).await.ok(),
2995 }
2996 }
2997 #[cfg(not(any(target_os = "linux", target_os = "macos")))]
2998 {
2999 udp.resolve_for_off_task(&remote_addr).await.ok()
3000 }
3001 };
3002 match (udp.async_socket(), socket_addr) {
3003 (Some(socket), Some(socket_addr)) => Some((socket, socket_addr)),
3004 _ => None,
3005 }
3006 } else {
3007 None
3008 }
3009 };
3010 if let Some((socket, socket_addr)) = send_target {
3011 let wire_capacity = ESTABLISHED_HEADER_SIZE + inner_len + 16;
3027 let mut wire_buf = Vec::with_capacity(wire_capacity);
3028 wire_buf.extend_from_slice(&header);
3029 wire_buf.extend_from_slice(×tamp_ms.to_le_bytes());
3030 wire_buf.extend_from_slice(plaintext);
3031 let predicted_bytes = wire_capacity;
3032 if let Some(peer) = self.peers.get_mut(node_addr) {
3039 peer.link_stats_mut().record_sent(predicted_bytes);
3040 if let Some(mmp) = peer.mmp_mut() {
3041 mmp.sender
3042 .record_sent(reserved_counter, timestamp_ms, predicted_bytes);
3043 }
3044 }
3045 let scheduling_weight = self.send_weight_for_peer(node_addr);
3046 workers.dispatch(self::encrypt_worker::FmpSendJob {
3047 cipher: cipher_clone,
3048 counter: reserved_counter,
3049 wire_buf,
3050 fsp_seal: None,
3051 socket,
3052 dest_addr: socket_addr,
3053 #[cfg(any(target_os = "linux", target_os = "macos"))]
3054 connected_socket,
3055 drop_on_backpressure: fmp_plaintext_is_bulk_session_datagram(plaintext),
3056 scheduling_weight,
3057 queued_at: crate::perf_profile::stamp(),
3058 });
3059 return Ok(());
3060 }
3061 }
3062 }
3063
3064 let inner_plaintext = prepend_inner_header(timestamp_ms, plaintext);
3069 let ciphertext = {
3071 let _t = crate::perf_profile::Timer::start(crate::perf_profile::Stage::FmpEncrypt);
3072 session
3073 .encrypt_with_aad(&inner_plaintext, &header)
3074 .map_err(|e| NodeError::SendFailed {
3075 node_addr: *node_addr,
3076 reason: format!("encryption failed: {}", e),
3077 })?
3078 };
3079
3080 let wire_packet = build_encrypted(&header, &ciphertext);
3081
3082 let send_result = {
3084 let _t = crate::perf_profile::Timer::start(crate::perf_profile::Stage::UdpSend);
3085 let transport = self
3086 .transports
3087 .get(&transport_id)
3088 .ok_or(NodeError::TransportNotFound(transport_id))?;
3089 transport.send(&remote_addr, &wire_packet).await
3090 };
3091 self.note_local_send_outcome(&send_result);
3092 let bytes_sent = send_result.map_err(|e| match e {
3093 TransportError::MtuExceeded { packet_size, mtu } => NodeError::MtuExceeded {
3094 node_addr: *node_addr,
3095 packet_size,
3096 mtu,
3097 },
3098 other => NodeError::SendFailed {
3099 node_addr: *node_addr,
3100 reason: format!("transport send: {}", other),
3101 },
3102 })?;
3103
3104 if let Some(peer) = self.peers.get_mut(node_addr) {
3106 peer.link_stats_mut().record_sent(bytes_sent);
3107 if let Some(mmp) = peer.mmp_mut() {
3109 mmp.sender.record_sent(counter, timestamp_ms, bytes_sent);
3110 }
3111 }
3112
3113 Ok(())
3114 }
3115}
3116
3117impl fmt::Debug for Node {
3118 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3119 f.debug_struct("Node")
3120 .field("node_addr", self.node_addr())
3121 .field("state", &self.state)
3122 .field("is_leaf_only", &self.is_leaf_only)
3123 .field("connections", &self.connection_count())
3124 .field("peers", &self.peer_count())
3125 .field("links", &self.link_count())
3126 .field("transports", &self.transport_count())
3127 .finish()
3128 }
3129}