1mod acl;
8mod bloom;
9mod decrypt_worker;
10mod discovery_rate_limit;
11mod encrypt_worker;
12mod handlers;
13mod lifecycle;
14mod rate_limit;
15mod retry;
16mod routing;
17mod routing_error_rate_limit;
18pub(crate) mod session;
19pub(crate) mod session_wire;
20pub(crate) mod stats;
21pub(crate) mod stats_history;
22#[cfg(test)]
23mod tests;
24mod tree;
25pub(crate) mod wire;
26
27use self::discovery_rate_limit::{DiscoveryBackoff, DiscoveryForwardRateLimiter};
28use self::rate_limit::HandshakeRateLimiter;
29use self::routing::{LearnedRouteTable, LearnedRouteTableSnapshot};
30use self::routing_error_rate_limit::RoutingErrorRateLimiter;
31use self::wire::{
32 ESTABLISHED_HEADER_SIZE, FLAG_CE, FLAG_KEY_EPOCH, FLAG_SP, build_encrypted,
33 build_established_header, prepend_inner_header,
34};
35use crate::bloom::BloomState;
36use crate::cache::CoordCache;
37use crate::config::RoutingMode;
38use crate::node::session::SessionEntry;
39use crate::peer::{ActivePeer, PeerConnection};
40#[cfg(any(target_os = "linux", target_os = "macos"))]
41use crate::transport::ethernet::EthernetTransport;
42use crate::transport::tcp::TcpTransport;
43use crate::transport::tor::TorTransport;
44use crate::transport::udp::UdpTransport;
45#[cfg(feature = "webrtc-transport")]
46use crate::transport::webrtc::WebRtcTransport;
47use crate::transport::{
48 Link, LinkId, PacketRx, PacketTx, TransportAddr, TransportError, TransportHandle, TransportId,
49};
50use crate::tree::TreeState;
51use crate::upper::hosts::HostMap;
52use crate::upper::icmp_rate_limit::IcmpRateLimiter;
53use crate::upper::tun::{TunError, TunOutboundRx, TunState, TunTx};
54use crate::utils::index::IndexAllocator;
55use crate::{
56 Config, ConfigError, FipsAddress, Identity, IdentityError, NodeAddr, PeerIdentity,
57 SessionMessageType, encode_npub,
58};
59use rand::Rng;
60use std::collections::{HashMap, HashSet, VecDeque};
61use std::fmt;
62use std::sync::Arc;
63use std::thread::JoinHandle;
64use thiserror::Error;
65use tracing::{debug, warn};
66
67pub(crate) const REKEY_JITTER_SECS: i64 = 15;
74
75#[derive(Debug, Error)]
77pub enum NodeError {
78 #[error("node not started")]
79 NotStarted,
80
81 #[error("node already started")]
82 AlreadyStarted,
83
84 #[error("node already stopped")]
85 AlreadyStopped,
86
87 #[error("transport not found: {0}")]
88 TransportNotFound(TransportId),
89
90 #[error("no transport available for type: {0}")]
91 NoTransportForType(String),
92
93 #[error("link not found: {0}")]
94 LinkNotFound(LinkId),
95
96 #[error("connection not found: {0}")]
97 ConnectionNotFound(LinkId),
98
99 #[error("peer not found: {0:?}")]
100 PeerNotFound(NodeAddr),
101
102 #[error("peer already exists: {0:?}")]
103 PeerAlreadyExists(NodeAddr),
104
105 #[error("connection already exists for link: {0}")]
106 ConnectionAlreadyExists(LinkId),
107
108 #[error("invalid peer npub '{npub}': {reason}")]
109 InvalidPeerNpub { npub: String, reason: String },
110
111 #[error("discovery error: {0}")]
112 Discovery(String),
113
114 #[error("access denied: {0}")]
115 AccessDenied(String),
116
117 #[error("max connections exceeded: {max}")]
118 MaxConnectionsExceeded { max: usize },
119
120 #[error("max peers exceeded: {max}")]
121 MaxPeersExceeded { max: usize },
122
123 #[error("max links exceeded: {max}")]
124 MaxLinksExceeded { max: usize },
125
126 #[error("handshake incomplete for link {0}")]
127 HandshakeIncomplete(LinkId),
128
129 #[error("no session available for link {0}")]
130 NoSession(LinkId),
131
132 #[error("promotion failed for link {link_id}: {reason}")]
133 PromotionFailed { link_id: LinkId, reason: String },
134
135 #[error("send failed to {node_addr}: {reason}")]
136 SendFailed { node_addr: NodeAddr, reason: String },
137
138 #[error("mtu exceeded forwarding to {node_addr}: packet {packet_size} > mtu {mtu}")]
139 MtuExceeded {
140 node_addr: NodeAddr,
141 packet_size: usize,
142 mtu: u16,
143 },
144
145 #[error("config error: {0}")]
146 Config(#[from] ConfigError),
147
148 #[error("identity error: {0}")]
149 Identity(#[from] IdentityError),
150
151 #[error("TUN error: {0}")]
152 Tun(#[from] TunError),
153
154 #[error("index allocation failed: {0}")]
155 IndexAllocationFailed(String),
156
157 #[error("handshake failed: {0}")]
158 HandshakeFailed(String),
159
160 #[error("transport error: {0}")]
161 TransportError(String),
162
163 #[error("bootstrap handoff failed: {0}")]
164 BootstrapHandoff(String),
165}
166
167#[derive(Debug, Clone, PartialEq, Eq)]
169pub struct NodeDeliveredPacket {
170 pub source_node_addr: NodeAddr,
172 pub source_npub: Option<String>,
174 pub destination: FipsAddress,
176 pub packet: Vec<u8>,
178}
179
180#[derive(Debug, Clone)]
181struct IdentityCacheEntry {
182 node_addr: NodeAddr,
183 pubkey: secp256k1::PublicKey,
184 npub: String,
185 last_seen_ms: u64,
186}
187
188impl IdentityCacheEntry {
189 fn new(
190 node_addr: NodeAddr,
191 pubkey: secp256k1::PublicKey,
192 npub: String,
193 last_seen_ms: u64,
194 ) -> Self {
195 Self {
196 node_addr,
197 pubkey,
198 npub,
199 last_seen_ms,
200 }
201 }
202}
203
204#[derive(Debug)]
206pub struct ExternalPacketIo {
207 pub outbound_tx: crate::upper::tun::TunOutboundTx,
209 pub inbound_rx: tokio::sync::mpsc::Receiver<NodeDeliveredPacket>,
211}
212
213#[derive(Debug)]
215pub(crate) struct EndpointDataIo {
216 pub(crate) command_tx: tokio::sync::mpsc::Sender<NodeEndpointCommand>,
225 pub(crate) event_rx: tokio::sync::mpsc::UnboundedReceiver<NodeEndpointEvent>,
235 pub(crate) event_tx: tokio::sync::mpsc::UnboundedSender<NodeEndpointEvent>,
241}
242
243fn endpoint_data_command_capacity(requested: usize) -> usize {
244 if let Ok(raw) = std::env::var("FIPS_ENDPOINT_DATA_QUEUE_CAP")
245 && let Ok(value) = raw.trim().parse::<usize>()
246 && value > 0
247 {
248 return value;
249 }
250
251 requested.max(1).max(32_768)
252}
253
254#[derive(Debug)]
256pub(crate) enum NodeEndpointCommand {
257 Send {
261 remote: PeerIdentity,
262 payload: Vec<u8>,
263 queued_at: Option<std::time::Instant>,
264 response_tx: tokio::sync::oneshot::Sender<Result<(), NodeError>>,
265 },
266 SendOneway {
272 remote: PeerIdentity,
273 payload: Vec<u8>,
274 queued_at: Option<std::time::Instant>,
275 },
276 PeerSnapshot {
277 response_tx: tokio::sync::oneshot::Sender<Vec<NodeEndpointPeer>>,
278 },
279 RelaySnapshot {
280 response_tx: tokio::sync::oneshot::Sender<Vec<NodeEndpointRelayStatus>>,
281 },
282 UpdateRelays {
283 advert_relays: Vec<String>,
284 dm_relays: Vec<String>,
285 response_tx: tokio::sync::oneshot::Sender<Result<(), NodeError>>,
286 },
287 UpdatePeers {
293 peers: Vec<crate::config::PeerConfig>,
294 response_tx: tokio::sync::oneshot::Sender<Result<UpdatePeersOutcome, NodeError>>,
295 },
296}
297
298#[derive(Debug, Clone, Default, PartialEq, Eq)]
300pub(crate) struct UpdatePeersOutcome {
301 pub(crate) added: usize,
302 pub(crate) removed: usize,
303 pub(crate) updated: usize,
304 pub(crate) unchanged: usize,
305}
306
307#[derive(Debug)]
309pub(crate) enum NodeEndpointEvent {
310 Data {
311 source_node_addr: NodeAddr,
312 source_npub: Option<String>,
313 payload: Vec<u8>,
314 queued_at: Option<std::time::Instant>,
315 },
316}
317
318#[derive(Debug, Clone, PartialEq, Eq)]
320pub(crate) struct NodeEndpointPeer {
321 pub(crate) npub: String,
322 pub(crate) transport_addr: Option<String>,
323 pub(crate) transport_type: Option<String>,
324 pub(crate) link_id: u64,
325 pub(crate) srtt_ms: Option<u64>,
326 pub(crate) packets_sent: u64,
327 pub(crate) packets_recv: u64,
328 pub(crate) bytes_sent: u64,
329 pub(crate) bytes_recv: u64,
330}
331
332#[derive(Debug, Clone, PartialEq, Eq)]
334pub(crate) struct NodeEndpointRelayStatus {
335 pub(crate) url: String,
336 pub(crate) status: String,
337}
338
339#[derive(Clone, Copy, Debug, PartialEq, Eq)]
341pub enum NodeState {
342 Created,
344 Starting,
346 Running,
348 Stopping,
350 Stopped,
352}
353
354impl NodeState {
355 pub fn is_operational(&self) -> bool {
357 matches!(self, NodeState::Running)
358 }
359
360 pub fn can_start(&self) -> bool {
362 matches!(self, NodeState::Created | NodeState::Stopped)
363 }
364
365 pub fn can_stop(&self) -> bool {
367 matches!(self, NodeState::Running)
368 }
369}
370
371impl fmt::Display for NodeState {
372 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
373 let s = match self {
374 NodeState::Created => "created",
375 NodeState::Starting => "starting",
376 NodeState::Running => "running",
377 NodeState::Stopping => "stopping",
378 NodeState::Stopped => "stopped",
379 };
380 write!(f, "{}", s)
381 }
382}
383
384#[derive(Clone, Debug)]
391pub(crate) struct RecentRequest {
392 pub(crate) from_peer: NodeAddr,
394 pub(crate) timestamp_ms: u64,
396 pub(crate) response_forwarded: bool,
400}
401
402impl RecentRequest {
403 pub(crate) fn new(from_peer: NodeAddr, timestamp_ms: u64) -> Self {
404 Self {
405 from_peer,
406 timestamp_ms,
407 response_forwarded: false,
408 }
409 }
410
411 pub(crate) fn is_expired(&self, current_time_ms: u64, expiry_ms: u64) -> bool {
413 current_time_ms.saturating_sub(self.timestamp_ms) > expiry_ms
414 }
415}
416
417type AddrKey = (TransportId, TransportAddr);
419
420#[derive(Debug, Default)]
425struct TransportDropState {
426 prev_drops: u64,
428 dropping: bool,
430}
431
432struct PendingConnect {
438 link_id: LinkId,
440 transport_id: TransportId,
442 remote_addr: TransportAddr,
444 peer_identity: PeerIdentity,
446}
447
448pub struct Node {
462 identity: Identity,
465
466 startup_epoch: [u8; 8],
469
470 started_at: std::time::Instant,
472
473 config: Config,
476
477 state: NodeState,
480
481 is_leaf_only: bool,
483
484 tree_state: TreeState,
487
488 bloom_state: BloomState,
491
492 coord_cache: CoordCache,
495 learned_routes: LearnedRouteTable,
497 recent_requests: HashMap<u64, RecentRequest>,
500 path_mtu_lookup: Arc<std::sync::RwLock<HashMap<crate::FipsAddress, u16>>>,
506
507 transports: HashMap<TransportId, TransportHandle>,
510 transport_drops: HashMap<TransportId, TransportDropState>,
512 links: HashMap<LinkId, Link>,
514 addr_to_link: HashMap<AddrKey, LinkId>,
516
517 packet_tx: Option<PacketTx>,
520 packet_rx: Option<PacketRx>,
522
523 connections: HashMap<LinkId, PeerConnection>,
527
528 peers: HashMap<NodeAddr, ActivePeer>,
532
533 sessions: HashMap<NodeAddr, SessionEntry>,
537
538 identity_cache: HashMap<[u8; 15], IdentityCacheEntry>,
542
543 pending_tun_packets: HashMap<NodeAddr, VecDeque<Vec<u8>>>,
547 pending_endpoint_data: HashMap<NodeAddr, VecDeque<Vec<u8>>>,
549 pending_lookups: HashMap<NodeAddr, handlers::discovery::PendingLookup>,
553
554 max_connections: usize,
557 max_peers: usize,
559 max_links: usize,
561
562 next_link_id: u64,
565 next_transport_id: u32,
567
568 stats: stats::NodeStats,
571
572 stats_history: stats_history::StatsHistory,
574
575 tun_state: TunState,
578 tun_name: Option<String>,
580 tun_tx: Option<TunTx>,
582 tun_outbound_rx: Option<TunOutboundRx>,
584 external_packet_tx: Option<tokio::sync::mpsc::Sender<NodeDeliveredPacket>>,
586 endpoint_command_rx: Option<tokio::sync::mpsc::Receiver<NodeEndpointCommand>>,
588 endpoint_event_tx: Option<tokio::sync::mpsc::UnboundedSender<NodeEndpointEvent>>,
590 encrypt_workers: Option<encrypt_worker::EncryptWorkerPool>,
596 decrypt_workers: Option<decrypt_worker::DecryptWorkerPool>,
599 decrypt_registered_sessions: std::collections::HashSet<(TransportId, u32)>,
608 decrypt_fallback_rx:
612 Option<tokio::sync::mpsc::UnboundedReceiver<decrypt_worker::DecryptWorkerEvent>>,
613 decrypt_fallback_tx: tokio::sync::mpsc::UnboundedSender<decrypt_worker::DecryptWorkerEvent>,
614 tun_reader_handle: Option<JoinHandle<()>>,
616 tun_writer_handle: Option<JoinHandle<()>>,
618 #[cfg(target_os = "macos")]
621 tun_shutdown_fd: Option<std::os::unix::io::RawFd>,
622
623 dns_identity_rx: Option<crate::upper::dns::DnsIdentityRx>,
626 dns_task: Option<tokio::task::JoinHandle<()>>,
628
629 index_allocator: IndexAllocator,
632 peers_by_index: HashMap<(TransportId, u32), NodeAddr>,
635 pending_outbound: HashMap<(TransportId, u32), LinkId>,
638
639 msg1_rate_limiter: HandshakeRateLimiter,
642 icmp_rate_limiter: IcmpRateLimiter,
644 routing_error_rate_limiter: RoutingErrorRateLimiter,
646 coords_response_rate_limiter: RoutingErrorRateLimiter,
648 discovery_backoff: DiscoveryBackoff,
650 discovery_forward_limiter: DiscoveryForwardRateLimiter,
652
653 pending_connects: Vec<PendingConnect>,
659
660 retry_pending: HashMap<NodeAddr, retry::RetryState>,
666
667 nostr_discovery: Option<Arc<crate::discovery::nostr::NostrDiscovery>>,
669 lan_discovery: Option<Arc<crate::discovery::lan::LanDiscovery>>,
674 nostr_discovery_started_at_ms: Option<u64>,
679 startup_open_discovery_sweep_done: bool,
683 bootstrap_transports: HashSet<TransportId>,
685 bootstrap_transport_npubs: HashMap<TransportId, String>,
692 discovery_fallback_transit_blocked_peers: HashSet<NodeAddr>,
695
696 last_parent_reeval: Option<crate::time::Instant>,
699
700 last_congestion_log: Option<std::time::Instant>,
703
704 estimated_mesh_size: Option<u64>,
707 last_mesh_size_log: Option<std::time::Instant>,
709
710 last_self_warn: Option<std::time::Instant>,
716
717 last_local_send_failure_at: Option<std::time::Instant>,
725
726 peer_aliases: HashMap<NodeAddr, String>,
730
731 peer_acl: acl::PeerAclReloader,
733
734 host_map: Arc<HostMap>,
738}
739
740impl Node {
741 pub fn new(config: Config) -> Result<Self, NodeError> {
743 config.validate()?;
744 let identity = config.create_identity()?;
745 let node_addr = *identity.node_addr();
746 let is_leaf_only = config.is_leaf_only();
747
748 let (decrypt_fallback_tx, decrypt_fallback_rx) = tokio::sync::mpsc::unbounded_channel();
749 let decrypt_fallback_rx = Some(decrypt_fallback_rx);
750
751 let mut startup_epoch = [0u8; 8];
752 rand::rng().fill_bytes(&mut startup_epoch);
753
754 let mut bloom_state = if is_leaf_only {
755 BloomState::leaf_only(node_addr)
756 } else {
757 BloomState::new(node_addr)
758 };
759 bloom_state.set_update_debounce_ms(config.node.bloom.update_debounce_ms);
760
761 let tun_state = if config.tun.enabled {
762 TunState::Configured
763 } else {
764 TunState::Disabled
765 };
766
767 let mut tree_state = TreeState::new(node_addr);
769 tree_state.set_parent_hysteresis(config.node.tree.parent_hysteresis);
770 tree_state.set_hold_down(config.node.tree.hold_down_secs);
771 tree_state.set_flap_dampening(
772 config.node.tree.flap_threshold,
773 config.node.tree.flap_window_secs,
774 config.node.tree.flap_dampening_secs,
775 );
776 tree_state
777 .sign_declaration(&identity)
778 .expect("signing own declaration should never fail");
779
780 let coord_cache = CoordCache::new(
781 config.node.cache.coord_size,
782 config.node.cache.coord_ttl_secs * 1000,
783 );
784 let rl = &config.node.rate_limit;
785 let msg1_rate_limiter = HandshakeRateLimiter::with_params(
786 rate_limit::TokenBucket::with_params(rl.handshake_burst, rl.handshake_rate),
787 config.node.limits.max_pending_inbound,
788 );
789
790 let max_connections = config.node.limits.max_connections;
791 let max_peers = config.node.limits.max_peers;
792 let max_links = config.node.limits.max_links;
793 let coords_response_interval_ms = config.node.session.coords_response_interval_ms;
794 let backoff_base_secs = config.node.discovery.backoff_base_secs;
795 let backoff_max_secs = config.node.discovery.backoff_max_secs;
796 let forward_min_interval_secs = config.node.discovery.forward_min_interval_secs;
797
798 let (host_map, peer_acl) = Self::host_map_and_peer_acl(&config);
799
800 Ok(Self {
801 identity,
802 startup_epoch,
803 started_at: std::time::Instant::now(),
804 config,
805 state: NodeState::Created,
806 is_leaf_only,
807 tree_state,
808 bloom_state,
809 coord_cache,
810 learned_routes: LearnedRouteTable::default(),
811 recent_requests: HashMap::new(),
812 transports: HashMap::new(),
813 transport_drops: HashMap::new(),
814 links: HashMap::new(),
815 addr_to_link: HashMap::new(),
816 packet_tx: None,
817 packet_rx: None,
818 connections: HashMap::new(),
819 peers: HashMap::new(),
820 sessions: HashMap::new(),
821 identity_cache: HashMap::new(),
822 pending_tun_packets: HashMap::new(),
823 pending_endpoint_data: HashMap::new(),
824 pending_lookups: HashMap::new(),
825 max_connections,
826 max_peers,
827 max_links,
828 next_link_id: 1,
829 next_transport_id: 1,
830 stats: stats::NodeStats::new(),
831 stats_history: stats_history::StatsHistory::new(),
832 tun_state,
833 tun_name: None,
834 tun_tx: None,
835 tun_outbound_rx: None,
836 external_packet_tx: None,
837 endpoint_command_rx: None,
838 endpoint_event_tx: None,
839 encrypt_workers: None,
840 decrypt_workers: None,
841 decrypt_registered_sessions: std::collections::HashSet::new(),
842 decrypt_fallback_tx,
843 decrypt_fallback_rx,
844 tun_reader_handle: None,
845 tun_writer_handle: None,
846 #[cfg(target_os = "macos")]
847 tun_shutdown_fd: None,
848 dns_identity_rx: None,
849 dns_task: None,
850 index_allocator: IndexAllocator::new(),
851 peers_by_index: HashMap::new(),
852 pending_outbound: HashMap::new(),
853 msg1_rate_limiter,
854 icmp_rate_limiter: IcmpRateLimiter::new(),
855 routing_error_rate_limiter: RoutingErrorRateLimiter::new(),
856 coords_response_rate_limiter: RoutingErrorRateLimiter::with_interval(
857 std::time::Duration::from_millis(coords_response_interval_ms),
858 ),
859 discovery_backoff: DiscoveryBackoff::with_params(backoff_base_secs, backoff_max_secs),
860 discovery_forward_limiter: DiscoveryForwardRateLimiter::with_interval(
861 std::time::Duration::from_secs(forward_min_interval_secs),
862 ),
863 pending_connects: Vec::new(),
864 retry_pending: HashMap::new(),
865 nostr_discovery: None,
866 nostr_discovery_started_at_ms: None,
867 lan_discovery: None,
868 startup_open_discovery_sweep_done: false,
869 bootstrap_transports: HashSet::new(),
870 bootstrap_transport_npubs: HashMap::new(),
871 discovery_fallback_transit_blocked_peers: HashSet::new(),
872 last_parent_reeval: None,
873 last_congestion_log: None,
874 estimated_mesh_size: None,
875 last_mesh_size_log: None,
876 last_self_warn: None,
877 last_local_send_failure_at: None,
878 peer_aliases: HashMap::new(),
879 peer_acl,
880 host_map,
881 path_mtu_lookup: Arc::new(std::sync::RwLock::new(HashMap::new())),
882 })
883 }
884
885 pub fn with_identity(identity: Identity, config: Config) -> Result<Self, NodeError> {
890 config.validate()?;
891 let node_addr = *identity.node_addr();
892
893 let (decrypt_fallback_tx, decrypt_fallback_rx) = tokio::sync::mpsc::unbounded_channel();
894 let decrypt_fallback_rx = Some(decrypt_fallback_rx);
895
896 let mut startup_epoch = [0u8; 8];
897 rand::rng().fill_bytes(&mut startup_epoch);
898
899 let tun_state = if config.tun.enabled {
900 TunState::Configured
901 } else {
902 TunState::Disabled
903 };
904
905 let mut tree_state = TreeState::new(node_addr);
907 tree_state.set_parent_hysteresis(config.node.tree.parent_hysteresis);
908 tree_state.set_hold_down(config.node.tree.hold_down_secs);
909 tree_state.set_flap_dampening(
910 config.node.tree.flap_threshold,
911 config.node.tree.flap_window_secs,
912 config.node.tree.flap_dampening_secs,
913 );
914 tree_state
915 .sign_declaration(&identity)
916 .expect("signing own declaration should never fail");
917
918 let mut bloom_state = BloomState::new(node_addr);
919 bloom_state.set_update_debounce_ms(config.node.bloom.update_debounce_ms);
920
921 let coord_cache = CoordCache::new(
922 config.node.cache.coord_size,
923 config.node.cache.coord_ttl_secs * 1000,
924 );
925 let rl = &config.node.rate_limit;
926 let msg1_rate_limiter = HandshakeRateLimiter::with_params(
927 rate_limit::TokenBucket::with_params(rl.handshake_burst, rl.handshake_rate),
928 config.node.limits.max_pending_inbound,
929 );
930
931 let max_connections = config.node.limits.max_connections;
932 let max_peers = config.node.limits.max_peers;
933 let max_links = config.node.limits.max_links;
934 let coords_response_interval_ms = config.node.session.coords_response_interval_ms;
935
936 let (host_map, peer_acl) = Self::host_map_and_peer_acl(&config);
937
938 Ok(Self {
939 identity,
940 startup_epoch,
941 started_at: std::time::Instant::now(),
942 config,
943 state: NodeState::Created,
944 is_leaf_only: false,
945 tree_state,
946 bloom_state,
947 coord_cache,
948 learned_routes: LearnedRouteTable::default(),
949 recent_requests: HashMap::new(),
950 transports: HashMap::new(),
951 transport_drops: HashMap::new(),
952 links: HashMap::new(),
953 addr_to_link: HashMap::new(),
954 packet_tx: None,
955 packet_rx: None,
956 connections: HashMap::new(),
957 peers: HashMap::new(),
958 sessions: HashMap::new(),
959 identity_cache: HashMap::new(),
960 pending_tun_packets: HashMap::new(),
961 pending_endpoint_data: HashMap::new(),
962 pending_lookups: HashMap::new(),
963 max_connections,
964 max_peers,
965 max_links,
966 next_link_id: 1,
967 next_transport_id: 1,
968 stats: stats::NodeStats::new(),
969 stats_history: stats_history::StatsHistory::new(),
970 tun_state,
971 tun_name: None,
972 tun_tx: None,
973 tun_outbound_rx: None,
974 external_packet_tx: None,
975 endpoint_command_rx: None,
976 endpoint_event_tx: None,
977 encrypt_workers: None,
978 decrypt_workers: None,
979 decrypt_registered_sessions: std::collections::HashSet::new(),
980 decrypt_fallback_tx,
981 decrypt_fallback_rx,
982 tun_reader_handle: None,
983 tun_writer_handle: None,
984 #[cfg(target_os = "macos")]
985 tun_shutdown_fd: None,
986 dns_identity_rx: None,
987 dns_task: None,
988 index_allocator: IndexAllocator::new(),
989 peers_by_index: HashMap::new(),
990 pending_outbound: HashMap::new(),
991 msg1_rate_limiter,
992 icmp_rate_limiter: IcmpRateLimiter::new(),
993 routing_error_rate_limiter: RoutingErrorRateLimiter::new(),
994 coords_response_rate_limiter: RoutingErrorRateLimiter::with_interval(
995 std::time::Duration::from_millis(coords_response_interval_ms),
996 ),
997 discovery_backoff: DiscoveryBackoff::new(),
998 discovery_forward_limiter: DiscoveryForwardRateLimiter::new(),
999 pending_connects: Vec::new(),
1000 retry_pending: HashMap::new(),
1001 nostr_discovery: None,
1002 nostr_discovery_started_at_ms: None,
1003 lan_discovery: None,
1004 startup_open_discovery_sweep_done: false,
1005 bootstrap_transports: HashSet::new(),
1006 bootstrap_transport_npubs: HashMap::new(),
1007 discovery_fallback_transit_blocked_peers: HashSet::new(),
1008 last_parent_reeval: None,
1009 last_congestion_log: None,
1010 estimated_mesh_size: None,
1011 last_mesh_size_log: None,
1012 last_self_warn: None,
1013 last_local_send_failure_at: None,
1014 peer_aliases: HashMap::new(),
1015 peer_acl,
1016 host_map,
1017 path_mtu_lookup: Arc::new(std::sync::RwLock::new(HashMap::new())),
1018 })
1019 }
1020
1021 pub fn leaf_only(config: Config) -> Result<Self, NodeError> {
1023 let mut node = Self::new(config)?;
1024 node.is_leaf_only = true;
1025 node.bloom_state = BloomState::leaf_only(*node.identity.node_addr());
1026 Ok(node)
1027 }
1028
1029 fn host_map_and_peer_acl(config: &Config) -> (Arc<HostMap>, acl::PeerAclReloader) {
1030 let base_host_map = HostMap::from_peer_configs(config.peers());
1031 if !config.node.system_files_enabled {
1032 return (
1033 Arc::new(base_host_map.clone()),
1034 acl::PeerAclReloader::memory_only(base_host_map),
1035 );
1036 }
1037
1038 let mut host_map = base_host_map.clone();
1039 let hosts_path = std::path::PathBuf::from(crate::upper::hosts::DEFAULT_HOSTS_PATH);
1040 let hosts_file = HostMap::load_hosts_file(std::path::Path::new(
1041 crate::upper::hosts::DEFAULT_HOSTS_PATH,
1042 ));
1043 host_map.merge(hosts_file);
1044 let peer_acl = acl::PeerAclReloader::with_alias_sources(
1045 std::path::PathBuf::from(acl::DEFAULT_PEERS_ALLOW_PATH),
1046 std::path::PathBuf::from(acl::DEFAULT_PEERS_DENY_PATH),
1047 base_host_map,
1048 hosts_path,
1049 );
1050 (Arc::new(host_map), peer_acl)
1051 }
1052
1053 async fn create_transports(&mut self, packet_tx: &PacketTx) -> Vec<TransportHandle> {
1057 let mut transports = Vec::new();
1058
1059 let udp_instances: Vec<_> = self
1061 .config
1062 .transports
1063 .udp
1064 .iter()
1065 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1066 .collect();
1067
1068 for (name, udp_config) in udp_instances {
1070 let transport_id = self.allocate_transport_id();
1071 let udp = UdpTransport::new(transport_id, name, udp_config, packet_tx.clone());
1072 transports.push(TransportHandle::Udp(udp));
1073 }
1074
1075 #[cfg(feature = "sim-transport")]
1076 {
1077 let sim_instances: Vec<_> = self
1078 .config
1079 .transports
1080 .sim
1081 .iter()
1082 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1083 .collect();
1084
1085 for (name, sim_config) in sim_instances {
1086 let transport_id = self.allocate_transport_id();
1087 let sim = crate::transport::sim::SimTransport::new(
1088 transport_id,
1089 name,
1090 sim_config,
1091 packet_tx.clone(),
1092 );
1093 transports.push(TransportHandle::Sim(sim));
1094 }
1095 }
1096
1097 #[cfg(any(target_os = "linux", target_os = "macos"))]
1099 {
1100 let eth_instances: Vec<_> = self
1101 .config
1102 .transports
1103 .ethernet
1104 .iter()
1105 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1106 .collect();
1107 let xonly = self.identity.pubkey();
1108 for (name, eth_config) in eth_instances {
1109 let mut eth_config = eth_config;
1110 if eth_config.discovery_scope.is_none() {
1111 eth_config.discovery_scope = self.lan_discovery_scope();
1112 }
1113 let transport_id = self.allocate_transport_id();
1114 let mut eth =
1115 EthernetTransport::new(transport_id, name, eth_config, packet_tx.clone());
1116 eth.set_local_pubkey(xonly);
1117 transports.push(TransportHandle::Ethernet(eth));
1118 }
1119 }
1120
1121 let tcp_instances: Vec<_> = self
1123 .config
1124 .transports
1125 .tcp
1126 .iter()
1127 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1128 .collect();
1129
1130 for (name, tcp_config) in tcp_instances {
1131 let transport_id = self.allocate_transport_id();
1132 let tcp = TcpTransport::new(transport_id, name, tcp_config, packet_tx.clone());
1133 transports.push(TransportHandle::Tcp(tcp));
1134 }
1135
1136 let tor_instances: Vec<_> = self
1138 .config
1139 .transports
1140 .tor
1141 .iter()
1142 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1143 .collect();
1144
1145 for (name, tor_config) in tor_instances {
1146 let transport_id = self.allocate_transport_id();
1147 let tor = TorTransport::new(transport_id, name, tor_config, packet_tx.clone());
1148 transports.push(TransportHandle::Tor(tor));
1149 }
1150
1151 let webrtc_instances: Vec<_> = self
1152 .config
1153 .transports
1154 .webrtc
1155 .iter()
1156 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1157 .collect();
1158
1159 #[cfg(feature = "webrtc-transport")]
1160 {
1161 for (name, webrtc_config) in webrtc_instances {
1162 let transport_id = self.allocate_transport_id();
1163 match WebRtcTransport::new(
1164 transport_id,
1165 name,
1166 webrtc_config,
1167 packet_tx.clone(),
1168 &self.identity,
1169 &self.config.node.discovery.nostr,
1170 ) {
1171 Ok(webrtc) => transports.push(TransportHandle::WebRtc(Box::new(webrtc))),
1172 Err(err) => {
1173 warn!(
1174 transport_id = %transport_id,
1175 error = %err,
1176 "failed to initialize WebRTC transport"
1177 );
1178 }
1179 }
1180 }
1181 }
1182 #[cfg(not(feature = "webrtc-transport"))]
1183 if !webrtc_instances.is_empty() {
1184 warn!("WebRTC transport configured but this build lacks WebRTC transport support");
1185 }
1186
1187 #[cfg(bluer_available)]
1189 {
1190 let ble_instances: Vec<_> = self
1191 .config
1192 .transports
1193 .ble
1194 .iter()
1195 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1196 .collect();
1197
1198 #[cfg(all(bluer_available, not(test)))]
1199 for (name, ble_config) in ble_instances {
1200 let transport_id = self.allocate_transport_id();
1201 let adapter = ble_config.adapter().to_string();
1202 let mtu = ble_config.mtu();
1203 match crate::transport::ble::io::BluerIo::new(&adapter, mtu).await {
1204 Ok(io) => {
1205 let mut ble = crate::transport::ble::BleTransport::new(
1206 transport_id,
1207 name,
1208 ble_config,
1209 io,
1210 packet_tx.clone(),
1211 );
1212 ble.set_local_pubkey(self.identity.pubkey().serialize());
1213 transports.push(TransportHandle::Ble(ble));
1214 }
1215 Err(e) => {
1216 tracing::warn!(adapter = %adapter, error = %e, "failed to initialize BLE adapter");
1217 }
1218 }
1219 }
1220
1221 #[cfg(any(not(bluer_available), test))]
1222 if !ble_instances.is_empty() {
1223 #[cfg(not(test))]
1224 tracing::warn!("BLE transport configured but this build lacks BlueZ support");
1225 }
1226 }
1227
1228 transports
1229 }
1230
1231 fn find_transport_for_type(&self, transport_type: &str) -> Option<TransportId> {
1241 self.transports
1242 .iter()
1243 .filter(|(id, handle)| {
1244 handle.transport_type().name == transport_type
1245 && handle.is_operational()
1246 && !self.bootstrap_transports.contains(id)
1247 })
1248 .min_by_key(|(id, _)| id.as_u32())
1249 .map(|(id, _)| *id)
1250 }
1251
1252 #[allow(unused_variables)]
1258 fn resolve_ethernet_addr(
1259 &self,
1260 addr_str: &str,
1261 ) -> Result<(TransportId, TransportAddr), NodeError> {
1262 #[cfg(any(target_os = "linux", target_os = "macos"))]
1263 {
1264 let (iface, mac_str) = addr_str.split_once('/').ok_or_else(|| {
1265 NodeError::NoTransportForType(format!(
1266 "invalid Ethernet address format '{}': expected 'interface/mac'",
1267 addr_str
1268 ))
1269 })?;
1270
1271 let transport_id = self
1273 .transports
1274 .iter()
1275 .find(|(_, handle)| {
1276 handle.transport_type().name == "ethernet"
1277 && handle.is_operational()
1278 && handle.interface_name() == Some(iface)
1279 })
1280 .map(|(id, _)| *id)
1281 .ok_or_else(|| {
1282 NodeError::NoTransportForType(format!(
1283 "no operational Ethernet transport for interface '{}'",
1284 iface
1285 ))
1286 })?;
1287
1288 let mac = crate::transport::ethernet::parse_mac_string(mac_str).map_err(|e| {
1289 NodeError::NoTransportForType(format!("invalid MAC in '{}': {}", addr_str, e))
1290 })?;
1291
1292 Ok((transport_id, TransportAddr::from_bytes(&mac)))
1293 }
1294 #[cfg(not(any(target_os = "linux", target_os = "macos")))]
1295 {
1296 Err(NodeError::NoTransportForType(
1297 "Ethernet transport is not supported on this platform".to_string(),
1298 ))
1299 }
1300 }
1301
1302 #[cfg(bluer_available)]
1306 fn resolve_ble_addr(&self, addr_str: &str) -> Result<(TransportId, TransportAddr), NodeError> {
1307 let ta = TransportAddr::from_string(addr_str);
1308 let adapter = crate::transport::ble::addr::adapter_from_addr(&ta).ok_or_else(|| {
1309 NodeError::NoTransportForType(format!(
1310 "invalid BLE address format '{}': expected 'adapter/mac'",
1311 addr_str
1312 ))
1313 })?;
1314
1315 let transport_id = self
1317 .transports
1318 .iter()
1319 .find(|(_, handle)| handle.transport_type().name == "ble" && handle.is_operational())
1320 .map(|(id, _)| *id)
1321 .ok_or_else(|| {
1322 NodeError::NoTransportForType(format!(
1323 "no operational BLE transport for adapter '{}'",
1324 adapter
1325 ))
1326 })?;
1327
1328 crate::transport::ble::addr::BleAddr::parse(addr_str).map_err(|e| {
1330 NodeError::NoTransportForType(format!("invalid BLE address '{}': {}", addr_str, e))
1331 })?;
1332
1333 Ok((transport_id, TransportAddr::from_string(addr_str)))
1334 }
1335
1336 pub fn identity(&self) -> &Identity {
1340 &self.identity
1341 }
1342
1343 pub fn node_addr(&self) -> &NodeAddr {
1345 self.identity.node_addr()
1346 }
1347
1348 pub fn npub(&self) -> String {
1350 self.identity.npub()
1351 }
1352
1353 pub(crate) fn peer_display_name(&self, addr: &NodeAddr) -> String {
1362 if let Some(hostname) = self.host_map.lookup_hostname(addr) {
1363 return hostname.to_string();
1364 }
1365 if let Some(name) = self.peer_aliases.get(addr) {
1366 return name.clone();
1367 }
1368 if let Some(peer) = self.peers.get(addr) {
1369 return peer.identity().short_npub();
1370 }
1371 if let Some(entry) = self.sessions.get(addr) {
1372 let (xonly, _) = entry.remote_pubkey().x_only_public_key();
1373 return PeerIdentity::from_pubkey(xonly).short_npub();
1374 }
1375 addr.short_hex()
1376 }
1377
1378 pub(in crate::node) fn deregister_session_index(&mut self, cache_key: (TransportId, u32)) {
1390 let owning_peer = self.peers_by_index.get(&cache_key).copied();
1394 self.peers_by_index.remove(&cache_key);
1395 if self.decrypt_registered_sessions.remove(&cache_key)
1396 && let Some(workers) = self.decrypt_workers.as_ref()
1397 {
1398 workers.unregister_session(cache_key);
1399 }
1400 if let Some(peer_addr) = owning_peer {
1411 let peer_has_other_index = self
1412 .peers_by_index
1413 .values()
1414 .any(|other| *other == peer_addr);
1415 if !peer_has_other_index {
1416 self.clear_connected_udp_for_peer(&peer_addr);
1417 }
1418 }
1419 }
1420
1421 pub(in crate::node) fn ensure_current_session_index_registered(
1430 &mut self,
1431 node_addr: &NodeAddr,
1432 context: &'static str,
1433 ) -> bool {
1434 let Some(peer) = self.peers.get(node_addr) else {
1435 return false;
1436 };
1437 let Some(transport_id) = peer.transport_id() else {
1438 warn!(
1439 peer = %self.peer_display_name(node_addr),
1440 context,
1441 "Cannot register current session index without transport id"
1442 );
1443 return false;
1444 };
1445 let Some(our_index) = peer.our_index() else {
1446 warn!(
1447 peer = %self.peer_display_name(node_addr),
1448 context,
1449 "Cannot register current session index without local index"
1450 );
1451 return false;
1452 };
1453
1454 let cache_key = (transport_id, our_index.as_u32());
1455 match self.peers_by_index.get(&cache_key).copied() {
1456 Some(existing) if existing == *node_addr => true,
1457 Some(existing) => {
1458 warn!(
1459 peer = %self.peer_display_name(node_addr),
1460 previous_owner = %self.peer_display_name(&existing),
1461 transport_id = %transport_id,
1462 our_index = %our_index,
1463 context,
1464 "Repairing current session index with stale owner"
1465 );
1466 self.peers_by_index.insert(cache_key, *node_addr);
1467 true
1468 }
1469 None => {
1470 warn!(
1471 peer = %self.peer_display_name(node_addr),
1472 transport_id = %transport_id,
1473 our_index = %our_index,
1474 context,
1475 "Repairing missing current session index"
1476 );
1477 self.peers_by_index.insert(cache_key, *node_addr);
1478 true
1479 }
1480 }
1481 }
1482
1483 pub fn config(&self) -> &Config {
1487 &self.config
1488 }
1489
1490 pub fn effective_ipv6_mtu(&self) -> u16 {
1496 crate::upper::icmp::effective_ipv6_mtu(self.transport_mtu())
1497 }
1498
1499 pub fn transport_mtu(&self) -> u16 {
1516 let min_operational = self
1517 .transports
1518 .values()
1519 .filter(|h| h.is_operational())
1520 .map(|h| h.mtu())
1521 .min();
1522 if let Some(mtu) = min_operational {
1523 return mtu;
1524 }
1525 if let Some((_, cfg)) = self.config.transports.udp.iter().next() {
1527 return cfg.mtu();
1528 }
1529 1280
1530 }
1531
1532 pub fn state(&self) -> NodeState {
1536 self.state
1537 }
1538
1539 pub fn uptime(&self) -> std::time::Duration {
1541 self.started_at.elapsed()
1542 }
1543
1544 pub fn is_running(&self) -> bool {
1546 self.state.is_operational()
1547 }
1548
1549 pub fn is_leaf_only(&self) -> bool {
1551 self.is_leaf_only
1552 }
1553
1554 pub fn tree_state(&self) -> &TreeState {
1558 &self.tree_state
1559 }
1560
1561 pub fn tree_state_mut(&mut self) -> &mut TreeState {
1563 &mut self.tree_state
1564 }
1565
1566 pub fn bloom_state(&self) -> &BloomState {
1570 &self.bloom_state
1571 }
1572
1573 pub fn bloom_state_mut(&mut self) -> &mut BloomState {
1575 &mut self.bloom_state
1576 }
1577
1578 pub fn estimated_mesh_size(&self) -> Option<u64> {
1582 self.estimated_mesh_size
1583 }
1584
1585 pub(crate) fn compute_mesh_size(&mut self) {
1591 let my_addr = *self.tree_state.my_node_addr();
1592 let parent_id = *self.tree_state.my_declaration().parent_id();
1593 let is_root = self.tree_state.is_root();
1594
1595 let max_fpr = self.config.node.bloom.max_inbound_fpr;
1596 let mut total: f64 = 1.0; let mut child_count: u32 = 0;
1598 let mut has_data = false;
1599
1600 if !is_root
1606 && let Some(parent) = self.peers.get(&parent_id)
1607 && let Some(filter) = parent.inbound_filter()
1608 {
1609 match filter.estimated_count(max_fpr) {
1610 Some(n) => {
1611 total += n;
1612 has_data = true;
1613 }
1614 None => {
1615 self.estimated_mesh_size = None;
1616 return;
1617 }
1618 }
1619 }
1620
1621 for (peer_addr, peer) in &self.peers {
1623 if let Some(decl) = self.tree_state.peer_declaration(peer_addr)
1624 && *decl.parent_id() == my_addr
1625 {
1626 child_count += 1;
1627 if let Some(filter) = peer.inbound_filter() {
1628 match filter.estimated_count(max_fpr) {
1629 Some(n) => {
1630 total += n;
1631 has_data = true;
1632 }
1633 None => {
1634 self.estimated_mesh_size = None;
1635 return;
1636 }
1637 }
1638 }
1639 }
1640 }
1641
1642 if !has_data {
1643 self.estimated_mesh_size = None;
1644 return;
1645 }
1646
1647 let size = total.round() as u64;
1648 self.estimated_mesh_size = Some(size);
1649
1650 let now = std::time::Instant::now();
1652 let should_log = match self.last_mesh_size_log {
1653 None => true,
1654 Some(last) => {
1655 now.duration_since(last)
1656 >= std::time::Duration::from_secs(self.config.node.mmp.log_interval_secs)
1657 }
1658 };
1659 if should_log {
1660 tracing::debug!(
1661 estimated_mesh_size = size,
1662 peers = self.peers.len(),
1663 children = child_count,
1664 "Mesh size estimate"
1665 );
1666 self.last_mesh_size_log = Some(now);
1667 }
1668 }
1669
1670 pub fn coord_cache(&self) -> &CoordCache {
1674 &self.coord_cache
1675 }
1676
1677 pub fn coord_cache_mut(&mut self) -> &mut CoordCache {
1679 &mut self.coord_cache
1680 }
1681
1682 pub fn stats(&self) -> &stats::NodeStats {
1686 &self.stats
1687 }
1688
1689 pub(crate) fn stats_mut(&mut self) -> &mut stats::NodeStats {
1691 &mut self.stats
1692 }
1693
1694 pub fn stats_history(&self) -> &stats_history::StatsHistory {
1696 &self.stats_history
1697 }
1698
1699 pub(crate) fn record_stats_history(&mut self) {
1702 let fwd = &self.stats.forwarding;
1703 let peers_with_mmp: Vec<f64> = self
1704 .peers
1705 .values()
1706 .filter_map(|p| p.mmp().map(|m| m.metrics.loss_rate()))
1707 .collect();
1708 let loss_rate = if peers_with_mmp.is_empty() {
1709 0.0
1710 } else {
1711 peers_with_mmp.iter().sum::<f64>() / peers_with_mmp.len() as f64
1712 };
1713
1714 let snap = stats_history::Snapshot {
1715 mesh_size: self.estimated_mesh_size,
1716 tree_depth: self.tree_state.my_coords().depth() as u32,
1717 peer_count: self.peers.len() as u64,
1718 parent_switches_total: self.stats.tree.parent_switches,
1719 bytes_in_total: fwd.received_bytes,
1720 bytes_out_total: fwd.forwarded_bytes + fwd.originated_bytes,
1721 packets_in_total: fwd.received_packets,
1722 packets_out_total: fwd.forwarded_packets + fwd.originated_packets,
1723 loss_rate,
1724 active_sessions: self.sessions.len() as u64,
1725 };
1726
1727 let now = std::time::Instant::now();
1728 let peer_snaps: Vec<stats_history::PeerSnapshot> = self
1729 .peers
1730 .values()
1731 .map(|p| {
1732 let stats = p.link_stats();
1733 let (srtt_ms, loss_rate, ecn_ce) = match p.mmp() {
1734 Some(m) => (
1735 m.metrics.srtt_ms(),
1736 Some(m.metrics.loss_rate()),
1737 m.receiver.ecn_ce_count() as u64,
1738 ),
1739 None => (None, None, 0),
1740 };
1741 stats_history::PeerSnapshot {
1742 node_addr: *p.node_addr(),
1743 last_seen: now,
1744 srtt_ms,
1745 loss_rate,
1746 bytes_in_total: stats.bytes_recv,
1747 bytes_out_total: stats.bytes_sent,
1748 packets_in_total: stats.packets_recv,
1749 packets_out_total: stats.packets_sent,
1750 ecn_ce_total: ecn_ce,
1751 }
1752 })
1753 .collect();
1754
1755 self.stats_history.tick(now, &snap, &peer_snaps);
1756 }
1757
1758 pub fn tun_state(&self) -> TunState {
1762 self.tun_state
1763 }
1764
1765 pub fn tun_name(&self) -> Option<&str> {
1767 self.tun_name.as_deref()
1768 }
1769
1770 pub fn set_max_connections(&mut self, max: usize) {
1774 self.max_connections = max;
1775 }
1776
1777 pub fn set_max_peers(&mut self, max: usize) {
1779 self.max_peers = max;
1780 }
1781
1782 pub fn set_max_links(&mut self, max: usize) {
1784 self.max_links = max;
1785 }
1786
1787 pub fn connection_count(&self) -> usize {
1791 self.connections.len()
1792 }
1793
1794 pub fn peer_count(&self) -> usize {
1796 self.peers.len()
1797 }
1798
1799 pub fn link_count(&self) -> usize {
1801 self.links.len()
1802 }
1803
1804 pub fn transport_count(&self) -> usize {
1806 self.transports.len()
1807 }
1808
1809 pub fn allocate_transport_id(&mut self) -> TransportId {
1813 let id = TransportId::new(self.next_transport_id);
1814 self.next_transport_id += 1;
1815 id
1816 }
1817
1818 pub fn get_transport(&self, id: &TransportId) -> Option<&TransportHandle> {
1820 self.transports.get(id)
1821 }
1822
1823 pub fn get_transport_mut(&mut self, id: &TransportId) -> Option<&mut TransportHandle> {
1825 self.transports.get_mut(id)
1826 }
1827
1828 pub fn transport_ids(&self) -> impl Iterator<Item = &TransportId> {
1830 self.transports.keys()
1831 }
1832
1833 pub fn packet_rx(&mut self) -> Option<&mut PacketRx> {
1835 self.packet_rx.as_mut()
1836 }
1837
1838 pub fn allocate_link_id(&mut self) -> LinkId {
1842 let id = LinkId::new(self.next_link_id);
1843 self.next_link_id += 1;
1844 id
1845 }
1846
1847 pub fn add_link(&mut self, link: Link) -> Result<(), NodeError> {
1849 if self.max_links > 0 && self.links.len() >= self.max_links {
1850 return Err(NodeError::MaxLinksExceeded {
1851 max: self.max_links,
1852 });
1853 }
1854 let link_id = link.link_id();
1855 let transport_id = link.transport_id();
1856 let remote_addr = link.remote_addr().clone();
1857
1858 self.links.insert(link_id, link);
1859 self.addr_to_link
1860 .insert((transport_id, remote_addr), link_id);
1861 Ok(())
1862 }
1863
1864 pub fn get_link(&self, link_id: &LinkId) -> Option<&Link> {
1866 self.links.get(link_id)
1867 }
1868
1869 pub fn get_link_mut(&mut self, link_id: &LinkId) -> Option<&mut Link> {
1871 self.links.get_mut(link_id)
1872 }
1873
1874 pub fn find_link_by_addr(
1876 &self,
1877 transport_id: TransportId,
1878 addr: &TransportAddr,
1879 ) -> Option<LinkId> {
1880 self.addr_to_link
1881 .get(&(transport_id, addr.clone()))
1882 .copied()
1883 }
1884
1885 pub fn remove_link(&mut self, link_id: &LinkId) -> Option<Link> {
1891 if let Some(link) = self.links.remove(link_id) {
1892 let key = (link.transport_id(), link.remote_addr().clone());
1894 if self.addr_to_link.get(&key) == Some(link_id) {
1895 self.addr_to_link.remove(&key);
1896 }
1897 Some(link)
1898 } else {
1899 None
1900 }
1901 }
1902
1903 pub(crate) fn cleanup_bootstrap_transport_if_unused(&mut self, transport_id: TransportId) {
1904 if !self.bootstrap_transports.contains(&transport_id) {
1905 return;
1906 }
1907
1908 let transport_in_use = self
1909 .links
1910 .values()
1911 .any(|link| link.transport_id() == transport_id)
1912 || self
1913 .connections
1914 .values()
1915 .any(|conn| conn.transport_id() == Some(transport_id))
1916 || self
1917 .peers
1918 .values()
1919 .any(|peer| peer.transport_id() == Some(transport_id))
1920 || self
1921 .pending_connects
1922 .iter()
1923 .any(|pending| pending.transport_id == transport_id);
1924
1925 if transport_in_use {
1926 return;
1927 }
1928
1929 tracing::debug!(
1930 transport_id = %transport_id,
1931 "bootstrap transport has no remaining references; dropping"
1932 );
1933
1934 self.bootstrap_transports.remove(&transport_id);
1935 self.bootstrap_transport_npubs.remove(&transport_id);
1936 self.transport_drops.remove(&transport_id);
1937 self.transports.remove(&transport_id);
1938 }
1939
1940 pub fn links(&self) -> impl Iterator<Item = &Link> {
1942 self.links.values()
1943 }
1944
1945 pub fn add_connection(&mut self, connection: PeerConnection) -> Result<(), NodeError> {
1949 let link_id = connection.link_id();
1950
1951 if self.connections.contains_key(&link_id) {
1952 return Err(NodeError::ConnectionAlreadyExists(link_id));
1953 }
1954
1955 if self.max_connections > 0 && self.connections.len() >= self.max_connections {
1956 return Err(NodeError::MaxConnectionsExceeded {
1957 max: self.max_connections,
1958 });
1959 }
1960
1961 self.connections.insert(link_id, connection);
1962 Ok(())
1963 }
1964
1965 pub fn get_connection(&self, link_id: &LinkId) -> Option<&PeerConnection> {
1967 self.connections.get(link_id)
1968 }
1969
1970 pub fn get_connection_mut(&mut self, link_id: &LinkId) -> Option<&mut PeerConnection> {
1972 self.connections.get_mut(link_id)
1973 }
1974
1975 pub fn remove_connection(&mut self, link_id: &LinkId) -> Option<PeerConnection> {
1977 self.connections.remove(link_id)
1978 }
1979
1980 pub fn connections(&self) -> impl Iterator<Item = &PeerConnection> {
1982 self.connections.values()
1983 }
1984
1985 pub fn get_peer(&self, node_addr: &NodeAddr) -> Option<&ActivePeer> {
1989 self.peers.get(node_addr)
1990 }
1991
1992 pub fn get_peer_mut(&mut self, node_addr: &NodeAddr) -> Option<&mut ActivePeer> {
1994 self.peers.get_mut(node_addr)
1995 }
1996
1997 pub fn remove_peer(&mut self, node_addr: &NodeAddr) -> Option<ActivePeer> {
1999 self.peers.remove(node_addr)
2000 }
2001
2002 pub fn peers(&self) -> impl Iterator<Item = &ActivePeer> {
2004 self.peers.values()
2005 }
2006
2007 pub fn nostr_discovery_handle(&self) -> Option<&crate::discovery::nostr::NostrDiscovery> {
2011 self.nostr_discovery.as_deref()
2012 }
2013
2014 pub fn peer_ids(&self) -> impl Iterator<Item = &NodeAddr> {
2016 self.peers.keys()
2017 }
2018
2019 pub fn sendable_peers(&self) -> impl Iterator<Item = &ActivePeer> {
2021 self.peers.values().filter(|p| p.can_send())
2022 }
2023
2024 pub fn sendable_peer_count(&self) -> usize {
2026 self.peers.values().filter(|p| p.can_send()).count()
2027 }
2028
2029 pub(crate) fn set_discovery_fallback_transit_allowed(
2030 &mut self,
2031 peer_addr: NodeAddr,
2032 allowed: bool,
2033 ) {
2034 if allowed {
2035 self.discovery_fallback_transit_blocked_peers
2036 .remove(&peer_addr);
2037 } else {
2038 self.discovery_fallback_transit_blocked_peers
2039 .insert(peer_addr);
2040 }
2041 }
2042
2043 pub(crate) fn configured_discovery_fallback_transit(
2044 &self,
2045 peer_addr: &NodeAddr,
2046 ) -> Option<bool> {
2047 self.config.peers().iter().find_map(|peer| {
2048 PeerIdentity::from_npub(&peer.npub)
2049 .ok()
2050 .filter(|identity| identity.node_addr() == peer_addr)
2051 .map(|_| peer.discovery_fallback_transit)
2052 })
2053 }
2054
2055 pub(crate) fn discovery_fallback_transit_for_promotion(&self, peer_addr: &NodeAddr) -> bool {
2056 if let Some(retry_state) = self.retry_pending.get(peer_addr) {
2057 return retry_state.peer_config.discovery_fallback_transit;
2058 }
2059
2060 if let Some(allowed) = self.configured_discovery_fallback_transit(peer_addr) {
2061 return allowed;
2062 }
2063
2064 self.config.node.discovery.nostr.policy != crate::config::NostrDiscoveryPolicy::Open
2065 }
2066
2067 #[cfg(test)]
2072 pub(crate) fn disable_discovery_forward_rate_limit(&mut self) {
2073 self.discovery_forward_limiter
2074 .set_interval(std::time::Duration::ZERO);
2075 }
2076
2077 #[cfg(test)]
2078 pub(crate) fn get_session(&self, remote: &NodeAddr) -> Option<&SessionEntry> {
2079 self.sessions.get(remote)
2080 }
2081
2082 #[cfg(test)]
2084 pub(crate) fn get_session_mut(&mut self, remote: &NodeAddr) -> Option<&mut SessionEntry> {
2085 self.sessions.get_mut(remote)
2086 }
2087
2088 #[cfg(test)]
2090 pub(crate) fn remove_session(&mut self, remote: &NodeAddr) -> Option<SessionEntry> {
2091 self.sessions.remove(remote)
2092 }
2093
2094 #[cfg(test)]
2096 pub(crate) fn path_mtu_lookup_get(&self, fips_addr: &crate::FipsAddress) -> Option<u16> {
2097 self.path_mtu_lookup
2098 .read()
2099 .ok()
2100 .and_then(|map| map.get(fips_addr).copied())
2101 }
2102
2103 #[cfg(test)]
2105 pub(crate) fn path_mtu_lookup_insert(&self, fips_addr: crate::FipsAddress, mtu: u16) {
2106 if let Ok(mut map) = self.path_mtu_lookup.write() {
2107 map.insert(fips_addr, mtu);
2108 }
2109 }
2110
2111 pub fn session_count(&self) -> usize {
2113 self.sessions.len()
2114 }
2115
2116 pub(crate) fn session_entries(&self) -> impl Iterator<Item = (&NodeAddr, &SessionEntry)> {
2118 self.sessions.iter()
2119 }
2120
2121 pub(crate) fn register_identity(
2125 &mut self,
2126 node_addr: NodeAddr,
2127 pubkey: secp256k1::PublicKey,
2128 ) -> bool {
2129 let mut prefix = [0u8; 15];
2130 prefix.copy_from_slice(&node_addr.as_bytes()[0..15]);
2131 if let Some(entry) = self.identity_cache.get(&prefix)
2132 && entry.node_addr == node_addr
2133 && entry.pubkey == pubkey
2134 {
2135 return true;
2139 }
2140
2141 let (xonly, _) = pubkey.x_only_public_key();
2142 let derived_node_addr = NodeAddr::from_pubkey(&xonly);
2143 if derived_node_addr != node_addr {
2144 debug!(
2145 claimed_node_addr = %node_addr,
2146 derived_node_addr = %derived_node_addr,
2147 "Rejected identity cache entry with mismatched public key"
2148 );
2149 return false;
2150 }
2151
2152 let now_ms = Self::now_ms();
2153 if let Some(entry) = self.identity_cache.get_mut(&prefix)
2154 && entry.node_addr == node_addr
2155 {
2156 entry.pubkey = pubkey;
2157 entry.last_seen_ms = now_ms;
2158 return true;
2159 }
2160
2161 let npub = encode_npub(&xonly);
2162 self.identity_cache.insert(
2163 prefix,
2164 IdentityCacheEntry::new(node_addr, pubkey, npub, now_ms),
2165 );
2166 let max = self.config.node.cache.identity_size;
2168 if self.identity_cache.len() > max
2169 && let Some(oldest_key) = self
2170 .identity_cache
2171 .iter()
2172 .min_by_key(|(_, entry)| entry.last_seen_ms)
2173 .map(|(k, _)| *k)
2174 {
2175 self.identity_cache.remove(&oldest_key);
2176 }
2177 true
2178 }
2179
2180 pub(crate) fn lookup_by_fips_prefix(
2182 &mut self,
2183 prefix: &[u8; 15],
2184 ) -> Option<(NodeAddr, secp256k1::PublicKey)> {
2185 if let Some(entry) = self.identity_cache.get_mut(prefix) {
2186 entry.last_seen_ms = Self::now_ms(); Some((entry.node_addr, entry.pubkey))
2188 } else {
2189 None
2190 }
2191 }
2192
2193 pub(crate) fn has_cached_identity(&self, addr: &NodeAddr) -> bool {
2195 let mut prefix = [0u8; 15];
2196 prefix.copy_from_slice(&addr.as_bytes()[0..15]);
2197 self.identity_cache.contains_key(&prefix)
2198 }
2199
2200 pub fn identity_cache_len(&self) -> usize {
2202 self.identity_cache.len()
2203 }
2204
2205 pub fn identity_cache_iter(
2210 &self,
2211 ) -> impl Iterator<Item = (&NodeAddr, &secp256k1::PublicKey, u64)> {
2212 self.identity_cache
2213 .values()
2214 .map(|entry| (&entry.node_addr, &entry.pubkey, entry.last_seen_ms))
2215 }
2216
2217 pub fn identity_cache_max(&self) -> usize {
2219 self.config.node.cache.identity_size
2220 }
2221
2222 pub fn pending_lookup_count(&self) -> usize {
2224 self.pending_lookups.len()
2225 }
2226
2227 pub fn pending_lookups_iter(
2229 &self,
2230 ) -> impl Iterator<Item = (&NodeAddr, &handlers::discovery::PendingLookup)> {
2231 self.pending_lookups.iter()
2232 }
2233
2234 pub fn recent_request_count(&self) -> usize {
2236 self.recent_requests.len()
2237 }
2238
2239 pub fn pending_tun_destinations(&self) -> usize {
2241 self.pending_tun_packets.len()
2242 }
2243
2244 pub fn pending_tun_total_packets(&self) -> usize {
2246 self.pending_tun_packets.values().map(|q| q.len()).sum()
2247 }
2248
2249 pub fn retry_state_iter(&self) -> impl Iterator<Item = (&NodeAddr, &retry::RetryState)> {
2251 self.retry_pending.iter()
2252 }
2253
2254 pub(crate) fn is_tree_peer(&self, peer_addr: &NodeAddr) -> bool {
2261 if !self.tree_state.is_root() && self.tree_state.my_declaration().parent_id() == peer_addr {
2263 return true;
2264 }
2265 if let Some(decl) = self.tree_state.peer_declaration(peer_addr)
2267 && decl.parent_id() == self.node_addr()
2268 {
2269 return true;
2270 }
2271 false
2272 }
2273
2274 pub fn find_next_hop(&mut self, dest_node_addr: &NodeAddr) -> Option<&ActivePeer> {
2297 if dest_node_addr == self.node_addr() {
2299 return None;
2300 }
2301
2302 let direct_peer_can_send = self
2306 .peers
2307 .get(dest_node_addr)
2308 .is_some_and(|peer| peer.can_send());
2309 if let Some(peer) = self.peers.get(dest_node_addr)
2310 && peer.is_healthy()
2311 {
2312 return Some(peer);
2313 }
2314
2315 let now_ms = Self::now_ms();
2316
2317 let sendable_learned_peers = if self.config.node.routing.mode == RoutingMode::ReplyLearned {
2318 Some(
2319 self.peers
2320 .iter()
2321 .filter(|(_, peer)| peer.can_send())
2322 .map(|(addr, _)| *addr)
2323 .collect::<HashSet<_>>(),
2324 )
2325 } else {
2326 None
2327 };
2328
2329 let explore_fallback = sendable_learned_peers.as_ref().is_some_and(|sendable| {
2336 self.learned_routes.should_explore_fallback(
2337 dest_node_addr,
2338 now_ms,
2339 self.config.node.routing.learned_fallback_explore_interval,
2340 |addr| sendable.contains(addr),
2341 )
2342 });
2343 if let Some(sendable) = &sendable_learned_peers
2344 && !explore_fallback
2345 && let Some(next_hop_addr) =
2346 self.learned_routes
2347 .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2348 {
2349 return self.peers.get(&next_hop_addr);
2350 }
2351
2352 let Some(dest_coords) = self
2354 .coord_cache
2355 .get_and_touch(dest_node_addr, now_ms)
2356 .cloned()
2357 else {
2358 if let Some(sendable) = &sendable_learned_peers
2359 && let Some(next_hop_addr) =
2360 self.learned_routes
2361 .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2362 {
2363 return self.peers.get(&next_hop_addr);
2364 }
2365 if direct_peer_can_send {
2366 return self.peers.get(dest_node_addr);
2367 }
2368 return None;
2369 };
2370
2371 let coordinate_route_addr = {
2374 let candidates: Vec<&ActivePeer> = self.destination_in_filters(dest_node_addr);
2375 if !candidates.is_empty() {
2376 self.select_best_candidate(&candidates, &dest_coords)
2377 .map(|peer| *peer.node_addr())
2378 } else {
2379 None
2380 }
2381 };
2382 if let Some(next_hop_addr) = coordinate_route_addr {
2383 return self.peers.get(&next_hop_addr);
2384 }
2385
2386 let tree_route_addr = self
2388 .tree_state
2389 .find_next_hop(&dest_coords)
2390 .filter(|next_hop_id| {
2391 self.peers
2392 .get(next_hop_id)
2393 .is_some_and(|peer| peer.can_send())
2394 });
2395 if let Some(next_hop_addr) = tree_route_addr {
2396 return self.peers.get(&next_hop_addr);
2397 }
2398 if explore_fallback {
2399 return sendable_learned_peers.as_ref().and_then(|sendable| {
2400 self.learned_routes
2401 .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2402 .and_then(|next_hop_addr| self.peers.get(&next_hop_addr))
2403 });
2404 }
2405
2406 if let Some(sendable) = &sendable_learned_peers
2407 && let Some(next_hop_addr) =
2408 self.learned_routes
2409 .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2410 {
2411 return self.peers.get(&next_hop_addr);
2412 }
2413
2414 if direct_peer_can_send {
2415 return self.peers.get(dest_node_addr);
2416 }
2417
2418 None
2419 }
2420
2421 pub(in crate::node) fn learn_reverse_route(
2422 &mut self,
2423 destination: NodeAddr,
2424 next_hop: NodeAddr,
2425 ) {
2426 if self.config.node.routing.mode != RoutingMode::ReplyLearned
2427 || destination == *self.node_addr()
2428 {
2429 return;
2430 }
2431 let now_ms = Self::now_ms();
2432 self.learned_routes.learn(
2433 destination,
2434 next_hop,
2435 now_ms,
2436 self.config.node.routing.learned_ttl_secs,
2437 self.config.node.routing.max_learned_routes_per_dest,
2438 );
2439 }
2440
2441 pub(in crate::node) fn record_route_failure(
2442 &mut self,
2443 destination: NodeAddr,
2444 next_hop: NodeAddr,
2445 ) {
2446 if self.config.node.routing.mode != RoutingMode::ReplyLearned {
2447 return;
2448 }
2449 self.learned_routes.record_failure(&destination, &next_hop);
2450 }
2451
2452 pub(crate) fn learned_route_table_snapshot(&self, now_ms: u64) -> LearnedRouteTableSnapshot {
2453 self.learned_routes.snapshot(now_ms)
2454 }
2455
2456 pub(in crate::node) fn purge_learned_routes(&mut self, now_ms: u64) {
2457 self.learned_routes.purge_expired(now_ms);
2458 }
2459
2460 fn select_best_candidate<'a>(
2469 &'a self,
2470 candidates: &[&'a ActivePeer],
2471 dest_coords: &crate::tree::TreeCoordinate,
2472 ) -> Option<&'a ActivePeer> {
2473 let my_distance = self.tree_state.my_coords().distance_to(dest_coords);
2474
2475 let mut best: Option<(&ActivePeer, f64, usize)> = None;
2476
2477 for &candidate in candidates {
2478 if !candidate.can_send() {
2479 continue;
2480 }
2481
2482 let cost = candidate.link_cost();
2483
2484 let dist = self
2485 .tree_state
2486 .peer_coords(candidate.node_addr())
2487 .map(|pc| pc.distance_to(dest_coords))
2488 .unwrap_or(usize::MAX);
2489
2490 if dist >= my_distance {
2493 continue;
2494 }
2495
2496 let dominated = match &best {
2497 None => true,
2498 Some((_, best_cost, best_dist)) => {
2499 cost < *best_cost
2500 || (cost == *best_cost && dist < *best_dist)
2501 || (cost == *best_cost
2502 && dist == *best_dist
2503 && candidate.node_addr() < best.as_ref().unwrap().0.node_addr())
2504 }
2505 };
2506
2507 if dominated {
2508 best = Some((candidate, cost, dist));
2509 }
2510 }
2511
2512 best.map(|(peer, _, _)| peer)
2513 }
2514
2515 pub fn destination_in_filters(&self, dest: &NodeAddr) -> Vec<&ActivePeer> {
2517 self.peers.values().filter(|p| p.may_reach(dest)).collect()
2518 }
2519
2520 pub fn tun_tx(&self) -> Option<&TunTx> {
2524 self.tun_tx.as_ref()
2525 }
2526
2527 pub fn attach_external_packet_io(
2534 &mut self,
2535 capacity: usize,
2536 ) -> Result<ExternalPacketIo, NodeError> {
2537 if self.state != NodeState::Created {
2538 return Err(NodeError::Config(ConfigError::Validation(
2539 "external packet I/O must be attached before node start".to_string(),
2540 )));
2541 }
2542 if self.config.tun.enabled {
2543 return Err(NodeError::Config(ConfigError::Validation(
2544 "external packet I/O requires tun.enabled=false".to_string(),
2545 )));
2546 }
2547
2548 let capacity = capacity.max(1);
2549 let (outbound_tx, outbound_rx) = tokio::sync::mpsc::channel(capacity);
2550 let (inbound_tx, inbound_rx) = tokio::sync::mpsc::channel(capacity);
2551 self.tun_outbound_rx = Some(outbound_rx);
2552 self.external_packet_tx = Some(inbound_tx);
2553
2554 Ok(ExternalPacketIo {
2555 outbound_tx,
2556 inbound_rx,
2557 })
2558 }
2559
2560 pub(crate) fn attach_endpoint_data_io(
2565 &mut self,
2566 capacity: usize,
2567 ) -> Result<EndpointDataIo, NodeError> {
2568 if self.state != NodeState::Created {
2569 return Err(NodeError::Config(ConfigError::Validation(
2570 "endpoint data I/O must be attached before node start".to_string(),
2571 )));
2572 }
2573
2574 let command_capacity = endpoint_data_command_capacity(capacity);
2575 let (command_tx, command_rx) = tokio::sync::mpsc::channel(command_capacity);
2576 let (event_tx, event_rx) = tokio::sync::mpsc::unbounded_channel();
2581 self.endpoint_command_rx = Some(command_rx);
2582 self.endpoint_event_tx = Some(event_tx.clone());
2583
2584 Ok(EndpointDataIo {
2585 command_tx,
2586 event_rx,
2587 event_tx,
2588 })
2589 }
2590
2591 pub(crate) fn pubkey_for_node_addr(&self, addr: &NodeAddr) -> Option<secp256k1::PublicKey> {
2592 let mut prefix = [0u8; 15];
2593 prefix.copy_from_slice(&addr.as_bytes()[0..15]);
2594 self.identity_cache
2595 .get(&prefix)
2596 .filter(|entry| &entry.node_addr == addr)
2597 .map(|entry| entry.pubkey)
2598 }
2599
2600 pub(crate) fn npub_for_node_addr(&self, addr: &NodeAddr) -> Option<String> {
2601 let mut prefix = [0u8; 15];
2602 prefix.copy_from_slice(&addr.as_bytes()[0..15]);
2603 self.identity_cache
2604 .get(&prefix)
2605 .filter(|entry| &entry.node_addr == addr)
2606 .map(|entry| entry.npub.clone())
2607 }
2608
2609 pub(in crate::node) fn deliver_external_ipv6_packet(
2610 &self,
2611 src_addr: &NodeAddr,
2612 packet: Vec<u8>,
2613 ) {
2614 let Some(external_packet_tx) = &self.external_packet_tx else {
2615 return;
2616 };
2617 if packet.len() < 40 {
2618 return;
2619 }
2620 let Ok(destination) = FipsAddress::from_slice(&packet[24..40]) else {
2621 return;
2622 };
2623 let delivered = NodeDeliveredPacket {
2624 source_node_addr: *src_addr,
2625 source_npub: self.npub_for_node_addr(src_addr),
2626 destination,
2627 packet,
2628 };
2629 if let Err(error) = external_packet_tx.try_send(delivered) {
2630 debug!(error = %error, "Failed to deliver packet to external app sink");
2631 }
2632 }
2633
2634 pub(super) async fn send_encrypted_link_message(
2648 &mut self,
2649 node_addr: &NodeAddr,
2650 plaintext: &[u8],
2651 ) -> Result<(), NodeError> {
2652 self.send_encrypted_link_message_with_ce(node_addr, plaintext, false)
2653 .await
2654 }
2655
2656 pub(in crate::node) fn note_local_send_outcome(
2662 &mut self,
2663 result: &Result<usize, TransportError>,
2664 ) {
2665 match result {
2666 Ok(_) => {
2667 if self.last_local_send_failure_at.is_some() {
2668 self.last_local_send_failure_at = None;
2669 }
2670 }
2671 Err(TransportError::Io(e))
2672 if matches!(
2673 e.kind(),
2674 std::io::ErrorKind::NetworkUnreachable
2675 | std::io::ErrorKind::HostUnreachable
2676 | std::io::ErrorKind::AddrNotAvailable
2677 ) =>
2678 {
2679 self.last_local_send_failure_at = Some(std::time::Instant::now());
2680 }
2681 Err(_) => {}
2682 }
2683 }
2684
2685 pub(in crate::node) fn last_local_send_failure_at(&self) -> Option<std::time::Instant> {
2688 self.last_local_send_failure_at
2689 }
2690
2691 pub(super) async fn send_encrypted_link_message_with_ce(
2695 &mut self,
2696 node_addr: &NodeAddr,
2697 plaintext: &[u8],
2698 ce_flag: bool,
2699 ) -> Result<(), NodeError> {
2700 let peer = self
2701 .peers
2702 .get_mut(node_addr)
2703 .ok_or(NodeError::PeerNotFound(*node_addr))?;
2704
2705 let their_index = peer.their_index().ok_or_else(|| NodeError::SendFailed {
2706 node_addr: *node_addr,
2707 reason: "no their_index".into(),
2708 })?;
2709 let transport_id = peer.transport_id().ok_or_else(|| NodeError::SendFailed {
2710 node_addr: *node_addr,
2711 reason: "no transport_id".into(),
2712 })?;
2713 let remote_addr = peer
2714 .current_addr()
2715 .cloned()
2716 .ok_or_else(|| NodeError::SendFailed {
2717 node_addr: *node_addr,
2718 reason: "no current_addr".into(),
2719 })?;
2720 #[cfg(any(target_os = "linux", target_os = "macos"))]
2721 let connected_socket = peer.connected_udp();
2722
2723 let timestamp_ms = peer.session_elapsed_ms();
2725
2726 let sp_flag = peer.mmp().map(|mmp| mmp.spin_bit.tx_bit()).unwrap_or(false);
2728 let mut flags = if sp_flag { FLAG_SP } else { 0 };
2729 if ce_flag {
2730 flags |= FLAG_CE;
2731 }
2732 if peer.current_k_bit() {
2733 flags |= FLAG_KEY_EPOCH;
2734 }
2735
2736 let session = peer
2737 .noise_session_mut()
2738 .ok_or_else(|| NodeError::SendFailed {
2739 node_addr: *node_addr,
2740 reason: "no noise session".into(),
2741 })?;
2742
2743 const INNER_TS_LEN: usize = 4;
2751 let counter = session.current_send_counter();
2752 let inner_len = INNER_TS_LEN + plaintext.len();
2753 let payload_len = inner_len as u16;
2754 let header = build_established_header(their_index, counter, flags, payload_len);
2755
2756 let transport_for_send = self
2775 .transports
2776 .get(&transport_id)
2777 .ok_or(NodeError::TransportNotFound(transport_id))?;
2778 let is_udp = matches!(transport_for_send, TransportHandle::Udp(_));
2779 if let Some(workers) = self.encrypt_workers.as_ref().cloned()
2780 && is_udp
2781 && let Some(cipher_clone) = session.send_cipher_clone()
2782 {
2783 {
2784 let reserved_counter =
2788 session
2789 .take_send_counter()
2790 .map_err(|e| NodeError::SendFailed {
2791 node_addr: *node_addr,
2792 reason: format!("counter reservation failed: {}", e),
2793 })?;
2794 debug_assert_eq!(reserved_counter, counter);
2795 let header =
2799 build_established_header(their_index, reserved_counter, flags, payload_len);
2800 let transport = transport_for_send;
2801 let send_target = {
2808 if let TransportHandle::Udp(udp) = transport {
2809 let socket_addr = {
2810 #[cfg(any(target_os = "linux", target_os = "macos"))]
2811 {
2812 match connected_socket.as_ref() {
2813 Some(socket) => Some(socket.peer_addr()),
2814 None => udp.resolve_for_off_task(&remote_addr).await.ok(),
2815 }
2816 }
2817 #[cfg(not(any(target_os = "linux", target_os = "macos")))]
2818 {
2819 udp.resolve_for_off_task(&remote_addr).await.ok()
2820 }
2821 };
2822 match (udp.async_socket(), socket_addr) {
2823 (Some(socket), Some(socket_addr)) => Some((socket, socket_addr)),
2824 _ => None,
2825 }
2826 } else {
2827 None
2828 }
2829 };
2830 if let Some((socket, socket_addr)) = send_target {
2831 let wire_capacity = ESTABLISHED_HEADER_SIZE + inner_len + 16;
2847 let mut wire_buf = Vec::with_capacity(wire_capacity);
2848 wire_buf.extend_from_slice(&header);
2849 wire_buf.extend_from_slice(×tamp_ms.to_le_bytes());
2850 wire_buf.extend_from_slice(plaintext);
2851 let predicted_bytes = wire_capacity;
2852 if let Some(peer) = self.peers.get_mut(node_addr) {
2859 peer.link_stats_mut().record_sent(predicted_bytes);
2860 if let Some(mmp) = peer.mmp_mut() {
2861 mmp.sender
2862 .record_sent(reserved_counter, timestamp_ms, predicted_bytes);
2863 }
2864 }
2865 workers.dispatch(self::encrypt_worker::FmpSendJob {
2866 cipher: cipher_clone,
2867 counter: reserved_counter,
2868 wire_buf,
2869 fsp_seal: None,
2870 socket,
2871 dest_addr: socket_addr,
2872 #[cfg(any(target_os = "linux", target_os = "macos"))]
2873 connected_socket,
2874 drop_on_backpressure: plaintext
2875 .first()
2876 .is_some_and(|ty| *ty == SessionMessageType::EndpointData.to_byte()),
2877 queued_at: crate::perf_profile::stamp(),
2878 });
2879 return Ok(());
2880 }
2881 }
2882 }
2883
2884 let inner_plaintext = prepend_inner_header(timestamp_ms, plaintext);
2889 let ciphertext = {
2891 let _t = crate::perf_profile::Timer::start(crate::perf_profile::Stage::FmpEncrypt);
2892 session
2893 .encrypt_with_aad(&inner_plaintext, &header)
2894 .map_err(|e| NodeError::SendFailed {
2895 node_addr: *node_addr,
2896 reason: format!("encryption failed: {}", e),
2897 })?
2898 };
2899
2900 let wire_packet = build_encrypted(&header, &ciphertext);
2901
2902 let send_result = {
2904 let _t = crate::perf_profile::Timer::start(crate::perf_profile::Stage::UdpSend);
2905 let transport = self
2906 .transports
2907 .get(&transport_id)
2908 .ok_or(NodeError::TransportNotFound(transport_id))?;
2909 transport.send(&remote_addr, &wire_packet).await
2910 };
2911 self.note_local_send_outcome(&send_result);
2912 let bytes_sent = send_result.map_err(|e| match e {
2913 TransportError::MtuExceeded { packet_size, mtu } => NodeError::MtuExceeded {
2914 node_addr: *node_addr,
2915 packet_size,
2916 mtu,
2917 },
2918 other => NodeError::SendFailed {
2919 node_addr: *node_addr,
2920 reason: format!("transport send: {}", other),
2921 },
2922 })?;
2923
2924 if let Some(peer) = self.peers.get_mut(node_addr) {
2926 peer.link_stats_mut().record_sent(bytes_sent);
2927 if let Some(mmp) = peer.mmp_mut() {
2929 mmp.sender.record_sent(counter, timestamp_ms, bytes_sent);
2930 }
2931 }
2932
2933 Ok(())
2934 }
2935}
2936
2937impl fmt::Debug for Node {
2938 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2939 f.debug_struct("Node")
2940 .field("node_addr", self.node_addr())
2941 .field("state", &self.state)
2942 .field("is_leaf_only", &self.is_leaf_only)
2943 .field("connections", &self.connection_count())
2944 .field("peers", &self.peer_count())
2945 .field("links", &self.link_count())
2946 .field("transports", &self.transport_count())
2947 .finish()
2948 }
2949}