1mod acl;
8mod bloom;
9mod decrypt_worker;
10mod discovery_rate_limit;
11mod encrypt_worker;
12mod handlers;
13mod lifecycle;
14mod rate_limit;
15mod retry;
16mod routing;
17mod routing_error_rate_limit;
18pub(crate) mod session;
19pub(crate) mod session_wire;
20pub(crate) mod stats;
21pub(crate) mod stats_history;
22#[cfg(test)]
23mod tests;
24mod tree;
25pub(crate) mod wire;
26
27use self::discovery_rate_limit::{DiscoveryBackoff, DiscoveryForwardRateLimiter};
28use self::rate_limit::HandshakeRateLimiter;
29use self::routing::{LearnedRouteTable, LearnedRouteTableSnapshot};
30use self::routing_error_rate_limit::RoutingErrorRateLimiter;
31use self::wire::{
32 ESTABLISHED_HEADER_SIZE, FLAG_CE, FLAG_KEY_EPOCH, FLAG_SP, build_encrypted,
33 build_established_header, prepend_inner_header,
34};
35use crate::bloom::BloomState;
36use crate::cache::CoordCache;
37use crate::config::{PeerConfig, RoutingMode};
38use crate::node::session::SessionEntry;
39use crate::peer::{ActivePeer, PeerConnection};
40#[cfg(any(target_os = "linux", target_os = "macos"))]
41use crate::transport::ethernet::EthernetTransport;
42use crate::transport::tcp::TcpTransport;
43use crate::transport::tor::TorTransport;
44use crate::transport::udp::UdpTransport;
45#[cfg(feature = "webrtc-transport")]
46use crate::transport::webrtc::WebRtcTransport;
47use crate::transport::{
48 ConnectionState, Link, LinkId, PacketRx, PacketTx, TransportAddr, TransportError,
49 TransportHandle, TransportId,
50};
51use crate::tree::TreeState;
52use crate::upper::hosts::HostMap;
53use crate::upper::icmp_rate_limit::IcmpRateLimiter;
54use crate::upper::tun::{TunError, TunOutboundRx, TunState, TunTx};
55use crate::utils::index::IndexAllocator;
56use crate::{
57 Config, ConfigError, FipsAddress, Identity, IdentityError, NodeAddr, PeerIdentity,
58 SessionMessageType, encode_npub,
59};
60use rand::Rng;
61use std::collections::{HashMap, HashSet, VecDeque};
62use std::fmt;
63use std::sync::Arc;
64use std::thread::JoinHandle;
65use thiserror::Error;
66use tracing::{debug, warn};
67
68pub(crate) const REKEY_JITTER_SECS: i64 = 15;
75
76#[derive(Debug, Error)]
78pub enum NodeError {
79 #[error("node not started")]
80 NotStarted,
81
82 #[error("node already started")]
83 AlreadyStarted,
84
85 #[error("node already stopped")]
86 AlreadyStopped,
87
88 #[error("transport not found: {0}")]
89 TransportNotFound(TransportId),
90
91 #[error("no transport available for type: {0}")]
92 NoTransportForType(String),
93
94 #[error("link not found: {0}")]
95 LinkNotFound(LinkId),
96
97 #[error("connection not found: {0}")]
98 ConnectionNotFound(LinkId),
99
100 #[error("peer not found: {0:?}")]
101 PeerNotFound(NodeAddr),
102
103 #[error("peer already exists: {0:?}")]
104 PeerAlreadyExists(NodeAddr),
105
106 #[error("connection already exists for link: {0}")]
107 ConnectionAlreadyExists(LinkId),
108
109 #[error("invalid peer npub '{npub}': {reason}")]
110 InvalidPeerNpub { npub: String, reason: String },
111
112 #[error("discovery error: {0}")]
113 Discovery(String),
114
115 #[error("access denied: {0}")]
116 AccessDenied(String),
117
118 #[error("max connections exceeded: {max}")]
119 MaxConnectionsExceeded { max: usize },
120
121 #[error("max peers exceeded: {max}")]
122 MaxPeersExceeded { max: usize },
123
124 #[error("max links exceeded: {max}")]
125 MaxLinksExceeded { max: usize },
126
127 #[error("handshake incomplete for link {0}")]
128 HandshakeIncomplete(LinkId),
129
130 #[error("no session available for link {0}")]
131 NoSession(LinkId),
132
133 #[error("promotion failed for link {link_id}: {reason}")]
134 PromotionFailed { link_id: LinkId, reason: String },
135
136 #[error("send failed to {node_addr}: {reason}")]
137 SendFailed { node_addr: NodeAddr, reason: String },
138
139 #[error("mtu exceeded forwarding to {node_addr}: packet {packet_size} > mtu {mtu}")]
140 MtuExceeded {
141 node_addr: NodeAddr,
142 packet_size: usize,
143 mtu: u16,
144 },
145
146 #[error("config error: {0}")]
147 Config(#[from] ConfigError),
148
149 #[error("identity error: {0}")]
150 Identity(#[from] IdentityError),
151
152 #[error("TUN error: {0}")]
153 Tun(#[from] TunError),
154
155 #[error("index allocation failed: {0}")]
156 IndexAllocationFailed(String),
157
158 #[error("handshake failed: {0}")]
159 HandshakeFailed(String),
160
161 #[error("transport error: {0}")]
162 TransportError(String),
163
164 #[error("bootstrap handoff failed: {0}")]
165 BootstrapHandoff(String),
166}
167
168#[derive(Debug, Clone, PartialEq, Eq)]
170pub struct NodeDeliveredPacket {
171 pub source_node_addr: NodeAddr,
173 pub source_npub: Option<String>,
175 pub destination: FipsAddress,
177 pub packet: Vec<u8>,
179}
180
181#[derive(Debug, Clone)]
182struct IdentityCacheEntry {
183 node_addr: NodeAddr,
184 pubkey: secp256k1::PublicKey,
185 npub: String,
186 last_seen_ms: u64,
187}
188
189impl IdentityCacheEntry {
190 fn new(
191 node_addr: NodeAddr,
192 pubkey: secp256k1::PublicKey,
193 npub: String,
194 last_seen_ms: u64,
195 ) -> Self {
196 Self {
197 node_addr,
198 pubkey,
199 npub,
200 last_seen_ms,
201 }
202 }
203}
204
205#[derive(Debug)]
207pub struct ExternalPacketIo {
208 pub outbound_tx: crate::upper::tun::TunOutboundTx,
210 pub inbound_rx: tokio::sync::mpsc::Receiver<NodeDeliveredPacket>,
212}
213
214#[derive(Debug)]
216pub(crate) struct EndpointDataIo {
217 pub(crate) command_tx: tokio::sync::mpsc::Sender<NodeEndpointCommand>,
226 pub(crate) event_rx: tokio::sync::mpsc::UnboundedReceiver<NodeEndpointEvent>,
236 pub(crate) event_tx: tokio::sync::mpsc::UnboundedSender<NodeEndpointEvent>,
242}
243
244fn endpoint_data_command_capacity(requested: usize) -> usize {
245 if let Ok(raw) = std::env::var("FIPS_ENDPOINT_DATA_QUEUE_CAP")
246 && let Ok(value) = raw.trim().parse::<usize>()
247 && value > 0
248 {
249 return value;
250 }
251
252 requested.max(1).max(32_768)
253}
254
255#[derive(Debug)]
257pub(crate) enum NodeEndpointCommand {
258 Send {
262 remote: PeerIdentity,
263 payload: Vec<u8>,
264 queued_at: Option<std::time::Instant>,
265 response_tx: tokio::sync::oneshot::Sender<Result<(), NodeError>>,
266 },
267 SendOneway {
273 remote: PeerIdentity,
274 payload: Vec<u8>,
275 queued_at: Option<std::time::Instant>,
276 },
277 PeerSnapshot {
278 response_tx: tokio::sync::oneshot::Sender<Vec<NodeEndpointPeer>>,
279 },
280 RelaySnapshot {
281 response_tx: tokio::sync::oneshot::Sender<Vec<NodeEndpointRelayStatus>>,
282 },
283 UpdateRelays {
284 advert_relays: Vec<String>,
285 dm_relays: Vec<String>,
286 response_tx: tokio::sync::oneshot::Sender<Result<(), NodeError>>,
287 },
288 UpdatePeers {
294 peers: Vec<crate::config::PeerConfig>,
295 response_tx: tokio::sync::oneshot::Sender<Result<UpdatePeersOutcome, NodeError>>,
296 },
297}
298
299#[derive(Debug, Clone, Default, PartialEq, Eq)]
301pub(crate) struct UpdatePeersOutcome {
302 pub(crate) added: usize,
303 pub(crate) removed: usize,
304 pub(crate) updated: usize,
305 pub(crate) unchanged: usize,
306}
307
308#[derive(Debug)]
310pub(crate) enum NodeEndpointEvent {
311 Data {
312 source_node_addr: NodeAddr,
313 source_npub: Option<String>,
314 payload: Vec<u8>,
315 queued_at: Option<std::time::Instant>,
316 },
317}
318
319#[derive(Debug, Clone, PartialEq, Eq)]
321pub(crate) struct NodeEndpointPeer {
322 pub(crate) npub: String,
323 pub(crate) transport_addr: Option<String>,
324 pub(crate) transport_type: Option<String>,
325 pub(crate) link_id: u64,
326 pub(crate) srtt_ms: Option<u64>,
327 pub(crate) packets_sent: u64,
328 pub(crate) packets_recv: u64,
329 pub(crate) bytes_sent: u64,
330 pub(crate) bytes_recv: u64,
331}
332
333#[derive(Debug, Clone, PartialEq, Eq)]
335pub(crate) struct NodeEndpointRelayStatus {
336 pub(crate) url: String,
337 pub(crate) status: String,
338}
339
340#[derive(Clone, Copy, Debug, PartialEq, Eq)]
342pub enum NodeState {
343 Created,
345 Starting,
347 Running,
349 Stopping,
351 Stopped,
353}
354
355impl NodeState {
356 pub fn is_operational(&self) -> bool {
358 matches!(self, NodeState::Running)
359 }
360
361 pub fn can_start(&self) -> bool {
363 matches!(self, NodeState::Created | NodeState::Stopped)
364 }
365
366 pub fn can_stop(&self) -> bool {
368 matches!(self, NodeState::Running)
369 }
370}
371
372impl fmt::Display for NodeState {
373 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
374 let s = match self {
375 NodeState::Created => "created",
376 NodeState::Starting => "starting",
377 NodeState::Running => "running",
378 NodeState::Stopping => "stopping",
379 NodeState::Stopped => "stopped",
380 };
381 write!(f, "{}", s)
382 }
383}
384
385#[derive(Clone, Debug)]
392pub(crate) struct RecentRequest {
393 pub(crate) from_peer: NodeAddr,
395 pub(crate) timestamp_ms: u64,
397 pub(crate) response_forwarded: bool,
401}
402
403impl RecentRequest {
404 pub(crate) fn new(from_peer: NodeAddr, timestamp_ms: u64) -> Self {
405 Self {
406 from_peer,
407 timestamp_ms,
408 response_forwarded: false,
409 }
410 }
411
412 pub(crate) fn is_expired(&self, current_time_ms: u64, expiry_ms: u64) -> bool {
414 current_time_ms.saturating_sub(self.timestamp_ms) > expiry_ms
415 }
416}
417
418type AddrKey = (TransportId, TransportAddr);
420
421#[derive(Debug, Default)]
426struct TransportDropState {
427 prev_drops: u64,
429 dropping: bool,
431}
432
433struct PendingConnect {
439 link_id: LinkId,
441 transport_id: TransportId,
443 remote_addr: TransportAddr,
445 peer_identity: PeerIdentity,
447}
448
449pub struct Node {
463 identity: Identity,
466
467 startup_epoch: [u8; 8],
470
471 started_at: std::time::Instant,
473
474 config: Config,
477
478 state: NodeState,
481
482 is_leaf_only: bool,
484
485 tree_state: TreeState,
488
489 bloom_state: BloomState,
492
493 coord_cache: CoordCache,
496 learned_routes: LearnedRouteTable,
498 recent_requests: HashMap<u64, RecentRequest>,
501 path_mtu_lookup: Arc<std::sync::RwLock<HashMap<crate::FipsAddress, u16>>>,
507
508 transports: HashMap<TransportId, TransportHandle>,
511 transport_drops: HashMap<TransportId, TransportDropState>,
513 links: HashMap<LinkId, Link>,
515 addr_to_link: HashMap<AddrKey, LinkId>,
517
518 packet_tx: Option<PacketTx>,
521 packet_rx: Option<PacketRx>,
523
524 connections: HashMap<LinkId, PeerConnection>,
528
529 peers: HashMap<NodeAddr, ActivePeer>,
533
534 sessions: HashMap<NodeAddr, SessionEntry>,
538
539 identity_cache: HashMap<[u8; 15], IdentityCacheEntry>,
543
544 pending_tun_packets: HashMap<NodeAddr, VecDeque<Vec<u8>>>,
548 pending_endpoint_data: HashMap<NodeAddr, VecDeque<Vec<u8>>>,
550 pending_lookups: HashMap<NodeAddr, handlers::discovery::PendingLookup>,
554
555 max_connections: usize,
558 max_peers: usize,
560 max_links: usize,
562
563 next_link_id: u64,
566 next_transport_id: u32,
568
569 stats: stats::NodeStats,
572
573 stats_history: stats_history::StatsHistory,
575
576 tun_state: TunState,
579 tun_name: Option<String>,
581 tun_tx: Option<TunTx>,
583 tun_outbound_rx: Option<TunOutboundRx>,
585 external_packet_tx: Option<tokio::sync::mpsc::Sender<NodeDeliveredPacket>>,
587 endpoint_command_rx: Option<tokio::sync::mpsc::Receiver<NodeEndpointCommand>>,
589 endpoint_event_tx: Option<tokio::sync::mpsc::UnboundedSender<NodeEndpointEvent>>,
591 encrypt_workers: Option<encrypt_worker::EncryptWorkerPool>,
597 decrypt_workers: Option<decrypt_worker::DecryptWorkerPool>,
600 decrypt_registered_sessions: std::collections::HashSet<(TransportId, u32)>,
609 decrypt_fallback_rx:
613 Option<tokio::sync::mpsc::UnboundedReceiver<decrypt_worker::DecryptWorkerEvent>>,
614 decrypt_fallback_tx: tokio::sync::mpsc::UnboundedSender<decrypt_worker::DecryptWorkerEvent>,
615 tun_reader_handle: Option<JoinHandle<()>>,
617 tun_writer_handle: Option<JoinHandle<()>>,
619 #[cfg(target_os = "macos")]
622 tun_shutdown_fd: Option<std::os::unix::io::RawFd>,
623
624 dns_identity_rx: Option<crate::upper::dns::DnsIdentityRx>,
627 dns_task: Option<tokio::task::JoinHandle<()>>,
629
630 index_allocator: IndexAllocator,
633 peers_by_index: HashMap<(TransportId, u32), NodeAddr>,
636 pending_outbound: HashMap<(TransportId, u32), LinkId>,
639
640 msg1_rate_limiter: HandshakeRateLimiter,
643 icmp_rate_limiter: IcmpRateLimiter,
645 routing_error_rate_limiter: RoutingErrorRateLimiter,
647 coords_response_rate_limiter: RoutingErrorRateLimiter,
649 discovery_backoff: DiscoveryBackoff,
651 discovery_forward_limiter: DiscoveryForwardRateLimiter,
653
654 pending_connects: Vec<PendingConnect>,
660
661 retry_pending: HashMap<NodeAddr, retry::RetryState>,
667
668 nostr_discovery: Option<Arc<crate::discovery::nostr::NostrDiscovery>>,
670 lan_discovery: Option<Arc<crate::discovery::lan::LanDiscovery>>,
675 local_instance_registry: Option<crate::discovery::local::LocalInstanceRegistry>,
679 local_instance_started_at_ms: Option<u64>,
680 last_local_instance_publish_ms: Option<u64>,
681 last_local_instance_scan_ms: Option<u64>,
682 nostr_discovery_started_at_ms: Option<u64>,
687 startup_open_discovery_sweep_done: bool,
691 bootstrap_transports: HashSet<TransportId>,
693 bootstrap_transport_npubs: HashMap<TransportId, String>,
700 discovery_fallback_transit_blocked_peers: HashSet<NodeAddr>,
703
704 last_parent_reeval: Option<crate::time::Instant>,
707
708 last_congestion_log: Option<std::time::Instant>,
711
712 estimated_mesh_size: Option<u64>,
715 last_mesh_size_log: Option<std::time::Instant>,
717
718 last_self_warn: Option<std::time::Instant>,
724
725 last_local_send_failure_at: Option<std::time::Instant>,
733
734 peer_aliases: HashMap<NodeAddr, String>,
738 configured_peer_send_weights: HashMap<NodeAddr, u8>,
741
742 peer_acl: acl::PeerAclReloader,
744
745 host_map: Arc<HostMap>,
749}
750
751impl Node {
752 pub fn new(config: Config) -> Result<Self, NodeError> {
754 config.validate()?;
755 let identity = config.create_identity()?;
756 let node_addr = *identity.node_addr();
757 let is_leaf_only = config.is_leaf_only();
758
759 let (decrypt_fallback_tx, decrypt_fallback_rx) = tokio::sync::mpsc::unbounded_channel();
760 let decrypt_fallback_rx = Some(decrypt_fallback_rx);
761
762 let mut startup_epoch = [0u8; 8];
763 rand::rng().fill_bytes(&mut startup_epoch);
764
765 let mut bloom_state = if is_leaf_only {
766 BloomState::leaf_only(node_addr)
767 } else {
768 BloomState::new(node_addr)
769 };
770 bloom_state.set_update_debounce_ms(config.node.bloom.update_debounce_ms);
771
772 let tun_state = if config.tun.enabled {
773 TunState::Configured
774 } else {
775 TunState::Disabled
776 };
777
778 let mut tree_state = TreeState::new(node_addr);
780 tree_state.set_parent_hysteresis(config.node.tree.parent_hysteresis);
781 tree_state.set_hold_down(config.node.tree.hold_down_secs);
782 tree_state.set_flap_dampening(
783 config.node.tree.flap_threshold,
784 config.node.tree.flap_window_secs,
785 config.node.tree.flap_dampening_secs,
786 );
787 tree_state
788 .sign_declaration(&identity)
789 .expect("signing own declaration should never fail");
790
791 let coord_cache = CoordCache::new(
792 config.node.cache.coord_size,
793 config.node.cache.coord_ttl_secs * 1000,
794 );
795 let rl = &config.node.rate_limit;
796 let msg1_rate_limiter = HandshakeRateLimiter::with_params(
797 rate_limit::TokenBucket::with_params(rl.handshake_burst, rl.handshake_rate),
798 config.node.limits.max_pending_inbound,
799 );
800
801 let max_connections = config.node.limits.max_connections;
802 let max_peers = config.node.limits.max_peers;
803 let max_links = config.node.limits.max_links;
804 let coords_response_interval_ms = config.node.session.coords_response_interval_ms;
805 let backoff_base_secs = config.node.discovery.backoff_base_secs;
806 let backoff_max_secs = config.node.discovery.backoff_max_secs;
807 let forward_min_interval_secs = config.node.discovery.forward_min_interval_secs;
808
809 let (host_map, peer_acl) = Self::host_map_and_peer_acl(&config);
810 let configured_peer_send_weights = Self::configured_peer_send_weights(&config);
811
812 Ok(Self {
813 identity,
814 startup_epoch,
815 started_at: std::time::Instant::now(),
816 config,
817 state: NodeState::Created,
818 is_leaf_only,
819 tree_state,
820 bloom_state,
821 coord_cache,
822 learned_routes: LearnedRouteTable::default(),
823 recent_requests: HashMap::new(),
824 transports: HashMap::new(),
825 transport_drops: HashMap::new(),
826 links: HashMap::new(),
827 addr_to_link: HashMap::new(),
828 packet_tx: None,
829 packet_rx: None,
830 connections: HashMap::new(),
831 peers: HashMap::new(),
832 sessions: HashMap::new(),
833 identity_cache: HashMap::new(),
834 pending_tun_packets: HashMap::new(),
835 pending_endpoint_data: HashMap::new(),
836 pending_lookups: HashMap::new(),
837 max_connections,
838 max_peers,
839 max_links,
840 next_link_id: 1,
841 next_transport_id: 1,
842 stats: stats::NodeStats::new(),
843 stats_history: stats_history::StatsHistory::new(),
844 tun_state,
845 tun_name: None,
846 tun_tx: None,
847 tun_outbound_rx: None,
848 external_packet_tx: None,
849 endpoint_command_rx: None,
850 endpoint_event_tx: None,
851 encrypt_workers: None,
852 decrypt_workers: None,
853 decrypt_registered_sessions: std::collections::HashSet::new(),
854 decrypt_fallback_tx,
855 decrypt_fallback_rx,
856 tun_reader_handle: None,
857 tun_writer_handle: None,
858 #[cfg(target_os = "macos")]
859 tun_shutdown_fd: None,
860 dns_identity_rx: None,
861 dns_task: None,
862 index_allocator: IndexAllocator::new(),
863 peers_by_index: HashMap::new(),
864 pending_outbound: HashMap::new(),
865 msg1_rate_limiter,
866 icmp_rate_limiter: IcmpRateLimiter::new(),
867 routing_error_rate_limiter: RoutingErrorRateLimiter::new(),
868 coords_response_rate_limiter: RoutingErrorRateLimiter::with_interval(
869 std::time::Duration::from_millis(coords_response_interval_ms),
870 ),
871 discovery_backoff: DiscoveryBackoff::with_params(backoff_base_secs, backoff_max_secs),
872 discovery_forward_limiter: DiscoveryForwardRateLimiter::with_interval(
873 std::time::Duration::from_secs(forward_min_interval_secs),
874 ),
875 pending_connects: Vec::new(),
876 retry_pending: HashMap::new(),
877 nostr_discovery: None,
878 nostr_discovery_started_at_ms: None,
879 lan_discovery: None,
880 local_instance_registry: None,
881 local_instance_started_at_ms: None,
882 last_local_instance_publish_ms: None,
883 last_local_instance_scan_ms: None,
884 startup_open_discovery_sweep_done: false,
885 bootstrap_transports: HashSet::new(),
886 bootstrap_transport_npubs: HashMap::new(),
887 discovery_fallback_transit_blocked_peers: HashSet::new(),
888 last_parent_reeval: None,
889 last_congestion_log: None,
890 estimated_mesh_size: None,
891 last_mesh_size_log: None,
892 last_self_warn: None,
893 last_local_send_failure_at: None,
894 peer_aliases: HashMap::new(),
895 configured_peer_send_weights,
896 peer_acl,
897 host_map,
898 path_mtu_lookup: Arc::new(std::sync::RwLock::new(HashMap::new())),
899 })
900 }
901
902 pub fn with_identity(identity: Identity, config: Config) -> Result<Self, NodeError> {
907 config.validate()?;
908 let node_addr = *identity.node_addr();
909
910 let (decrypt_fallback_tx, decrypt_fallback_rx) = tokio::sync::mpsc::unbounded_channel();
911 let decrypt_fallback_rx = Some(decrypt_fallback_rx);
912
913 let mut startup_epoch = [0u8; 8];
914 rand::rng().fill_bytes(&mut startup_epoch);
915
916 let tun_state = if config.tun.enabled {
917 TunState::Configured
918 } else {
919 TunState::Disabled
920 };
921
922 let mut tree_state = TreeState::new(node_addr);
924 tree_state.set_parent_hysteresis(config.node.tree.parent_hysteresis);
925 tree_state.set_hold_down(config.node.tree.hold_down_secs);
926 tree_state.set_flap_dampening(
927 config.node.tree.flap_threshold,
928 config.node.tree.flap_window_secs,
929 config.node.tree.flap_dampening_secs,
930 );
931 tree_state
932 .sign_declaration(&identity)
933 .expect("signing own declaration should never fail");
934
935 let mut bloom_state = BloomState::new(node_addr);
936 bloom_state.set_update_debounce_ms(config.node.bloom.update_debounce_ms);
937
938 let coord_cache = CoordCache::new(
939 config.node.cache.coord_size,
940 config.node.cache.coord_ttl_secs * 1000,
941 );
942 let rl = &config.node.rate_limit;
943 let msg1_rate_limiter = HandshakeRateLimiter::with_params(
944 rate_limit::TokenBucket::with_params(rl.handshake_burst, rl.handshake_rate),
945 config.node.limits.max_pending_inbound,
946 );
947
948 let max_connections = config.node.limits.max_connections;
949 let max_peers = config.node.limits.max_peers;
950 let max_links = config.node.limits.max_links;
951 let coords_response_interval_ms = config.node.session.coords_response_interval_ms;
952
953 let (host_map, peer_acl) = Self::host_map_and_peer_acl(&config);
954 let configured_peer_send_weights = Self::configured_peer_send_weights(&config);
955
956 Ok(Self {
957 identity,
958 startup_epoch,
959 started_at: std::time::Instant::now(),
960 config,
961 state: NodeState::Created,
962 is_leaf_only: false,
963 tree_state,
964 bloom_state,
965 coord_cache,
966 learned_routes: LearnedRouteTable::default(),
967 recent_requests: HashMap::new(),
968 transports: HashMap::new(),
969 transport_drops: HashMap::new(),
970 links: HashMap::new(),
971 addr_to_link: HashMap::new(),
972 packet_tx: None,
973 packet_rx: None,
974 connections: HashMap::new(),
975 peers: HashMap::new(),
976 sessions: HashMap::new(),
977 identity_cache: HashMap::new(),
978 pending_tun_packets: HashMap::new(),
979 pending_endpoint_data: HashMap::new(),
980 pending_lookups: HashMap::new(),
981 max_connections,
982 max_peers,
983 max_links,
984 next_link_id: 1,
985 next_transport_id: 1,
986 stats: stats::NodeStats::new(),
987 stats_history: stats_history::StatsHistory::new(),
988 tun_state,
989 tun_name: None,
990 tun_tx: None,
991 tun_outbound_rx: None,
992 external_packet_tx: None,
993 endpoint_command_rx: None,
994 endpoint_event_tx: None,
995 encrypt_workers: None,
996 decrypt_workers: None,
997 decrypt_registered_sessions: std::collections::HashSet::new(),
998 decrypt_fallback_tx,
999 decrypt_fallback_rx,
1000 tun_reader_handle: None,
1001 tun_writer_handle: None,
1002 #[cfg(target_os = "macos")]
1003 tun_shutdown_fd: None,
1004 dns_identity_rx: None,
1005 dns_task: None,
1006 index_allocator: IndexAllocator::new(),
1007 peers_by_index: HashMap::new(),
1008 pending_outbound: HashMap::new(),
1009 msg1_rate_limiter,
1010 icmp_rate_limiter: IcmpRateLimiter::new(),
1011 routing_error_rate_limiter: RoutingErrorRateLimiter::new(),
1012 coords_response_rate_limiter: RoutingErrorRateLimiter::with_interval(
1013 std::time::Duration::from_millis(coords_response_interval_ms),
1014 ),
1015 discovery_backoff: DiscoveryBackoff::new(),
1016 discovery_forward_limiter: DiscoveryForwardRateLimiter::new(),
1017 pending_connects: Vec::new(),
1018 retry_pending: HashMap::new(),
1019 nostr_discovery: None,
1020 nostr_discovery_started_at_ms: None,
1021 lan_discovery: None,
1022 local_instance_registry: None,
1023 local_instance_started_at_ms: None,
1024 last_local_instance_publish_ms: None,
1025 last_local_instance_scan_ms: None,
1026 startup_open_discovery_sweep_done: false,
1027 bootstrap_transports: HashSet::new(),
1028 bootstrap_transport_npubs: HashMap::new(),
1029 discovery_fallback_transit_blocked_peers: HashSet::new(),
1030 last_parent_reeval: None,
1031 last_congestion_log: None,
1032 estimated_mesh_size: None,
1033 last_mesh_size_log: None,
1034 last_self_warn: None,
1035 last_local_send_failure_at: None,
1036 peer_aliases: HashMap::new(),
1037 configured_peer_send_weights,
1038 peer_acl,
1039 host_map,
1040 path_mtu_lookup: Arc::new(std::sync::RwLock::new(HashMap::new())),
1041 })
1042 }
1043
1044 pub fn leaf_only(config: Config) -> Result<Self, NodeError> {
1046 let mut node = Self::new(config)?;
1047 node.is_leaf_only = true;
1048 node.bloom_state = BloomState::leaf_only(*node.identity.node_addr());
1049 Ok(node)
1050 }
1051
1052 fn host_map_and_peer_acl(config: &Config) -> (Arc<HostMap>, acl::PeerAclReloader) {
1053 let base_host_map = HostMap::from_peer_configs(config.peers());
1054 if !config.node.system_files_enabled {
1055 return (
1056 Arc::new(base_host_map.clone()),
1057 acl::PeerAclReloader::memory_only(base_host_map),
1058 );
1059 }
1060
1061 let mut host_map = base_host_map.clone();
1062 let hosts_path = std::path::PathBuf::from(crate::upper::hosts::DEFAULT_HOSTS_PATH);
1063 let hosts_file = HostMap::load_hosts_file(std::path::Path::new(
1064 crate::upper::hosts::DEFAULT_HOSTS_PATH,
1065 ));
1066 host_map.merge(hosts_file);
1067 let peer_acl = acl::PeerAclReloader::with_alias_sources(
1068 std::path::PathBuf::from(acl::DEFAULT_PEERS_ALLOW_PATH),
1069 std::path::PathBuf::from(acl::DEFAULT_PEERS_DENY_PATH),
1070 base_host_map,
1071 hosts_path,
1072 );
1073 (Arc::new(host_map), peer_acl)
1074 }
1075
1076 fn configured_peer_send_weights(config: &Config) -> HashMap<NodeAddr, u8> {
1077 config
1078 .peers()
1079 .iter()
1080 .filter_map(|peer| {
1081 PeerIdentity::from_npub(&peer.npub).ok().map(|identity| {
1082 (
1083 *identity.node_addr(),
1084 encrypt_worker::EXPLICIT_PEER_SEND_WEIGHT,
1085 )
1086 })
1087 })
1088 .collect()
1089 }
1090
1091 fn send_weight_for_peer(&self, peer_addr: &NodeAddr) -> u8 {
1092 self.configured_peer_send_weights
1093 .get(peer_addr)
1094 .copied()
1095 .unwrap_or(encrypt_worker::DEFAULT_SEND_WEIGHT)
1096 }
1097
1098 async fn create_transports(&mut self, packet_tx: &PacketTx) -> Vec<TransportHandle> {
1102 let mut transports = Vec::new();
1103
1104 let udp_instances: Vec<_> = self
1106 .config
1107 .transports
1108 .udp
1109 .iter()
1110 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1111 .collect();
1112
1113 for (name, udp_config) in udp_instances {
1115 let transport_id = self.allocate_transport_id();
1116 let udp = UdpTransport::new(transport_id, name, udp_config, packet_tx.clone());
1117 transports.push(TransportHandle::Udp(udp));
1118 }
1119
1120 #[cfg(feature = "sim-transport")]
1121 {
1122 let sim_instances: Vec<_> = self
1123 .config
1124 .transports
1125 .sim
1126 .iter()
1127 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1128 .collect();
1129
1130 for (name, sim_config) in sim_instances {
1131 let transport_id = self.allocate_transport_id();
1132 let sim = crate::transport::sim::SimTransport::new(
1133 transport_id,
1134 name,
1135 sim_config,
1136 packet_tx.clone(),
1137 );
1138 transports.push(TransportHandle::Sim(sim));
1139 }
1140 }
1141
1142 #[cfg(any(target_os = "linux", target_os = "macos"))]
1144 {
1145 let eth_instances: Vec<_> = self
1146 .config
1147 .transports
1148 .ethernet
1149 .iter()
1150 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1151 .collect();
1152 let xonly = self.identity.pubkey();
1153 for (name, eth_config) in eth_instances {
1154 let mut eth_config = eth_config;
1155 if eth_config.discovery_scope.is_none() {
1156 eth_config.discovery_scope = self.lan_discovery_scope();
1157 }
1158 let transport_id = self.allocate_transport_id();
1159 let mut eth =
1160 EthernetTransport::new(transport_id, name, eth_config, packet_tx.clone());
1161 eth.set_local_pubkey(xonly);
1162 transports.push(TransportHandle::Ethernet(eth));
1163 }
1164 }
1165
1166 let tcp_instances: Vec<_> = self
1168 .config
1169 .transports
1170 .tcp
1171 .iter()
1172 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1173 .collect();
1174
1175 for (name, tcp_config) in tcp_instances {
1176 let transport_id = self.allocate_transport_id();
1177 let tcp = TcpTransport::new(transport_id, name, tcp_config, packet_tx.clone());
1178 transports.push(TransportHandle::Tcp(tcp));
1179 }
1180
1181 let tor_instances: Vec<_> = self
1183 .config
1184 .transports
1185 .tor
1186 .iter()
1187 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1188 .collect();
1189
1190 for (name, tor_config) in tor_instances {
1191 let transport_id = self.allocate_transport_id();
1192 let tor = TorTransport::new(transport_id, name, tor_config, packet_tx.clone());
1193 transports.push(TransportHandle::Tor(tor));
1194 }
1195
1196 let webrtc_instances: Vec<_> = self
1197 .config
1198 .transports
1199 .webrtc
1200 .iter()
1201 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1202 .collect();
1203
1204 #[cfg(feature = "webrtc-transport")]
1205 {
1206 for (name, webrtc_config) in webrtc_instances {
1207 let transport_id = self.allocate_transport_id();
1208 match WebRtcTransport::new(
1209 transport_id,
1210 name,
1211 webrtc_config,
1212 packet_tx.clone(),
1213 &self.identity,
1214 &self.config.node.discovery.nostr,
1215 ) {
1216 Ok(webrtc) => transports.push(TransportHandle::WebRtc(Box::new(webrtc))),
1217 Err(err) => {
1218 warn!(
1219 transport_id = %transport_id,
1220 error = %err,
1221 "failed to initialize WebRTC transport"
1222 );
1223 }
1224 }
1225 }
1226 }
1227 #[cfg(not(feature = "webrtc-transport"))]
1228 if !webrtc_instances.is_empty() {
1229 warn!("WebRTC transport configured but this build lacks WebRTC transport support");
1230 }
1231
1232 #[cfg(bluer_available)]
1234 {
1235 let ble_instances: Vec<_> = self
1236 .config
1237 .transports
1238 .ble
1239 .iter()
1240 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1241 .collect();
1242
1243 #[cfg(all(bluer_available, not(test)))]
1244 for (name, ble_config) in ble_instances {
1245 let transport_id = self.allocate_transport_id();
1246 let adapter = ble_config.adapter().to_string();
1247 let mtu = ble_config.mtu();
1248 match crate::transport::ble::io::BluerIo::new(&adapter, mtu).await {
1249 Ok(io) => {
1250 let mut ble = crate::transport::ble::BleTransport::new(
1251 transport_id,
1252 name,
1253 ble_config,
1254 io,
1255 packet_tx.clone(),
1256 );
1257 ble.set_local_pubkey(self.identity.pubkey().serialize());
1258 transports.push(TransportHandle::Ble(ble));
1259 }
1260 Err(e) => {
1261 tracing::warn!(adapter = %adapter, error = %e, "failed to initialize BLE adapter");
1262 }
1263 }
1264 }
1265
1266 #[cfg(any(not(bluer_available), test))]
1267 if !ble_instances.is_empty() {
1268 #[cfg(not(test))]
1269 tracing::warn!("BLE transport configured but this build lacks BlueZ support");
1270 }
1271 }
1272
1273 transports
1274 }
1275
1276 fn find_transport_for_type(&self, transport_type: &str) -> Option<TransportId> {
1286 self.transports
1287 .iter()
1288 .filter(|(id, handle)| {
1289 handle.transport_type().name == transport_type
1290 && handle.is_operational()
1291 && !self.bootstrap_transports.contains(id)
1292 })
1293 .min_by_key(|(id, _)| id.as_u32())
1294 .map(|(id, _)| *id)
1295 }
1296
1297 #[allow(unused_variables)]
1303 fn resolve_ethernet_addr(
1304 &self,
1305 addr_str: &str,
1306 ) -> Result<(TransportId, TransportAddr), NodeError> {
1307 #[cfg(any(target_os = "linux", target_os = "macos"))]
1308 {
1309 let (iface, mac_str) = addr_str.split_once('/').ok_or_else(|| {
1310 NodeError::NoTransportForType(format!(
1311 "invalid Ethernet address format '{}': expected 'interface/mac'",
1312 addr_str
1313 ))
1314 })?;
1315
1316 let transport_id = self
1318 .transports
1319 .iter()
1320 .find(|(_, handle)| {
1321 handle.transport_type().name == "ethernet"
1322 && handle.is_operational()
1323 && handle.interface_name() == Some(iface)
1324 })
1325 .map(|(id, _)| *id)
1326 .ok_or_else(|| {
1327 NodeError::NoTransportForType(format!(
1328 "no operational Ethernet transport for interface '{}'",
1329 iface
1330 ))
1331 })?;
1332
1333 let mac = crate::transport::ethernet::parse_mac_string(mac_str).map_err(|e| {
1334 NodeError::NoTransportForType(format!("invalid MAC in '{}': {}", addr_str, e))
1335 })?;
1336
1337 Ok((transport_id, TransportAddr::from_bytes(&mac)))
1338 }
1339 #[cfg(not(any(target_os = "linux", target_os = "macos")))]
1340 {
1341 Err(NodeError::NoTransportForType(
1342 "Ethernet transport is not supported on this platform".to_string(),
1343 ))
1344 }
1345 }
1346
1347 #[cfg(bluer_available)]
1351 fn resolve_ble_addr(&self, addr_str: &str) -> Result<(TransportId, TransportAddr), NodeError> {
1352 let ta = TransportAddr::from_string(addr_str);
1353 let adapter = crate::transport::ble::addr::adapter_from_addr(&ta).ok_or_else(|| {
1354 NodeError::NoTransportForType(format!(
1355 "invalid BLE address format '{}': expected 'adapter/mac'",
1356 addr_str
1357 ))
1358 })?;
1359
1360 let transport_id = self
1362 .transports
1363 .iter()
1364 .find(|(_, handle)| handle.transport_type().name == "ble" && handle.is_operational())
1365 .map(|(id, _)| *id)
1366 .ok_or_else(|| {
1367 NodeError::NoTransportForType(format!(
1368 "no operational BLE transport for adapter '{}'",
1369 adapter
1370 ))
1371 })?;
1372
1373 crate::transport::ble::addr::BleAddr::parse(addr_str).map_err(|e| {
1375 NodeError::NoTransportForType(format!("invalid BLE address '{}': {}", addr_str, e))
1376 })?;
1377
1378 Ok((transport_id, TransportAddr::from_string(addr_str)))
1379 }
1380
1381 pub fn identity(&self) -> &Identity {
1385 &self.identity
1386 }
1387
1388 pub fn node_addr(&self) -> &NodeAddr {
1390 self.identity.node_addr()
1391 }
1392
1393 pub fn npub(&self) -> String {
1395 self.identity.npub()
1396 }
1397
1398 pub(crate) fn peer_display_name(&self, addr: &NodeAddr) -> String {
1407 if let Some(hostname) = self.host_map.lookup_hostname(addr) {
1408 return hostname.to_string();
1409 }
1410 if let Some(name) = self.peer_aliases.get(addr) {
1411 return name.clone();
1412 }
1413 if let Some(peer) = self.peers.get(addr) {
1414 return peer.identity().short_npub();
1415 }
1416 if let Some(entry) = self.sessions.get(addr) {
1417 let (xonly, _) = entry.remote_pubkey().x_only_public_key();
1418 return PeerIdentity::from_pubkey(xonly).short_npub();
1419 }
1420 addr.short_hex()
1421 }
1422
1423 pub(in crate::node) fn deregister_session_index(&mut self, cache_key: (TransportId, u32)) {
1435 let owning_peer = self.peers_by_index.get(&cache_key).copied();
1439 self.peers_by_index.remove(&cache_key);
1440 if self.decrypt_registered_sessions.remove(&cache_key)
1441 && let Some(workers) = self.decrypt_workers.as_ref()
1442 {
1443 workers.unregister_session(cache_key);
1444 }
1445 if let Some(peer_addr) = owning_peer {
1456 let peer_has_other_index = self
1457 .peers_by_index
1458 .values()
1459 .any(|other| *other == peer_addr);
1460 if !peer_has_other_index {
1461 self.clear_connected_udp_for_peer(&peer_addr);
1462 }
1463 }
1464 }
1465
1466 pub(in crate::node) fn ensure_current_session_index_registered(
1475 &mut self,
1476 node_addr: &NodeAddr,
1477 context: &'static str,
1478 ) -> bool {
1479 let Some(peer) = self.peers.get(node_addr) else {
1480 return false;
1481 };
1482 let Some(transport_id) = peer.transport_id() else {
1483 warn!(
1484 peer = %self.peer_display_name(node_addr),
1485 context,
1486 "Cannot register current session index without transport id"
1487 );
1488 return false;
1489 };
1490 let Some(our_index) = peer.our_index() else {
1491 warn!(
1492 peer = %self.peer_display_name(node_addr),
1493 context,
1494 "Cannot register current session index without local index"
1495 );
1496 return false;
1497 };
1498
1499 let cache_key = (transport_id, our_index.as_u32());
1500 match self.peers_by_index.get(&cache_key).copied() {
1501 Some(existing) if existing == *node_addr => true,
1502 Some(existing) => {
1503 warn!(
1504 peer = %self.peer_display_name(node_addr),
1505 previous_owner = %self.peer_display_name(&existing),
1506 transport_id = %transport_id,
1507 our_index = %our_index,
1508 context,
1509 "Repairing current session index with stale owner"
1510 );
1511 self.peers_by_index.insert(cache_key, *node_addr);
1512 true
1513 }
1514 None => {
1515 warn!(
1516 peer = %self.peer_display_name(node_addr),
1517 transport_id = %transport_id,
1518 our_index = %our_index,
1519 context,
1520 "Repairing missing current session index"
1521 );
1522 self.peers_by_index.insert(cache_key, *node_addr);
1523 true
1524 }
1525 }
1526 }
1527
1528 pub fn config(&self) -> &Config {
1532 &self.config
1533 }
1534
1535 pub fn effective_ipv6_mtu(&self) -> u16 {
1541 crate::upper::icmp::effective_ipv6_mtu(self.transport_mtu())
1542 }
1543
1544 pub fn transport_mtu(&self) -> u16 {
1561 let min_operational = self
1562 .transports
1563 .values()
1564 .filter(|h| h.is_operational())
1565 .map(|h| h.mtu())
1566 .min();
1567 if let Some(mtu) = min_operational {
1568 return mtu;
1569 }
1570 if let Some((_, cfg)) = self.config.transports.udp.iter().next() {
1572 return cfg.mtu();
1573 }
1574 1280
1575 }
1576
1577 pub fn state(&self) -> NodeState {
1581 self.state
1582 }
1583
1584 pub fn uptime(&self) -> std::time::Duration {
1586 self.started_at.elapsed()
1587 }
1588
1589 pub fn is_running(&self) -> bool {
1591 self.state.is_operational()
1592 }
1593
1594 pub fn is_leaf_only(&self) -> bool {
1596 self.is_leaf_only
1597 }
1598
1599 pub fn tree_state(&self) -> &TreeState {
1603 &self.tree_state
1604 }
1605
1606 pub fn tree_state_mut(&mut self) -> &mut TreeState {
1608 &mut self.tree_state
1609 }
1610
1611 pub fn bloom_state(&self) -> &BloomState {
1615 &self.bloom_state
1616 }
1617
1618 pub fn bloom_state_mut(&mut self) -> &mut BloomState {
1620 &mut self.bloom_state
1621 }
1622
1623 pub fn estimated_mesh_size(&self) -> Option<u64> {
1627 self.estimated_mesh_size
1628 }
1629
1630 pub(crate) fn compute_mesh_size(&mut self) {
1636 let my_addr = *self.tree_state.my_node_addr();
1637 let parent_id = *self.tree_state.my_declaration().parent_id();
1638 let is_root = self.tree_state.is_root();
1639
1640 let max_fpr = self.config.node.bloom.max_inbound_fpr;
1641 let mut total: f64 = 1.0; let mut child_count: u32 = 0;
1643 let mut has_data = false;
1644
1645 if !is_root
1651 && let Some(parent) = self.peers.get(&parent_id)
1652 && let Some(filter) = parent.inbound_filter()
1653 {
1654 match filter.estimated_count(max_fpr) {
1655 Some(n) => {
1656 total += n;
1657 has_data = true;
1658 }
1659 None => {
1660 self.estimated_mesh_size = None;
1661 return;
1662 }
1663 }
1664 }
1665
1666 for (peer_addr, peer) in &self.peers {
1668 if peer_addr == &parent_id {
1669 continue;
1670 }
1671 if let Some(decl) = self.tree_state.peer_declaration(peer_addr)
1672 && *decl.parent_id() == my_addr
1673 {
1674 child_count += 1;
1675 if let Some(filter) = peer.inbound_filter() {
1676 match filter.estimated_count(max_fpr) {
1677 Some(n) => {
1678 total += n;
1679 has_data = true;
1680 }
1681 None => {
1682 self.estimated_mesh_size = None;
1683 return;
1684 }
1685 }
1686 }
1687 }
1688 }
1689
1690 if !has_data {
1691 self.estimated_mesh_size = None;
1692 return;
1693 }
1694
1695 let size = total.round() as u64;
1696 self.estimated_mesh_size = Some(size);
1697
1698 let now = std::time::Instant::now();
1700 let should_log = match self.last_mesh_size_log {
1701 None => true,
1702 Some(last) => {
1703 now.duration_since(last)
1704 >= std::time::Duration::from_secs(self.config.node.mmp.log_interval_secs)
1705 }
1706 };
1707 if should_log {
1708 tracing::debug!(
1709 estimated_mesh_size = size,
1710 peers = self.peers.len(),
1711 children = child_count,
1712 "Mesh size estimate"
1713 );
1714 self.last_mesh_size_log = Some(now);
1715 }
1716 }
1717
1718 pub fn coord_cache(&self) -> &CoordCache {
1722 &self.coord_cache
1723 }
1724
1725 pub fn coord_cache_mut(&mut self) -> &mut CoordCache {
1727 &mut self.coord_cache
1728 }
1729
1730 pub fn stats(&self) -> &stats::NodeStats {
1734 &self.stats
1735 }
1736
1737 pub(crate) fn stats_mut(&mut self) -> &mut stats::NodeStats {
1739 &mut self.stats
1740 }
1741
1742 pub fn stats_history(&self) -> &stats_history::StatsHistory {
1744 &self.stats_history
1745 }
1746
1747 pub(crate) fn record_stats_history(&mut self) {
1750 let fwd = &self.stats.forwarding;
1751 let peers_with_mmp: Vec<f64> = self
1752 .peers
1753 .values()
1754 .filter_map(|p| p.mmp().map(|m| m.metrics.loss_rate()))
1755 .collect();
1756 let loss_rate = if peers_with_mmp.is_empty() {
1757 0.0
1758 } else {
1759 peers_with_mmp.iter().sum::<f64>() / peers_with_mmp.len() as f64
1760 };
1761
1762 let snap = stats_history::Snapshot {
1763 mesh_size: self.estimated_mesh_size,
1764 tree_depth: self.tree_state.my_coords().depth() as u32,
1765 peer_count: self.peers.len() as u64,
1766 parent_switches_total: self.stats.tree.parent_switches,
1767 bytes_in_total: fwd.received_bytes,
1768 bytes_out_total: fwd.forwarded_bytes + fwd.originated_bytes,
1769 packets_in_total: fwd.received_packets,
1770 packets_out_total: fwd.forwarded_packets + fwd.originated_packets,
1771 loss_rate,
1772 active_sessions: self.sessions.len() as u64,
1773 };
1774
1775 let now = std::time::Instant::now();
1776 let peer_snaps: Vec<stats_history::PeerSnapshot> = self
1777 .peers
1778 .values()
1779 .map(|p| {
1780 let stats = p.link_stats();
1781 let (srtt_ms, loss_rate, ecn_ce) = match p.mmp() {
1782 Some(m) => (
1783 m.metrics.srtt_ms(),
1784 Some(m.metrics.loss_rate()),
1785 m.receiver.ecn_ce_count() as u64,
1786 ),
1787 None => (None, None, 0),
1788 };
1789 stats_history::PeerSnapshot {
1790 node_addr: *p.node_addr(),
1791 last_seen: now,
1792 srtt_ms,
1793 loss_rate,
1794 bytes_in_total: stats.bytes_recv,
1795 bytes_out_total: stats.bytes_sent,
1796 packets_in_total: stats.packets_recv,
1797 packets_out_total: stats.packets_sent,
1798 ecn_ce_total: ecn_ce,
1799 }
1800 })
1801 .collect();
1802
1803 self.stats_history.tick(now, &snap, &peer_snaps);
1804 }
1805
1806 pub fn tun_state(&self) -> TunState {
1810 self.tun_state
1811 }
1812
1813 pub fn tun_name(&self) -> Option<&str> {
1815 self.tun_name.as_deref()
1816 }
1817
1818 pub fn set_max_connections(&mut self, max: usize) {
1822 self.max_connections = max;
1823 }
1824
1825 pub fn set_max_peers(&mut self, max: usize) {
1827 self.max_peers = max;
1828 }
1829
1830 pub(crate) fn outbound_admission_check(&self) -> bool {
1833 let connection_used = self
1834 .connections
1835 .len()
1836 .saturating_add(self.pending_connects.len());
1837 let peer_allowed = self.max_peers == 0 || self.peers.len() < self.max_peers;
1838 let connection_allowed =
1839 self.max_connections == 0 || connection_used < self.max_connections;
1840 let link_allowed = self.max_links == 0 || self.links.len() < self.max_links;
1841 peer_allowed && connection_allowed && link_allowed
1842 }
1843
1844 pub(crate) fn outbound_direct_refresh_admission_check(&self) -> bool {
1848 let connection_used = self
1849 .connections
1850 .len()
1851 .saturating_add(self.pending_connects.len());
1852 let connection_allowed =
1853 self.max_connections == 0 || connection_used < self.max_connections;
1854 let link_allowed = self.max_links == 0 || self.links.len() < self.max_links;
1855 connection_allowed && link_allowed
1856 }
1857
1858 pub fn set_max_links(&mut self, max: usize) {
1860 self.max_links = max;
1861 }
1862
1863 pub fn connection_count(&self) -> usize {
1867 self.connections.len()
1868 }
1869
1870 pub fn peer_count(&self) -> usize {
1872 self.peers.len()
1873 }
1874
1875 pub fn link_count(&self) -> usize {
1877 self.links.len()
1878 }
1879
1880 pub fn transport_count(&self) -> usize {
1882 self.transports.len()
1883 }
1884
1885 pub fn allocate_transport_id(&mut self) -> TransportId {
1889 let id = TransportId::new(self.next_transport_id);
1890 self.next_transport_id += 1;
1891 id
1892 }
1893
1894 pub fn get_transport(&self, id: &TransportId) -> Option<&TransportHandle> {
1896 self.transports.get(id)
1897 }
1898
1899 pub fn get_transport_mut(&mut self, id: &TransportId) -> Option<&mut TransportHandle> {
1901 self.transports.get_mut(id)
1902 }
1903
1904 pub fn transport_ids(&self) -> impl Iterator<Item = &TransportId> {
1906 self.transports.keys()
1907 }
1908
1909 pub fn packet_rx(&mut self) -> Option<&mut PacketRx> {
1911 self.packet_rx.as_mut()
1912 }
1913
1914 pub fn allocate_link_id(&mut self) -> LinkId {
1918 let id = LinkId::new(self.next_link_id);
1919 self.next_link_id += 1;
1920 id
1921 }
1922
1923 pub fn add_link(&mut self, link: Link) -> Result<(), NodeError> {
1925 if self.max_links > 0 && self.links.len() >= self.max_links {
1926 return Err(NodeError::MaxLinksExceeded {
1927 max: self.max_links,
1928 });
1929 }
1930 let link_id = link.link_id();
1931 let transport_id = link.transport_id();
1932 let remote_addr = link.remote_addr().clone();
1933
1934 self.links.insert(link_id, link);
1935 self.addr_to_link
1936 .insert((transport_id, remote_addr), link_id);
1937 Ok(())
1938 }
1939
1940 pub fn get_link(&self, link_id: &LinkId) -> Option<&Link> {
1942 self.links.get(link_id)
1943 }
1944
1945 pub fn get_link_mut(&mut self, link_id: &LinkId) -> Option<&mut Link> {
1947 self.links.get_mut(link_id)
1948 }
1949
1950 pub fn find_link_by_addr(
1952 &self,
1953 transport_id: TransportId,
1954 addr: &TransportAddr,
1955 ) -> Option<LinkId> {
1956 self.addr_to_link
1957 .get(&(transport_id, addr.clone()))
1958 .copied()
1959 }
1960
1961 pub fn remove_link(&mut self, link_id: &LinkId) -> Option<Link> {
1967 if let Some(link) = self.links.remove(link_id) {
1968 let key = (link.transport_id(), link.remote_addr().clone());
1970 if self.addr_to_link.get(&key) == Some(link_id) {
1971 self.addr_to_link.remove(&key);
1972 }
1973 Some(link)
1974 } else {
1975 None
1976 }
1977 }
1978
1979 pub(crate) fn cleanup_bootstrap_transport_if_unused(&mut self, transport_id: TransportId) {
1980 if !self.bootstrap_transports.contains(&transport_id) {
1981 return;
1982 }
1983
1984 let transport_in_use = self
1985 .links
1986 .values()
1987 .any(|link| link.transport_id() == transport_id)
1988 || self
1989 .connections
1990 .values()
1991 .any(|conn| conn.transport_id() == Some(transport_id))
1992 || self
1993 .peers
1994 .values()
1995 .any(|peer| peer.transport_id() == Some(transport_id))
1996 || self
1997 .pending_connects
1998 .iter()
1999 .any(|pending| pending.transport_id == transport_id);
2000
2001 if transport_in_use {
2002 return;
2003 }
2004
2005 tracing::debug!(
2006 transport_id = %transport_id,
2007 "bootstrap transport has no remaining references; dropping"
2008 );
2009
2010 self.bootstrap_transports.remove(&transport_id);
2011 self.bootstrap_transport_npubs.remove(&transport_id);
2012 self.transport_drops.remove(&transport_id);
2013 self.transports.remove(&transport_id);
2014 }
2015
2016 pub fn links(&self) -> impl Iterator<Item = &Link> {
2018 self.links.values()
2019 }
2020
2021 pub fn add_connection(&mut self, connection: PeerConnection) -> Result<(), NodeError> {
2025 let link_id = connection.link_id();
2026
2027 if self.connections.contains_key(&link_id) {
2028 return Err(NodeError::ConnectionAlreadyExists(link_id));
2029 }
2030
2031 if self.max_connections > 0 && self.connections.len() >= self.max_connections {
2032 return Err(NodeError::MaxConnectionsExceeded {
2033 max: self.max_connections,
2034 });
2035 }
2036
2037 self.connections.insert(link_id, connection);
2038 Ok(())
2039 }
2040
2041 pub fn get_connection(&self, link_id: &LinkId) -> Option<&PeerConnection> {
2043 self.connections.get(link_id)
2044 }
2045
2046 pub fn get_connection_mut(&mut self, link_id: &LinkId) -> Option<&mut PeerConnection> {
2048 self.connections.get_mut(link_id)
2049 }
2050
2051 pub fn remove_connection(&mut self, link_id: &LinkId) -> Option<PeerConnection> {
2053 self.connections.remove(link_id)
2054 }
2055
2056 pub fn connections(&self) -> impl Iterator<Item = &PeerConnection> {
2058 self.connections.values()
2059 }
2060
2061 pub fn get_peer(&self, node_addr: &NodeAddr) -> Option<&ActivePeer> {
2065 self.peers.get(node_addr)
2066 }
2067
2068 pub fn get_peer_mut(&mut self, node_addr: &NodeAddr) -> Option<&mut ActivePeer> {
2070 self.peers.get_mut(node_addr)
2071 }
2072
2073 pub fn remove_peer(&mut self, node_addr: &NodeAddr) -> Option<ActivePeer> {
2075 self.peers.remove(node_addr)
2076 }
2077
2078 pub fn peers(&self) -> impl Iterator<Item = &ActivePeer> {
2080 self.peers.values()
2081 }
2082
2083 pub fn nostr_discovery_handle(&self) -> Option<&crate::discovery::nostr::NostrDiscovery> {
2087 self.nostr_discovery.as_deref()
2088 }
2089
2090 pub fn peer_ids(&self) -> impl Iterator<Item = &NodeAddr> {
2092 self.peers.keys()
2093 }
2094
2095 pub fn sendable_peers(&self) -> impl Iterator<Item = &ActivePeer> {
2097 self.peers.values().filter(|p| p.can_send())
2098 }
2099
2100 pub fn sendable_peer_count(&self) -> usize {
2102 self.peers.values().filter(|p| p.can_send()).count()
2103 }
2104
2105 pub(crate) fn set_discovery_fallback_transit_allowed(
2106 &mut self,
2107 peer_addr: NodeAddr,
2108 allowed: bool,
2109 ) {
2110 if allowed {
2111 self.discovery_fallback_transit_blocked_peers
2112 .remove(&peer_addr);
2113 } else {
2114 self.discovery_fallback_transit_blocked_peers
2115 .insert(peer_addr);
2116 }
2117 }
2118
2119 pub(crate) fn configured_discovery_fallback_transit(
2120 &self,
2121 peer_addr: &NodeAddr,
2122 ) -> Option<bool> {
2123 self.configured_peer(peer_addr)
2124 .map(|peer| peer.discovery_fallback_transit)
2125 }
2126
2127 pub(crate) fn configured_peer(&self, peer_addr: &NodeAddr) -> Option<&PeerConfig> {
2128 self.config.peers().iter().find(|peer| {
2129 PeerIdentity::from_npub(&peer.npub)
2130 .ok()
2131 .is_some_and(|identity| identity.node_addr() == peer_addr)
2132 })
2133 }
2134
2135 pub(crate) fn discovery_fallback_transit_for_promotion(&self, peer_addr: &NodeAddr) -> bool {
2136 if let Some(retry_state) = self.retry_pending.get(peer_addr) {
2137 return retry_state.peer_config.discovery_fallback_transit;
2138 }
2139
2140 if let Some(allowed) = self.configured_discovery_fallback_transit(peer_addr) {
2141 return allowed;
2142 }
2143
2144 self.config.node.discovery.nostr.policy != crate::config::NostrDiscoveryPolicy::Open
2145 }
2146
2147 #[cfg(test)]
2152 pub(crate) fn disable_discovery_forward_rate_limit(&mut self) {
2153 self.discovery_forward_limiter
2154 .set_interval(std::time::Duration::ZERO);
2155 }
2156
2157 #[cfg(test)]
2158 pub(crate) fn get_session(&self, remote: &NodeAddr) -> Option<&SessionEntry> {
2159 self.sessions.get(remote)
2160 }
2161
2162 #[cfg(test)]
2164 pub(crate) fn get_session_mut(&mut self, remote: &NodeAddr) -> Option<&mut SessionEntry> {
2165 self.sessions.get_mut(remote)
2166 }
2167
2168 #[cfg(test)]
2170 pub(crate) fn remove_session(&mut self, remote: &NodeAddr) -> Option<SessionEntry> {
2171 self.sessions.remove(remote)
2172 }
2173
2174 #[cfg(test)]
2176 pub(crate) fn path_mtu_lookup_get(&self, fips_addr: &crate::FipsAddress) -> Option<u16> {
2177 self.path_mtu_lookup
2178 .read()
2179 .ok()
2180 .and_then(|map| map.get(fips_addr).copied())
2181 }
2182
2183 #[cfg(test)]
2185 pub(crate) fn path_mtu_lookup_insert(&self, fips_addr: crate::FipsAddress, mtu: u16) {
2186 if let Ok(mut map) = self.path_mtu_lookup.write() {
2187 map.insert(fips_addr, mtu);
2188 }
2189 }
2190
2191 pub fn session_count(&self) -> usize {
2193 self.sessions.len()
2194 }
2195
2196 pub(crate) fn session_entries(&self) -> impl Iterator<Item = (&NodeAddr, &SessionEntry)> {
2198 self.sessions.iter()
2199 }
2200
2201 pub(crate) fn register_identity(
2205 &mut self,
2206 node_addr: NodeAddr,
2207 pubkey: secp256k1::PublicKey,
2208 ) -> bool {
2209 let mut prefix = [0u8; 15];
2210 prefix.copy_from_slice(&node_addr.as_bytes()[0..15]);
2211 if let Some(entry) = self.identity_cache.get(&prefix)
2212 && entry.node_addr == node_addr
2213 && entry.pubkey == pubkey
2214 {
2215 return true;
2219 }
2220
2221 let (xonly, _) = pubkey.x_only_public_key();
2222 let derived_node_addr = NodeAddr::from_pubkey(&xonly);
2223 if derived_node_addr != node_addr {
2224 debug!(
2225 claimed_node_addr = %node_addr,
2226 derived_node_addr = %derived_node_addr,
2227 "Rejected identity cache entry with mismatched public key"
2228 );
2229 return false;
2230 }
2231
2232 let now_ms = Self::now_ms();
2233 if let Some(entry) = self.identity_cache.get_mut(&prefix)
2234 && entry.node_addr == node_addr
2235 {
2236 entry.pubkey = pubkey;
2237 entry.last_seen_ms = now_ms;
2238 return true;
2239 }
2240
2241 let npub = encode_npub(&xonly);
2242 self.identity_cache.insert(
2243 prefix,
2244 IdentityCacheEntry::new(node_addr, pubkey, npub, now_ms),
2245 );
2246 let max = self.config.node.cache.identity_size;
2248 if self.identity_cache.len() > max
2249 && let Some(oldest_key) = self
2250 .identity_cache
2251 .iter()
2252 .min_by_key(|(_, entry)| entry.last_seen_ms)
2253 .map(|(k, _)| *k)
2254 {
2255 self.identity_cache.remove(&oldest_key);
2256 }
2257 true
2258 }
2259
2260 pub(crate) fn lookup_by_fips_prefix(
2262 &mut self,
2263 prefix: &[u8; 15],
2264 ) -> Option<(NodeAddr, secp256k1::PublicKey)> {
2265 if let Some(entry) = self.identity_cache.get_mut(prefix) {
2266 entry.last_seen_ms = Self::now_ms(); Some((entry.node_addr, entry.pubkey))
2268 } else {
2269 None
2270 }
2271 }
2272
2273 pub(crate) fn has_cached_identity(&self, addr: &NodeAddr) -> bool {
2275 let mut prefix = [0u8; 15];
2276 prefix.copy_from_slice(&addr.as_bytes()[0..15]);
2277 self.identity_cache.contains_key(&prefix)
2278 }
2279
2280 pub fn identity_cache_len(&self) -> usize {
2282 self.identity_cache.len()
2283 }
2284
2285 pub fn identity_cache_iter(
2290 &self,
2291 ) -> impl Iterator<Item = (&NodeAddr, &secp256k1::PublicKey, u64)> {
2292 self.identity_cache
2293 .values()
2294 .map(|entry| (&entry.node_addr, &entry.pubkey, entry.last_seen_ms))
2295 }
2296
2297 pub fn identity_cache_max(&self) -> usize {
2299 self.config.node.cache.identity_size
2300 }
2301
2302 pub fn pending_lookup_count(&self) -> usize {
2304 self.pending_lookups.len()
2305 }
2306
2307 pub fn pending_lookups_iter(
2309 &self,
2310 ) -> impl Iterator<Item = (&NodeAddr, &handlers::discovery::PendingLookup)> {
2311 self.pending_lookups.iter()
2312 }
2313
2314 pub fn recent_request_count(&self) -> usize {
2316 self.recent_requests.len()
2317 }
2318
2319 pub fn pending_tun_destinations(&self) -> usize {
2321 self.pending_tun_packets.len()
2322 }
2323
2324 pub fn pending_tun_total_packets(&self) -> usize {
2326 self.pending_tun_packets.values().map(|q| q.len()).sum()
2327 }
2328
2329 pub fn retry_state_iter(&self) -> impl Iterator<Item = (&NodeAddr, &retry::RetryState)> {
2331 self.retry_pending.iter()
2332 }
2333
2334 pub(crate) fn is_tree_peer(&self, peer_addr: &NodeAddr) -> bool {
2341 if !self.tree_state.is_root() && self.tree_state.my_declaration().parent_id() == peer_addr {
2343 return true;
2344 }
2345 if let Some(decl) = self.tree_state.peer_declaration(peer_addr)
2347 && decl.parent_id() == self.node_addr()
2348 {
2349 return true;
2350 }
2351 false
2352 }
2353
2354 pub fn find_next_hop(&mut self, dest_node_addr: &NodeAddr) -> Option<&ActivePeer> {
2377 if dest_node_addr == self.node_addr() {
2379 return None;
2380 }
2381
2382 let direct_peer_can_send = self
2386 .peers
2387 .get(dest_node_addr)
2388 .is_some_and(|peer| peer.can_send());
2389 if let Some(peer) = self.peers.get(dest_node_addr)
2390 && peer.is_healthy()
2391 {
2392 return Some(peer);
2393 }
2394
2395 let now_ms = Self::now_ms();
2396
2397 let sendable_learned_peers = if self.config.node.routing.mode == RoutingMode::ReplyLearned {
2398 Some(
2399 self.peers
2400 .iter()
2401 .filter(|(_, peer)| peer.can_send())
2402 .map(|(addr, _)| *addr)
2403 .collect::<HashSet<_>>(),
2404 )
2405 } else {
2406 None
2407 };
2408
2409 let explore_fallback = sendable_learned_peers.as_ref().is_some_and(|sendable| {
2416 self.learned_routes.should_explore_fallback(
2417 dest_node_addr,
2418 now_ms,
2419 self.config.node.routing.learned_fallback_explore_interval,
2420 |addr| sendable.contains(addr),
2421 )
2422 });
2423 if let Some(sendable) = &sendable_learned_peers
2424 && !explore_fallback
2425 && let Some(next_hop_addr) =
2426 self.learned_routes
2427 .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2428 {
2429 return self.peers.get(&next_hop_addr);
2430 }
2431
2432 let Some(dest_coords) = self
2434 .coord_cache
2435 .get_and_touch(dest_node_addr, now_ms)
2436 .cloned()
2437 else {
2438 if let Some(sendable) = &sendable_learned_peers
2439 && let Some(next_hop_addr) =
2440 self.learned_routes
2441 .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2442 {
2443 return self.peers.get(&next_hop_addr);
2444 }
2445 if direct_peer_can_send {
2446 return self.peers.get(dest_node_addr);
2447 }
2448 return None;
2449 };
2450
2451 let coordinate_route_addr = {
2454 let candidates: Vec<&ActivePeer> = self.destination_in_filters(dest_node_addr);
2455 if !candidates.is_empty() {
2456 self.select_best_candidate(&candidates, &dest_coords)
2457 .map(|peer| *peer.node_addr())
2458 } else {
2459 None
2460 }
2461 };
2462 if let Some(next_hop_addr) = coordinate_route_addr {
2463 return self.peers.get(&next_hop_addr);
2464 }
2465
2466 let tree_route_addr = self
2468 .tree_state
2469 .find_next_hop(&dest_coords)
2470 .filter(|next_hop_id| {
2471 self.peers
2472 .get(next_hop_id)
2473 .is_some_and(|peer| peer.can_send())
2474 });
2475 if let Some(next_hop_addr) = tree_route_addr {
2476 return self.peers.get(&next_hop_addr);
2477 }
2478 if explore_fallback {
2479 return sendable_learned_peers.as_ref().and_then(|sendable| {
2480 self.learned_routes
2481 .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2482 .and_then(|next_hop_addr| self.peers.get(&next_hop_addr))
2483 });
2484 }
2485
2486 if let Some(sendable) = &sendable_learned_peers
2487 && let Some(next_hop_addr) =
2488 self.learned_routes
2489 .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2490 {
2491 return self.peers.get(&next_hop_addr);
2492 }
2493
2494 if direct_peer_can_send {
2495 return self.peers.get(dest_node_addr);
2496 }
2497
2498 None
2499 }
2500
2501 pub(in crate::node) fn learn_reverse_route(
2502 &mut self,
2503 destination: NodeAddr,
2504 next_hop: NodeAddr,
2505 ) {
2506 if self.config.node.routing.mode != RoutingMode::ReplyLearned
2507 || destination == *self.node_addr()
2508 {
2509 return;
2510 }
2511 let now_ms = Self::now_ms();
2512 self.learned_routes.learn(
2513 destination,
2514 next_hop,
2515 now_ms,
2516 self.config.node.routing.learned_ttl_secs,
2517 self.config.node.routing.max_learned_routes_per_dest,
2518 );
2519 }
2520
2521 pub(in crate::node) fn record_route_failure(
2522 &mut self,
2523 destination: NodeAddr,
2524 next_hop: NodeAddr,
2525 ) {
2526 if self.config.node.routing.mode != RoutingMode::ReplyLearned {
2527 return;
2528 }
2529 self.learned_routes.record_failure(&destination, &next_hop);
2530 }
2531
2532 pub(crate) fn learned_route_table_snapshot(&self, now_ms: u64) -> LearnedRouteTableSnapshot {
2533 self.learned_routes.snapshot(now_ms)
2534 }
2535
2536 pub(in crate::node) fn purge_learned_routes(&mut self, now_ms: u64) {
2537 self.learned_routes.purge_expired(now_ms);
2538 }
2539
2540 fn select_best_candidate<'a>(
2549 &'a self,
2550 candidates: &[&'a ActivePeer],
2551 dest_coords: &crate::tree::TreeCoordinate,
2552 ) -> Option<&'a ActivePeer> {
2553 let my_distance = self.tree_state.my_coords().distance_to(dest_coords);
2554
2555 let mut best: Option<(&ActivePeer, f64, usize)> = None;
2556
2557 for &candidate in candidates {
2558 if !candidate.can_send() {
2559 continue;
2560 }
2561
2562 let cost = candidate.link_cost();
2563
2564 let dist = self
2565 .tree_state
2566 .peer_coords(candidate.node_addr())
2567 .map(|pc| pc.distance_to(dest_coords))
2568 .unwrap_or(usize::MAX);
2569
2570 if dist >= my_distance {
2573 continue;
2574 }
2575
2576 let dominated = match &best {
2577 None => true,
2578 Some((_, best_cost, best_dist)) => {
2579 cost < *best_cost
2580 || (cost == *best_cost && dist < *best_dist)
2581 || (cost == *best_cost
2582 && dist == *best_dist
2583 && candidate.node_addr() < best.as_ref().unwrap().0.node_addr())
2584 }
2585 };
2586
2587 if dominated {
2588 best = Some((candidate, cost, dist));
2589 }
2590 }
2591
2592 best.map(|(peer, _, _)| peer)
2593 }
2594
2595 pub fn destination_in_filters(&self, dest: &NodeAddr) -> Vec<&ActivePeer> {
2597 self.peers.values().filter(|p| p.may_reach(dest)).collect()
2598 }
2599
2600 pub fn tun_tx(&self) -> Option<&TunTx> {
2604 self.tun_tx.as_ref()
2605 }
2606
2607 pub fn attach_external_packet_io(
2614 &mut self,
2615 capacity: usize,
2616 ) -> Result<ExternalPacketIo, NodeError> {
2617 if self.state != NodeState::Created {
2618 return Err(NodeError::Config(ConfigError::Validation(
2619 "external packet I/O must be attached before node start".to_string(),
2620 )));
2621 }
2622 if self.config.tun.enabled {
2623 return Err(NodeError::Config(ConfigError::Validation(
2624 "external packet I/O requires tun.enabled=false".to_string(),
2625 )));
2626 }
2627
2628 let capacity = capacity.max(1);
2629 let (outbound_tx, outbound_rx) = tokio::sync::mpsc::channel(capacity);
2630 let (inbound_tx, inbound_rx) = tokio::sync::mpsc::channel(capacity);
2631 self.tun_outbound_rx = Some(outbound_rx);
2632 self.external_packet_tx = Some(inbound_tx);
2633
2634 Ok(ExternalPacketIo {
2635 outbound_tx,
2636 inbound_rx,
2637 })
2638 }
2639
2640 pub(crate) fn attach_endpoint_data_io(
2645 &mut self,
2646 capacity: usize,
2647 ) -> Result<EndpointDataIo, NodeError> {
2648 if self.state != NodeState::Created {
2649 return Err(NodeError::Config(ConfigError::Validation(
2650 "endpoint data I/O must be attached before node start".to_string(),
2651 )));
2652 }
2653
2654 let command_capacity = endpoint_data_command_capacity(capacity);
2655 let (command_tx, command_rx) = tokio::sync::mpsc::channel(command_capacity);
2656 let (event_tx, event_rx) = tokio::sync::mpsc::unbounded_channel();
2661 self.endpoint_command_rx = Some(command_rx);
2662 self.endpoint_event_tx = Some(event_tx.clone());
2663
2664 Ok(EndpointDataIo {
2665 command_tx,
2666 event_rx,
2667 event_tx,
2668 })
2669 }
2670
2671 pub(crate) fn pubkey_for_node_addr(&self, addr: &NodeAddr) -> Option<secp256k1::PublicKey> {
2672 let mut prefix = [0u8; 15];
2673 prefix.copy_from_slice(&addr.as_bytes()[0..15]);
2674 self.identity_cache
2675 .get(&prefix)
2676 .filter(|entry| &entry.node_addr == addr)
2677 .map(|entry| entry.pubkey)
2678 }
2679
2680 pub(crate) fn npub_for_node_addr(&self, addr: &NodeAddr) -> Option<String> {
2681 let mut prefix = [0u8; 15];
2682 prefix.copy_from_slice(&addr.as_bytes()[0..15]);
2683 self.identity_cache
2684 .get(&prefix)
2685 .filter(|entry| &entry.node_addr == addr)
2686 .map(|entry| entry.npub.clone())
2687 }
2688
2689 pub(in crate::node) fn deliver_external_ipv6_packet(
2690 &self,
2691 src_addr: &NodeAddr,
2692 packet: Vec<u8>,
2693 ) {
2694 let Some(external_packet_tx) = &self.external_packet_tx else {
2695 return;
2696 };
2697 if packet.len() < 40 {
2698 return;
2699 }
2700 let Ok(destination) = FipsAddress::from_slice(&packet[24..40]) else {
2701 return;
2702 };
2703 let delivered = NodeDeliveredPacket {
2704 source_node_addr: *src_addr,
2705 source_npub: self.npub_for_node_addr(src_addr),
2706 destination,
2707 packet,
2708 };
2709 if let Err(error) = external_packet_tx.try_send(delivered) {
2710 debug!(error = %error, "Failed to deliver packet to external app sink");
2711 }
2712 }
2713
2714 pub(super) async fn send_encrypted_link_message(
2728 &mut self,
2729 node_addr: &NodeAddr,
2730 plaintext: &[u8],
2731 ) -> Result<(), NodeError> {
2732 self.send_encrypted_link_message_with_ce(node_addr, plaintext, false)
2733 .await
2734 }
2735
2736 pub(in crate::node) fn note_local_send_outcome(
2742 &mut self,
2743 result: &Result<usize, TransportError>,
2744 ) {
2745 match result {
2746 Ok(_) => {
2747 if self.last_local_send_failure_at.is_some() {
2748 self.last_local_send_failure_at = None;
2749 }
2750 }
2751 Err(TransportError::Io(e))
2752 if matches!(
2753 e.kind(),
2754 std::io::ErrorKind::NetworkUnreachable
2755 | std::io::ErrorKind::HostUnreachable
2756 | std::io::ErrorKind::AddrNotAvailable
2757 ) =>
2758 {
2759 self.last_local_send_failure_at = Some(std::time::Instant::now());
2760 }
2761 Err(_) => {}
2762 }
2763 }
2764
2765 pub(in crate::node) fn last_local_send_failure_at(&self) -> Option<std::time::Instant> {
2768 self.last_local_send_failure_at
2769 }
2770
2771 pub(super) async fn send_encrypted_link_message_with_ce(
2775 &mut self,
2776 node_addr: &NodeAddr,
2777 plaintext: &[u8],
2778 ce_flag: bool,
2779 ) -> Result<(), NodeError> {
2780 let peer = self
2781 .peers
2782 .get_mut(node_addr)
2783 .ok_or(NodeError::PeerNotFound(*node_addr))?;
2784
2785 let their_index = peer.their_index().ok_or_else(|| NodeError::SendFailed {
2786 node_addr: *node_addr,
2787 reason: "no their_index".into(),
2788 })?;
2789 let transport_id = peer.transport_id().ok_or_else(|| NodeError::SendFailed {
2790 node_addr: *node_addr,
2791 reason: "no transport_id".into(),
2792 })?;
2793 let remote_addr = peer
2794 .current_addr()
2795 .cloned()
2796 .ok_or_else(|| NodeError::SendFailed {
2797 node_addr: *node_addr,
2798 reason: "no current_addr".into(),
2799 })?;
2800 #[cfg(any(target_os = "linux", target_os = "macos"))]
2801 let connected_socket = peer.connected_udp();
2802
2803 let timestamp_ms = peer.session_elapsed_ms();
2805
2806 let sp_flag = peer.mmp().map(|mmp| mmp.spin_bit.tx_bit()).unwrap_or(false);
2808 let mut flags = if sp_flag { FLAG_SP } else { 0 };
2809 if ce_flag {
2810 flags |= FLAG_CE;
2811 }
2812 if peer.current_k_bit() {
2813 flags |= FLAG_KEY_EPOCH;
2814 }
2815
2816 let session = peer
2817 .noise_session_mut()
2818 .ok_or_else(|| NodeError::SendFailed {
2819 node_addr: *node_addr,
2820 reason: "no noise session".into(),
2821 })?;
2822
2823 const INNER_TS_LEN: usize = 4;
2831 let counter = session.current_send_counter();
2832 let inner_len = INNER_TS_LEN + plaintext.len();
2833 let payload_len = inner_len as u16;
2834 let header = build_established_header(their_index, counter, flags, payload_len);
2835
2836 let transport_for_send = self
2855 .transports
2856 .get(&transport_id)
2857 .ok_or(NodeError::TransportNotFound(transport_id))?;
2858 match transport_for_send.connection_state(&remote_addr) {
2859 ConnectionState::Connected => {}
2860 other => {
2861 if matches!(other, ConnectionState::None) {
2862 let _ = transport_for_send.connect(&remote_addr).await;
2863 }
2864 return Err(NodeError::SendFailed {
2865 node_addr: *node_addr,
2866 reason: format!("transport connection not ready: {:?}", other),
2867 });
2868 }
2869 }
2870 let is_udp = matches!(transport_for_send, TransportHandle::Udp(_));
2871 if let Some(workers) = self.encrypt_workers.as_ref().cloned()
2872 && is_udp
2873 && let Some(cipher_clone) = session.send_cipher_clone()
2874 {
2875 {
2876 let reserved_counter =
2880 session
2881 .take_send_counter()
2882 .map_err(|e| NodeError::SendFailed {
2883 node_addr: *node_addr,
2884 reason: format!("counter reservation failed: {}", e),
2885 })?;
2886 debug_assert_eq!(reserved_counter, counter);
2887 let header =
2891 build_established_header(their_index, reserved_counter, flags, payload_len);
2892 let transport = transport_for_send;
2893 let send_target = {
2900 if let TransportHandle::Udp(udp) = transport {
2901 let socket_addr = {
2902 #[cfg(any(target_os = "linux", target_os = "macos"))]
2903 {
2904 match connected_socket.as_ref() {
2905 Some(socket) => Some(socket.peer_addr()),
2906 None => udp.resolve_for_off_task(&remote_addr).await.ok(),
2907 }
2908 }
2909 #[cfg(not(any(target_os = "linux", target_os = "macos")))]
2910 {
2911 udp.resolve_for_off_task(&remote_addr).await.ok()
2912 }
2913 };
2914 match (udp.async_socket(), socket_addr) {
2915 (Some(socket), Some(socket_addr)) => Some((socket, socket_addr)),
2916 _ => None,
2917 }
2918 } else {
2919 None
2920 }
2921 };
2922 if let Some((socket, socket_addr)) = send_target {
2923 let wire_capacity = ESTABLISHED_HEADER_SIZE + inner_len + 16;
2939 let mut wire_buf = Vec::with_capacity(wire_capacity);
2940 wire_buf.extend_from_slice(&header);
2941 wire_buf.extend_from_slice(×tamp_ms.to_le_bytes());
2942 wire_buf.extend_from_slice(plaintext);
2943 let predicted_bytes = wire_capacity;
2944 if let Some(peer) = self.peers.get_mut(node_addr) {
2951 peer.link_stats_mut().record_sent(predicted_bytes);
2952 if let Some(mmp) = peer.mmp_mut() {
2953 mmp.sender
2954 .record_sent(reserved_counter, timestamp_ms, predicted_bytes);
2955 }
2956 }
2957 let scheduling_weight = self.send_weight_for_peer(node_addr);
2958 workers.dispatch(self::encrypt_worker::FmpSendJob {
2959 cipher: cipher_clone,
2960 counter: reserved_counter,
2961 wire_buf,
2962 fsp_seal: None,
2963 socket,
2964 dest_addr: socket_addr,
2965 #[cfg(any(target_os = "linux", target_os = "macos"))]
2966 connected_socket,
2967 drop_on_backpressure: plaintext
2968 .first()
2969 .is_some_and(|ty| *ty == SessionMessageType::EndpointData.to_byte()),
2970 scheduling_weight,
2971 queued_at: crate::perf_profile::stamp(),
2972 });
2973 return Ok(());
2974 }
2975 }
2976 }
2977
2978 let inner_plaintext = prepend_inner_header(timestamp_ms, plaintext);
2983 let ciphertext = {
2985 let _t = crate::perf_profile::Timer::start(crate::perf_profile::Stage::FmpEncrypt);
2986 session
2987 .encrypt_with_aad(&inner_plaintext, &header)
2988 .map_err(|e| NodeError::SendFailed {
2989 node_addr: *node_addr,
2990 reason: format!("encryption failed: {}", e),
2991 })?
2992 };
2993
2994 let wire_packet = build_encrypted(&header, &ciphertext);
2995
2996 let send_result = {
2998 let _t = crate::perf_profile::Timer::start(crate::perf_profile::Stage::UdpSend);
2999 let transport = self
3000 .transports
3001 .get(&transport_id)
3002 .ok_or(NodeError::TransportNotFound(transport_id))?;
3003 transport.send(&remote_addr, &wire_packet).await
3004 };
3005 self.note_local_send_outcome(&send_result);
3006 let bytes_sent = send_result.map_err(|e| match e {
3007 TransportError::MtuExceeded { packet_size, mtu } => NodeError::MtuExceeded {
3008 node_addr: *node_addr,
3009 packet_size,
3010 mtu,
3011 },
3012 other => NodeError::SendFailed {
3013 node_addr: *node_addr,
3014 reason: format!("transport send: {}", other),
3015 },
3016 })?;
3017
3018 if let Some(peer) = self.peers.get_mut(node_addr) {
3020 peer.link_stats_mut().record_sent(bytes_sent);
3021 if let Some(mmp) = peer.mmp_mut() {
3023 mmp.sender.record_sent(counter, timestamp_ms, bytes_sent);
3024 }
3025 }
3026
3027 Ok(())
3028 }
3029}
3030
3031impl fmt::Debug for Node {
3032 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3033 f.debug_struct("Node")
3034 .field("node_addr", self.node_addr())
3035 .field("state", &self.state)
3036 .field("is_leaf_only", &self.is_leaf_only)
3037 .field("connections", &self.connection_count())
3038 .field("peers", &self.peer_count())
3039 .field("links", &self.link_count())
3040 .field("transports", &self.transport_count())
3041 .finish()
3042 }
3043}