1mod acl;
8mod bloom;
9mod decrypt_worker;
10mod discovery_rate_limit;
11mod encrypt_worker;
12mod handlers;
13mod lifecycle;
14mod rate_limit;
15mod retry;
16mod routing;
17mod routing_error_rate_limit;
18pub(crate) mod session;
19pub(crate) mod session_wire;
20pub(crate) mod stats;
21pub(crate) mod stats_history;
22#[cfg(test)]
23mod tests;
24mod tree;
25pub(crate) mod wire;
26
27use self::discovery_rate_limit::{DiscoveryBackoff, DiscoveryForwardRateLimiter};
28use self::rate_limit::HandshakeRateLimiter;
29use self::routing::{LearnedRouteTable, LearnedRouteTableSnapshot};
30use self::routing_error_rate_limit::RoutingErrorRateLimiter;
31use self::wire::{
32 ESTABLISHED_HEADER_SIZE, FLAG_CE, FLAG_KEY_EPOCH, FLAG_SP, build_encrypted,
33 build_established_header, prepend_inner_header,
34};
35use crate::bloom::BloomState;
36use crate::cache::CoordCache;
37use crate::config::{NostrDiscoveryPolicy, PeerConfig, RoutingMode};
38use crate::node::session::SessionEntry;
39use crate::peer::{ActivePeer, PeerConnection};
40#[cfg(any(target_os = "linux", target_os = "macos"))]
41use crate::transport::ethernet::EthernetTransport;
42use crate::transport::tcp::TcpTransport;
43use crate::transport::tor::TorTransport;
44use crate::transport::udp::UdpTransport;
45#[cfg(feature = "webrtc-transport")]
46use crate::transport::webrtc::WebRtcTransport;
47use crate::transport::{
48 ConnectionState, Link, LinkId, PacketRx, PacketTx, TransportAddr, TransportError,
49 TransportHandle, TransportId,
50};
51use crate::tree::TreeState;
52use crate::upper::hosts::HostMap;
53use crate::upper::icmp_rate_limit::IcmpRateLimiter;
54use crate::upper::tun::{TunError, TunOutboundRx, TunState, TunTx};
55use crate::utils::index::IndexAllocator;
56use crate::{
57 Config, ConfigError, FipsAddress, Identity, IdentityError, NodeAddr, PeerIdentity,
58 SessionMessageType, encode_npub,
59};
60use rand::Rng;
61use std::collections::{HashMap, HashSet, VecDeque};
62use std::fmt;
63use std::sync::Arc;
64use std::thread::JoinHandle;
65use thiserror::Error;
66use tracing::{debug, warn};
67
68pub(crate) const REKEY_JITTER_SECS: i64 = 15;
75
76#[derive(Debug, Error)]
78pub enum NodeError {
79 #[error("node not started")]
80 NotStarted,
81
82 #[error("node already started")]
83 AlreadyStarted,
84
85 #[error("node already stopped")]
86 AlreadyStopped,
87
88 #[error("transport not found: {0}")]
89 TransportNotFound(TransportId),
90
91 #[error("no transport available for type: {0}")]
92 NoTransportForType(String),
93
94 #[error("link not found: {0}")]
95 LinkNotFound(LinkId),
96
97 #[error("connection not found: {0}")]
98 ConnectionNotFound(LinkId),
99
100 #[error("peer not found: {0:?}")]
101 PeerNotFound(NodeAddr),
102
103 #[error("peer already exists: {0:?}")]
104 PeerAlreadyExists(NodeAddr),
105
106 #[error("connection already exists for link: {0}")]
107 ConnectionAlreadyExists(LinkId),
108
109 #[error("invalid peer npub '{npub}': {reason}")]
110 InvalidPeerNpub { npub: String, reason: String },
111
112 #[error("discovery error: {0}")]
113 Discovery(String),
114
115 #[error("access denied: {0}")]
116 AccessDenied(String),
117
118 #[error("max connections exceeded: {max}")]
119 MaxConnectionsExceeded { max: usize },
120
121 #[error("max peers exceeded: {max}")]
122 MaxPeersExceeded { max: usize },
123
124 #[error("max links exceeded: {max}")]
125 MaxLinksExceeded { max: usize },
126
127 #[error("handshake incomplete for link {0}")]
128 HandshakeIncomplete(LinkId),
129
130 #[error("no session available for link {0}")]
131 NoSession(LinkId),
132
133 #[error("promotion failed for link {link_id}: {reason}")]
134 PromotionFailed { link_id: LinkId, reason: String },
135
136 #[error("send failed to {node_addr}: {reason}")]
137 SendFailed { node_addr: NodeAddr, reason: String },
138
139 #[error("mtu exceeded forwarding to {node_addr}: packet {packet_size} > mtu {mtu}")]
140 MtuExceeded {
141 node_addr: NodeAddr,
142 packet_size: usize,
143 mtu: u16,
144 },
145
146 #[error("config error: {0}")]
147 Config(#[from] ConfigError),
148
149 #[error("identity error: {0}")]
150 Identity(#[from] IdentityError),
151
152 #[error("TUN error: {0}")]
153 Tun(#[from] TunError),
154
155 #[error("index allocation failed: {0}")]
156 IndexAllocationFailed(String),
157
158 #[error("handshake failed: {0}")]
159 HandshakeFailed(String),
160
161 #[error("transport error: {0}")]
162 TransportError(String),
163
164 #[error("bootstrap handoff failed: {0}")]
165 BootstrapHandoff(String),
166}
167
168#[derive(Debug, Clone, PartialEq, Eq)]
170pub struct NodeDeliveredPacket {
171 pub source_node_addr: NodeAddr,
173 pub source_npub: Option<String>,
175 pub destination: FipsAddress,
177 pub packet: Vec<u8>,
179}
180
181#[derive(Debug, Clone)]
182struct IdentityCacheEntry {
183 node_addr: NodeAddr,
184 pubkey: secp256k1::PublicKey,
185 npub: String,
186 last_seen_ms: u64,
187}
188
189impl IdentityCacheEntry {
190 fn new(
191 node_addr: NodeAddr,
192 pubkey: secp256k1::PublicKey,
193 npub: String,
194 last_seen_ms: u64,
195 ) -> Self {
196 Self {
197 node_addr,
198 pubkey,
199 npub,
200 last_seen_ms,
201 }
202 }
203}
204
205#[derive(Debug)]
207pub struct ExternalPacketIo {
208 pub outbound_tx: crate::upper::tun::TunOutboundTx,
210 pub inbound_rx: tokio::sync::mpsc::Receiver<NodeDeliveredPacket>,
212}
213
214#[derive(Debug)]
216pub(crate) struct EndpointDataIo {
217 pub(crate) command_tx: tokio::sync::mpsc::Sender<NodeEndpointCommand>,
226 pub(crate) event_rx: tokio::sync::mpsc::UnboundedReceiver<NodeEndpointEvent>,
236 pub(crate) event_tx: tokio::sync::mpsc::UnboundedSender<NodeEndpointEvent>,
242}
243
244fn endpoint_data_command_capacity(requested: usize) -> usize {
245 if let Ok(raw) = std::env::var("FIPS_ENDPOINT_DATA_QUEUE_CAP")
246 && let Ok(value) = raw.trim().parse::<usize>()
247 && value > 0
248 {
249 return value;
250 }
251
252 requested.max(1).max(32_768)
253}
254
255#[derive(Debug)]
257pub(crate) enum NodeEndpointCommand {
258 Send {
262 remote: PeerIdentity,
263 payload: Vec<u8>,
264 queued_at: Option<std::time::Instant>,
265 response_tx: tokio::sync::oneshot::Sender<Result<(), NodeError>>,
266 },
267 SendOneway {
273 remote: PeerIdentity,
274 payload: Vec<u8>,
275 queued_at: Option<std::time::Instant>,
276 },
277 PeerSnapshot {
278 response_tx: tokio::sync::oneshot::Sender<Vec<NodeEndpointPeer>>,
279 },
280 RelaySnapshot {
281 response_tx: tokio::sync::oneshot::Sender<Vec<NodeEndpointRelayStatus>>,
282 },
283 UpdateRelays {
284 advert_relays: Vec<String>,
285 dm_relays: Vec<String>,
286 response_tx: tokio::sync::oneshot::Sender<Result<(), NodeError>>,
287 },
288 UpdatePeers {
294 peers: Vec<crate::config::PeerConfig>,
295 response_tx: tokio::sync::oneshot::Sender<Result<UpdatePeersOutcome, NodeError>>,
296 },
297}
298
299#[derive(Debug, Clone, Default, PartialEq, Eq)]
301pub(crate) struct UpdatePeersOutcome {
302 pub(crate) added: usize,
303 pub(crate) removed: usize,
304 pub(crate) updated: usize,
305 pub(crate) unchanged: usize,
306}
307
308#[derive(Debug)]
310pub(crate) enum NodeEndpointEvent {
311 Data {
312 source_node_addr: NodeAddr,
313 source_npub: Option<String>,
314 payload: Vec<u8>,
315 queued_at: Option<std::time::Instant>,
316 },
317}
318
319#[derive(Debug, Clone, PartialEq, Eq)]
321pub(crate) struct NodeEndpointPeer {
322 pub(crate) npub: String,
323 pub(crate) transport_addr: Option<String>,
324 pub(crate) transport_type: Option<String>,
325 pub(crate) link_id: u64,
326 pub(crate) srtt_ms: Option<u64>,
327 pub(crate) packets_sent: u64,
328 pub(crate) packets_recv: u64,
329 pub(crate) bytes_sent: u64,
330 pub(crate) bytes_recv: u64,
331}
332
333#[derive(Debug, Clone, PartialEq, Eq)]
335pub(crate) struct NodeEndpointRelayStatus {
336 pub(crate) url: String,
337 pub(crate) status: String,
338}
339
340#[derive(Clone, Copy, Debug, PartialEq, Eq)]
342pub enum NodeState {
343 Created,
345 Starting,
347 Running,
349 Stopping,
351 Stopped,
353}
354
355impl NodeState {
356 pub fn is_operational(&self) -> bool {
358 matches!(self, NodeState::Running)
359 }
360
361 pub fn can_start(&self) -> bool {
363 matches!(self, NodeState::Created | NodeState::Stopped)
364 }
365
366 pub fn can_stop(&self) -> bool {
368 matches!(self, NodeState::Running)
369 }
370}
371
372impl fmt::Display for NodeState {
373 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
374 let s = match self {
375 NodeState::Created => "created",
376 NodeState::Starting => "starting",
377 NodeState::Running => "running",
378 NodeState::Stopping => "stopping",
379 NodeState::Stopped => "stopped",
380 };
381 write!(f, "{}", s)
382 }
383}
384
385#[derive(Clone, Debug)]
392pub(crate) struct RecentRequest {
393 pub(crate) from_peer: NodeAddr,
395 pub(crate) timestamp_ms: u64,
397 pub(crate) response_forwarded: bool,
401}
402
403impl RecentRequest {
404 pub(crate) fn new(from_peer: NodeAddr, timestamp_ms: u64) -> Self {
405 Self {
406 from_peer,
407 timestamp_ms,
408 response_forwarded: false,
409 }
410 }
411
412 pub(crate) fn is_expired(&self, current_time_ms: u64, expiry_ms: u64) -> bool {
414 current_time_ms.saturating_sub(self.timestamp_ms) > expiry_ms
415 }
416}
417
418type AddrKey = (TransportId, TransportAddr);
420
421#[derive(Debug, Default)]
426struct TransportDropState {
427 prev_drops: u64,
429 dropping: bool,
431}
432
433struct PendingConnect {
439 link_id: LinkId,
441 transport_id: TransportId,
443 remote_addr: TransportAddr,
445 peer_identity: PeerIdentity,
447}
448
449pub struct Node {
463 identity: Identity,
466
467 startup_epoch: [u8; 8],
470
471 started_at: std::time::Instant,
473
474 config: Config,
477
478 state: NodeState,
481
482 is_leaf_only: bool,
484
485 tree_state: TreeState,
488
489 bloom_state: BloomState,
492
493 coord_cache: CoordCache,
496 learned_routes: LearnedRouteTable,
498 recent_requests: HashMap<u64, RecentRequest>,
501 path_mtu_lookup: Arc<std::sync::RwLock<HashMap<crate::FipsAddress, u16>>>,
507
508 transports: HashMap<TransportId, TransportHandle>,
511 transport_drops: HashMap<TransportId, TransportDropState>,
513 links: HashMap<LinkId, Link>,
515 addr_to_link: HashMap<AddrKey, LinkId>,
517
518 packet_tx: Option<PacketTx>,
521 packet_rx: Option<PacketRx>,
523
524 connections: HashMap<LinkId, PeerConnection>,
528
529 peers: HashMap<NodeAddr, ActivePeer>,
533
534 sessions: HashMap<NodeAddr, SessionEntry>,
538
539 identity_cache: HashMap<[u8; 15], IdentityCacheEntry>,
543
544 pending_tun_packets: HashMap<NodeAddr, VecDeque<Vec<u8>>>,
548 pending_endpoint_data: HashMap<NodeAddr, VecDeque<Vec<u8>>>,
550 pending_lookups: HashMap<NodeAddr, handlers::discovery::PendingLookup>,
554
555 max_connections: usize,
558 max_peers: usize,
560 max_links: usize,
562
563 next_link_id: u64,
566 next_transport_id: u32,
568
569 stats: stats::NodeStats,
572
573 stats_history: stats_history::StatsHistory,
575
576 tun_state: TunState,
579 tun_name: Option<String>,
581 tun_tx: Option<TunTx>,
583 tun_outbound_rx: Option<TunOutboundRx>,
585 external_packet_tx: Option<tokio::sync::mpsc::Sender<NodeDeliveredPacket>>,
587 endpoint_command_rx: Option<tokio::sync::mpsc::Receiver<NodeEndpointCommand>>,
589 endpoint_event_tx: Option<tokio::sync::mpsc::UnboundedSender<NodeEndpointEvent>>,
591 encrypt_workers: Option<encrypt_worker::EncryptWorkerPool>,
597 decrypt_workers: Option<decrypt_worker::DecryptWorkerPool>,
600 decrypt_registered_sessions: std::collections::HashSet<(TransportId, u32)>,
609 decrypt_fallback_rx:
613 Option<tokio::sync::mpsc::UnboundedReceiver<decrypt_worker::DecryptWorkerEvent>>,
614 decrypt_fallback_tx: tokio::sync::mpsc::UnboundedSender<decrypt_worker::DecryptWorkerEvent>,
615 tun_reader_handle: Option<JoinHandle<()>>,
617 tun_writer_handle: Option<JoinHandle<()>>,
619 #[cfg(target_os = "macos")]
622 tun_shutdown_fd: Option<std::os::unix::io::RawFd>,
623
624 dns_identity_rx: Option<crate::upper::dns::DnsIdentityRx>,
627 dns_task: Option<tokio::task::JoinHandle<()>>,
629
630 index_allocator: IndexAllocator,
633 peers_by_index: HashMap<(TransportId, u32), NodeAddr>,
636 pending_outbound: HashMap<(TransportId, u32), LinkId>,
639
640 msg1_rate_limiter: HandshakeRateLimiter,
643 icmp_rate_limiter: IcmpRateLimiter,
645 routing_error_rate_limiter: RoutingErrorRateLimiter,
647 coords_response_rate_limiter: RoutingErrorRateLimiter,
649 discovery_backoff: DiscoveryBackoff,
651 discovery_forward_limiter: DiscoveryForwardRateLimiter,
653
654 pending_connects: Vec<PendingConnect>,
660
661 retry_pending: HashMap<NodeAddr, retry::RetryState>,
667
668 nostr_discovery: Option<Arc<crate::discovery::nostr::NostrDiscovery>>,
670 lan_discovery: Option<Arc<crate::discovery::lan::LanDiscovery>>,
675 local_instance_registry: Option<crate::discovery::local::LocalInstanceRegistry>,
679 local_instance_started_at_ms: Option<u64>,
680 last_local_instance_publish_ms: Option<u64>,
681 last_local_instance_scan_ms: Option<u64>,
682 nostr_discovery_started_at_ms: Option<u64>,
687 startup_open_discovery_sweep_done: bool,
691 bootstrap_transports: HashSet<TransportId>,
693 bootstrap_transport_npubs: HashMap<TransportId, String>,
700 discovery_fallback_transit_blocked_peers: HashSet<NodeAddr>,
703
704 last_parent_reeval: Option<crate::time::Instant>,
707
708 last_congestion_log: Option<std::time::Instant>,
711
712 estimated_mesh_size: Option<u64>,
715 last_mesh_size_log: Option<std::time::Instant>,
717
718 last_self_warn: Option<std::time::Instant>,
724
725 last_local_send_failure_at: Option<std::time::Instant>,
733
734 peer_aliases: HashMap<NodeAddr, String>,
738 configured_peer_send_weights: HashMap<NodeAddr, u8>,
741
742 peer_acl: acl::PeerAclReloader,
744
745 host_map: Arc<HostMap>,
749}
750
751impl Node {
752 pub fn new(config: Config) -> Result<Self, NodeError> {
754 config.validate()?;
755 let identity = config.create_identity()?;
756 let node_addr = *identity.node_addr();
757 let is_leaf_only = config.is_leaf_only();
758
759 let (decrypt_fallback_tx, decrypt_fallback_rx) = tokio::sync::mpsc::unbounded_channel();
760 let decrypt_fallback_rx = Some(decrypt_fallback_rx);
761
762 let mut startup_epoch = [0u8; 8];
763 rand::rng().fill_bytes(&mut startup_epoch);
764
765 let mut bloom_state = if is_leaf_only {
766 BloomState::leaf_only(node_addr)
767 } else {
768 BloomState::new(node_addr)
769 };
770 bloom_state.set_update_debounce_ms(config.node.bloom.update_debounce_ms);
771
772 let tun_state = if config.tun.enabled {
773 TunState::Configured
774 } else {
775 TunState::Disabled
776 };
777
778 let mut tree_state = TreeState::new(node_addr);
780 tree_state.set_parent_hysteresis(config.node.tree.parent_hysteresis);
781 tree_state.set_hold_down(config.node.tree.hold_down_secs);
782 tree_state.set_flap_dampening(
783 config.node.tree.flap_threshold,
784 config.node.tree.flap_window_secs,
785 config.node.tree.flap_dampening_secs,
786 );
787 tree_state
788 .sign_declaration(&identity)
789 .expect("signing own declaration should never fail");
790
791 let coord_cache = CoordCache::new(
792 config.node.cache.coord_size,
793 config.node.cache.coord_ttl_secs * 1000,
794 );
795 let rl = &config.node.rate_limit;
796 let msg1_rate_limiter = HandshakeRateLimiter::with_params(
797 rate_limit::TokenBucket::with_params(rl.handshake_burst, rl.handshake_rate),
798 config.node.limits.max_pending_inbound,
799 );
800
801 let max_connections = config.node.limits.max_connections;
802 let max_peers = config.node.limits.max_peers;
803 let max_links = config.node.limits.max_links;
804 let coords_response_interval_ms = config.node.session.coords_response_interval_ms;
805 let backoff_base_secs = config.node.discovery.backoff_base_secs;
806 let backoff_max_secs = config.node.discovery.backoff_max_secs;
807 let forward_min_interval_secs = config.node.discovery.forward_min_interval_secs;
808
809 let (host_map, peer_acl) = Self::host_map_and_peer_acl(&config);
810 let configured_peer_send_weights = Self::configured_peer_send_weights(&config);
811
812 Ok(Self {
813 identity,
814 startup_epoch,
815 started_at: std::time::Instant::now(),
816 config,
817 state: NodeState::Created,
818 is_leaf_only,
819 tree_state,
820 bloom_state,
821 coord_cache,
822 learned_routes: LearnedRouteTable::default(),
823 recent_requests: HashMap::new(),
824 transports: HashMap::new(),
825 transport_drops: HashMap::new(),
826 links: HashMap::new(),
827 addr_to_link: HashMap::new(),
828 packet_tx: None,
829 packet_rx: None,
830 connections: HashMap::new(),
831 peers: HashMap::new(),
832 sessions: HashMap::new(),
833 identity_cache: HashMap::new(),
834 pending_tun_packets: HashMap::new(),
835 pending_endpoint_data: HashMap::new(),
836 pending_lookups: HashMap::new(),
837 max_connections,
838 max_peers,
839 max_links,
840 next_link_id: 1,
841 next_transport_id: 1,
842 stats: stats::NodeStats::new(),
843 stats_history: stats_history::StatsHistory::new(),
844 tun_state,
845 tun_name: None,
846 tun_tx: None,
847 tun_outbound_rx: None,
848 external_packet_tx: None,
849 endpoint_command_rx: None,
850 endpoint_event_tx: None,
851 encrypt_workers: None,
852 decrypt_workers: None,
853 decrypt_registered_sessions: std::collections::HashSet::new(),
854 decrypt_fallback_tx,
855 decrypt_fallback_rx,
856 tun_reader_handle: None,
857 tun_writer_handle: None,
858 #[cfg(target_os = "macos")]
859 tun_shutdown_fd: None,
860 dns_identity_rx: None,
861 dns_task: None,
862 index_allocator: IndexAllocator::new(),
863 peers_by_index: HashMap::new(),
864 pending_outbound: HashMap::new(),
865 msg1_rate_limiter,
866 icmp_rate_limiter: IcmpRateLimiter::new(),
867 routing_error_rate_limiter: RoutingErrorRateLimiter::new(),
868 coords_response_rate_limiter: RoutingErrorRateLimiter::with_interval(
869 std::time::Duration::from_millis(coords_response_interval_ms),
870 ),
871 discovery_backoff: DiscoveryBackoff::with_params(backoff_base_secs, backoff_max_secs),
872 discovery_forward_limiter: DiscoveryForwardRateLimiter::with_interval(
873 std::time::Duration::from_secs(forward_min_interval_secs),
874 ),
875 pending_connects: Vec::new(),
876 retry_pending: HashMap::new(),
877 nostr_discovery: None,
878 nostr_discovery_started_at_ms: None,
879 lan_discovery: None,
880 local_instance_registry: None,
881 local_instance_started_at_ms: None,
882 last_local_instance_publish_ms: None,
883 last_local_instance_scan_ms: None,
884 startup_open_discovery_sweep_done: false,
885 bootstrap_transports: HashSet::new(),
886 bootstrap_transport_npubs: HashMap::new(),
887 discovery_fallback_transit_blocked_peers: HashSet::new(),
888 last_parent_reeval: None,
889 last_congestion_log: None,
890 estimated_mesh_size: None,
891 last_mesh_size_log: None,
892 last_self_warn: None,
893 last_local_send_failure_at: None,
894 peer_aliases: HashMap::new(),
895 configured_peer_send_weights,
896 peer_acl,
897 host_map,
898 path_mtu_lookup: Arc::new(std::sync::RwLock::new(HashMap::new())),
899 })
900 }
901
902 pub fn with_identity(identity: Identity, config: Config) -> Result<Self, NodeError> {
907 config.validate()?;
908 let node_addr = *identity.node_addr();
909
910 let (decrypt_fallback_tx, decrypt_fallback_rx) = tokio::sync::mpsc::unbounded_channel();
911 let decrypt_fallback_rx = Some(decrypt_fallback_rx);
912
913 let mut startup_epoch = [0u8; 8];
914 rand::rng().fill_bytes(&mut startup_epoch);
915
916 let tun_state = if config.tun.enabled {
917 TunState::Configured
918 } else {
919 TunState::Disabled
920 };
921
922 let mut tree_state = TreeState::new(node_addr);
924 tree_state.set_parent_hysteresis(config.node.tree.parent_hysteresis);
925 tree_state.set_hold_down(config.node.tree.hold_down_secs);
926 tree_state.set_flap_dampening(
927 config.node.tree.flap_threshold,
928 config.node.tree.flap_window_secs,
929 config.node.tree.flap_dampening_secs,
930 );
931 tree_state
932 .sign_declaration(&identity)
933 .expect("signing own declaration should never fail");
934
935 let mut bloom_state = BloomState::new(node_addr);
936 bloom_state.set_update_debounce_ms(config.node.bloom.update_debounce_ms);
937
938 let coord_cache = CoordCache::new(
939 config.node.cache.coord_size,
940 config.node.cache.coord_ttl_secs * 1000,
941 );
942 let rl = &config.node.rate_limit;
943 let msg1_rate_limiter = HandshakeRateLimiter::with_params(
944 rate_limit::TokenBucket::with_params(rl.handshake_burst, rl.handshake_rate),
945 config.node.limits.max_pending_inbound,
946 );
947
948 let max_connections = config.node.limits.max_connections;
949 let max_peers = config.node.limits.max_peers;
950 let max_links = config.node.limits.max_links;
951 let coords_response_interval_ms = config.node.session.coords_response_interval_ms;
952
953 let (host_map, peer_acl) = Self::host_map_and_peer_acl(&config);
954 let configured_peer_send_weights = Self::configured_peer_send_weights(&config);
955
956 Ok(Self {
957 identity,
958 startup_epoch,
959 started_at: std::time::Instant::now(),
960 config,
961 state: NodeState::Created,
962 is_leaf_only: false,
963 tree_state,
964 bloom_state,
965 coord_cache,
966 learned_routes: LearnedRouteTable::default(),
967 recent_requests: HashMap::new(),
968 transports: HashMap::new(),
969 transport_drops: HashMap::new(),
970 links: HashMap::new(),
971 addr_to_link: HashMap::new(),
972 packet_tx: None,
973 packet_rx: None,
974 connections: HashMap::new(),
975 peers: HashMap::new(),
976 sessions: HashMap::new(),
977 identity_cache: HashMap::new(),
978 pending_tun_packets: HashMap::new(),
979 pending_endpoint_data: HashMap::new(),
980 pending_lookups: HashMap::new(),
981 max_connections,
982 max_peers,
983 max_links,
984 next_link_id: 1,
985 next_transport_id: 1,
986 stats: stats::NodeStats::new(),
987 stats_history: stats_history::StatsHistory::new(),
988 tun_state,
989 tun_name: None,
990 tun_tx: None,
991 tun_outbound_rx: None,
992 external_packet_tx: None,
993 endpoint_command_rx: None,
994 endpoint_event_tx: None,
995 encrypt_workers: None,
996 decrypt_workers: None,
997 decrypt_registered_sessions: std::collections::HashSet::new(),
998 decrypt_fallback_tx,
999 decrypt_fallback_rx,
1000 tun_reader_handle: None,
1001 tun_writer_handle: None,
1002 #[cfg(target_os = "macos")]
1003 tun_shutdown_fd: None,
1004 dns_identity_rx: None,
1005 dns_task: None,
1006 index_allocator: IndexAllocator::new(),
1007 peers_by_index: HashMap::new(),
1008 pending_outbound: HashMap::new(),
1009 msg1_rate_limiter,
1010 icmp_rate_limiter: IcmpRateLimiter::new(),
1011 routing_error_rate_limiter: RoutingErrorRateLimiter::new(),
1012 coords_response_rate_limiter: RoutingErrorRateLimiter::with_interval(
1013 std::time::Duration::from_millis(coords_response_interval_ms),
1014 ),
1015 discovery_backoff: DiscoveryBackoff::new(),
1016 discovery_forward_limiter: DiscoveryForwardRateLimiter::new(),
1017 pending_connects: Vec::new(),
1018 retry_pending: HashMap::new(),
1019 nostr_discovery: None,
1020 nostr_discovery_started_at_ms: None,
1021 lan_discovery: None,
1022 local_instance_registry: None,
1023 local_instance_started_at_ms: None,
1024 last_local_instance_publish_ms: None,
1025 last_local_instance_scan_ms: None,
1026 startup_open_discovery_sweep_done: false,
1027 bootstrap_transports: HashSet::new(),
1028 bootstrap_transport_npubs: HashMap::new(),
1029 discovery_fallback_transit_blocked_peers: HashSet::new(),
1030 last_parent_reeval: None,
1031 last_congestion_log: None,
1032 estimated_mesh_size: None,
1033 last_mesh_size_log: None,
1034 last_self_warn: None,
1035 last_local_send_failure_at: None,
1036 peer_aliases: HashMap::new(),
1037 configured_peer_send_weights,
1038 peer_acl,
1039 host_map,
1040 path_mtu_lookup: Arc::new(std::sync::RwLock::new(HashMap::new())),
1041 })
1042 }
1043
1044 pub fn leaf_only(config: Config) -> Result<Self, NodeError> {
1046 let mut node = Self::new(config)?;
1047 node.is_leaf_only = true;
1048 node.bloom_state = BloomState::leaf_only(*node.identity.node_addr());
1049 Ok(node)
1050 }
1051
1052 fn host_map_and_peer_acl(config: &Config) -> (Arc<HostMap>, acl::PeerAclReloader) {
1053 let base_host_map = HostMap::from_peer_configs(config.peers());
1054 if !config.node.system_files_enabled {
1055 return (
1056 Arc::new(base_host_map.clone()),
1057 acl::PeerAclReloader::memory_only(base_host_map),
1058 );
1059 }
1060
1061 let mut host_map = base_host_map.clone();
1062 let hosts_path = std::path::PathBuf::from(crate::upper::hosts::DEFAULT_HOSTS_PATH);
1063 let hosts_file = HostMap::load_hosts_file(std::path::Path::new(
1064 crate::upper::hosts::DEFAULT_HOSTS_PATH,
1065 ));
1066 host_map.merge(hosts_file);
1067 let peer_acl = acl::PeerAclReloader::with_alias_sources(
1068 std::path::PathBuf::from(acl::DEFAULT_PEERS_ALLOW_PATH),
1069 std::path::PathBuf::from(acl::DEFAULT_PEERS_DENY_PATH),
1070 base_host_map,
1071 hosts_path,
1072 );
1073 (Arc::new(host_map), peer_acl)
1074 }
1075
1076 fn configured_peer_send_weights(config: &Config) -> HashMap<NodeAddr, u8> {
1077 config
1078 .peers()
1079 .iter()
1080 .filter_map(|peer| {
1081 PeerIdentity::from_npub(&peer.npub).ok().map(|identity| {
1082 (
1083 *identity.node_addr(),
1084 encrypt_worker::EXPLICIT_PEER_SEND_WEIGHT,
1085 )
1086 })
1087 })
1088 .collect()
1089 }
1090
1091 fn send_weight_for_peer(&self, peer_addr: &NodeAddr) -> u8 {
1092 self.configured_peer_send_weights
1093 .get(peer_addr)
1094 .copied()
1095 .unwrap_or(encrypt_worker::DEFAULT_SEND_WEIGHT)
1096 }
1097
1098 async fn create_transports(&mut self, packet_tx: &PacketTx) -> Vec<TransportHandle> {
1102 let mut transports = Vec::new();
1103
1104 let udp_instances: Vec<_> = self
1106 .config
1107 .transports
1108 .udp
1109 .iter()
1110 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1111 .collect();
1112
1113 for (name, udp_config) in udp_instances {
1115 let transport_id = self.allocate_transport_id();
1116 let udp = UdpTransport::new(transport_id, name, udp_config, packet_tx.clone());
1117 transports.push(TransportHandle::Udp(udp));
1118 }
1119
1120 #[cfg(feature = "sim-transport")]
1121 {
1122 let sim_instances: Vec<_> = self
1123 .config
1124 .transports
1125 .sim
1126 .iter()
1127 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1128 .collect();
1129
1130 for (name, sim_config) in sim_instances {
1131 let transport_id = self.allocate_transport_id();
1132 let sim = crate::transport::sim::SimTransport::new(
1133 transport_id,
1134 name,
1135 sim_config,
1136 packet_tx.clone(),
1137 );
1138 transports.push(TransportHandle::Sim(sim));
1139 }
1140 }
1141
1142 #[cfg(any(target_os = "linux", target_os = "macos"))]
1144 {
1145 let eth_instances: Vec<_> = self
1146 .config
1147 .transports
1148 .ethernet
1149 .iter()
1150 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1151 .collect();
1152 let xonly = self.identity.pubkey();
1153 for (name, eth_config) in eth_instances {
1154 let mut eth_config = eth_config;
1155 if eth_config.discovery_scope.is_none() {
1156 eth_config.discovery_scope = self.lan_discovery_scope();
1157 }
1158 let transport_id = self.allocate_transport_id();
1159 let mut eth =
1160 EthernetTransport::new(transport_id, name, eth_config, packet_tx.clone());
1161 eth.set_local_pubkey(xonly);
1162 transports.push(TransportHandle::Ethernet(eth));
1163 }
1164 }
1165
1166 let tcp_instances: Vec<_> = self
1168 .config
1169 .transports
1170 .tcp
1171 .iter()
1172 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1173 .collect();
1174
1175 for (name, tcp_config) in tcp_instances {
1176 let transport_id = self.allocate_transport_id();
1177 let tcp = TcpTransport::new(transport_id, name, tcp_config, packet_tx.clone());
1178 transports.push(TransportHandle::Tcp(tcp));
1179 }
1180
1181 let tor_instances: Vec<_> = self
1183 .config
1184 .transports
1185 .tor
1186 .iter()
1187 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1188 .collect();
1189
1190 for (name, tor_config) in tor_instances {
1191 let transport_id = self.allocate_transport_id();
1192 let tor = TorTransport::new(transport_id, name, tor_config, packet_tx.clone());
1193 transports.push(TransportHandle::Tor(tor));
1194 }
1195
1196 let webrtc_instances: Vec<_> = self
1197 .config
1198 .transports
1199 .webrtc
1200 .iter()
1201 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1202 .collect();
1203
1204 #[cfg(feature = "webrtc-transport")]
1205 {
1206 for (name, webrtc_config) in webrtc_instances {
1207 let transport_id = self.allocate_transport_id();
1208 match WebRtcTransport::new(
1209 transport_id,
1210 name,
1211 webrtc_config,
1212 packet_tx.clone(),
1213 &self.identity,
1214 &self.config.node.discovery.nostr,
1215 ) {
1216 Ok(webrtc) => transports.push(TransportHandle::WebRtc(Box::new(webrtc))),
1217 Err(err) => {
1218 warn!(
1219 transport_id = %transport_id,
1220 error = %err,
1221 "failed to initialize WebRTC transport"
1222 );
1223 }
1224 }
1225 }
1226 }
1227 #[cfg(not(feature = "webrtc-transport"))]
1228 if !webrtc_instances.is_empty() {
1229 warn!("WebRTC transport configured but this build lacks WebRTC transport support");
1230 }
1231
1232 #[cfg(bluer_available)]
1234 {
1235 let ble_instances: Vec<_> = self
1236 .config
1237 .transports
1238 .ble
1239 .iter()
1240 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1241 .collect();
1242
1243 #[cfg(all(bluer_available, not(test)))]
1244 for (name, ble_config) in ble_instances {
1245 let transport_id = self.allocate_transport_id();
1246 let adapter = ble_config.adapter().to_string();
1247 let mtu = ble_config.mtu();
1248 match crate::transport::ble::io::BluerIo::new(&adapter, mtu).await {
1249 Ok(io) => {
1250 let mut ble = crate::transport::ble::BleTransport::new(
1251 transport_id,
1252 name,
1253 ble_config,
1254 io,
1255 packet_tx.clone(),
1256 );
1257 ble.set_local_pubkey(self.identity.pubkey().serialize());
1258 transports.push(TransportHandle::Ble(ble));
1259 }
1260 Err(e) => {
1261 tracing::warn!(adapter = %adapter, error = %e, "failed to initialize BLE adapter");
1262 }
1263 }
1264 }
1265
1266 #[cfg(any(not(bluer_available), test))]
1267 if !ble_instances.is_empty() {
1268 #[cfg(not(test))]
1269 tracing::warn!("BLE transport configured but this build lacks BlueZ support");
1270 }
1271 }
1272
1273 transports
1274 }
1275
1276 fn find_transport_for_type(&self, transport_type: &str) -> Option<TransportId> {
1286 self.transports
1287 .iter()
1288 .filter(|(id, handle)| {
1289 handle.transport_type().name == transport_type
1290 && handle.is_operational()
1291 && !self.bootstrap_transports.contains(id)
1292 })
1293 .min_by_key(|(id, _)| id.as_u32())
1294 .map(|(id, _)| *id)
1295 }
1296
1297 #[allow(unused_variables)]
1303 fn resolve_ethernet_addr(
1304 &self,
1305 addr_str: &str,
1306 ) -> Result<(TransportId, TransportAddr), NodeError> {
1307 #[cfg(any(target_os = "linux", target_os = "macos"))]
1308 {
1309 let (iface, mac_str) = addr_str.split_once('/').ok_or_else(|| {
1310 NodeError::NoTransportForType(format!(
1311 "invalid Ethernet address format '{}': expected 'interface/mac'",
1312 addr_str
1313 ))
1314 })?;
1315
1316 let transport_id = self
1318 .transports
1319 .iter()
1320 .find(|(_, handle)| {
1321 handle.transport_type().name == "ethernet"
1322 && handle.is_operational()
1323 && handle.interface_name() == Some(iface)
1324 })
1325 .map(|(id, _)| *id)
1326 .ok_or_else(|| {
1327 NodeError::NoTransportForType(format!(
1328 "no operational Ethernet transport for interface '{}'",
1329 iface
1330 ))
1331 })?;
1332
1333 let mac = crate::transport::ethernet::parse_mac_string(mac_str).map_err(|e| {
1334 NodeError::NoTransportForType(format!("invalid MAC in '{}': {}", addr_str, e))
1335 })?;
1336
1337 Ok((transport_id, TransportAddr::from_bytes(&mac)))
1338 }
1339 #[cfg(not(any(target_os = "linux", target_os = "macos")))]
1340 {
1341 Err(NodeError::NoTransportForType(
1342 "Ethernet transport is not supported on this platform".to_string(),
1343 ))
1344 }
1345 }
1346
1347 #[cfg(bluer_available)]
1351 fn resolve_ble_addr(&self, addr_str: &str) -> Result<(TransportId, TransportAddr), NodeError> {
1352 let ta = TransportAddr::from_string(addr_str);
1353 let adapter = crate::transport::ble::addr::adapter_from_addr(&ta).ok_or_else(|| {
1354 NodeError::NoTransportForType(format!(
1355 "invalid BLE address format '{}': expected 'adapter/mac'",
1356 addr_str
1357 ))
1358 })?;
1359
1360 let transport_id = self
1362 .transports
1363 .iter()
1364 .find(|(_, handle)| handle.transport_type().name == "ble" && handle.is_operational())
1365 .map(|(id, _)| *id)
1366 .ok_or_else(|| {
1367 NodeError::NoTransportForType(format!(
1368 "no operational BLE transport for adapter '{}'",
1369 adapter
1370 ))
1371 })?;
1372
1373 crate::transport::ble::addr::BleAddr::parse(addr_str).map_err(|e| {
1375 NodeError::NoTransportForType(format!("invalid BLE address '{}': {}", addr_str, e))
1376 })?;
1377
1378 Ok((transport_id, TransportAddr::from_string(addr_str)))
1379 }
1380
1381 pub fn identity(&self) -> &Identity {
1385 &self.identity
1386 }
1387
1388 pub fn node_addr(&self) -> &NodeAddr {
1390 self.identity.node_addr()
1391 }
1392
1393 pub fn npub(&self) -> String {
1395 self.identity.npub()
1396 }
1397
1398 pub(crate) fn peer_display_name(&self, addr: &NodeAddr) -> String {
1407 if let Some(hostname) = self.host_map.lookup_hostname(addr) {
1408 return hostname.to_string();
1409 }
1410 if let Some(name) = self.peer_aliases.get(addr) {
1411 return name.clone();
1412 }
1413 if let Some(peer) = self.peers.get(addr) {
1414 return peer.identity().short_npub();
1415 }
1416 if let Some(entry) = self.sessions.get(addr) {
1417 let (xonly, _) = entry.remote_pubkey().x_only_public_key();
1418 return PeerIdentity::from_pubkey(xonly).short_npub();
1419 }
1420 addr.short_hex()
1421 }
1422
1423 pub(in crate::node) fn deregister_session_index(&mut self, cache_key: (TransportId, u32)) {
1435 let owning_peer = self.peers_by_index.get(&cache_key).copied();
1439 self.peers_by_index.remove(&cache_key);
1440 if self.decrypt_registered_sessions.remove(&cache_key)
1441 && let Some(workers) = self.decrypt_workers.as_ref()
1442 {
1443 workers.unregister_session(cache_key);
1444 }
1445 if let Some(peer_addr) = owning_peer {
1456 let peer_has_other_index = self
1457 .peers_by_index
1458 .values()
1459 .any(|other| *other == peer_addr);
1460 if !peer_has_other_index {
1461 self.clear_connected_udp_for_peer(&peer_addr);
1462 }
1463 }
1464 }
1465
1466 pub(in crate::node) fn ensure_current_session_index_registered(
1475 &mut self,
1476 node_addr: &NodeAddr,
1477 context: &'static str,
1478 ) -> bool {
1479 let Some(peer) = self.peers.get(node_addr) else {
1480 return false;
1481 };
1482 let Some(transport_id) = peer.transport_id() else {
1483 warn!(
1484 peer = %self.peer_display_name(node_addr),
1485 context,
1486 "Cannot register current session index without transport id"
1487 );
1488 return false;
1489 };
1490 let Some(our_index) = peer.our_index() else {
1491 warn!(
1492 peer = %self.peer_display_name(node_addr),
1493 context,
1494 "Cannot register current session index without local index"
1495 );
1496 return false;
1497 };
1498
1499 let cache_key = (transport_id, our_index.as_u32());
1500 match self.peers_by_index.get(&cache_key).copied() {
1501 Some(existing) if existing == *node_addr => true,
1502 Some(existing) => {
1503 warn!(
1504 peer = %self.peer_display_name(node_addr),
1505 previous_owner = %self.peer_display_name(&existing),
1506 transport_id = %transport_id,
1507 our_index = %our_index,
1508 context,
1509 "Repairing current session index with stale owner"
1510 );
1511 self.peers_by_index.insert(cache_key, *node_addr);
1512 true
1513 }
1514 None => {
1515 warn!(
1516 peer = %self.peer_display_name(node_addr),
1517 transport_id = %transport_id,
1518 our_index = %our_index,
1519 context,
1520 "Repairing missing current session index"
1521 );
1522 self.peers_by_index.insert(cache_key, *node_addr);
1523 true
1524 }
1525 }
1526 }
1527
1528 pub fn config(&self) -> &Config {
1532 &self.config
1533 }
1534
1535 pub fn effective_ipv6_mtu(&self) -> u16 {
1541 crate::upper::icmp::effective_ipv6_mtu(self.transport_mtu())
1542 }
1543
1544 pub fn transport_mtu(&self) -> u16 {
1561 let min_operational = self
1562 .transports
1563 .values()
1564 .filter(|h| h.is_operational())
1565 .map(|h| h.mtu())
1566 .min();
1567 if let Some(mtu) = min_operational {
1568 return mtu;
1569 }
1570 if let Some((_, cfg)) = self.config.transports.udp.iter().next() {
1572 return cfg.mtu();
1573 }
1574 1280
1575 }
1576
1577 pub fn state(&self) -> NodeState {
1581 self.state
1582 }
1583
1584 pub fn uptime(&self) -> std::time::Duration {
1586 self.started_at.elapsed()
1587 }
1588
1589 pub fn is_running(&self) -> bool {
1591 self.state.is_operational()
1592 }
1593
1594 pub fn is_leaf_only(&self) -> bool {
1596 self.is_leaf_only
1597 }
1598
1599 pub fn tree_state(&self) -> &TreeState {
1603 &self.tree_state
1604 }
1605
1606 pub fn tree_state_mut(&mut self) -> &mut TreeState {
1608 &mut self.tree_state
1609 }
1610
1611 pub fn bloom_state(&self) -> &BloomState {
1615 &self.bloom_state
1616 }
1617
1618 pub fn bloom_state_mut(&mut self) -> &mut BloomState {
1620 &mut self.bloom_state
1621 }
1622
1623 pub fn estimated_mesh_size(&self) -> Option<u64> {
1627 self.estimated_mesh_size
1628 }
1629
1630 pub(crate) fn compute_mesh_size(&mut self) {
1636 let my_addr = *self.tree_state.my_node_addr();
1637 let parent_id = *self.tree_state.my_declaration().parent_id();
1638 let is_root = self.tree_state.is_root();
1639
1640 let max_fpr = self.config.node.bloom.max_inbound_fpr;
1641 let mut total: f64 = 1.0; let mut child_count: u32 = 0;
1643 let mut has_data = false;
1644
1645 if !is_root
1651 && let Some(parent) = self.peers.get(&parent_id)
1652 && let Some(filter) = parent.inbound_filter()
1653 {
1654 match filter.estimated_count(max_fpr) {
1655 Some(n) => {
1656 total += n;
1657 has_data = true;
1658 }
1659 None => {
1660 self.estimated_mesh_size = None;
1661 return;
1662 }
1663 }
1664 }
1665
1666 for (peer_addr, peer) in &self.peers {
1668 if peer_addr == &parent_id {
1669 continue;
1670 }
1671 if let Some(decl) = self.tree_state.peer_declaration(peer_addr)
1672 && *decl.parent_id() == my_addr
1673 {
1674 child_count += 1;
1675 if let Some(filter) = peer.inbound_filter() {
1676 match filter.estimated_count(max_fpr) {
1677 Some(n) => {
1678 total += n;
1679 has_data = true;
1680 }
1681 None => {
1682 self.estimated_mesh_size = None;
1683 return;
1684 }
1685 }
1686 }
1687 }
1688 }
1689
1690 if !has_data {
1691 self.estimated_mesh_size = None;
1692 return;
1693 }
1694
1695 let size = total.round() as u64;
1696 self.estimated_mesh_size = Some(size);
1697
1698 let now = std::time::Instant::now();
1700 let should_log = match self.last_mesh_size_log {
1701 None => true,
1702 Some(last) => {
1703 now.duration_since(last)
1704 >= std::time::Duration::from_secs(self.config.node.mmp.log_interval_secs)
1705 }
1706 };
1707 if should_log {
1708 tracing::debug!(
1709 estimated_mesh_size = size,
1710 peers = self.peers.len(),
1711 children = child_count,
1712 "Mesh size estimate"
1713 );
1714 self.last_mesh_size_log = Some(now);
1715 }
1716 }
1717
1718 pub fn coord_cache(&self) -> &CoordCache {
1722 &self.coord_cache
1723 }
1724
1725 pub fn coord_cache_mut(&mut self) -> &mut CoordCache {
1727 &mut self.coord_cache
1728 }
1729
1730 pub fn stats(&self) -> &stats::NodeStats {
1734 &self.stats
1735 }
1736
1737 pub(crate) fn stats_mut(&mut self) -> &mut stats::NodeStats {
1739 &mut self.stats
1740 }
1741
1742 pub fn stats_history(&self) -> &stats_history::StatsHistory {
1744 &self.stats_history
1745 }
1746
1747 pub(crate) fn record_stats_history(&mut self) {
1750 let fwd = &self.stats.forwarding;
1751 let peers_with_mmp: Vec<f64> = self
1752 .peers
1753 .values()
1754 .filter_map(|p| p.mmp().map(|m| m.metrics.loss_rate()))
1755 .collect();
1756 let loss_rate = if peers_with_mmp.is_empty() {
1757 0.0
1758 } else {
1759 peers_with_mmp.iter().sum::<f64>() / peers_with_mmp.len() as f64
1760 };
1761
1762 let snap = stats_history::Snapshot {
1763 mesh_size: self.estimated_mesh_size,
1764 tree_depth: self.tree_state.my_coords().depth() as u32,
1765 peer_count: self.peers.len() as u64,
1766 parent_switches_total: self.stats.tree.parent_switches,
1767 bytes_in_total: fwd.received_bytes,
1768 bytes_out_total: fwd.forwarded_bytes + fwd.originated_bytes,
1769 packets_in_total: fwd.received_packets,
1770 packets_out_total: fwd.forwarded_packets + fwd.originated_packets,
1771 loss_rate,
1772 active_sessions: self.sessions.len() as u64,
1773 };
1774
1775 let now = std::time::Instant::now();
1776 let peer_snaps: Vec<stats_history::PeerSnapshot> = self
1777 .peers
1778 .values()
1779 .map(|p| {
1780 let stats = p.link_stats();
1781 let (srtt_ms, loss_rate, ecn_ce) = match p.mmp() {
1782 Some(m) => (
1783 m.metrics.srtt_ms(),
1784 Some(m.metrics.loss_rate()),
1785 m.receiver.ecn_ce_count() as u64,
1786 ),
1787 None => (None, None, 0),
1788 };
1789 stats_history::PeerSnapshot {
1790 node_addr: *p.node_addr(),
1791 last_seen: now,
1792 srtt_ms,
1793 loss_rate,
1794 bytes_in_total: stats.bytes_recv,
1795 bytes_out_total: stats.bytes_sent,
1796 packets_in_total: stats.packets_recv,
1797 packets_out_total: stats.packets_sent,
1798 ecn_ce_total: ecn_ce,
1799 }
1800 })
1801 .collect();
1802
1803 self.stats_history.tick(now, &snap, &peer_snaps);
1804 }
1805
1806 pub fn tun_state(&self) -> TunState {
1810 self.tun_state
1811 }
1812
1813 pub fn tun_name(&self) -> Option<&str> {
1815 self.tun_name.as_deref()
1816 }
1817
1818 pub fn set_max_connections(&mut self, max: usize) {
1822 self.max_connections = max;
1823 }
1824
1825 pub fn set_max_peers(&mut self, max: usize) {
1827 self.max_peers = max;
1828 }
1829
1830 pub(crate) fn outbound_admission_check(&self) -> bool {
1833 let connection_used = self
1834 .connections
1835 .len()
1836 .saturating_add(self.pending_connects.len());
1837 let peer_allowed = self.max_peers == 0 || self.peers.len() < self.max_peers;
1838 let connection_allowed =
1839 self.max_connections == 0 || connection_used < self.max_connections;
1840 let link_allowed = self.max_links == 0 || self.links.len() < self.max_links;
1841 peer_allowed && connection_allowed && link_allowed
1842 }
1843
1844 pub(crate) fn open_discovery_outbound_admission_check(&self) -> bool {
1848 if !self.outbound_admission_check() {
1849 return false;
1850 }
1851
1852 let nostr = &self.config.node.discovery.nostr;
1853 if !nostr.enabled || nostr.policy != NostrDiscoveryPolicy::Open {
1854 return true;
1855 }
1856
1857 let configured_npubs = self
1858 .config
1859 .peers()
1860 .iter()
1861 .map(|peer| peer.npub.clone())
1862 .collect::<HashSet<_>>();
1863 self.open_discovery_enqueue_budget(&configured_npubs) > 0
1864 }
1865
1866 pub(crate) fn outbound_direct_refresh_admission_check(&self) -> bool {
1870 let connection_used = self
1871 .connections
1872 .len()
1873 .saturating_add(self.pending_connects.len());
1874 let connection_allowed =
1875 self.max_connections == 0 || connection_used < self.max_connections;
1876 let link_allowed = self.max_links == 0 || self.links.len() < self.max_links;
1877 connection_allowed && link_allowed
1878 }
1879
1880 pub fn set_max_links(&mut self, max: usize) {
1882 self.max_links = max;
1883 }
1884
1885 pub fn connection_count(&self) -> usize {
1889 self.connections.len()
1890 }
1891
1892 pub fn peer_count(&self) -> usize {
1894 self.peers.len()
1895 }
1896
1897 pub fn link_count(&self) -> usize {
1899 self.links.len()
1900 }
1901
1902 pub fn transport_count(&self) -> usize {
1904 self.transports.len()
1905 }
1906
1907 pub fn allocate_transport_id(&mut self) -> TransportId {
1911 let id = TransportId::new(self.next_transport_id);
1912 self.next_transport_id += 1;
1913 id
1914 }
1915
1916 pub fn get_transport(&self, id: &TransportId) -> Option<&TransportHandle> {
1918 self.transports.get(id)
1919 }
1920
1921 pub fn get_transport_mut(&mut self, id: &TransportId) -> Option<&mut TransportHandle> {
1923 self.transports.get_mut(id)
1924 }
1925
1926 pub fn transport_ids(&self) -> impl Iterator<Item = &TransportId> {
1928 self.transports.keys()
1929 }
1930
1931 pub fn packet_rx(&mut self) -> Option<&mut PacketRx> {
1933 self.packet_rx.as_mut()
1934 }
1935
1936 pub fn allocate_link_id(&mut self) -> LinkId {
1940 let id = LinkId::new(self.next_link_id);
1941 self.next_link_id += 1;
1942 id
1943 }
1944
1945 pub fn add_link(&mut self, link: Link) -> Result<(), NodeError> {
1947 if self.max_links > 0 && self.links.len() >= self.max_links {
1948 return Err(NodeError::MaxLinksExceeded {
1949 max: self.max_links,
1950 });
1951 }
1952 let link_id = link.link_id();
1953 let transport_id = link.transport_id();
1954 let remote_addr = link.remote_addr().clone();
1955
1956 self.links.insert(link_id, link);
1957 self.addr_to_link
1958 .insert((transport_id, remote_addr), link_id);
1959 Ok(())
1960 }
1961
1962 pub fn get_link(&self, link_id: &LinkId) -> Option<&Link> {
1964 self.links.get(link_id)
1965 }
1966
1967 pub fn get_link_mut(&mut self, link_id: &LinkId) -> Option<&mut Link> {
1969 self.links.get_mut(link_id)
1970 }
1971
1972 pub fn find_link_by_addr(
1974 &self,
1975 transport_id: TransportId,
1976 addr: &TransportAddr,
1977 ) -> Option<LinkId> {
1978 self.addr_to_link
1979 .get(&(transport_id, addr.clone()))
1980 .copied()
1981 }
1982
1983 pub fn remove_link(&mut self, link_id: &LinkId) -> Option<Link> {
1989 if let Some(link) = self.links.remove(link_id) {
1990 let key = (link.transport_id(), link.remote_addr().clone());
1992 if self.addr_to_link.get(&key) == Some(link_id) {
1993 self.addr_to_link.remove(&key);
1994 }
1995 Some(link)
1996 } else {
1997 None
1998 }
1999 }
2000
2001 pub(crate) fn cleanup_bootstrap_transport_if_unused(&mut self, transport_id: TransportId) {
2002 if !self.bootstrap_transports.contains(&transport_id) {
2003 return;
2004 }
2005
2006 let transport_in_use = self
2007 .links
2008 .values()
2009 .any(|link| link.transport_id() == transport_id)
2010 || self
2011 .connections
2012 .values()
2013 .any(|conn| conn.transport_id() == Some(transport_id))
2014 || self
2015 .peers
2016 .values()
2017 .any(|peer| peer.transport_id() == Some(transport_id))
2018 || self
2019 .pending_connects
2020 .iter()
2021 .any(|pending| pending.transport_id == transport_id);
2022
2023 if transport_in_use {
2024 return;
2025 }
2026
2027 tracing::debug!(
2028 transport_id = %transport_id,
2029 "bootstrap transport has no remaining references; dropping"
2030 );
2031
2032 self.bootstrap_transports.remove(&transport_id);
2033 self.bootstrap_transport_npubs.remove(&transport_id);
2034 self.transport_drops.remove(&transport_id);
2035 self.transports.remove(&transport_id);
2036 }
2037
2038 pub fn links(&self) -> impl Iterator<Item = &Link> {
2040 self.links.values()
2041 }
2042
2043 pub fn add_connection(&mut self, connection: PeerConnection) -> Result<(), NodeError> {
2047 let link_id = connection.link_id();
2048
2049 if self.connections.contains_key(&link_id) {
2050 return Err(NodeError::ConnectionAlreadyExists(link_id));
2051 }
2052
2053 if self.max_connections > 0 && self.connections.len() >= self.max_connections {
2054 return Err(NodeError::MaxConnectionsExceeded {
2055 max: self.max_connections,
2056 });
2057 }
2058
2059 self.connections.insert(link_id, connection);
2060 Ok(())
2061 }
2062
2063 pub fn get_connection(&self, link_id: &LinkId) -> Option<&PeerConnection> {
2065 self.connections.get(link_id)
2066 }
2067
2068 pub fn get_connection_mut(&mut self, link_id: &LinkId) -> Option<&mut PeerConnection> {
2070 self.connections.get_mut(link_id)
2071 }
2072
2073 pub fn remove_connection(&mut self, link_id: &LinkId) -> Option<PeerConnection> {
2075 self.connections.remove(link_id)
2076 }
2077
2078 pub fn connections(&self) -> impl Iterator<Item = &PeerConnection> {
2080 self.connections.values()
2081 }
2082
2083 pub fn get_peer(&self, node_addr: &NodeAddr) -> Option<&ActivePeer> {
2087 self.peers.get(node_addr)
2088 }
2089
2090 pub fn get_peer_mut(&mut self, node_addr: &NodeAddr) -> Option<&mut ActivePeer> {
2092 self.peers.get_mut(node_addr)
2093 }
2094
2095 pub fn remove_peer(&mut self, node_addr: &NodeAddr) -> Option<ActivePeer> {
2097 self.peers.remove(node_addr)
2098 }
2099
2100 pub fn peers(&self) -> impl Iterator<Item = &ActivePeer> {
2102 self.peers.values()
2103 }
2104
2105 pub fn nostr_discovery_handle(&self) -> Option<&crate::discovery::nostr::NostrDiscovery> {
2109 self.nostr_discovery.as_deref()
2110 }
2111
2112 pub fn peer_ids(&self) -> impl Iterator<Item = &NodeAddr> {
2114 self.peers.keys()
2115 }
2116
2117 pub fn sendable_peers(&self) -> impl Iterator<Item = &ActivePeer> {
2119 self.peers.values().filter(|p| p.can_send())
2120 }
2121
2122 pub fn sendable_peer_count(&self) -> usize {
2124 self.peers.values().filter(|p| p.can_send()).count()
2125 }
2126
2127 pub(crate) fn set_discovery_fallback_transit_allowed(
2128 &mut self,
2129 peer_addr: NodeAddr,
2130 allowed: bool,
2131 ) {
2132 if allowed {
2133 self.discovery_fallback_transit_blocked_peers
2134 .remove(&peer_addr);
2135 } else {
2136 self.discovery_fallback_transit_blocked_peers
2137 .insert(peer_addr);
2138 }
2139 }
2140
2141 pub(crate) fn configured_discovery_fallback_transit(
2142 &self,
2143 peer_addr: &NodeAddr,
2144 ) -> Option<bool> {
2145 self.configured_peer(peer_addr)
2146 .map(|peer| peer.discovery_fallback_transit)
2147 }
2148
2149 pub(crate) fn configured_peer(&self, peer_addr: &NodeAddr) -> Option<&PeerConfig> {
2150 self.config.peers().iter().find(|peer| {
2151 PeerIdentity::from_npub(&peer.npub)
2152 .ok()
2153 .is_some_and(|identity| identity.node_addr() == peer_addr)
2154 })
2155 }
2156
2157 pub(crate) fn discovery_fallback_transit_for_promotion(&self, peer_addr: &NodeAddr) -> bool {
2158 if let Some(retry_state) = self.retry_pending.get(peer_addr) {
2159 return retry_state.peer_config.discovery_fallback_transit;
2160 }
2161
2162 if let Some(allowed) = self.configured_discovery_fallback_transit(peer_addr) {
2163 return allowed;
2164 }
2165
2166 self.config.node.discovery.nostr.policy != crate::config::NostrDiscoveryPolicy::Open
2167 }
2168
2169 #[cfg(test)]
2174 pub(crate) fn disable_discovery_forward_rate_limit(&mut self) {
2175 self.discovery_forward_limiter
2176 .set_interval(std::time::Duration::ZERO);
2177 }
2178
2179 #[cfg(test)]
2180 pub(crate) fn get_session(&self, remote: &NodeAddr) -> Option<&SessionEntry> {
2181 self.sessions.get(remote)
2182 }
2183
2184 #[cfg(test)]
2186 pub(crate) fn get_session_mut(&mut self, remote: &NodeAddr) -> Option<&mut SessionEntry> {
2187 self.sessions.get_mut(remote)
2188 }
2189
2190 #[cfg(test)]
2192 pub(crate) fn remove_session(&mut self, remote: &NodeAddr) -> Option<SessionEntry> {
2193 self.sessions.remove(remote)
2194 }
2195
2196 #[cfg(test)]
2198 pub(crate) fn path_mtu_lookup_get(&self, fips_addr: &crate::FipsAddress) -> Option<u16> {
2199 self.path_mtu_lookup
2200 .read()
2201 .ok()
2202 .and_then(|map| map.get(fips_addr).copied())
2203 }
2204
2205 #[cfg(test)]
2207 pub(crate) fn path_mtu_lookup_insert(&self, fips_addr: crate::FipsAddress, mtu: u16) {
2208 if let Ok(mut map) = self.path_mtu_lookup.write() {
2209 map.insert(fips_addr, mtu);
2210 }
2211 }
2212
2213 pub fn session_count(&self) -> usize {
2215 self.sessions.len()
2216 }
2217
2218 pub(crate) fn session_entries(&self) -> impl Iterator<Item = (&NodeAddr, &SessionEntry)> {
2220 self.sessions.iter()
2221 }
2222
2223 pub(crate) fn register_identity(
2227 &mut self,
2228 node_addr: NodeAddr,
2229 pubkey: secp256k1::PublicKey,
2230 ) -> bool {
2231 let mut prefix = [0u8; 15];
2232 prefix.copy_from_slice(&node_addr.as_bytes()[0..15]);
2233 if let Some(entry) = self.identity_cache.get(&prefix)
2234 && entry.node_addr == node_addr
2235 && entry.pubkey == pubkey
2236 {
2237 return true;
2241 }
2242
2243 let (xonly, _) = pubkey.x_only_public_key();
2244 let derived_node_addr = NodeAddr::from_pubkey(&xonly);
2245 if derived_node_addr != node_addr {
2246 debug!(
2247 claimed_node_addr = %node_addr,
2248 derived_node_addr = %derived_node_addr,
2249 "Rejected identity cache entry with mismatched public key"
2250 );
2251 return false;
2252 }
2253
2254 let now_ms = Self::now_ms();
2255 if let Some(entry) = self.identity_cache.get_mut(&prefix)
2256 && entry.node_addr == node_addr
2257 {
2258 entry.pubkey = pubkey;
2259 entry.last_seen_ms = now_ms;
2260 return true;
2261 }
2262
2263 let npub = encode_npub(&xonly);
2264 self.identity_cache.insert(
2265 prefix,
2266 IdentityCacheEntry::new(node_addr, pubkey, npub, now_ms),
2267 );
2268 let max = self.config.node.cache.identity_size;
2270 if self.identity_cache.len() > max
2271 && let Some(oldest_key) = self
2272 .identity_cache
2273 .iter()
2274 .min_by_key(|(_, entry)| entry.last_seen_ms)
2275 .map(|(k, _)| *k)
2276 {
2277 self.identity_cache.remove(&oldest_key);
2278 }
2279 true
2280 }
2281
2282 pub(crate) fn lookup_by_fips_prefix(
2284 &mut self,
2285 prefix: &[u8; 15],
2286 ) -> Option<(NodeAddr, secp256k1::PublicKey)> {
2287 if let Some(entry) = self.identity_cache.get_mut(prefix) {
2288 entry.last_seen_ms = Self::now_ms(); Some((entry.node_addr, entry.pubkey))
2290 } else {
2291 None
2292 }
2293 }
2294
2295 pub(crate) fn has_cached_identity(&self, addr: &NodeAddr) -> bool {
2297 let mut prefix = [0u8; 15];
2298 prefix.copy_from_slice(&addr.as_bytes()[0..15]);
2299 self.identity_cache.contains_key(&prefix)
2300 }
2301
2302 pub fn identity_cache_len(&self) -> usize {
2304 self.identity_cache.len()
2305 }
2306
2307 pub fn identity_cache_iter(
2312 &self,
2313 ) -> impl Iterator<Item = (&NodeAddr, &secp256k1::PublicKey, u64)> {
2314 self.identity_cache
2315 .values()
2316 .map(|entry| (&entry.node_addr, &entry.pubkey, entry.last_seen_ms))
2317 }
2318
2319 pub fn identity_cache_max(&self) -> usize {
2321 self.config.node.cache.identity_size
2322 }
2323
2324 pub fn pending_lookup_count(&self) -> usize {
2326 self.pending_lookups.len()
2327 }
2328
2329 pub fn pending_lookups_iter(
2331 &self,
2332 ) -> impl Iterator<Item = (&NodeAddr, &handlers::discovery::PendingLookup)> {
2333 self.pending_lookups.iter()
2334 }
2335
2336 pub fn recent_request_count(&self) -> usize {
2338 self.recent_requests.len()
2339 }
2340
2341 pub fn pending_tun_destinations(&self) -> usize {
2343 self.pending_tun_packets.len()
2344 }
2345
2346 pub fn pending_tun_total_packets(&self) -> usize {
2348 self.pending_tun_packets.values().map(|q| q.len()).sum()
2349 }
2350
2351 pub fn retry_state_iter(&self) -> impl Iterator<Item = (&NodeAddr, &retry::RetryState)> {
2353 self.retry_pending.iter()
2354 }
2355
2356 pub(crate) fn is_tree_peer(&self, peer_addr: &NodeAddr) -> bool {
2363 if !self.tree_state.is_root() && self.tree_state.my_declaration().parent_id() == peer_addr {
2365 return true;
2366 }
2367 if let Some(decl) = self.tree_state.peer_declaration(peer_addr)
2369 && decl.parent_id() == self.node_addr()
2370 {
2371 return true;
2372 }
2373 false
2374 }
2375
2376 pub fn find_next_hop(&mut self, dest_node_addr: &NodeAddr) -> Option<&ActivePeer> {
2399 if dest_node_addr == self.node_addr() {
2401 return None;
2402 }
2403
2404 let direct_peer_can_send = self
2408 .peers
2409 .get(dest_node_addr)
2410 .is_some_and(|peer| peer.can_send());
2411 if let Some(peer) = self.peers.get(dest_node_addr)
2412 && peer.is_healthy()
2413 {
2414 return Some(peer);
2415 }
2416
2417 let now_ms = Self::now_ms();
2418
2419 let sendable_learned_peers = if self.config.node.routing.mode == RoutingMode::ReplyLearned {
2420 Some(
2421 self.peers
2422 .iter()
2423 .filter(|(_, peer)| peer.can_send())
2424 .map(|(addr, _)| *addr)
2425 .collect::<HashSet<_>>(),
2426 )
2427 } else {
2428 None
2429 };
2430
2431 let explore_fallback = sendable_learned_peers.as_ref().is_some_and(|sendable| {
2438 self.learned_routes.should_explore_fallback(
2439 dest_node_addr,
2440 now_ms,
2441 self.config.node.routing.learned_fallback_explore_interval,
2442 |addr| sendable.contains(addr),
2443 )
2444 });
2445 if let Some(sendable) = &sendable_learned_peers
2446 && !explore_fallback
2447 && let Some(next_hop_addr) =
2448 self.learned_routes
2449 .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2450 {
2451 return self.peers.get(&next_hop_addr);
2452 }
2453
2454 let Some(dest_coords) = self
2456 .coord_cache
2457 .get_and_touch(dest_node_addr, now_ms)
2458 .cloned()
2459 else {
2460 if let Some(sendable) = &sendable_learned_peers
2461 && let Some(next_hop_addr) =
2462 self.learned_routes
2463 .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2464 {
2465 return self.peers.get(&next_hop_addr);
2466 }
2467 if direct_peer_can_send {
2468 return self.peers.get(dest_node_addr);
2469 }
2470 return None;
2471 };
2472
2473 let coordinate_route_addr = {
2476 let candidates: Vec<&ActivePeer> = self.destination_in_filters(dest_node_addr);
2477 if !candidates.is_empty() {
2478 self.select_best_candidate(&candidates, &dest_coords)
2479 .map(|peer| *peer.node_addr())
2480 } else {
2481 None
2482 }
2483 };
2484 if let Some(next_hop_addr) = coordinate_route_addr {
2485 return self.peers.get(&next_hop_addr);
2486 }
2487
2488 let tree_route_addr = self
2490 .tree_state
2491 .find_next_hop(&dest_coords)
2492 .filter(|next_hop_id| {
2493 self.peers
2494 .get(next_hop_id)
2495 .is_some_and(|peer| peer.can_send())
2496 });
2497 if let Some(next_hop_addr) = tree_route_addr {
2498 return self.peers.get(&next_hop_addr);
2499 }
2500 if explore_fallback {
2501 return sendable_learned_peers.as_ref().and_then(|sendable| {
2502 self.learned_routes
2503 .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2504 .and_then(|next_hop_addr| self.peers.get(&next_hop_addr))
2505 });
2506 }
2507
2508 if let Some(sendable) = &sendable_learned_peers
2509 && let Some(next_hop_addr) =
2510 self.learned_routes
2511 .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2512 {
2513 return self.peers.get(&next_hop_addr);
2514 }
2515
2516 if direct_peer_can_send {
2517 return self.peers.get(dest_node_addr);
2518 }
2519
2520 None
2521 }
2522
2523 pub(in crate::node) fn learn_reverse_route(
2524 &mut self,
2525 destination: NodeAddr,
2526 next_hop: NodeAddr,
2527 ) {
2528 if self.config.node.routing.mode != RoutingMode::ReplyLearned
2529 || destination == *self.node_addr()
2530 {
2531 return;
2532 }
2533 let now_ms = Self::now_ms();
2534 self.learned_routes.learn(
2535 destination,
2536 next_hop,
2537 now_ms,
2538 self.config.node.routing.learned_ttl_secs,
2539 self.config.node.routing.max_learned_routes_per_dest,
2540 );
2541 }
2542
2543 pub(in crate::node) fn record_route_failure(
2544 &mut self,
2545 destination: NodeAddr,
2546 next_hop: NodeAddr,
2547 ) {
2548 if self.config.node.routing.mode != RoutingMode::ReplyLearned {
2549 return;
2550 }
2551 self.learned_routes.record_failure(&destination, &next_hop);
2552 }
2553
2554 pub(crate) fn learned_route_table_snapshot(&self, now_ms: u64) -> LearnedRouteTableSnapshot {
2555 self.learned_routes.snapshot(now_ms)
2556 }
2557
2558 pub(in crate::node) fn purge_learned_routes(&mut self, now_ms: u64) {
2559 self.learned_routes.purge_expired(now_ms);
2560 }
2561
2562 fn select_best_candidate<'a>(
2571 &'a self,
2572 candidates: &[&'a ActivePeer],
2573 dest_coords: &crate::tree::TreeCoordinate,
2574 ) -> Option<&'a ActivePeer> {
2575 let my_distance = self.tree_state.my_coords().distance_to(dest_coords);
2576
2577 let mut best: Option<(&ActivePeer, f64, usize)> = None;
2578
2579 for &candidate in candidates {
2580 if !candidate.can_send() {
2581 continue;
2582 }
2583
2584 let cost = candidate.link_cost();
2585
2586 let dist = self
2587 .tree_state
2588 .peer_coords(candidate.node_addr())
2589 .map(|pc| pc.distance_to(dest_coords))
2590 .unwrap_or(usize::MAX);
2591
2592 if dist >= my_distance {
2595 continue;
2596 }
2597
2598 let dominated = match &best {
2599 None => true,
2600 Some((_, best_cost, best_dist)) => {
2601 cost < *best_cost
2602 || (cost == *best_cost && dist < *best_dist)
2603 || (cost == *best_cost
2604 && dist == *best_dist
2605 && candidate.node_addr() < best.as_ref().unwrap().0.node_addr())
2606 }
2607 };
2608
2609 if dominated {
2610 best = Some((candidate, cost, dist));
2611 }
2612 }
2613
2614 best.map(|(peer, _, _)| peer)
2615 }
2616
2617 pub fn destination_in_filters(&self, dest: &NodeAddr) -> Vec<&ActivePeer> {
2619 self.peers.values().filter(|p| p.may_reach(dest)).collect()
2620 }
2621
2622 pub fn tun_tx(&self) -> Option<&TunTx> {
2626 self.tun_tx.as_ref()
2627 }
2628
2629 pub fn attach_external_packet_io(
2636 &mut self,
2637 capacity: usize,
2638 ) -> Result<ExternalPacketIo, NodeError> {
2639 if self.state != NodeState::Created {
2640 return Err(NodeError::Config(ConfigError::Validation(
2641 "external packet I/O must be attached before node start".to_string(),
2642 )));
2643 }
2644 if self.config.tun.enabled {
2645 return Err(NodeError::Config(ConfigError::Validation(
2646 "external packet I/O requires tun.enabled=false".to_string(),
2647 )));
2648 }
2649
2650 let capacity = capacity.max(1);
2651 let (outbound_tx, outbound_rx) = tokio::sync::mpsc::channel(capacity);
2652 let (inbound_tx, inbound_rx) = tokio::sync::mpsc::channel(capacity);
2653 self.tun_outbound_rx = Some(outbound_rx);
2654 self.external_packet_tx = Some(inbound_tx);
2655
2656 Ok(ExternalPacketIo {
2657 outbound_tx,
2658 inbound_rx,
2659 })
2660 }
2661
2662 pub(crate) fn attach_endpoint_data_io(
2667 &mut self,
2668 capacity: usize,
2669 ) -> Result<EndpointDataIo, NodeError> {
2670 if self.state != NodeState::Created {
2671 return Err(NodeError::Config(ConfigError::Validation(
2672 "endpoint data I/O must be attached before node start".to_string(),
2673 )));
2674 }
2675
2676 let command_capacity = endpoint_data_command_capacity(capacity);
2677 let (command_tx, command_rx) = tokio::sync::mpsc::channel(command_capacity);
2678 let (event_tx, event_rx) = tokio::sync::mpsc::unbounded_channel();
2683 self.endpoint_command_rx = Some(command_rx);
2684 self.endpoint_event_tx = Some(event_tx.clone());
2685
2686 Ok(EndpointDataIo {
2687 command_tx,
2688 event_rx,
2689 event_tx,
2690 })
2691 }
2692
2693 pub(crate) fn pubkey_for_node_addr(&self, addr: &NodeAddr) -> Option<secp256k1::PublicKey> {
2694 let mut prefix = [0u8; 15];
2695 prefix.copy_from_slice(&addr.as_bytes()[0..15]);
2696 self.identity_cache
2697 .get(&prefix)
2698 .filter(|entry| &entry.node_addr == addr)
2699 .map(|entry| entry.pubkey)
2700 }
2701
2702 pub(crate) fn npub_for_node_addr(&self, addr: &NodeAddr) -> Option<String> {
2703 let mut prefix = [0u8; 15];
2704 prefix.copy_from_slice(&addr.as_bytes()[0..15]);
2705 self.identity_cache
2706 .get(&prefix)
2707 .filter(|entry| &entry.node_addr == addr)
2708 .map(|entry| entry.npub.clone())
2709 }
2710
2711 pub(in crate::node) fn deliver_external_ipv6_packet(
2712 &self,
2713 src_addr: &NodeAddr,
2714 packet: Vec<u8>,
2715 ) {
2716 let Some(external_packet_tx) = &self.external_packet_tx else {
2717 return;
2718 };
2719 if packet.len() < 40 {
2720 return;
2721 }
2722 let Ok(destination) = FipsAddress::from_slice(&packet[24..40]) else {
2723 return;
2724 };
2725 let delivered = NodeDeliveredPacket {
2726 source_node_addr: *src_addr,
2727 source_npub: self.npub_for_node_addr(src_addr),
2728 destination,
2729 packet,
2730 };
2731 if let Err(error) = external_packet_tx.try_send(delivered) {
2732 debug!(error = %error, "Failed to deliver packet to external app sink");
2733 }
2734 }
2735
2736 pub(super) async fn send_encrypted_link_message(
2750 &mut self,
2751 node_addr: &NodeAddr,
2752 plaintext: &[u8],
2753 ) -> Result<(), NodeError> {
2754 self.send_encrypted_link_message_with_ce(node_addr, plaintext, false)
2755 .await
2756 }
2757
2758 pub(in crate::node) fn note_local_send_outcome(
2764 &mut self,
2765 result: &Result<usize, TransportError>,
2766 ) {
2767 match result {
2768 Ok(_) => {
2769 if self.last_local_send_failure_at.is_some() {
2770 self.last_local_send_failure_at = None;
2771 }
2772 }
2773 Err(TransportError::Io(e))
2774 if matches!(
2775 e.kind(),
2776 std::io::ErrorKind::NetworkUnreachable
2777 | std::io::ErrorKind::HostUnreachable
2778 | std::io::ErrorKind::AddrNotAvailable
2779 ) =>
2780 {
2781 self.last_local_send_failure_at = Some(std::time::Instant::now());
2782 }
2783 Err(_) => {}
2784 }
2785 }
2786
2787 pub(in crate::node) fn last_local_send_failure_at(&self) -> Option<std::time::Instant> {
2790 self.last_local_send_failure_at
2791 }
2792
2793 pub(super) async fn send_encrypted_link_message_with_ce(
2797 &mut self,
2798 node_addr: &NodeAddr,
2799 plaintext: &[u8],
2800 ce_flag: bool,
2801 ) -> Result<(), NodeError> {
2802 let peer = self
2803 .peers
2804 .get_mut(node_addr)
2805 .ok_or(NodeError::PeerNotFound(*node_addr))?;
2806
2807 let their_index = peer.their_index().ok_or_else(|| NodeError::SendFailed {
2808 node_addr: *node_addr,
2809 reason: "no their_index".into(),
2810 })?;
2811 let transport_id = peer.transport_id().ok_or_else(|| NodeError::SendFailed {
2812 node_addr: *node_addr,
2813 reason: "no transport_id".into(),
2814 })?;
2815 let remote_addr = peer
2816 .current_addr()
2817 .cloned()
2818 .ok_or_else(|| NodeError::SendFailed {
2819 node_addr: *node_addr,
2820 reason: "no current_addr".into(),
2821 })?;
2822 #[cfg(any(target_os = "linux", target_os = "macos"))]
2823 let connected_socket = peer.connected_udp();
2824
2825 let timestamp_ms = peer.session_elapsed_ms();
2827
2828 let sp_flag = peer.mmp().map(|mmp| mmp.spin_bit.tx_bit()).unwrap_or(false);
2830 let mut flags = if sp_flag { FLAG_SP } else { 0 };
2831 if ce_flag {
2832 flags |= FLAG_CE;
2833 }
2834 if peer.current_k_bit() {
2835 flags |= FLAG_KEY_EPOCH;
2836 }
2837
2838 let session = peer
2839 .noise_session_mut()
2840 .ok_or_else(|| NodeError::SendFailed {
2841 node_addr: *node_addr,
2842 reason: "no noise session".into(),
2843 })?;
2844
2845 const INNER_TS_LEN: usize = 4;
2853 let counter = session.current_send_counter();
2854 let inner_len = INNER_TS_LEN + plaintext.len();
2855 let payload_len = inner_len as u16;
2856 let header = build_established_header(their_index, counter, flags, payload_len);
2857
2858 let transport_for_send = self
2877 .transports
2878 .get(&transport_id)
2879 .ok_or(NodeError::TransportNotFound(transport_id))?;
2880 match transport_for_send.connection_state(&remote_addr) {
2881 ConnectionState::Connected => {}
2882 other => {
2883 if matches!(other, ConnectionState::None) {
2884 let _ = transport_for_send.connect(&remote_addr).await;
2885 }
2886 return Err(NodeError::SendFailed {
2887 node_addr: *node_addr,
2888 reason: format!("transport connection not ready: {:?}", other),
2889 });
2890 }
2891 }
2892 let is_udp = matches!(transport_for_send, TransportHandle::Udp(_));
2893 if let Some(workers) = self.encrypt_workers.as_ref().cloned()
2894 && is_udp
2895 && let Some(cipher_clone) = session.send_cipher_clone()
2896 {
2897 {
2898 let reserved_counter =
2902 session
2903 .take_send_counter()
2904 .map_err(|e| NodeError::SendFailed {
2905 node_addr: *node_addr,
2906 reason: format!("counter reservation failed: {}", e),
2907 })?;
2908 debug_assert_eq!(reserved_counter, counter);
2909 let header =
2913 build_established_header(their_index, reserved_counter, flags, payload_len);
2914 let transport = transport_for_send;
2915 let send_target = {
2922 if let TransportHandle::Udp(udp) = transport {
2923 let socket_addr = {
2924 #[cfg(any(target_os = "linux", target_os = "macos"))]
2925 {
2926 match connected_socket.as_ref() {
2927 Some(socket) => Some(socket.peer_addr()),
2928 None => udp.resolve_for_off_task(&remote_addr).await.ok(),
2929 }
2930 }
2931 #[cfg(not(any(target_os = "linux", target_os = "macos")))]
2932 {
2933 udp.resolve_for_off_task(&remote_addr).await.ok()
2934 }
2935 };
2936 match (udp.async_socket(), socket_addr) {
2937 (Some(socket), Some(socket_addr)) => Some((socket, socket_addr)),
2938 _ => None,
2939 }
2940 } else {
2941 None
2942 }
2943 };
2944 if let Some((socket, socket_addr)) = send_target {
2945 let wire_capacity = ESTABLISHED_HEADER_SIZE + inner_len + 16;
2961 let mut wire_buf = Vec::with_capacity(wire_capacity);
2962 wire_buf.extend_from_slice(&header);
2963 wire_buf.extend_from_slice(×tamp_ms.to_le_bytes());
2964 wire_buf.extend_from_slice(plaintext);
2965 let predicted_bytes = wire_capacity;
2966 if let Some(peer) = self.peers.get_mut(node_addr) {
2973 peer.link_stats_mut().record_sent(predicted_bytes);
2974 if let Some(mmp) = peer.mmp_mut() {
2975 mmp.sender
2976 .record_sent(reserved_counter, timestamp_ms, predicted_bytes);
2977 }
2978 }
2979 let scheduling_weight = self.send_weight_for_peer(node_addr);
2980 workers.dispatch(self::encrypt_worker::FmpSendJob {
2981 cipher: cipher_clone,
2982 counter: reserved_counter,
2983 wire_buf,
2984 fsp_seal: None,
2985 socket,
2986 dest_addr: socket_addr,
2987 #[cfg(any(target_os = "linux", target_os = "macos"))]
2988 connected_socket,
2989 drop_on_backpressure: plaintext
2990 .first()
2991 .is_some_and(|ty| *ty == SessionMessageType::EndpointData.to_byte()),
2992 scheduling_weight,
2993 queued_at: crate::perf_profile::stamp(),
2994 });
2995 return Ok(());
2996 }
2997 }
2998 }
2999
3000 let inner_plaintext = prepend_inner_header(timestamp_ms, plaintext);
3005 let ciphertext = {
3007 let _t = crate::perf_profile::Timer::start(crate::perf_profile::Stage::FmpEncrypt);
3008 session
3009 .encrypt_with_aad(&inner_plaintext, &header)
3010 .map_err(|e| NodeError::SendFailed {
3011 node_addr: *node_addr,
3012 reason: format!("encryption failed: {}", e),
3013 })?
3014 };
3015
3016 let wire_packet = build_encrypted(&header, &ciphertext);
3017
3018 let send_result = {
3020 let _t = crate::perf_profile::Timer::start(crate::perf_profile::Stage::UdpSend);
3021 let transport = self
3022 .transports
3023 .get(&transport_id)
3024 .ok_or(NodeError::TransportNotFound(transport_id))?;
3025 transport.send(&remote_addr, &wire_packet).await
3026 };
3027 self.note_local_send_outcome(&send_result);
3028 let bytes_sent = send_result.map_err(|e| match e {
3029 TransportError::MtuExceeded { packet_size, mtu } => NodeError::MtuExceeded {
3030 node_addr: *node_addr,
3031 packet_size,
3032 mtu,
3033 },
3034 other => NodeError::SendFailed {
3035 node_addr: *node_addr,
3036 reason: format!("transport send: {}", other),
3037 },
3038 })?;
3039
3040 if let Some(peer) = self.peers.get_mut(node_addr) {
3042 peer.link_stats_mut().record_sent(bytes_sent);
3043 if let Some(mmp) = peer.mmp_mut() {
3045 mmp.sender.record_sent(counter, timestamp_ms, bytes_sent);
3046 }
3047 }
3048
3049 Ok(())
3050 }
3051}
3052
3053impl fmt::Debug for Node {
3054 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3055 f.debug_struct("Node")
3056 .field("node_addr", self.node_addr())
3057 .field("state", &self.state)
3058 .field("is_leaf_only", &self.is_leaf_only)
3059 .field("connections", &self.connection_count())
3060 .field("peers", &self.peer_count())
3061 .field("links", &self.link_count())
3062 .field("transports", &self.transport_count())
3063 .finish()
3064 }
3065}