1mod acl;
8mod bloom;
9mod decrypt_worker;
10mod discovery_rate_limit;
11mod encrypt_worker;
12mod handlers;
13mod lifecycle;
14mod rate_limit;
15mod retry;
16mod routing;
17mod routing_error_rate_limit;
18pub(crate) mod session;
19pub(crate) mod session_wire;
20pub(crate) mod stats;
21pub(crate) mod stats_history;
22#[cfg(test)]
23mod tests;
24mod tree;
25pub(crate) mod wire;
26
27use self::discovery_rate_limit::{DiscoveryBackoff, DiscoveryForwardRateLimiter};
28use self::rate_limit::HandshakeRateLimiter;
29use self::routing::{LearnedRouteTable, LearnedRouteTableSnapshot};
30use self::routing_error_rate_limit::RoutingErrorRateLimiter;
31use self::wire::{
32 ESTABLISHED_HEADER_SIZE, FLAG_CE, FLAG_KEY_EPOCH, FLAG_SP, build_encrypted,
33 build_established_header, prepend_inner_header,
34};
35use crate::bloom::BloomState;
36use crate::cache::CoordCache;
37use crate::config::RoutingMode;
38use crate::node::session::SessionEntry;
39use crate::peer::{ActivePeer, PeerConnection};
40#[cfg(any(target_os = "linux", target_os = "macos"))]
41use crate::transport::ethernet::EthernetTransport;
42use crate::transport::tcp::TcpTransport;
43use crate::transport::tor::TorTransport;
44use crate::transport::udp::UdpTransport;
45#[cfg(feature = "webrtc-transport")]
46use crate::transport::webrtc::WebRtcTransport;
47use crate::transport::{
48 ConnectionState, Link, LinkId, PacketRx, PacketTx, TransportAddr, TransportError,
49 TransportHandle, TransportId,
50};
51use crate::tree::TreeState;
52use crate::upper::hosts::HostMap;
53use crate::upper::icmp_rate_limit::IcmpRateLimiter;
54use crate::upper::tun::{TunError, TunOutboundRx, TunState, TunTx};
55use crate::utils::index::IndexAllocator;
56use crate::{
57 Config, ConfigError, FipsAddress, Identity, IdentityError, NodeAddr, PeerIdentity,
58 SessionMessageType, encode_npub,
59};
60use rand::Rng;
61use std::collections::{HashMap, HashSet, VecDeque};
62use std::fmt;
63use std::sync::Arc;
64use std::thread::JoinHandle;
65use thiserror::Error;
66use tracing::{debug, warn};
67
68pub(crate) const REKEY_JITTER_SECS: i64 = 15;
75
76#[derive(Debug, Error)]
78pub enum NodeError {
79 #[error("node not started")]
80 NotStarted,
81
82 #[error("node already started")]
83 AlreadyStarted,
84
85 #[error("node already stopped")]
86 AlreadyStopped,
87
88 #[error("transport not found: {0}")]
89 TransportNotFound(TransportId),
90
91 #[error("no transport available for type: {0}")]
92 NoTransportForType(String),
93
94 #[error("link not found: {0}")]
95 LinkNotFound(LinkId),
96
97 #[error("connection not found: {0}")]
98 ConnectionNotFound(LinkId),
99
100 #[error("peer not found: {0:?}")]
101 PeerNotFound(NodeAddr),
102
103 #[error("peer already exists: {0:?}")]
104 PeerAlreadyExists(NodeAddr),
105
106 #[error("connection already exists for link: {0}")]
107 ConnectionAlreadyExists(LinkId),
108
109 #[error("invalid peer npub '{npub}': {reason}")]
110 InvalidPeerNpub { npub: String, reason: String },
111
112 #[error("discovery error: {0}")]
113 Discovery(String),
114
115 #[error("access denied: {0}")]
116 AccessDenied(String),
117
118 #[error("max connections exceeded: {max}")]
119 MaxConnectionsExceeded { max: usize },
120
121 #[error("max peers exceeded: {max}")]
122 MaxPeersExceeded { max: usize },
123
124 #[error("max links exceeded: {max}")]
125 MaxLinksExceeded { max: usize },
126
127 #[error("handshake incomplete for link {0}")]
128 HandshakeIncomplete(LinkId),
129
130 #[error("no session available for link {0}")]
131 NoSession(LinkId),
132
133 #[error("promotion failed for link {link_id}: {reason}")]
134 PromotionFailed { link_id: LinkId, reason: String },
135
136 #[error("send failed to {node_addr}: {reason}")]
137 SendFailed { node_addr: NodeAddr, reason: String },
138
139 #[error("mtu exceeded forwarding to {node_addr}: packet {packet_size} > mtu {mtu}")]
140 MtuExceeded {
141 node_addr: NodeAddr,
142 packet_size: usize,
143 mtu: u16,
144 },
145
146 #[error("config error: {0}")]
147 Config(#[from] ConfigError),
148
149 #[error("identity error: {0}")]
150 Identity(#[from] IdentityError),
151
152 #[error("TUN error: {0}")]
153 Tun(#[from] TunError),
154
155 #[error("index allocation failed: {0}")]
156 IndexAllocationFailed(String),
157
158 #[error("handshake failed: {0}")]
159 HandshakeFailed(String),
160
161 #[error("transport error: {0}")]
162 TransportError(String),
163
164 #[error("bootstrap handoff failed: {0}")]
165 BootstrapHandoff(String),
166}
167
168#[derive(Debug, Clone, PartialEq, Eq)]
170pub struct NodeDeliveredPacket {
171 pub source_node_addr: NodeAddr,
173 pub source_npub: Option<String>,
175 pub destination: FipsAddress,
177 pub packet: Vec<u8>,
179}
180
181#[derive(Debug, Clone)]
182struct IdentityCacheEntry {
183 node_addr: NodeAddr,
184 pubkey: secp256k1::PublicKey,
185 npub: String,
186 last_seen_ms: u64,
187}
188
189impl IdentityCacheEntry {
190 fn new(
191 node_addr: NodeAddr,
192 pubkey: secp256k1::PublicKey,
193 npub: String,
194 last_seen_ms: u64,
195 ) -> Self {
196 Self {
197 node_addr,
198 pubkey,
199 npub,
200 last_seen_ms,
201 }
202 }
203}
204
205#[derive(Debug)]
207pub struct ExternalPacketIo {
208 pub outbound_tx: crate::upper::tun::TunOutboundTx,
210 pub inbound_rx: tokio::sync::mpsc::Receiver<NodeDeliveredPacket>,
212}
213
214#[derive(Debug)]
216pub(crate) struct EndpointDataIo {
217 pub(crate) command_tx: tokio::sync::mpsc::Sender<NodeEndpointCommand>,
226 pub(crate) event_rx: tokio::sync::mpsc::UnboundedReceiver<NodeEndpointEvent>,
236 pub(crate) event_tx: tokio::sync::mpsc::UnboundedSender<NodeEndpointEvent>,
242}
243
244fn endpoint_data_command_capacity(requested: usize) -> usize {
245 if let Ok(raw) = std::env::var("FIPS_ENDPOINT_DATA_QUEUE_CAP")
246 && let Ok(value) = raw.trim().parse::<usize>()
247 && value > 0
248 {
249 return value;
250 }
251
252 requested.max(1).max(32_768)
253}
254
255#[derive(Debug)]
257pub(crate) enum NodeEndpointCommand {
258 Send {
262 remote: PeerIdentity,
263 payload: Vec<u8>,
264 queued_at: Option<std::time::Instant>,
265 response_tx: tokio::sync::oneshot::Sender<Result<(), NodeError>>,
266 },
267 SendOneway {
273 remote: PeerIdentity,
274 payload: Vec<u8>,
275 queued_at: Option<std::time::Instant>,
276 },
277 PeerSnapshot {
278 response_tx: tokio::sync::oneshot::Sender<Vec<NodeEndpointPeer>>,
279 },
280 RelaySnapshot {
281 response_tx: tokio::sync::oneshot::Sender<Vec<NodeEndpointRelayStatus>>,
282 },
283 UpdateRelays {
284 advert_relays: Vec<String>,
285 dm_relays: Vec<String>,
286 response_tx: tokio::sync::oneshot::Sender<Result<(), NodeError>>,
287 },
288 UpdatePeers {
294 peers: Vec<crate::config::PeerConfig>,
295 response_tx: tokio::sync::oneshot::Sender<Result<UpdatePeersOutcome, NodeError>>,
296 },
297}
298
299#[derive(Debug, Clone, Default, PartialEq, Eq)]
301pub(crate) struct UpdatePeersOutcome {
302 pub(crate) added: usize,
303 pub(crate) removed: usize,
304 pub(crate) updated: usize,
305 pub(crate) unchanged: usize,
306}
307
308#[derive(Debug)]
310pub(crate) enum NodeEndpointEvent {
311 Data {
312 source_node_addr: NodeAddr,
313 source_npub: Option<String>,
314 payload: Vec<u8>,
315 queued_at: Option<std::time::Instant>,
316 },
317}
318
319#[derive(Debug, Clone, PartialEq, Eq)]
321pub(crate) struct NodeEndpointPeer {
322 pub(crate) npub: String,
323 pub(crate) transport_addr: Option<String>,
324 pub(crate) transport_type: Option<String>,
325 pub(crate) link_id: u64,
326 pub(crate) srtt_ms: Option<u64>,
327 pub(crate) packets_sent: u64,
328 pub(crate) packets_recv: u64,
329 pub(crate) bytes_sent: u64,
330 pub(crate) bytes_recv: u64,
331}
332
333#[derive(Debug, Clone, PartialEq, Eq)]
335pub(crate) struct NodeEndpointRelayStatus {
336 pub(crate) url: String,
337 pub(crate) status: String,
338}
339
340#[derive(Clone, Copy, Debug, PartialEq, Eq)]
342pub enum NodeState {
343 Created,
345 Starting,
347 Running,
349 Stopping,
351 Stopped,
353}
354
355impl NodeState {
356 pub fn is_operational(&self) -> bool {
358 matches!(self, NodeState::Running)
359 }
360
361 pub fn can_start(&self) -> bool {
363 matches!(self, NodeState::Created | NodeState::Stopped)
364 }
365
366 pub fn can_stop(&self) -> bool {
368 matches!(self, NodeState::Running)
369 }
370}
371
372impl fmt::Display for NodeState {
373 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
374 let s = match self {
375 NodeState::Created => "created",
376 NodeState::Starting => "starting",
377 NodeState::Running => "running",
378 NodeState::Stopping => "stopping",
379 NodeState::Stopped => "stopped",
380 };
381 write!(f, "{}", s)
382 }
383}
384
385#[derive(Clone, Debug)]
392pub(crate) struct RecentRequest {
393 pub(crate) from_peer: NodeAddr,
395 pub(crate) timestamp_ms: u64,
397 pub(crate) response_forwarded: bool,
401}
402
403impl RecentRequest {
404 pub(crate) fn new(from_peer: NodeAddr, timestamp_ms: u64) -> Self {
405 Self {
406 from_peer,
407 timestamp_ms,
408 response_forwarded: false,
409 }
410 }
411
412 pub(crate) fn is_expired(&self, current_time_ms: u64, expiry_ms: u64) -> bool {
414 current_time_ms.saturating_sub(self.timestamp_ms) > expiry_ms
415 }
416}
417
418type AddrKey = (TransportId, TransportAddr);
420
421#[derive(Debug, Default)]
426struct TransportDropState {
427 prev_drops: u64,
429 dropping: bool,
431}
432
433struct PendingConnect {
439 link_id: LinkId,
441 transport_id: TransportId,
443 remote_addr: TransportAddr,
445 peer_identity: PeerIdentity,
447}
448
449pub struct Node {
463 identity: Identity,
466
467 startup_epoch: [u8; 8],
470
471 started_at: std::time::Instant,
473
474 config: Config,
477
478 state: NodeState,
481
482 is_leaf_only: bool,
484
485 tree_state: TreeState,
488
489 bloom_state: BloomState,
492
493 coord_cache: CoordCache,
496 learned_routes: LearnedRouteTable,
498 recent_requests: HashMap<u64, RecentRequest>,
501 path_mtu_lookup: Arc<std::sync::RwLock<HashMap<crate::FipsAddress, u16>>>,
507
508 transports: HashMap<TransportId, TransportHandle>,
511 transport_drops: HashMap<TransportId, TransportDropState>,
513 links: HashMap<LinkId, Link>,
515 addr_to_link: HashMap<AddrKey, LinkId>,
517
518 packet_tx: Option<PacketTx>,
521 packet_rx: Option<PacketRx>,
523
524 connections: HashMap<LinkId, PeerConnection>,
528
529 peers: HashMap<NodeAddr, ActivePeer>,
533
534 sessions: HashMap<NodeAddr, SessionEntry>,
538
539 identity_cache: HashMap<[u8; 15], IdentityCacheEntry>,
543
544 pending_tun_packets: HashMap<NodeAddr, VecDeque<Vec<u8>>>,
548 pending_endpoint_data: HashMap<NodeAddr, VecDeque<Vec<u8>>>,
550 pending_lookups: HashMap<NodeAddr, handlers::discovery::PendingLookup>,
554
555 max_connections: usize,
558 max_peers: usize,
560 max_links: usize,
562
563 next_link_id: u64,
566 next_transport_id: u32,
568
569 stats: stats::NodeStats,
572
573 stats_history: stats_history::StatsHistory,
575
576 tun_state: TunState,
579 tun_name: Option<String>,
581 tun_tx: Option<TunTx>,
583 tun_outbound_rx: Option<TunOutboundRx>,
585 external_packet_tx: Option<tokio::sync::mpsc::Sender<NodeDeliveredPacket>>,
587 endpoint_command_rx: Option<tokio::sync::mpsc::Receiver<NodeEndpointCommand>>,
589 endpoint_event_tx: Option<tokio::sync::mpsc::UnboundedSender<NodeEndpointEvent>>,
591 encrypt_workers: Option<encrypt_worker::EncryptWorkerPool>,
597 decrypt_workers: Option<decrypt_worker::DecryptWorkerPool>,
600 decrypt_registered_sessions: std::collections::HashSet<(TransportId, u32)>,
609 decrypt_fallback_rx:
613 Option<tokio::sync::mpsc::UnboundedReceiver<decrypt_worker::DecryptWorkerEvent>>,
614 decrypt_fallback_tx: tokio::sync::mpsc::UnboundedSender<decrypt_worker::DecryptWorkerEvent>,
615 tun_reader_handle: Option<JoinHandle<()>>,
617 tun_writer_handle: Option<JoinHandle<()>>,
619 #[cfg(target_os = "macos")]
622 tun_shutdown_fd: Option<std::os::unix::io::RawFd>,
623
624 dns_identity_rx: Option<crate::upper::dns::DnsIdentityRx>,
627 dns_task: Option<tokio::task::JoinHandle<()>>,
629
630 index_allocator: IndexAllocator,
633 peers_by_index: HashMap<(TransportId, u32), NodeAddr>,
636 pending_outbound: HashMap<(TransportId, u32), LinkId>,
639
640 msg1_rate_limiter: HandshakeRateLimiter,
643 icmp_rate_limiter: IcmpRateLimiter,
645 routing_error_rate_limiter: RoutingErrorRateLimiter,
647 coords_response_rate_limiter: RoutingErrorRateLimiter,
649 discovery_backoff: DiscoveryBackoff,
651 discovery_forward_limiter: DiscoveryForwardRateLimiter,
653
654 pending_connects: Vec<PendingConnect>,
660
661 retry_pending: HashMap<NodeAddr, retry::RetryState>,
667
668 nostr_discovery: Option<Arc<crate::discovery::nostr::NostrDiscovery>>,
670 lan_discovery: Option<Arc<crate::discovery::lan::LanDiscovery>>,
675 local_instance_registry: Option<crate::discovery::local::LocalInstanceRegistry>,
679 local_instance_started_at_ms: Option<u64>,
680 last_local_instance_publish_ms: Option<u64>,
681 last_local_instance_scan_ms: Option<u64>,
682 nostr_discovery_started_at_ms: Option<u64>,
687 startup_open_discovery_sweep_done: bool,
691 bootstrap_transports: HashSet<TransportId>,
693 bootstrap_transport_npubs: HashMap<TransportId, String>,
700 discovery_fallback_transit_blocked_peers: HashSet<NodeAddr>,
703
704 last_parent_reeval: Option<crate::time::Instant>,
707
708 last_congestion_log: Option<std::time::Instant>,
711
712 estimated_mesh_size: Option<u64>,
715 last_mesh_size_log: Option<std::time::Instant>,
717
718 last_self_warn: Option<std::time::Instant>,
724
725 last_local_send_failure_at: Option<std::time::Instant>,
733
734 peer_aliases: HashMap<NodeAddr, String>,
738
739 peer_acl: acl::PeerAclReloader,
741
742 host_map: Arc<HostMap>,
746}
747
748impl Node {
749 pub fn new(config: Config) -> Result<Self, NodeError> {
751 config.validate()?;
752 let identity = config.create_identity()?;
753 let node_addr = *identity.node_addr();
754 let is_leaf_only = config.is_leaf_only();
755
756 let (decrypt_fallback_tx, decrypt_fallback_rx) = tokio::sync::mpsc::unbounded_channel();
757 let decrypt_fallback_rx = Some(decrypt_fallback_rx);
758
759 let mut startup_epoch = [0u8; 8];
760 rand::rng().fill_bytes(&mut startup_epoch);
761
762 let mut bloom_state = if is_leaf_only {
763 BloomState::leaf_only(node_addr)
764 } else {
765 BloomState::new(node_addr)
766 };
767 bloom_state.set_update_debounce_ms(config.node.bloom.update_debounce_ms);
768
769 let tun_state = if config.tun.enabled {
770 TunState::Configured
771 } else {
772 TunState::Disabled
773 };
774
775 let mut tree_state = TreeState::new(node_addr);
777 tree_state.set_parent_hysteresis(config.node.tree.parent_hysteresis);
778 tree_state.set_hold_down(config.node.tree.hold_down_secs);
779 tree_state.set_flap_dampening(
780 config.node.tree.flap_threshold,
781 config.node.tree.flap_window_secs,
782 config.node.tree.flap_dampening_secs,
783 );
784 tree_state
785 .sign_declaration(&identity)
786 .expect("signing own declaration should never fail");
787
788 let coord_cache = CoordCache::new(
789 config.node.cache.coord_size,
790 config.node.cache.coord_ttl_secs * 1000,
791 );
792 let rl = &config.node.rate_limit;
793 let msg1_rate_limiter = HandshakeRateLimiter::with_params(
794 rate_limit::TokenBucket::with_params(rl.handshake_burst, rl.handshake_rate),
795 config.node.limits.max_pending_inbound,
796 );
797
798 let max_connections = config.node.limits.max_connections;
799 let max_peers = config.node.limits.max_peers;
800 let max_links = config.node.limits.max_links;
801 let coords_response_interval_ms = config.node.session.coords_response_interval_ms;
802 let backoff_base_secs = config.node.discovery.backoff_base_secs;
803 let backoff_max_secs = config.node.discovery.backoff_max_secs;
804 let forward_min_interval_secs = config.node.discovery.forward_min_interval_secs;
805
806 let (host_map, peer_acl) = Self::host_map_and_peer_acl(&config);
807
808 Ok(Self {
809 identity,
810 startup_epoch,
811 started_at: std::time::Instant::now(),
812 config,
813 state: NodeState::Created,
814 is_leaf_only,
815 tree_state,
816 bloom_state,
817 coord_cache,
818 learned_routes: LearnedRouteTable::default(),
819 recent_requests: HashMap::new(),
820 transports: HashMap::new(),
821 transport_drops: HashMap::new(),
822 links: HashMap::new(),
823 addr_to_link: HashMap::new(),
824 packet_tx: None,
825 packet_rx: None,
826 connections: HashMap::new(),
827 peers: HashMap::new(),
828 sessions: HashMap::new(),
829 identity_cache: HashMap::new(),
830 pending_tun_packets: HashMap::new(),
831 pending_endpoint_data: HashMap::new(),
832 pending_lookups: HashMap::new(),
833 max_connections,
834 max_peers,
835 max_links,
836 next_link_id: 1,
837 next_transport_id: 1,
838 stats: stats::NodeStats::new(),
839 stats_history: stats_history::StatsHistory::new(),
840 tun_state,
841 tun_name: None,
842 tun_tx: None,
843 tun_outbound_rx: None,
844 external_packet_tx: None,
845 endpoint_command_rx: None,
846 endpoint_event_tx: None,
847 encrypt_workers: None,
848 decrypt_workers: None,
849 decrypt_registered_sessions: std::collections::HashSet::new(),
850 decrypt_fallback_tx,
851 decrypt_fallback_rx,
852 tun_reader_handle: None,
853 tun_writer_handle: None,
854 #[cfg(target_os = "macos")]
855 tun_shutdown_fd: None,
856 dns_identity_rx: None,
857 dns_task: None,
858 index_allocator: IndexAllocator::new(),
859 peers_by_index: HashMap::new(),
860 pending_outbound: HashMap::new(),
861 msg1_rate_limiter,
862 icmp_rate_limiter: IcmpRateLimiter::new(),
863 routing_error_rate_limiter: RoutingErrorRateLimiter::new(),
864 coords_response_rate_limiter: RoutingErrorRateLimiter::with_interval(
865 std::time::Duration::from_millis(coords_response_interval_ms),
866 ),
867 discovery_backoff: DiscoveryBackoff::with_params(backoff_base_secs, backoff_max_secs),
868 discovery_forward_limiter: DiscoveryForwardRateLimiter::with_interval(
869 std::time::Duration::from_secs(forward_min_interval_secs),
870 ),
871 pending_connects: Vec::new(),
872 retry_pending: HashMap::new(),
873 nostr_discovery: None,
874 nostr_discovery_started_at_ms: None,
875 lan_discovery: None,
876 local_instance_registry: None,
877 local_instance_started_at_ms: None,
878 last_local_instance_publish_ms: None,
879 last_local_instance_scan_ms: None,
880 startup_open_discovery_sweep_done: false,
881 bootstrap_transports: HashSet::new(),
882 bootstrap_transport_npubs: HashMap::new(),
883 discovery_fallback_transit_blocked_peers: HashSet::new(),
884 last_parent_reeval: None,
885 last_congestion_log: None,
886 estimated_mesh_size: None,
887 last_mesh_size_log: None,
888 last_self_warn: None,
889 last_local_send_failure_at: None,
890 peer_aliases: HashMap::new(),
891 peer_acl,
892 host_map,
893 path_mtu_lookup: Arc::new(std::sync::RwLock::new(HashMap::new())),
894 })
895 }
896
897 pub fn with_identity(identity: Identity, config: Config) -> Result<Self, NodeError> {
902 config.validate()?;
903 let node_addr = *identity.node_addr();
904
905 let (decrypt_fallback_tx, decrypt_fallback_rx) = tokio::sync::mpsc::unbounded_channel();
906 let decrypt_fallback_rx = Some(decrypt_fallback_rx);
907
908 let mut startup_epoch = [0u8; 8];
909 rand::rng().fill_bytes(&mut startup_epoch);
910
911 let tun_state = if config.tun.enabled {
912 TunState::Configured
913 } else {
914 TunState::Disabled
915 };
916
917 let mut tree_state = TreeState::new(node_addr);
919 tree_state.set_parent_hysteresis(config.node.tree.parent_hysteresis);
920 tree_state.set_hold_down(config.node.tree.hold_down_secs);
921 tree_state.set_flap_dampening(
922 config.node.tree.flap_threshold,
923 config.node.tree.flap_window_secs,
924 config.node.tree.flap_dampening_secs,
925 );
926 tree_state
927 .sign_declaration(&identity)
928 .expect("signing own declaration should never fail");
929
930 let mut bloom_state = BloomState::new(node_addr);
931 bloom_state.set_update_debounce_ms(config.node.bloom.update_debounce_ms);
932
933 let coord_cache = CoordCache::new(
934 config.node.cache.coord_size,
935 config.node.cache.coord_ttl_secs * 1000,
936 );
937 let rl = &config.node.rate_limit;
938 let msg1_rate_limiter = HandshakeRateLimiter::with_params(
939 rate_limit::TokenBucket::with_params(rl.handshake_burst, rl.handshake_rate),
940 config.node.limits.max_pending_inbound,
941 );
942
943 let max_connections = config.node.limits.max_connections;
944 let max_peers = config.node.limits.max_peers;
945 let max_links = config.node.limits.max_links;
946 let coords_response_interval_ms = config.node.session.coords_response_interval_ms;
947
948 let (host_map, peer_acl) = Self::host_map_and_peer_acl(&config);
949
950 Ok(Self {
951 identity,
952 startup_epoch,
953 started_at: std::time::Instant::now(),
954 config,
955 state: NodeState::Created,
956 is_leaf_only: false,
957 tree_state,
958 bloom_state,
959 coord_cache,
960 learned_routes: LearnedRouteTable::default(),
961 recent_requests: HashMap::new(),
962 transports: HashMap::new(),
963 transport_drops: HashMap::new(),
964 links: HashMap::new(),
965 addr_to_link: HashMap::new(),
966 packet_tx: None,
967 packet_rx: None,
968 connections: HashMap::new(),
969 peers: HashMap::new(),
970 sessions: HashMap::new(),
971 identity_cache: HashMap::new(),
972 pending_tun_packets: HashMap::new(),
973 pending_endpoint_data: HashMap::new(),
974 pending_lookups: HashMap::new(),
975 max_connections,
976 max_peers,
977 max_links,
978 next_link_id: 1,
979 next_transport_id: 1,
980 stats: stats::NodeStats::new(),
981 stats_history: stats_history::StatsHistory::new(),
982 tun_state,
983 tun_name: None,
984 tun_tx: None,
985 tun_outbound_rx: None,
986 external_packet_tx: None,
987 endpoint_command_rx: None,
988 endpoint_event_tx: None,
989 encrypt_workers: None,
990 decrypt_workers: None,
991 decrypt_registered_sessions: std::collections::HashSet::new(),
992 decrypt_fallback_tx,
993 decrypt_fallback_rx,
994 tun_reader_handle: None,
995 tun_writer_handle: None,
996 #[cfg(target_os = "macos")]
997 tun_shutdown_fd: None,
998 dns_identity_rx: None,
999 dns_task: None,
1000 index_allocator: IndexAllocator::new(),
1001 peers_by_index: HashMap::new(),
1002 pending_outbound: HashMap::new(),
1003 msg1_rate_limiter,
1004 icmp_rate_limiter: IcmpRateLimiter::new(),
1005 routing_error_rate_limiter: RoutingErrorRateLimiter::new(),
1006 coords_response_rate_limiter: RoutingErrorRateLimiter::with_interval(
1007 std::time::Duration::from_millis(coords_response_interval_ms),
1008 ),
1009 discovery_backoff: DiscoveryBackoff::new(),
1010 discovery_forward_limiter: DiscoveryForwardRateLimiter::new(),
1011 pending_connects: Vec::new(),
1012 retry_pending: HashMap::new(),
1013 nostr_discovery: None,
1014 nostr_discovery_started_at_ms: None,
1015 lan_discovery: None,
1016 local_instance_registry: None,
1017 local_instance_started_at_ms: None,
1018 last_local_instance_publish_ms: None,
1019 last_local_instance_scan_ms: None,
1020 startup_open_discovery_sweep_done: false,
1021 bootstrap_transports: HashSet::new(),
1022 bootstrap_transport_npubs: HashMap::new(),
1023 discovery_fallback_transit_blocked_peers: HashSet::new(),
1024 last_parent_reeval: None,
1025 last_congestion_log: None,
1026 estimated_mesh_size: None,
1027 last_mesh_size_log: None,
1028 last_self_warn: None,
1029 last_local_send_failure_at: None,
1030 peer_aliases: HashMap::new(),
1031 peer_acl,
1032 host_map,
1033 path_mtu_lookup: Arc::new(std::sync::RwLock::new(HashMap::new())),
1034 })
1035 }
1036
1037 pub fn leaf_only(config: Config) -> Result<Self, NodeError> {
1039 let mut node = Self::new(config)?;
1040 node.is_leaf_only = true;
1041 node.bloom_state = BloomState::leaf_only(*node.identity.node_addr());
1042 Ok(node)
1043 }
1044
1045 fn host_map_and_peer_acl(config: &Config) -> (Arc<HostMap>, acl::PeerAclReloader) {
1046 let base_host_map = HostMap::from_peer_configs(config.peers());
1047 if !config.node.system_files_enabled {
1048 return (
1049 Arc::new(base_host_map.clone()),
1050 acl::PeerAclReloader::memory_only(base_host_map),
1051 );
1052 }
1053
1054 let mut host_map = base_host_map.clone();
1055 let hosts_path = std::path::PathBuf::from(crate::upper::hosts::DEFAULT_HOSTS_PATH);
1056 let hosts_file = HostMap::load_hosts_file(std::path::Path::new(
1057 crate::upper::hosts::DEFAULT_HOSTS_PATH,
1058 ));
1059 host_map.merge(hosts_file);
1060 let peer_acl = acl::PeerAclReloader::with_alias_sources(
1061 std::path::PathBuf::from(acl::DEFAULT_PEERS_ALLOW_PATH),
1062 std::path::PathBuf::from(acl::DEFAULT_PEERS_DENY_PATH),
1063 base_host_map,
1064 hosts_path,
1065 );
1066 (Arc::new(host_map), peer_acl)
1067 }
1068
1069 async fn create_transports(&mut self, packet_tx: &PacketTx) -> Vec<TransportHandle> {
1073 let mut transports = Vec::new();
1074
1075 let udp_instances: Vec<_> = self
1077 .config
1078 .transports
1079 .udp
1080 .iter()
1081 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1082 .collect();
1083
1084 for (name, udp_config) in udp_instances {
1086 let transport_id = self.allocate_transport_id();
1087 let udp = UdpTransport::new(transport_id, name, udp_config, packet_tx.clone());
1088 transports.push(TransportHandle::Udp(udp));
1089 }
1090
1091 #[cfg(feature = "sim-transport")]
1092 {
1093 let sim_instances: Vec<_> = self
1094 .config
1095 .transports
1096 .sim
1097 .iter()
1098 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1099 .collect();
1100
1101 for (name, sim_config) in sim_instances {
1102 let transport_id = self.allocate_transport_id();
1103 let sim = crate::transport::sim::SimTransport::new(
1104 transport_id,
1105 name,
1106 sim_config,
1107 packet_tx.clone(),
1108 );
1109 transports.push(TransportHandle::Sim(sim));
1110 }
1111 }
1112
1113 #[cfg(any(target_os = "linux", target_os = "macos"))]
1115 {
1116 let eth_instances: Vec<_> = self
1117 .config
1118 .transports
1119 .ethernet
1120 .iter()
1121 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1122 .collect();
1123 let xonly = self.identity.pubkey();
1124 for (name, eth_config) in eth_instances {
1125 let mut eth_config = eth_config;
1126 if eth_config.discovery_scope.is_none() {
1127 eth_config.discovery_scope = self.lan_discovery_scope();
1128 }
1129 let transport_id = self.allocate_transport_id();
1130 let mut eth =
1131 EthernetTransport::new(transport_id, name, eth_config, packet_tx.clone());
1132 eth.set_local_pubkey(xonly);
1133 transports.push(TransportHandle::Ethernet(eth));
1134 }
1135 }
1136
1137 let tcp_instances: Vec<_> = self
1139 .config
1140 .transports
1141 .tcp
1142 .iter()
1143 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1144 .collect();
1145
1146 for (name, tcp_config) in tcp_instances {
1147 let transport_id = self.allocate_transport_id();
1148 let tcp = TcpTransport::new(transport_id, name, tcp_config, packet_tx.clone());
1149 transports.push(TransportHandle::Tcp(tcp));
1150 }
1151
1152 let tor_instances: Vec<_> = self
1154 .config
1155 .transports
1156 .tor
1157 .iter()
1158 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1159 .collect();
1160
1161 for (name, tor_config) in tor_instances {
1162 let transport_id = self.allocate_transport_id();
1163 let tor = TorTransport::new(transport_id, name, tor_config, packet_tx.clone());
1164 transports.push(TransportHandle::Tor(tor));
1165 }
1166
1167 let webrtc_instances: Vec<_> = self
1168 .config
1169 .transports
1170 .webrtc
1171 .iter()
1172 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1173 .collect();
1174
1175 #[cfg(feature = "webrtc-transport")]
1176 {
1177 for (name, webrtc_config) in webrtc_instances {
1178 let transport_id = self.allocate_transport_id();
1179 match WebRtcTransport::new(
1180 transport_id,
1181 name,
1182 webrtc_config,
1183 packet_tx.clone(),
1184 &self.identity,
1185 &self.config.node.discovery.nostr,
1186 ) {
1187 Ok(webrtc) => transports.push(TransportHandle::WebRtc(Box::new(webrtc))),
1188 Err(err) => {
1189 warn!(
1190 transport_id = %transport_id,
1191 error = %err,
1192 "failed to initialize WebRTC transport"
1193 );
1194 }
1195 }
1196 }
1197 }
1198 #[cfg(not(feature = "webrtc-transport"))]
1199 if !webrtc_instances.is_empty() {
1200 warn!("WebRTC transport configured but this build lacks WebRTC transport support");
1201 }
1202
1203 #[cfg(bluer_available)]
1205 {
1206 let ble_instances: Vec<_> = self
1207 .config
1208 .transports
1209 .ble
1210 .iter()
1211 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1212 .collect();
1213
1214 #[cfg(all(bluer_available, not(test)))]
1215 for (name, ble_config) in ble_instances {
1216 let transport_id = self.allocate_transport_id();
1217 let adapter = ble_config.adapter().to_string();
1218 let mtu = ble_config.mtu();
1219 match crate::transport::ble::io::BluerIo::new(&adapter, mtu).await {
1220 Ok(io) => {
1221 let mut ble = crate::transport::ble::BleTransport::new(
1222 transport_id,
1223 name,
1224 ble_config,
1225 io,
1226 packet_tx.clone(),
1227 );
1228 ble.set_local_pubkey(self.identity.pubkey().serialize());
1229 transports.push(TransportHandle::Ble(ble));
1230 }
1231 Err(e) => {
1232 tracing::warn!(adapter = %adapter, error = %e, "failed to initialize BLE adapter");
1233 }
1234 }
1235 }
1236
1237 #[cfg(any(not(bluer_available), test))]
1238 if !ble_instances.is_empty() {
1239 #[cfg(not(test))]
1240 tracing::warn!("BLE transport configured but this build lacks BlueZ support");
1241 }
1242 }
1243
1244 transports
1245 }
1246
1247 fn find_transport_for_type(&self, transport_type: &str) -> Option<TransportId> {
1257 self.transports
1258 .iter()
1259 .filter(|(id, handle)| {
1260 handle.transport_type().name == transport_type
1261 && handle.is_operational()
1262 && !self.bootstrap_transports.contains(id)
1263 })
1264 .min_by_key(|(id, _)| id.as_u32())
1265 .map(|(id, _)| *id)
1266 }
1267
1268 #[allow(unused_variables)]
1274 fn resolve_ethernet_addr(
1275 &self,
1276 addr_str: &str,
1277 ) -> Result<(TransportId, TransportAddr), NodeError> {
1278 #[cfg(any(target_os = "linux", target_os = "macos"))]
1279 {
1280 let (iface, mac_str) = addr_str.split_once('/').ok_or_else(|| {
1281 NodeError::NoTransportForType(format!(
1282 "invalid Ethernet address format '{}': expected 'interface/mac'",
1283 addr_str
1284 ))
1285 })?;
1286
1287 let transport_id = self
1289 .transports
1290 .iter()
1291 .find(|(_, handle)| {
1292 handle.transport_type().name == "ethernet"
1293 && handle.is_operational()
1294 && handle.interface_name() == Some(iface)
1295 })
1296 .map(|(id, _)| *id)
1297 .ok_or_else(|| {
1298 NodeError::NoTransportForType(format!(
1299 "no operational Ethernet transport for interface '{}'",
1300 iface
1301 ))
1302 })?;
1303
1304 let mac = crate::transport::ethernet::parse_mac_string(mac_str).map_err(|e| {
1305 NodeError::NoTransportForType(format!("invalid MAC in '{}': {}", addr_str, e))
1306 })?;
1307
1308 Ok((transport_id, TransportAddr::from_bytes(&mac)))
1309 }
1310 #[cfg(not(any(target_os = "linux", target_os = "macos")))]
1311 {
1312 Err(NodeError::NoTransportForType(
1313 "Ethernet transport is not supported on this platform".to_string(),
1314 ))
1315 }
1316 }
1317
1318 #[cfg(bluer_available)]
1322 fn resolve_ble_addr(&self, addr_str: &str) -> Result<(TransportId, TransportAddr), NodeError> {
1323 let ta = TransportAddr::from_string(addr_str);
1324 let adapter = crate::transport::ble::addr::adapter_from_addr(&ta).ok_or_else(|| {
1325 NodeError::NoTransportForType(format!(
1326 "invalid BLE address format '{}': expected 'adapter/mac'",
1327 addr_str
1328 ))
1329 })?;
1330
1331 let transport_id = self
1333 .transports
1334 .iter()
1335 .find(|(_, handle)| handle.transport_type().name == "ble" && handle.is_operational())
1336 .map(|(id, _)| *id)
1337 .ok_or_else(|| {
1338 NodeError::NoTransportForType(format!(
1339 "no operational BLE transport for adapter '{}'",
1340 adapter
1341 ))
1342 })?;
1343
1344 crate::transport::ble::addr::BleAddr::parse(addr_str).map_err(|e| {
1346 NodeError::NoTransportForType(format!("invalid BLE address '{}': {}", addr_str, e))
1347 })?;
1348
1349 Ok((transport_id, TransportAddr::from_string(addr_str)))
1350 }
1351
1352 pub fn identity(&self) -> &Identity {
1356 &self.identity
1357 }
1358
1359 pub fn node_addr(&self) -> &NodeAddr {
1361 self.identity.node_addr()
1362 }
1363
1364 pub fn npub(&self) -> String {
1366 self.identity.npub()
1367 }
1368
1369 pub(crate) fn peer_display_name(&self, addr: &NodeAddr) -> String {
1378 if let Some(hostname) = self.host_map.lookup_hostname(addr) {
1379 return hostname.to_string();
1380 }
1381 if let Some(name) = self.peer_aliases.get(addr) {
1382 return name.clone();
1383 }
1384 if let Some(peer) = self.peers.get(addr) {
1385 return peer.identity().short_npub();
1386 }
1387 if let Some(entry) = self.sessions.get(addr) {
1388 let (xonly, _) = entry.remote_pubkey().x_only_public_key();
1389 return PeerIdentity::from_pubkey(xonly).short_npub();
1390 }
1391 addr.short_hex()
1392 }
1393
1394 pub(in crate::node) fn deregister_session_index(&mut self, cache_key: (TransportId, u32)) {
1406 let owning_peer = self.peers_by_index.get(&cache_key).copied();
1410 self.peers_by_index.remove(&cache_key);
1411 if self.decrypt_registered_sessions.remove(&cache_key)
1412 && let Some(workers) = self.decrypt_workers.as_ref()
1413 {
1414 workers.unregister_session(cache_key);
1415 }
1416 if let Some(peer_addr) = owning_peer {
1427 let peer_has_other_index = self
1428 .peers_by_index
1429 .values()
1430 .any(|other| *other == peer_addr);
1431 if !peer_has_other_index {
1432 self.clear_connected_udp_for_peer(&peer_addr);
1433 }
1434 }
1435 }
1436
1437 pub(in crate::node) fn ensure_current_session_index_registered(
1446 &mut self,
1447 node_addr: &NodeAddr,
1448 context: &'static str,
1449 ) -> bool {
1450 let Some(peer) = self.peers.get(node_addr) else {
1451 return false;
1452 };
1453 let Some(transport_id) = peer.transport_id() else {
1454 warn!(
1455 peer = %self.peer_display_name(node_addr),
1456 context,
1457 "Cannot register current session index without transport id"
1458 );
1459 return false;
1460 };
1461 let Some(our_index) = peer.our_index() else {
1462 warn!(
1463 peer = %self.peer_display_name(node_addr),
1464 context,
1465 "Cannot register current session index without local index"
1466 );
1467 return false;
1468 };
1469
1470 let cache_key = (transport_id, our_index.as_u32());
1471 match self.peers_by_index.get(&cache_key).copied() {
1472 Some(existing) if existing == *node_addr => true,
1473 Some(existing) => {
1474 warn!(
1475 peer = %self.peer_display_name(node_addr),
1476 previous_owner = %self.peer_display_name(&existing),
1477 transport_id = %transport_id,
1478 our_index = %our_index,
1479 context,
1480 "Repairing current session index with stale owner"
1481 );
1482 self.peers_by_index.insert(cache_key, *node_addr);
1483 true
1484 }
1485 None => {
1486 warn!(
1487 peer = %self.peer_display_name(node_addr),
1488 transport_id = %transport_id,
1489 our_index = %our_index,
1490 context,
1491 "Repairing missing current session index"
1492 );
1493 self.peers_by_index.insert(cache_key, *node_addr);
1494 true
1495 }
1496 }
1497 }
1498
1499 pub fn config(&self) -> &Config {
1503 &self.config
1504 }
1505
1506 pub fn effective_ipv6_mtu(&self) -> u16 {
1512 crate::upper::icmp::effective_ipv6_mtu(self.transport_mtu())
1513 }
1514
1515 pub fn transport_mtu(&self) -> u16 {
1532 let min_operational = self
1533 .transports
1534 .values()
1535 .filter(|h| h.is_operational())
1536 .map(|h| h.mtu())
1537 .min();
1538 if let Some(mtu) = min_operational {
1539 return mtu;
1540 }
1541 if let Some((_, cfg)) = self.config.transports.udp.iter().next() {
1543 return cfg.mtu();
1544 }
1545 1280
1546 }
1547
1548 pub fn state(&self) -> NodeState {
1552 self.state
1553 }
1554
1555 pub fn uptime(&self) -> std::time::Duration {
1557 self.started_at.elapsed()
1558 }
1559
1560 pub fn is_running(&self) -> bool {
1562 self.state.is_operational()
1563 }
1564
1565 pub fn is_leaf_only(&self) -> bool {
1567 self.is_leaf_only
1568 }
1569
1570 pub fn tree_state(&self) -> &TreeState {
1574 &self.tree_state
1575 }
1576
1577 pub fn tree_state_mut(&mut self) -> &mut TreeState {
1579 &mut self.tree_state
1580 }
1581
1582 pub fn bloom_state(&self) -> &BloomState {
1586 &self.bloom_state
1587 }
1588
1589 pub fn bloom_state_mut(&mut self) -> &mut BloomState {
1591 &mut self.bloom_state
1592 }
1593
1594 pub fn estimated_mesh_size(&self) -> Option<u64> {
1598 self.estimated_mesh_size
1599 }
1600
1601 pub(crate) fn compute_mesh_size(&mut self) {
1607 let my_addr = *self.tree_state.my_node_addr();
1608 let parent_id = *self.tree_state.my_declaration().parent_id();
1609 let is_root = self.tree_state.is_root();
1610
1611 let max_fpr = self.config.node.bloom.max_inbound_fpr;
1612 let mut total: f64 = 1.0; let mut child_count: u32 = 0;
1614 let mut has_data = false;
1615
1616 if !is_root
1622 && let Some(parent) = self.peers.get(&parent_id)
1623 && let Some(filter) = parent.inbound_filter()
1624 {
1625 match filter.estimated_count(max_fpr) {
1626 Some(n) => {
1627 total += n;
1628 has_data = true;
1629 }
1630 None => {
1631 self.estimated_mesh_size = None;
1632 return;
1633 }
1634 }
1635 }
1636
1637 for (peer_addr, peer) in &self.peers {
1639 if peer_addr == &parent_id {
1640 continue;
1641 }
1642 if let Some(decl) = self.tree_state.peer_declaration(peer_addr)
1643 && *decl.parent_id() == my_addr
1644 {
1645 child_count += 1;
1646 if let Some(filter) = peer.inbound_filter() {
1647 match filter.estimated_count(max_fpr) {
1648 Some(n) => {
1649 total += n;
1650 has_data = true;
1651 }
1652 None => {
1653 self.estimated_mesh_size = None;
1654 return;
1655 }
1656 }
1657 }
1658 }
1659 }
1660
1661 if !has_data {
1662 self.estimated_mesh_size = None;
1663 return;
1664 }
1665
1666 let size = total.round() as u64;
1667 self.estimated_mesh_size = Some(size);
1668
1669 let now = std::time::Instant::now();
1671 let should_log = match self.last_mesh_size_log {
1672 None => true,
1673 Some(last) => {
1674 now.duration_since(last)
1675 >= std::time::Duration::from_secs(self.config.node.mmp.log_interval_secs)
1676 }
1677 };
1678 if should_log {
1679 tracing::debug!(
1680 estimated_mesh_size = size,
1681 peers = self.peers.len(),
1682 children = child_count,
1683 "Mesh size estimate"
1684 );
1685 self.last_mesh_size_log = Some(now);
1686 }
1687 }
1688
1689 pub fn coord_cache(&self) -> &CoordCache {
1693 &self.coord_cache
1694 }
1695
1696 pub fn coord_cache_mut(&mut self) -> &mut CoordCache {
1698 &mut self.coord_cache
1699 }
1700
1701 pub fn stats(&self) -> &stats::NodeStats {
1705 &self.stats
1706 }
1707
1708 pub(crate) fn stats_mut(&mut self) -> &mut stats::NodeStats {
1710 &mut self.stats
1711 }
1712
1713 pub fn stats_history(&self) -> &stats_history::StatsHistory {
1715 &self.stats_history
1716 }
1717
1718 pub(crate) fn record_stats_history(&mut self) {
1721 let fwd = &self.stats.forwarding;
1722 let peers_with_mmp: Vec<f64> = self
1723 .peers
1724 .values()
1725 .filter_map(|p| p.mmp().map(|m| m.metrics.loss_rate()))
1726 .collect();
1727 let loss_rate = if peers_with_mmp.is_empty() {
1728 0.0
1729 } else {
1730 peers_with_mmp.iter().sum::<f64>() / peers_with_mmp.len() as f64
1731 };
1732
1733 let snap = stats_history::Snapshot {
1734 mesh_size: self.estimated_mesh_size,
1735 tree_depth: self.tree_state.my_coords().depth() as u32,
1736 peer_count: self.peers.len() as u64,
1737 parent_switches_total: self.stats.tree.parent_switches,
1738 bytes_in_total: fwd.received_bytes,
1739 bytes_out_total: fwd.forwarded_bytes + fwd.originated_bytes,
1740 packets_in_total: fwd.received_packets,
1741 packets_out_total: fwd.forwarded_packets + fwd.originated_packets,
1742 loss_rate,
1743 active_sessions: self.sessions.len() as u64,
1744 };
1745
1746 let now = std::time::Instant::now();
1747 let peer_snaps: Vec<stats_history::PeerSnapshot> = self
1748 .peers
1749 .values()
1750 .map(|p| {
1751 let stats = p.link_stats();
1752 let (srtt_ms, loss_rate, ecn_ce) = match p.mmp() {
1753 Some(m) => (
1754 m.metrics.srtt_ms(),
1755 Some(m.metrics.loss_rate()),
1756 m.receiver.ecn_ce_count() as u64,
1757 ),
1758 None => (None, None, 0),
1759 };
1760 stats_history::PeerSnapshot {
1761 node_addr: *p.node_addr(),
1762 last_seen: now,
1763 srtt_ms,
1764 loss_rate,
1765 bytes_in_total: stats.bytes_recv,
1766 bytes_out_total: stats.bytes_sent,
1767 packets_in_total: stats.packets_recv,
1768 packets_out_total: stats.packets_sent,
1769 ecn_ce_total: ecn_ce,
1770 }
1771 })
1772 .collect();
1773
1774 self.stats_history.tick(now, &snap, &peer_snaps);
1775 }
1776
1777 pub fn tun_state(&self) -> TunState {
1781 self.tun_state
1782 }
1783
1784 pub fn tun_name(&self) -> Option<&str> {
1786 self.tun_name.as_deref()
1787 }
1788
1789 pub fn set_max_connections(&mut self, max: usize) {
1793 self.max_connections = max;
1794 }
1795
1796 pub fn set_max_peers(&mut self, max: usize) {
1798 self.max_peers = max;
1799 }
1800
1801 pub(crate) fn outbound_admission_check(&self) -> bool {
1804 self.max_peers == 0 || self.peers.len() < self.max_peers
1805 }
1806
1807 pub fn set_max_links(&mut self, max: usize) {
1809 self.max_links = max;
1810 }
1811
1812 pub fn connection_count(&self) -> usize {
1816 self.connections.len()
1817 }
1818
1819 pub fn peer_count(&self) -> usize {
1821 self.peers.len()
1822 }
1823
1824 pub fn link_count(&self) -> usize {
1826 self.links.len()
1827 }
1828
1829 pub fn transport_count(&self) -> usize {
1831 self.transports.len()
1832 }
1833
1834 pub fn allocate_transport_id(&mut self) -> TransportId {
1838 let id = TransportId::new(self.next_transport_id);
1839 self.next_transport_id += 1;
1840 id
1841 }
1842
1843 pub fn get_transport(&self, id: &TransportId) -> Option<&TransportHandle> {
1845 self.transports.get(id)
1846 }
1847
1848 pub fn get_transport_mut(&mut self, id: &TransportId) -> Option<&mut TransportHandle> {
1850 self.transports.get_mut(id)
1851 }
1852
1853 pub fn transport_ids(&self) -> impl Iterator<Item = &TransportId> {
1855 self.transports.keys()
1856 }
1857
1858 pub fn packet_rx(&mut self) -> Option<&mut PacketRx> {
1860 self.packet_rx.as_mut()
1861 }
1862
1863 pub fn allocate_link_id(&mut self) -> LinkId {
1867 let id = LinkId::new(self.next_link_id);
1868 self.next_link_id += 1;
1869 id
1870 }
1871
1872 pub fn add_link(&mut self, link: Link) -> Result<(), NodeError> {
1874 if self.max_links > 0 && self.links.len() >= self.max_links {
1875 return Err(NodeError::MaxLinksExceeded {
1876 max: self.max_links,
1877 });
1878 }
1879 let link_id = link.link_id();
1880 let transport_id = link.transport_id();
1881 let remote_addr = link.remote_addr().clone();
1882
1883 self.links.insert(link_id, link);
1884 self.addr_to_link
1885 .insert((transport_id, remote_addr), link_id);
1886 Ok(())
1887 }
1888
1889 pub fn get_link(&self, link_id: &LinkId) -> Option<&Link> {
1891 self.links.get(link_id)
1892 }
1893
1894 pub fn get_link_mut(&mut self, link_id: &LinkId) -> Option<&mut Link> {
1896 self.links.get_mut(link_id)
1897 }
1898
1899 pub fn find_link_by_addr(
1901 &self,
1902 transport_id: TransportId,
1903 addr: &TransportAddr,
1904 ) -> Option<LinkId> {
1905 self.addr_to_link
1906 .get(&(transport_id, addr.clone()))
1907 .copied()
1908 }
1909
1910 pub fn remove_link(&mut self, link_id: &LinkId) -> Option<Link> {
1916 if let Some(link) = self.links.remove(link_id) {
1917 let key = (link.transport_id(), link.remote_addr().clone());
1919 if self.addr_to_link.get(&key) == Some(link_id) {
1920 self.addr_to_link.remove(&key);
1921 }
1922 Some(link)
1923 } else {
1924 None
1925 }
1926 }
1927
1928 pub(crate) fn cleanup_bootstrap_transport_if_unused(&mut self, transport_id: TransportId) {
1929 if !self.bootstrap_transports.contains(&transport_id) {
1930 return;
1931 }
1932
1933 let transport_in_use = self
1934 .links
1935 .values()
1936 .any(|link| link.transport_id() == transport_id)
1937 || self
1938 .connections
1939 .values()
1940 .any(|conn| conn.transport_id() == Some(transport_id))
1941 || self
1942 .peers
1943 .values()
1944 .any(|peer| peer.transport_id() == Some(transport_id))
1945 || self
1946 .pending_connects
1947 .iter()
1948 .any(|pending| pending.transport_id == transport_id);
1949
1950 if transport_in_use {
1951 return;
1952 }
1953
1954 tracing::debug!(
1955 transport_id = %transport_id,
1956 "bootstrap transport has no remaining references; dropping"
1957 );
1958
1959 self.bootstrap_transports.remove(&transport_id);
1960 self.bootstrap_transport_npubs.remove(&transport_id);
1961 self.transport_drops.remove(&transport_id);
1962 self.transports.remove(&transport_id);
1963 }
1964
1965 pub fn links(&self) -> impl Iterator<Item = &Link> {
1967 self.links.values()
1968 }
1969
1970 pub fn add_connection(&mut self, connection: PeerConnection) -> Result<(), NodeError> {
1974 let link_id = connection.link_id();
1975
1976 if self.connections.contains_key(&link_id) {
1977 return Err(NodeError::ConnectionAlreadyExists(link_id));
1978 }
1979
1980 if self.max_connections > 0 && self.connections.len() >= self.max_connections {
1981 return Err(NodeError::MaxConnectionsExceeded {
1982 max: self.max_connections,
1983 });
1984 }
1985
1986 self.connections.insert(link_id, connection);
1987 Ok(())
1988 }
1989
1990 pub fn get_connection(&self, link_id: &LinkId) -> Option<&PeerConnection> {
1992 self.connections.get(link_id)
1993 }
1994
1995 pub fn get_connection_mut(&mut self, link_id: &LinkId) -> Option<&mut PeerConnection> {
1997 self.connections.get_mut(link_id)
1998 }
1999
2000 pub fn remove_connection(&mut self, link_id: &LinkId) -> Option<PeerConnection> {
2002 self.connections.remove(link_id)
2003 }
2004
2005 pub fn connections(&self) -> impl Iterator<Item = &PeerConnection> {
2007 self.connections.values()
2008 }
2009
2010 pub fn get_peer(&self, node_addr: &NodeAddr) -> Option<&ActivePeer> {
2014 self.peers.get(node_addr)
2015 }
2016
2017 pub fn get_peer_mut(&mut self, node_addr: &NodeAddr) -> Option<&mut ActivePeer> {
2019 self.peers.get_mut(node_addr)
2020 }
2021
2022 pub fn remove_peer(&mut self, node_addr: &NodeAddr) -> Option<ActivePeer> {
2024 self.peers.remove(node_addr)
2025 }
2026
2027 pub fn peers(&self) -> impl Iterator<Item = &ActivePeer> {
2029 self.peers.values()
2030 }
2031
2032 pub fn nostr_discovery_handle(&self) -> Option<&crate::discovery::nostr::NostrDiscovery> {
2036 self.nostr_discovery.as_deref()
2037 }
2038
2039 pub fn peer_ids(&self) -> impl Iterator<Item = &NodeAddr> {
2041 self.peers.keys()
2042 }
2043
2044 pub fn sendable_peers(&self) -> impl Iterator<Item = &ActivePeer> {
2046 self.peers.values().filter(|p| p.can_send())
2047 }
2048
2049 pub fn sendable_peer_count(&self) -> usize {
2051 self.peers.values().filter(|p| p.can_send()).count()
2052 }
2053
2054 pub(crate) fn set_discovery_fallback_transit_allowed(
2055 &mut self,
2056 peer_addr: NodeAddr,
2057 allowed: bool,
2058 ) {
2059 if allowed {
2060 self.discovery_fallback_transit_blocked_peers
2061 .remove(&peer_addr);
2062 } else {
2063 self.discovery_fallback_transit_blocked_peers
2064 .insert(peer_addr);
2065 }
2066 }
2067
2068 pub(crate) fn configured_discovery_fallback_transit(
2069 &self,
2070 peer_addr: &NodeAddr,
2071 ) -> Option<bool> {
2072 self.config.peers().iter().find_map(|peer| {
2073 PeerIdentity::from_npub(&peer.npub)
2074 .ok()
2075 .filter(|identity| identity.node_addr() == peer_addr)
2076 .map(|_| peer.discovery_fallback_transit)
2077 })
2078 }
2079
2080 pub(crate) fn discovery_fallback_transit_for_promotion(&self, peer_addr: &NodeAddr) -> bool {
2081 if let Some(retry_state) = self.retry_pending.get(peer_addr) {
2082 return retry_state.peer_config.discovery_fallback_transit;
2083 }
2084
2085 if let Some(allowed) = self.configured_discovery_fallback_transit(peer_addr) {
2086 return allowed;
2087 }
2088
2089 self.config.node.discovery.nostr.policy != crate::config::NostrDiscoveryPolicy::Open
2090 }
2091
2092 #[cfg(test)]
2097 pub(crate) fn disable_discovery_forward_rate_limit(&mut self) {
2098 self.discovery_forward_limiter
2099 .set_interval(std::time::Duration::ZERO);
2100 }
2101
2102 #[cfg(test)]
2103 pub(crate) fn get_session(&self, remote: &NodeAddr) -> Option<&SessionEntry> {
2104 self.sessions.get(remote)
2105 }
2106
2107 #[cfg(test)]
2109 pub(crate) fn get_session_mut(&mut self, remote: &NodeAddr) -> Option<&mut SessionEntry> {
2110 self.sessions.get_mut(remote)
2111 }
2112
2113 #[cfg(test)]
2115 pub(crate) fn remove_session(&mut self, remote: &NodeAddr) -> Option<SessionEntry> {
2116 self.sessions.remove(remote)
2117 }
2118
2119 #[cfg(test)]
2121 pub(crate) fn path_mtu_lookup_get(&self, fips_addr: &crate::FipsAddress) -> Option<u16> {
2122 self.path_mtu_lookup
2123 .read()
2124 .ok()
2125 .and_then(|map| map.get(fips_addr).copied())
2126 }
2127
2128 #[cfg(test)]
2130 pub(crate) fn path_mtu_lookup_insert(&self, fips_addr: crate::FipsAddress, mtu: u16) {
2131 if let Ok(mut map) = self.path_mtu_lookup.write() {
2132 map.insert(fips_addr, mtu);
2133 }
2134 }
2135
2136 pub fn session_count(&self) -> usize {
2138 self.sessions.len()
2139 }
2140
2141 pub(crate) fn session_entries(&self) -> impl Iterator<Item = (&NodeAddr, &SessionEntry)> {
2143 self.sessions.iter()
2144 }
2145
2146 pub(crate) fn register_identity(
2150 &mut self,
2151 node_addr: NodeAddr,
2152 pubkey: secp256k1::PublicKey,
2153 ) -> bool {
2154 let mut prefix = [0u8; 15];
2155 prefix.copy_from_slice(&node_addr.as_bytes()[0..15]);
2156 if let Some(entry) = self.identity_cache.get(&prefix)
2157 && entry.node_addr == node_addr
2158 && entry.pubkey == pubkey
2159 {
2160 return true;
2164 }
2165
2166 let (xonly, _) = pubkey.x_only_public_key();
2167 let derived_node_addr = NodeAddr::from_pubkey(&xonly);
2168 if derived_node_addr != node_addr {
2169 debug!(
2170 claimed_node_addr = %node_addr,
2171 derived_node_addr = %derived_node_addr,
2172 "Rejected identity cache entry with mismatched public key"
2173 );
2174 return false;
2175 }
2176
2177 let now_ms = Self::now_ms();
2178 if let Some(entry) = self.identity_cache.get_mut(&prefix)
2179 && entry.node_addr == node_addr
2180 {
2181 entry.pubkey = pubkey;
2182 entry.last_seen_ms = now_ms;
2183 return true;
2184 }
2185
2186 let npub = encode_npub(&xonly);
2187 self.identity_cache.insert(
2188 prefix,
2189 IdentityCacheEntry::new(node_addr, pubkey, npub, now_ms),
2190 );
2191 let max = self.config.node.cache.identity_size;
2193 if self.identity_cache.len() > max
2194 && let Some(oldest_key) = self
2195 .identity_cache
2196 .iter()
2197 .min_by_key(|(_, entry)| entry.last_seen_ms)
2198 .map(|(k, _)| *k)
2199 {
2200 self.identity_cache.remove(&oldest_key);
2201 }
2202 true
2203 }
2204
2205 pub(crate) fn lookup_by_fips_prefix(
2207 &mut self,
2208 prefix: &[u8; 15],
2209 ) -> Option<(NodeAddr, secp256k1::PublicKey)> {
2210 if let Some(entry) = self.identity_cache.get_mut(prefix) {
2211 entry.last_seen_ms = Self::now_ms(); Some((entry.node_addr, entry.pubkey))
2213 } else {
2214 None
2215 }
2216 }
2217
2218 pub(crate) fn has_cached_identity(&self, addr: &NodeAddr) -> bool {
2220 let mut prefix = [0u8; 15];
2221 prefix.copy_from_slice(&addr.as_bytes()[0..15]);
2222 self.identity_cache.contains_key(&prefix)
2223 }
2224
2225 pub fn identity_cache_len(&self) -> usize {
2227 self.identity_cache.len()
2228 }
2229
2230 pub fn identity_cache_iter(
2235 &self,
2236 ) -> impl Iterator<Item = (&NodeAddr, &secp256k1::PublicKey, u64)> {
2237 self.identity_cache
2238 .values()
2239 .map(|entry| (&entry.node_addr, &entry.pubkey, entry.last_seen_ms))
2240 }
2241
2242 pub fn identity_cache_max(&self) -> usize {
2244 self.config.node.cache.identity_size
2245 }
2246
2247 pub fn pending_lookup_count(&self) -> usize {
2249 self.pending_lookups.len()
2250 }
2251
2252 pub fn pending_lookups_iter(
2254 &self,
2255 ) -> impl Iterator<Item = (&NodeAddr, &handlers::discovery::PendingLookup)> {
2256 self.pending_lookups.iter()
2257 }
2258
2259 pub fn recent_request_count(&self) -> usize {
2261 self.recent_requests.len()
2262 }
2263
2264 pub fn pending_tun_destinations(&self) -> usize {
2266 self.pending_tun_packets.len()
2267 }
2268
2269 pub fn pending_tun_total_packets(&self) -> usize {
2271 self.pending_tun_packets.values().map(|q| q.len()).sum()
2272 }
2273
2274 pub fn retry_state_iter(&self) -> impl Iterator<Item = (&NodeAddr, &retry::RetryState)> {
2276 self.retry_pending.iter()
2277 }
2278
2279 pub(crate) fn is_tree_peer(&self, peer_addr: &NodeAddr) -> bool {
2286 if !self.tree_state.is_root() && self.tree_state.my_declaration().parent_id() == peer_addr {
2288 return true;
2289 }
2290 if let Some(decl) = self.tree_state.peer_declaration(peer_addr)
2292 && decl.parent_id() == self.node_addr()
2293 {
2294 return true;
2295 }
2296 false
2297 }
2298
2299 pub fn find_next_hop(&mut self, dest_node_addr: &NodeAddr) -> Option<&ActivePeer> {
2322 if dest_node_addr == self.node_addr() {
2324 return None;
2325 }
2326
2327 let direct_peer_can_send = self
2331 .peers
2332 .get(dest_node_addr)
2333 .is_some_and(|peer| peer.can_send());
2334 if let Some(peer) = self.peers.get(dest_node_addr)
2335 && peer.is_healthy()
2336 {
2337 return Some(peer);
2338 }
2339
2340 let now_ms = Self::now_ms();
2341
2342 let sendable_learned_peers = if self.config.node.routing.mode == RoutingMode::ReplyLearned {
2343 Some(
2344 self.peers
2345 .iter()
2346 .filter(|(_, peer)| peer.can_send())
2347 .map(|(addr, _)| *addr)
2348 .collect::<HashSet<_>>(),
2349 )
2350 } else {
2351 None
2352 };
2353
2354 let explore_fallback = sendable_learned_peers.as_ref().is_some_and(|sendable| {
2361 self.learned_routes.should_explore_fallback(
2362 dest_node_addr,
2363 now_ms,
2364 self.config.node.routing.learned_fallback_explore_interval,
2365 |addr| sendable.contains(addr),
2366 )
2367 });
2368 if let Some(sendable) = &sendable_learned_peers
2369 && !explore_fallback
2370 && let Some(next_hop_addr) =
2371 self.learned_routes
2372 .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2373 {
2374 return self.peers.get(&next_hop_addr);
2375 }
2376
2377 let Some(dest_coords) = self
2379 .coord_cache
2380 .get_and_touch(dest_node_addr, now_ms)
2381 .cloned()
2382 else {
2383 if let Some(sendable) = &sendable_learned_peers
2384 && let Some(next_hop_addr) =
2385 self.learned_routes
2386 .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2387 {
2388 return self.peers.get(&next_hop_addr);
2389 }
2390 if direct_peer_can_send {
2391 return self.peers.get(dest_node_addr);
2392 }
2393 return None;
2394 };
2395
2396 let coordinate_route_addr = {
2399 let candidates: Vec<&ActivePeer> = self.destination_in_filters(dest_node_addr);
2400 if !candidates.is_empty() {
2401 self.select_best_candidate(&candidates, &dest_coords)
2402 .map(|peer| *peer.node_addr())
2403 } else {
2404 None
2405 }
2406 };
2407 if let Some(next_hop_addr) = coordinate_route_addr {
2408 return self.peers.get(&next_hop_addr);
2409 }
2410
2411 let tree_route_addr = self
2413 .tree_state
2414 .find_next_hop(&dest_coords)
2415 .filter(|next_hop_id| {
2416 self.peers
2417 .get(next_hop_id)
2418 .is_some_and(|peer| peer.can_send())
2419 });
2420 if let Some(next_hop_addr) = tree_route_addr {
2421 return self.peers.get(&next_hop_addr);
2422 }
2423 if explore_fallback {
2424 return sendable_learned_peers.as_ref().and_then(|sendable| {
2425 self.learned_routes
2426 .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2427 .and_then(|next_hop_addr| self.peers.get(&next_hop_addr))
2428 });
2429 }
2430
2431 if let Some(sendable) = &sendable_learned_peers
2432 && let Some(next_hop_addr) =
2433 self.learned_routes
2434 .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2435 {
2436 return self.peers.get(&next_hop_addr);
2437 }
2438
2439 if direct_peer_can_send {
2440 return self.peers.get(dest_node_addr);
2441 }
2442
2443 None
2444 }
2445
2446 pub(in crate::node) fn learn_reverse_route(
2447 &mut self,
2448 destination: NodeAddr,
2449 next_hop: NodeAddr,
2450 ) {
2451 if self.config.node.routing.mode != RoutingMode::ReplyLearned
2452 || destination == *self.node_addr()
2453 {
2454 return;
2455 }
2456 let now_ms = Self::now_ms();
2457 self.learned_routes.learn(
2458 destination,
2459 next_hop,
2460 now_ms,
2461 self.config.node.routing.learned_ttl_secs,
2462 self.config.node.routing.max_learned_routes_per_dest,
2463 );
2464 }
2465
2466 pub(in crate::node) fn record_route_failure(
2467 &mut self,
2468 destination: NodeAddr,
2469 next_hop: NodeAddr,
2470 ) {
2471 if self.config.node.routing.mode != RoutingMode::ReplyLearned {
2472 return;
2473 }
2474 self.learned_routes.record_failure(&destination, &next_hop);
2475 }
2476
2477 pub(crate) fn learned_route_table_snapshot(&self, now_ms: u64) -> LearnedRouteTableSnapshot {
2478 self.learned_routes.snapshot(now_ms)
2479 }
2480
2481 pub(in crate::node) fn purge_learned_routes(&mut self, now_ms: u64) {
2482 self.learned_routes.purge_expired(now_ms);
2483 }
2484
2485 fn select_best_candidate<'a>(
2494 &'a self,
2495 candidates: &[&'a ActivePeer],
2496 dest_coords: &crate::tree::TreeCoordinate,
2497 ) -> Option<&'a ActivePeer> {
2498 let my_distance = self.tree_state.my_coords().distance_to(dest_coords);
2499
2500 let mut best: Option<(&ActivePeer, f64, usize)> = None;
2501
2502 for &candidate in candidates {
2503 if !candidate.can_send() {
2504 continue;
2505 }
2506
2507 let cost = candidate.link_cost();
2508
2509 let dist = self
2510 .tree_state
2511 .peer_coords(candidate.node_addr())
2512 .map(|pc| pc.distance_to(dest_coords))
2513 .unwrap_or(usize::MAX);
2514
2515 if dist >= my_distance {
2518 continue;
2519 }
2520
2521 let dominated = match &best {
2522 None => true,
2523 Some((_, best_cost, best_dist)) => {
2524 cost < *best_cost
2525 || (cost == *best_cost && dist < *best_dist)
2526 || (cost == *best_cost
2527 && dist == *best_dist
2528 && candidate.node_addr() < best.as_ref().unwrap().0.node_addr())
2529 }
2530 };
2531
2532 if dominated {
2533 best = Some((candidate, cost, dist));
2534 }
2535 }
2536
2537 best.map(|(peer, _, _)| peer)
2538 }
2539
2540 pub fn destination_in_filters(&self, dest: &NodeAddr) -> Vec<&ActivePeer> {
2542 self.peers.values().filter(|p| p.may_reach(dest)).collect()
2543 }
2544
2545 pub fn tun_tx(&self) -> Option<&TunTx> {
2549 self.tun_tx.as_ref()
2550 }
2551
2552 pub fn attach_external_packet_io(
2559 &mut self,
2560 capacity: usize,
2561 ) -> Result<ExternalPacketIo, NodeError> {
2562 if self.state != NodeState::Created {
2563 return Err(NodeError::Config(ConfigError::Validation(
2564 "external packet I/O must be attached before node start".to_string(),
2565 )));
2566 }
2567 if self.config.tun.enabled {
2568 return Err(NodeError::Config(ConfigError::Validation(
2569 "external packet I/O requires tun.enabled=false".to_string(),
2570 )));
2571 }
2572
2573 let capacity = capacity.max(1);
2574 let (outbound_tx, outbound_rx) = tokio::sync::mpsc::channel(capacity);
2575 let (inbound_tx, inbound_rx) = tokio::sync::mpsc::channel(capacity);
2576 self.tun_outbound_rx = Some(outbound_rx);
2577 self.external_packet_tx = Some(inbound_tx);
2578
2579 Ok(ExternalPacketIo {
2580 outbound_tx,
2581 inbound_rx,
2582 })
2583 }
2584
2585 pub(crate) fn attach_endpoint_data_io(
2590 &mut self,
2591 capacity: usize,
2592 ) -> Result<EndpointDataIo, NodeError> {
2593 if self.state != NodeState::Created {
2594 return Err(NodeError::Config(ConfigError::Validation(
2595 "endpoint data I/O must be attached before node start".to_string(),
2596 )));
2597 }
2598
2599 let command_capacity = endpoint_data_command_capacity(capacity);
2600 let (command_tx, command_rx) = tokio::sync::mpsc::channel(command_capacity);
2601 let (event_tx, event_rx) = tokio::sync::mpsc::unbounded_channel();
2606 self.endpoint_command_rx = Some(command_rx);
2607 self.endpoint_event_tx = Some(event_tx.clone());
2608
2609 Ok(EndpointDataIo {
2610 command_tx,
2611 event_rx,
2612 event_tx,
2613 })
2614 }
2615
2616 pub(crate) fn pubkey_for_node_addr(&self, addr: &NodeAddr) -> Option<secp256k1::PublicKey> {
2617 let mut prefix = [0u8; 15];
2618 prefix.copy_from_slice(&addr.as_bytes()[0..15]);
2619 self.identity_cache
2620 .get(&prefix)
2621 .filter(|entry| &entry.node_addr == addr)
2622 .map(|entry| entry.pubkey)
2623 }
2624
2625 pub(crate) fn npub_for_node_addr(&self, addr: &NodeAddr) -> Option<String> {
2626 let mut prefix = [0u8; 15];
2627 prefix.copy_from_slice(&addr.as_bytes()[0..15]);
2628 self.identity_cache
2629 .get(&prefix)
2630 .filter(|entry| &entry.node_addr == addr)
2631 .map(|entry| entry.npub.clone())
2632 }
2633
2634 pub(in crate::node) fn deliver_external_ipv6_packet(
2635 &self,
2636 src_addr: &NodeAddr,
2637 packet: Vec<u8>,
2638 ) {
2639 let Some(external_packet_tx) = &self.external_packet_tx else {
2640 return;
2641 };
2642 if packet.len() < 40 {
2643 return;
2644 }
2645 let Ok(destination) = FipsAddress::from_slice(&packet[24..40]) else {
2646 return;
2647 };
2648 let delivered = NodeDeliveredPacket {
2649 source_node_addr: *src_addr,
2650 source_npub: self.npub_for_node_addr(src_addr),
2651 destination,
2652 packet,
2653 };
2654 if let Err(error) = external_packet_tx.try_send(delivered) {
2655 debug!(error = %error, "Failed to deliver packet to external app sink");
2656 }
2657 }
2658
2659 pub(super) async fn send_encrypted_link_message(
2673 &mut self,
2674 node_addr: &NodeAddr,
2675 plaintext: &[u8],
2676 ) -> Result<(), NodeError> {
2677 self.send_encrypted_link_message_with_ce(node_addr, plaintext, false)
2678 .await
2679 }
2680
2681 pub(in crate::node) fn note_local_send_outcome(
2687 &mut self,
2688 result: &Result<usize, TransportError>,
2689 ) {
2690 match result {
2691 Ok(_) => {
2692 if self.last_local_send_failure_at.is_some() {
2693 self.last_local_send_failure_at = None;
2694 }
2695 }
2696 Err(TransportError::Io(e))
2697 if matches!(
2698 e.kind(),
2699 std::io::ErrorKind::NetworkUnreachable
2700 | std::io::ErrorKind::HostUnreachable
2701 | std::io::ErrorKind::AddrNotAvailable
2702 ) =>
2703 {
2704 self.last_local_send_failure_at = Some(std::time::Instant::now());
2705 }
2706 Err(_) => {}
2707 }
2708 }
2709
2710 pub(in crate::node) fn last_local_send_failure_at(&self) -> Option<std::time::Instant> {
2713 self.last_local_send_failure_at
2714 }
2715
2716 pub(super) async fn send_encrypted_link_message_with_ce(
2720 &mut self,
2721 node_addr: &NodeAddr,
2722 plaintext: &[u8],
2723 ce_flag: bool,
2724 ) -> Result<(), NodeError> {
2725 let peer = self
2726 .peers
2727 .get_mut(node_addr)
2728 .ok_or(NodeError::PeerNotFound(*node_addr))?;
2729
2730 let their_index = peer.their_index().ok_or_else(|| NodeError::SendFailed {
2731 node_addr: *node_addr,
2732 reason: "no their_index".into(),
2733 })?;
2734 let transport_id = peer.transport_id().ok_or_else(|| NodeError::SendFailed {
2735 node_addr: *node_addr,
2736 reason: "no transport_id".into(),
2737 })?;
2738 let remote_addr = peer
2739 .current_addr()
2740 .cloned()
2741 .ok_or_else(|| NodeError::SendFailed {
2742 node_addr: *node_addr,
2743 reason: "no current_addr".into(),
2744 })?;
2745 #[cfg(any(target_os = "linux", target_os = "macos"))]
2746 let connected_socket = peer.connected_udp();
2747
2748 let timestamp_ms = peer.session_elapsed_ms();
2750
2751 let sp_flag = peer.mmp().map(|mmp| mmp.spin_bit.tx_bit()).unwrap_or(false);
2753 let mut flags = if sp_flag { FLAG_SP } else { 0 };
2754 if ce_flag {
2755 flags |= FLAG_CE;
2756 }
2757 if peer.current_k_bit() {
2758 flags |= FLAG_KEY_EPOCH;
2759 }
2760
2761 let session = peer
2762 .noise_session_mut()
2763 .ok_or_else(|| NodeError::SendFailed {
2764 node_addr: *node_addr,
2765 reason: "no noise session".into(),
2766 })?;
2767
2768 const INNER_TS_LEN: usize = 4;
2776 let counter = session.current_send_counter();
2777 let inner_len = INNER_TS_LEN + plaintext.len();
2778 let payload_len = inner_len as u16;
2779 let header = build_established_header(their_index, counter, flags, payload_len);
2780
2781 let transport_for_send = self
2800 .transports
2801 .get(&transport_id)
2802 .ok_or(NodeError::TransportNotFound(transport_id))?;
2803 match transport_for_send.connection_state(&remote_addr) {
2804 ConnectionState::Connected => {}
2805 other => {
2806 if matches!(other, ConnectionState::None) {
2807 let _ = transport_for_send.connect(&remote_addr).await;
2808 }
2809 return Err(NodeError::SendFailed {
2810 node_addr: *node_addr,
2811 reason: format!("transport connection not ready: {:?}", other),
2812 });
2813 }
2814 }
2815 let is_udp = matches!(transport_for_send, TransportHandle::Udp(_));
2816 if let Some(workers) = self.encrypt_workers.as_ref().cloned()
2817 && is_udp
2818 && let Some(cipher_clone) = session.send_cipher_clone()
2819 {
2820 {
2821 let reserved_counter =
2825 session
2826 .take_send_counter()
2827 .map_err(|e| NodeError::SendFailed {
2828 node_addr: *node_addr,
2829 reason: format!("counter reservation failed: {}", e),
2830 })?;
2831 debug_assert_eq!(reserved_counter, counter);
2832 let header =
2836 build_established_header(their_index, reserved_counter, flags, payload_len);
2837 let transport = transport_for_send;
2838 let send_target = {
2845 if let TransportHandle::Udp(udp) = transport {
2846 let socket_addr = {
2847 #[cfg(any(target_os = "linux", target_os = "macos"))]
2848 {
2849 match connected_socket.as_ref() {
2850 Some(socket) => Some(socket.peer_addr()),
2851 None => udp.resolve_for_off_task(&remote_addr).await.ok(),
2852 }
2853 }
2854 #[cfg(not(any(target_os = "linux", target_os = "macos")))]
2855 {
2856 udp.resolve_for_off_task(&remote_addr).await.ok()
2857 }
2858 };
2859 match (udp.async_socket(), socket_addr) {
2860 (Some(socket), Some(socket_addr)) => Some((socket, socket_addr)),
2861 _ => None,
2862 }
2863 } else {
2864 None
2865 }
2866 };
2867 if let Some((socket, socket_addr)) = send_target {
2868 let wire_capacity = ESTABLISHED_HEADER_SIZE + inner_len + 16;
2884 let mut wire_buf = Vec::with_capacity(wire_capacity);
2885 wire_buf.extend_from_slice(&header);
2886 wire_buf.extend_from_slice(×tamp_ms.to_le_bytes());
2887 wire_buf.extend_from_slice(plaintext);
2888 let predicted_bytes = wire_capacity;
2889 if let Some(peer) = self.peers.get_mut(node_addr) {
2896 peer.link_stats_mut().record_sent(predicted_bytes);
2897 if let Some(mmp) = peer.mmp_mut() {
2898 mmp.sender
2899 .record_sent(reserved_counter, timestamp_ms, predicted_bytes);
2900 }
2901 }
2902 workers.dispatch(self::encrypt_worker::FmpSendJob {
2903 cipher: cipher_clone,
2904 counter: reserved_counter,
2905 wire_buf,
2906 fsp_seal: None,
2907 socket,
2908 dest_addr: socket_addr,
2909 #[cfg(any(target_os = "linux", target_os = "macos"))]
2910 connected_socket,
2911 drop_on_backpressure: plaintext
2912 .first()
2913 .is_some_and(|ty| *ty == SessionMessageType::EndpointData.to_byte()),
2914 queued_at: crate::perf_profile::stamp(),
2915 });
2916 return Ok(());
2917 }
2918 }
2919 }
2920
2921 let inner_plaintext = prepend_inner_header(timestamp_ms, plaintext);
2926 let ciphertext = {
2928 let _t = crate::perf_profile::Timer::start(crate::perf_profile::Stage::FmpEncrypt);
2929 session
2930 .encrypt_with_aad(&inner_plaintext, &header)
2931 .map_err(|e| NodeError::SendFailed {
2932 node_addr: *node_addr,
2933 reason: format!("encryption failed: {}", e),
2934 })?
2935 };
2936
2937 let wire_packet = build_encrypted(&header, &ciphertext);
2938
2939 let send_result = {
2941 let _t = crate::perf_profile::Timer::start(crate::perf_profile::Stage::UdpSend);
2942 let transport = self
2943 .transports
2944 .get(&transport_id)
2945 .ok_or(NodeError::TransportNotFound(transport_id))?;
2946 transport.send(&remote_addr, &wire_packet).await
2947 };
2948 self.note_local_send_outcome(&send_result);
2949 let bytes_sent = send_result.map_err(|e| match e {
2950 TransportError::MtuExceeded { packet_size, mtu } => NodeError::MtuExceeded {
2951 node_addr: *node_addr,
2952 packet_size,
2953 mtu,
2954 },
2955 other => NodeError::SendFailed {
2956 node_addr: *node_addr,
2957 reason: format!("transport send: {}", other),
2958 },
2959 })?;
2960
2961 if let Some(peer) = self.peers.get_mut(node_addr) {
2963 peer.link_stats_mut().record_sent(bytes_sent);
2964 if let Some(mmp) = peer.mmp_mut() {
2966 mmp.sender.record_sent(counter, timestamp_ms, bytes_sent);
2967 }
2968 }
2969
2970 Ok(())
2971 }
2972}
2973
2974impl fmt::Debug for Node {
2975 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2976 f.debug_struct("Node")
2977 .field("node_addr", self.node_addr())
2978 .field("state", &self.state)
2979 .field("is_leaf_only", &self.is_leaf_only)
2980 .field("connections", &self.connection_count())
2981 .field("peers", &self.peer_count())
2982 .field("links", &self.link_count())
2983 .field("transports", &self.transport_count())
2984 .finish()
2985 }
2986}