1mod acl;
8mod bloom;
9mod decrypt_worker;
10mod discovery_rate_limit;
11mod encrypt_worker;
12mod handlers;
13mod lifecycle;
14mod rate_limit;
15mod retry;
16mod routing;
17mod routing_error_rate_limit;
18pub(crate) mod session;
19pub(crate) mod session_wire;
20pub(crate) mod stats;
21pub(crate) mod stats_history;
22#[cfg(test)]
23mod tests;
24mod tree;
25pub(crate) mod wire;
26
27use self::discovery_rate_limit::{DiscoveryBackoff, DiscoveryForwardRateLimiter};
28use self::rate_limit::HandshakeRateLimiter;
29use self::routing::{LearnedRouteTable, LearnedRouteTableSnapshot};
30use self::routing_error_rate_limit::RoutingErrorRateLimiter;
31use self::wire::{
32 ESTABLISHED_HEADER_SIZE, FLAG_CE, FLAG_KEY_EPOCH, FLAG_SP, build_encrypted,
33 build_established_header, prepend_inner_header,
34};
35use crate::bloom::BloomState;
36use crate::cache::CoordCache;
37use crate::config::RoutingMode;
38use crate::node::session::SessionEntry;
39use crate::peer::{ActivePeer, PeerConnection};
40#[cfg(any(target_os = "linux", target_os = "macos"))]
41use crate::transport::ethernet::EthernetTransport;
42use crate::transport::tcp::TcpTransport;
43use crate::transport::tor::TorTransport;
44use crate::transport::udp::UdpTransport;
45#[cfg(feature = "webrtc-transport")]
46use crate::transport::webrtc::WebRtcTransport;
47use crate::transport::{
48 ConnectionState, Link, LinkId, PacketRx, PacketTx, TransportAddr, TransportError,
49 TransportHandle, TransportId,
50};
51use crate::tree::TreeState;
52use crate::upper::hosts::HostMap;
53use crate::upper::icmp_rate_limit::IcmpRateLimiter;
54use crate::upper::tun::{TunError, TunOutboundRx, TunState, TunTx};
55use crate::utils::index::IndexAllocator;
56use crate::{
57 Config, ConfigError, FipsAddress, Identity, IdentityError, NodeAddr, PeerIdentity,
58 SessionMessageType, encode_npub,
59};
60use rand::Rng;
61use std::collections::{HashMap, HashSet, VecDeque};
62use std::fmt;
63use std::sync::Arc;
64use std::thread::JoinHandle;
65use thiserror::Error;
66use tracing::{debug, warn};
67
68pub(crate) const REKEY_JITTER_SECS: i64 = 15;
75
76#[derive(Debug, Error)]
78pub enum NodeError {
79 #[error("node not started")]
80 NotStarted,
81
82 #[error("node already started")]
83 AlreadyStarted,
84
85 #[error("node already stopped")]
86 AlreadyStopped,
87
88 #[error("transport not found: {0}")]
89 TransportNotFound(TransportId),
90
91 #[error("no transport available for type: {0}")]
92 NoTransportForType(String),
93
94 #[error("link not found: {0}")]
95 LinkNotFound(LinkId),
96
97 #[error("connection not found: {0}")]
98 ConnectionNotFound(LinkId),
99
100 #[error("peer not found: {0:?}")]
101 PeerNotFound(NodeAddr),
102
103 #[error("peer already exists: {0:?}")]
104 PeerAlreadyExists(NodeAddr),
105
106 #[error("connection already exists for link: {0}")]
107 ConnectionAlreadyExists(LinkId),
108
109 #[error("invalid peer npub '{npub}': {reason}")]
110 InvalidPeerNpub { npub: String, reason: String },
111
112 #[error("discovery error: {0}")]
113 Discovery(String),
114
115 #[error("access denied: {0}")]
116 AccessDenied(String),
117
118 #[error("max connections exceeded: {max}")]
119 MaxConnectionsExceeded { max: usize },
120
121 #[error("max peers exceeded: {max}")]
122 MaxPeersExceeded { max: usize },
123
124 #[error("max links exceeded: {max}")]
125 MaxLinksExceeded { max: usize },
126
127 #[error("handshake incomplete for link {0}")]
128 HandshakeIncomplete(LinkId),
129
130 #[error("no session available for link {0}")]
131 NoSession(LinkId),
132
133 #[error("promotion failed for link {link_id}: {reason}")]
134 PromotionFailed { link_id: LinkId, reason: String },
135
136 #[error("send failed to {node_addr}: {reason}")]
137 SendFailed { node_addr: NodeAddr, reason: String },
138
139 #[error("mtu exceeded forwarding to {node_addr}: packet {packet_size} > mtu {mtu}")]
140 MtuExceeded {
141 node_addr: NodeAddr,
142 packet_size: usize,
143 mtu: u16,
144 },
145
146 #[error("config error: {0}")]
147 Config(#[from] ConfigError),
148
149 #[error("identity error: {0}")]
150 Identity(#[from] IdentityError),
151
152 #[error("TUN error: {0}")]
153 Tun(#[from] TunError),
154
155 #[error("index allocation failed: {0}")]
156 IndexAllocationFailed(String),
157
158 #[error("handshake failed: {0}")]
159 HandshakeFailed(String),
160
161 #[error("transport error: {0}")]
162 TransportError(String),
163
164 #[error("bootstrap handoff failed: {0}")]
165 BootstrapHandoff(String),
166}
167
168#[derive(Debug, Clone, PartialEq, Eq)]
170pub struct NodeDeliveredPacket {
171 pub source_node_addr: NodeAddr,
173 pub source_npub: Option<String>,
175 pub destination: FipsAddress,
177 pub packet: Vec<u8>,
179}
180
181#[derive(Debug, Clone)]
182struct IdentityCacheEntry {
183 node_addr: NodeAddr,
184 pubkey: secp256k1::PublicKey,
185 npub: String,
186 last_seen_ms: u64,
187}
188
189impl IdentityCacheEntry {
190 fn new(
191 node_addr: NodeAddr,
192 pubkey: secp256k1::PublicKey,
193 npub: String,
194 last_seen_ms: u64,
195 ) -> Self {
196 Self {
197 node_addr,
198 pubkey,
199 npub,
200 last_seen_ms,
201 }
202 }
203}
204
205#[derive(Debug)]
207pub struct ExternalPacketIo {
208 pub outbound_tx: crate::upper::tun::TunOutboundTx,
210 pub inbound_rx: tokio::sync::mpsc::Receiver<NodeDeliveredPacket>,
212}
213
214#[derive(Debug)]
216pub(crate) struct EndpointDataIo {
217 pub(crate) command_tx: tokio::sync::mpsc::Sender<NodeEndpointCommand>,
226 pub(crate) event_rx: tokio::sync::mpsc::UnboundedReceiver<NodeEndpointEvent>,
236 pub(crate) event_tx: tokio::sync::mpsc::UnboundedSender<NodeEndpointEvent>,
242}
243
244fn endpoint_data_command_capacity(requested: usize) -> usize {
245 if let Ok(raw) = std::env::var("FIPS_ENDPOINT_DATA_QUEUE_CAP")
246 && let Ok(value) = raw.trim().parse::<usize>()
247 && value > 0
248 {
249 return value;
250 }
251
252 requested.max(1).max(32_768)
253}
254
255#[derive(Debug)]
257pub(crate) enum NodeEndpointCommand {
258 Send {
262 remote: PeerIdentity,
263 payload: Vec<u8>,
264 queued_at: Option<std::time::Instant>,
265 response_tx: tokio::sync::oneshot::Sender<Result<(), NodeError>>,
266 },
267 SendOneway {
273 remote: PeerIdentity,
274 payload: Vec<u8>,
275 queued_at: Option<std::time::Instant>,
276 },
277 PeerSnapshot {
278 response_tx: tokio::sync::oneshot::Sender<Vec<NodeEndpointPeer>>,
279 },
280 RelaySnapshot {
281 response_tx: tokio::sync::oneshot::Sender<Vec<NodeEndpointRelayStatus>>,
282 },
283 UpdateRelays {
284 advert_relays: Vec<String>,
285 dm_relays: Vec<String>,
286 response_tx: tokio::sync::oneshot::Sender<Result<(), NodeError>>,
287 },
288 UpdatePeers {
294 peers: Vec<crate::config::PeerConfig>,
295 response_tx: tokio::sync::oneshot::Sender<Result<UpdatePeersOutcome, NodeError>>,
296 },
297}
298
299#[derive(Debug, Clone, Default, PartialEq, Eq)]
301pub(crate) struct UpdatePeersOutcome {
302 pub(crate) added: usize,
303 pub(crate) removed: usize,
304 pub(crate) updated: usize,
305 pub(crate) unchanged: usize,
306}
307
308#[derive(Debug)]
310pub(crate) enum NodeEndpointEvent {
311 Data {
312 source_node_addr: NodeAddr,
313 source_npub: Option<String>,
314 payload: Vec<u8>,
315 queued_at: Option<std::time::Instant>,
316 },
317}
318
319#[derive(Debug, Clone, PartialEq, Eq)]
321pub(crate) struct NodeEndpointPeer {
322 pub(crate) npub: String,
323 pub(crate) transport_addr: Option<String>,
324 pub(crate) transport_type: Option<String>,
325 pub(crate) link_id: u64,
326 pub(crate) srtt_ms: Option<u64>,
327 pub(crate) packets_sent: u64,
328 pub(crate) packets_recv: u64,
329 pub(crate) bytes_sent: u64,
330 pub(crate) bytes_recv: u64,
331}
332
333#[derive(Debug, Clone, PartialEq, Eq)]
335pub(crate) struct NodeEndpointRelayStatus {
336 pub(crate) url: String,
337 pub(crate) status: String,
338}
339
340#[derive(Clone, Copy, Debug, PartialEq, Eq)]
342pub enum NodeState {
343 Created,
345 Starting,
347 Running,
349 Stopping,
351 Stopped,
353}
354
355impl NodeState {
356 pub fn is_operational(&self) -> bool {
358 matches!(self, NodeState::Running)
359 }
360
361 pub fn can_start(&self) -> bool {
363 matches!(self, NodeState::Created | NodeState::Stopped)
364 }
365
366 pub fn can_stop(&self) -> bool {
368 matches!(self, NodeState::Running)
369 }
370}
371
372impl fmt::Display for NodeState {
373 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
374 let s = match self {
375 NodeState::Created => "created",
376 NodeState::Starting => "starting",
377 NodeState::Running => "running",
378 NodeState::Stopping => "stopping",
379 NodeState::Stopped => "stopped",
380 };
381 write!(f, "{}", s)
382 }
383}
384
385#[derive(Clone, Debug)]
392pub(crate) struct RecentRequest {
393 pub(crate) from_peer: NodeAddr,
395 pub(crate) timestamp_ms: u64,
397 pub(crate) response_forwarded: bool,
401}
402
403impl RecentRequest {
404 pub(crate) fn new(from_peer: NodeAddr, timestamp_ms: u64) -> Self {
405 Self {
406 from_peer,
407 timestamp_ms,
408 response_forwarded: false,
409 }
410 }
411
412 pub(crate) fn is_expired(&self, current_time_ms: u64, expiry_ms: u64) -> bool {
414 current_time_ms.saturating_sub(self.timestamp_ms) > expiry_ms
415 }
416}
417
418type AddrKey = (TransportId, TransportAddr);
420
421#[derive(Debug, Default)]
426struct TransportDropState {
427 prev_drops: u64,
429 dropping: bool,
431}
432
433struct PendingConnect {
439 link_id: LinkId,
441 transport_id: TransportId,
443 remote_addr: TransportAddr,
445 peer_identity: PeerIdentity,
447}
448
449pub struct Node {
463 identity: Identity,
466
467 startup_epoch: [u8; 8],
470
471 started_at: std::time::Instant,
473
474 config: Config,
477
478 state: NodeState,
481
482 is_leaf_only: bool,
484
485 tree_state: TreeState,
488
489 bloom_state: BloomState,
492
493 coord_cache: CoordCache,
496 learned_routes: LearnedRouteTable,
498 recent_requests: HashMap<u64, RecentRequest>,
501 path_mtu_lookup: Arc<std::sync::RwLock<HashMap<crate::FipsAddress, u16>>>,
507
508 transports: HashMap<TransportId, TransportHandle>,
511 transport_drops: HashMap<TransportId, TransportDropState>,
513 links: HashMap<LinkId, Link>,
515 addr_to_link: HashMap<AddrKey, LinkId>,
517
518 packet_tx: Option<PacketTx>,
521 packet_rx: Option<PacketRx>,
523
524 connections: HashMap<LinkId, PeerConnection>,
528
529 peers: HashMap<NodeAddr, ActivePeer>,
533
534 sessions: HashMap<NodeAddr, SessionEntry>,
538
539 identity_cache: HashMap<[u8; 15], IdentityCacheEntry>,
543
544 pending_tun_packets: HashMap<NodeAddr, VecDeque<Vec<u8>>>,
548 pending_endpoint_data: HashMap<NodeAddr, VecDeque<Vec<u8>>>,
550 pending_lookups: HashMap<NodeAddr, handlers::discovery::PendingLookup>,
554
555 max_connections: usize,
558 max_peers: usize,
560 max_links: usize,
562
563 next_link_id: u64,
566 next_transport_id: u32,
568
569 stats: stats::NodeStats,
572
573 stats_history: stats_history::StatsHistory,
575
576 tun_state: TunState,
579 tun_name: Option<String>,
581 tun_tx: Option<TunTx>,
583 tun_outbound_rx: Option<TunOutboundRx>,
585 external_packet_tx: Option<tokio::sync::mpsc::Sender<NodeDeliveredPacket>>,
587 endpoint_command_rx: Option<tokio::sync::mpsc::Receiver<NodeEndpointCommand>>,
589 endpoint_event_tx: Option<tokio::sync::mpsc::UnboundedSender<NodeEndpointEvent>>,
591 encrypt_workers: Option<encrypt_worker::EncryptWorkerPool>,
597 decrypt_workers: Option<decrypt_worker::DecryptWorkerPool>,
600 decrypt_registered_sessions: std::collections::HashSet<(TransportId, u32)>,
609 decrypt_fallback_rx:
613 Option<tokio::sync::mpsc::UnboundedReceiver<decrypt_worker::DecryptWorkerEvent>>,
614 decrypt_fallback_tx: tokio::sync::mpsc::UnboundedSender<decrypt_worker::DecryptWorkerEvent>,
615 tun_reader_handle: Option<JoinHandle<()>>,
617 tun_writer_handle: Option<JoinHandle<()>>,
619 #[cfg(target_os = "macos")]
622 tun_shutdown_fd: Option<std::os::unix::io::RawFd>,
623
624 dns_identity_rx: Option<crate::upper::dns::DnsIdentityRx>,
627 dns_task: Option<tokio::task::JoinHandle<()>>,
629
630 index_allocator: IndexAllocator,
633 peers_by_index: HashMap<(TransportId, u32), NodeAddr>,
636 pending_outbound: HashMap<(TransportId, u32), LinkId>,
639
640 msg1_rate_limiter: HandshakeRateLimiter,
643 icmp_rate_limiter: IcmpRateLimiter,
645 routing_error_rate_limiter: RoutingErrorRateLimiter,
647 coords_response_rate_limiter: RoutingErrorRateLimiter,
649 discovery_backoff: DiscoveryBackoff,
651 discovery_forward_limiter: DiscoveryForwardRateLimiter,
653
654 pending_connects: Vec<PendingConnect>,
660
661 retry_pending: HashMap<NodeAddr, retry::RetryState>,
667
668 nostr_discovery: Option<Arc<crate::discovery::nostr::NostrDiscovery>>,
670 lan_discovery: Option<Arc<crate::discovery::lan::LanDiscovery>>,
675 local_instance_registry: Option<crate::discovery::local::LocalInstanceRegistry>,
679 local_instance_started_at_ms: Option<u64>,
680 last_local_instance_publish_ms: Option<u64>,
681 last_local_instance_scan_ms: Option<u64>,
682 nostr_discovery_started_at_ms: Option<u64>,
687 startup_open_discovery_sweep_done: bool,
691 bootstrap_transports: HashSet<TransportId>,
693 bootstrap_transport_npubs: HashMap<TransportId, String>,
700 discovery_fallback_transit_blocked_peers: HashSet<NodeAddr>,
703
704 last_parent_reeval: Option<crate::time::Instant>,
707
708 last_congestion_log: Option<std::time::Instant>,
711
712 estimated_mesh_size: Option<u64>,
715 last_mesh_size_log: Option<std::time::Instant>,
717
718 last_self_warn: Option<std::time::Instant>,
724
725 last_local_send_failure_at: Option<std::time::Instant>,
733
734 peer_aliases: HashMap<NodeAddr, String>,
738 configured_peer_send_weights: HashMap<NodeAddr, u8>,
741
742 peer_acl: acl::PeerAclReloader,
744
745 host_map: Arc<HostMap>,
749}
750
751impl Node {
752 pub fn new(config: Config) -> Result<Self, NodeError> {
754 config.validate()?;
755 let identity = config.create_identity()?;
756 let node_addr = *identity.node_addr();
757 let is_leaf_only = config.is_leaf_only();
758
759 let (decrypt_fallback_tx, decrypt_fallback_rx) = tokio::sync::mpsc::unbounded_channel();
760 let decrypt_fallback_rx = Some(decrypt_fallback_rx);
761
762 let mut startup_epoch = [0u8; 8];
763 rand::rng().fill_bytes(&mut startup_epoch);
764
765 let mut bloom_state = if is_leaf_only {
766 BloomState::leaf_only(node_addr)
767 } else {
768 BloomState::new(node_addr)
769 };
770 bloom_state.set_update_debounce_ms(config.node.bloom.update_debounce_ms);
771
772 let tun_state = if config.tun.enabled {
773 TunState::Configured
774 } else {
775 TunState::Disabled
776 };
777
778 let mut tree_state = TreeState::new(node_addr);
780 tree_state.set_parent_hysteresis(config.node.tree.parent_hysteresis);
781 tree_state.set_hold_down(config.node.tree.hold_down_secs);
782 tree_state.set_flap_dampening(
783 config.node.tree.flap_threshold,
784 config.node.tree.flap_window_secs,
785 config.node.tree.flap_dampening_secs,
786 );
787 tree_state
788 .sign_declaration(&identity)
789 .expect("signing own declaration should never fail");
790
791 let coord_cache = CoordCache::new(
792 config.node.cache.coord_size,
793 config.node.cache.coord_ttl_secs * 1000,
794 );
795 let rl = &config.node.rate_limit;
796 let msg1_rate_limiter = HandshakeRateLimiter::with_params(
797 rate_limit::TokenBucket::with_params(rl.handshake_burst, rl.handshake_rate),
798 config.node.limits.max_pending_inbound,
799 );
800
801 let max_connections = config.node.limits.max_connections;
802 let max_peers = config.node.limits.max_peers;
803 let max_links = config.node.limits.max_links;
804 let coords_response_interval_ms = config.node.session.coords_response_interval_ms;
805 let backoff_base_secs = config.node.discovery.backoff_base_secs;
806 let backoff_max_secs = config.node.discovery.backoff_max_secs;
807 let forward_min_interval_secs = config.node.discovery.forward_min_interval_secs;
808
809 let (host_map, peer_acl) = Self::host_map_and_peer_acl(&config);
810 let configured_peer_send_weights = Self::configured_peer_send_weights(&config);
811
812 Ok(Self {
813 identity,
814 startup_epoch,
815 started_at: std::time::Instant::now(),
816 config,
817 state: NodeState::Created,
818 is_leaf_only,
819 tree_state,
820 bloom_state,
821 coord_cache,
822 learned_routes: LearnedRouteTable::default(),
823 recent_requests: HashMap::new(),
824 transports: HashMap::new(),
825 transport_drops: HashMap::new(),
826 links: HashMap::new(),
827 addr_to_link: HashMap::new(),
828 packet_tx: None,
829 packet_rx: None,
830 connections: HashMap::new(),
831 peers: HashMap::new(),
832 sessions: HashMap::new(),
833 identity_cache: HashMap::new(),
834 pending_tun_packets: HashMap::new(),
835 pending_endpoint_data: HashMap::new(),
836 pending_lookups: HashMap::new(),
837 max_connections,
838 max_peers,
839 max_links,
840 next_link_id: 1,
841 next_transport_id: 1,
842 stats: stats::NodeStats::new(),
843 stats_history: stats_history::StatsHistory::new(),
844 tun_state,
845 tun_name: None,
846 tun_tx: None,
847 tun_outbound_rx: None,
848 external_packet_tx: None,
849 endpoint_command_rx: None,
850 endpoint_event_tx: None,
851 encrypt_workers: None,
852 decrypt_workers: None,
853 decrypt_registered_sessions: std::collections::HashSet::new(),
854 decrypt_fallback_tx,
855 decrypt_fallback_rx,
856 tun_reader_handle: None,
857 tun_writer_handle: None,
858 #[cfg(target_os = "macos")]
859 tun_shutdown_fd: None,
860 dns_identity_rx: None,
861 dns_task: None,
862 index_allocator: IndexAllocator::new(),
863 peers_by_index: HashMap::new(),
864 pending_outbound: HashMap::new(),
865 msg1_rate_limiter,
866 icmp_rate_limiter: IcmpRateLimiter::new(),
867 routing_error_rate_limiter: RoutingErrorRateLimiter::new(),
868 coords_response_rate_limiter: RoutingErrorRateLimiter::with_interval(
869 std::time::Duration::from_millis(coords_response_interval_ms),
870 ),
871 discovery_backoff: DiscoveryBackoff::with_params(backoff_base_secs, backoff_max_secs),
872 discovery_forward_limiter: DiscoveryForwardRateLimiter::with_interval(
873 std::time::Duration::from_secs(forward_min_interval_secs),
874 ),
875 pending_connects: Vec::new(),
876 retry_pending: HashMap::new(),
877 nostr_discovery: None,
878 nostr_discovery_started_at_ms: None,
879 lan_discovery: None,
880 local_instance_registry: None,
881 local_instance_started_at_ms: None,
882 last_local_instance_publish_ms: None,
883 last_local_instance_scan_ms: None,
884 startup_open_discovery_sweep_done: false,
885 bootstrap_transports: HashSet::new(),
886 bootstrap_transport_npubs: HashMap::new(),
887 discovery_fallback_transit_blocked_peers: HashSet::new(),
888 last_parent_reeval: None,
889 last_congestion_log: None,
890 estimated_mesh_size: None,
891 last_mesh_size_log: None,
892 last_self_warn: None,
893 last_local_send_failure_at: None,
894 peer_aliases: HashMap::new(),
895 configured_peer_send_weights,
896 peer_acl,
897 host_map,
898 path_mtu_lookup: Arc::new(std::sync::RwLock::new(HashMap::new())),
899 })
900 }
901
902 pub fn with_identity(identity: Identity, config: Config) -> Result<Self, NodeError> {
907 config.validate()?;
908 let node_addr = *identity.node_addr();
909
910 let (decrypt_fallback_tx, decrypt_fallback_rx) = tokio::sync::mpsc::unbounded_channel();
911 let decrypt_fallback_rx = Some(decrypt_fallback_rx);
912
913 let mut startup_epoch = [0u8; 8];
914 rand::rng().fill_bytes(&mut startup_epoch);
915
916 let tun_state = if config.tun.enabled {
917 TunState::Configured
918 } else {
919 TunState::Disabled
920 };
921
922 let mut tree_state = TreeState::new(node_addr);
924 tree_state.set_parent_hysteresis(config.node.tree.parent_hysteresis);
925 tree_state.set_hold_down(config.node.tree.hold_down_secs);
926 tree_state.set_flap_dampening(
927 config.node.tree.flap_threshold,
928 config.node.tree.flap_window_secs,
929 config.node.tree.flap_dampening_secs,
930 );
931 tree_state
932 .sign_declaration(&identity)
933 .expect("signing own declaration should never fail");
934
935 let mut bloom_state = BloomState::new(node_addr);
936 bloom_state.set_update_debounce_ms(config.node.bloom.update_debounce_ms);
937
938 let coord_cache = CoordCache::new(
939 config.node.cache.coord_size,
940 config.node.cache.coord_ttl_secs * 1000,
941 );
942 let rl = &config.node.rate_limit;
943 let msg1_rate_limiter = HandshakeRateLimiter::with_params(
944 rate_limit::TokenBucket::with_params(rl.handshake_burst, rl.handshake_rate),
945 config.node.limits.max_pending_inbound,
946 );
947
948 let max_connections = config.node.limits.max_connections;
949 let max_peers = config.node.limits.max_peers;
950 let max_links = config.node.limits.max_links;
951 let coords_response_interval_ms = config.node.session.coords_response_interval_ms;
952
953 let (host_map, peer_acl) = Self::host_map_and_peer_acl(&config);
954 let configured_peer_send_weights = Self::configured_peer_send_weights(&config);
955
956 Ok(Self {
957 identity,
958 startup_epoch,
959 started_at: std::time::Instant::now(),
960 config,
961 state: NodeState::Created,
962 is_leaf_only: false,
963 tree_state,
964 bloom_state,
965 coord_cache,
966 learned_routes: LearnedRouteTable::default(),
967 recent_requests: HashMap::new(),
968 transports: HashMap::new(),
969 transport_drops: HashMap::new(),
970 links: HashMap::new(),
971 addr_to_link: HashMap::new(),
972 packet_tx: None,
973 packet_rx: None,
974 connections: HashMap::new(),
975 peers: HashMap::new(),
976 sessions: HashMap::new(),
977 identity_cache: HashMap::new(),
978 pending_tun_packets: HashMap::new(),
979 pending_endpoint_data: HashMap::new(),
980 pending_lookups: HashMap::new(),
981 max_connections,
982 max_peers,
983 max_links,
984 next_link_id: 1,
985 next_transport_id: 1,
986 stats: stats::NodeStats::new(),
987 stats_history: stats_history::StatsHistory::new(),
988 tun_state,
989 tun_name: None,
990 tun_tx: None,
991 tun_outbound_rx: None,
992 external_packet_tx: None,
993 endpoint_command_rx: None,
994 endpoint_event_tx: None,
995 encrypt_workers: None,
996 decrypt_workers: None,
997 decrypt_registered_sessions: std::collections::HashSet::new(),
998 decrypt_fallback_tx,
999 decrypt_fallback_rx,
1000 tun_reader_handle: None,
1001 tun_writer_handle: None,
1002 #[cfg(target_os = "macos")]
1003 tun_shutdown_fd: None,
1004 dns_identity_rx: None,
1005 dns_task: None,
1006 index_allocator: IndexAllocator::new(),
1007 peers_by_index: HashMap::new(),
1008 pending_outbound: HashMap::new(),
1009 msg1_rate_limiter,
1010 icmp_rate_limiter: IcmpRateLimiter::new(),
1011 routing_error_rate_limiter: RoutingErrorRateLimiter::new(),
1012 coords_response_rate_limiter: RoutingErrorRateLimiter::with_interval(
1013 std::time::Duration::from_millis(coords_response_interval_ms),
1014 ),
1015 discovery_backoff: DiscoveryBackoff::new(),
1016 discovery_forward_limiter: DiscoveryForwardRateLimiter::new(),
1017 pending_connects: Vec::new(),
1018 retry_pending: HashMap::new(),
1019 nostr_discovery: None,
1020 nostr_discovery_started_at_ms: None,
1021 lan_discovery: None,
1022 local_instance_registry: None,
1023 local_instance_started_at_ms: None,
1024 last_local_instance_publish_ms: None,
1025 last_local_instance_scan_ms: None,
1026 startup_open_discovery_sweep_done: false,
1027 bootstrap_transports: HashSet::new(),
1028 bootstrap_transport_npubs: HashMap::new(),
1029 discovery_fallback_transit_blocked_peers: HashSet::new(),
1030 last_parent_reeval: None,
1031 last_congestion_log: None,
1032 estimated_mesh_size: None,
1033 last_mesh_size_log: None,
1034 last_self_warn: None,
1035 last_local_send_failure_at: None,
1036 peer_aliases: HashMap::new(),
1037 configured_peer_send_weights,
1038 peer_acl,
1039 host_map,
1040 path_mtu_lookup: Arc::new(std::sync::RwLock::new(HashMap::new())),
1041 })
1042 }
1043
1044 pub fn leaf_only(config: Config) -> Result<Self, NodeError> {
1046 let mut node = Self::new(config)?;
1047 node.is_leaf_only = true;
1048 node.bloom_state = BloomState::leaf_only(*node.identity.node_addr());
1049 Ok(node)
1050 }
1051
1052 fn host_map_and_peer_acl(config: &Config) -> (Arc<HostMap>, acl::PeerAclReloader) {
1053 let base_host_map = HostMap::from_peer_configs(config.peers());
1054 if !config.node.system_files_enabled {
1055 return (
1056 Arc::new(base_host_map.clone()),
1057 acl::PeerAclReloader::memory_only(base_host_map),
1058 );
1059 }
1060
1061 let mut host_map = base_host_map.clone();
1062 let hosts_path = std::path::PathBuf::from(crate::upper::hosts::DEFAULT_HOSTS_PATH);
1063 let hosts_file = HostMap::load_hosts_file(std::path::Path::new(
1064 crate::upper::hosts::DEFAULT_HOSTS_PATH,
1065 ));
1066 host_map.merge(hosts_file);
1067 let peer_acl = acl::PeerAclReloader::with_alias_sources(
1068 std::path::PathBuf::from(acl::DEFAULT_PEERS_ALLOW_PATH),
1069 std::path::PathBuf::from(acl::DEFAULT_PEERS_DENY_PATH),
1070 base_host_map,
1071 hosts_path,
1072 );
1073 (Arc::new(host_map), peer_acl)
1074 }
1075
1076 fn configured_peer_send_weights(config: &Config) -> HashMap<NodeAddr, u8> {
1077 config
1078 .peers()
1079 .iter()
1080 .filter_map(|peer| {
1081 PeerIdentity::from_npub(&peer.npub).ok().map(|identity| {
1082 (
1083 *identity.node_addr(),
1084 encrypt_worker::EXPLICIT_PEER_SEND_WEIGHT,
1085 )
1086 })
1087 })
1088 .collect()
1089 }
1090
1091 fn send_weight_for_peer(&self, peer_addr: &NodeAddr) -> u8 {
1092 self.configured_peer_send_weights
1093 .get(peer_addr)
1094 .copied()
1095 .unwrap_or(encrypt_worker::DEFAULT_SEND_WEIGHT)
1096 }
1097
1098 async fn create_transports(&mut self, packet_tx: &PacketTx) -> Vec<TransportHandle> {
1102 let mut transports = Vec::new();
1103
1104 let udp_instances: Vec<_> = self
1106 .config
1107 .transports
1108 .udp
1109 .iter()
1110 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1111 .collect();
1112
1113 for (name, udp_config) in udp_instances {
1115 let transport_id = self.allocate_transport_id();
1116 let udp = UdpTransport::new(transport_id, name, udp_config, packet_tx.clone());
1117 transports.push(TransportHandle::Udp(udp));
1118 }
1119
1120 #[cfg(feature = "sim-transport")]
1121 {
1122 let sim_instances: Vec<_> = self
1123 .config
1124 .transports
1125 .sim
1126 .iter()
1127 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1128 .collect();
1129
1130 for (name, sim_config) in sim_instances {
1131 let transport_id = self.allocate_transport_id();
1132 let sim = crate::transport::sim::SimTransport::new(
1133 transport_id,
1134 name,
1135 sim_config,
1136 packet_tx.clone(),
1137 );
1138 transports.push(TransportHandle::Sim(sim));
1139 }
1140 }
1141
1142 #[cfg(any(target_os = "linux", target_os = "macos"))]
1144 {
1145 let eth_instances: Vec<_> = self
1146 .config
1147 .transports
1148 .ethernet
1149 .iter()
1150 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1151 .collect();
1152 let xonly = self.identity.pubkey();
1153 for (name, eth_config) in eth_instances {
1154 let mut eth_config = eth_config;
1155 if eth_config.discovery_scope.is_none() {
1156 eth_config.discovery_scope = self.lan_discovery_scope();
1157 }
1158 let transport_id = self.allocate_transport_id();
1159 let mut eth =
1160 EthernetTransport::new(transport_id, name, eth_config, packet_tx.clone());
1161 eth.set_local_pubkey(xonly);
1162 transports.push(TransportHandle::Ethernet(eth));
1163 }
1164 }
1165
1166 let tcp_instances: Vec<_> = self
1168 .config
1169 .transports
1170 .tcp
1171 .iter()
1172 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1173 .collect();
1174
1175 for (name, tcp_config) in tcp_instances {
1176 let transport_id = self.allocate_transport_id();
1177 let tcp = TcpTransport::new(transport_id, name, tcp_config, packet_tx.clone());
1178 transports.push(TransportHandle::Tcp(tcp));
1179 }
1180
1181 let tor_instances: Vec<_> = self
1183 .config
1184 .transports
1185 .tor
1186 .iter()
1187 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1188 .collect();
1189
1190 for (name, tor_config) in tor_instances {
1191 let transport_id = self.allocate_transport_id();
1192 let tor = TorTransport::new(transport_id, name, tor_config, packet_tx.clone());
1193 transports.push(TransportHandle::Tor(tor));
1194 }
1195
1196 let webrtc_instances: Vec<_> = self
1197 .config
1198 .transports
1199 .webrtc
1200 .iter()
1201 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1202 .collect();
1203
1204 #[cfg(feature = "webrtc-transport")]
1205 {
1206 for (name, webrtc_config) in webrtc_instances {
1207 let transport_id = self.allocate_transport_id();
1208 match WebRtcTransport::new(
1209 transport_id,
1210 name,
1211 webrtc_config,
1212 packet_tx.clone(),
1213 &self.identity,
1214 &self.config.node.discovery.nostr,
1215 ) {
1216 Ok(webrtc) => transports.push(TransportHandle::WebRtc(Box::new(webrtc))),
1217 Err(err) => {
1218 warn!(
1219 transport_id = %transport_id,
1220 error = %err,
1221 "failed to initialize WebRTC transport"
1222 );
1223 }
1224 }
1225 }
1226 }
1227 #[cfg(not(feature = "webrtc-transport"))]
1228 if !webrtc_instances.is_empty() {
1229 warn!("WebRTC transport configured but this build lacks WebRTC transport support");
1230 }
1231
1232 #[cfg(bluer_available)]
1234 {
1235 let ble_instances: Vec<_> = self
1236 .config
1237 .transports
1238 .ble
1239 .iter()
1240 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1241 .collect();
1242
1243 #[cfg(all(bluer_available, not(test)))]
1244 for (name, ble_config) in ble_instances {
1245 let transport_id = self.allocate_transport_id();
1246 let adapter = ble_config.adapter().to_string();
1247 let mtu = ble_config.mtu();
1248 match crate::transport::ble::io::BluerIo::new(&adapter, mtu).await {
1249 Ok(io) => {
1250 let mut ble = crate::transport::ble::BleTransport::new(
1251 transport_id,
1252 name,
1253 ble_config,
1254 io,
1255 packet_tx.clone(),
1256 );
1257 ble.set_local_pubkey(self.identity.pubkey().serialize());
1258 transports.push(TransportHandle::Ble(ble));
1259 }
1260 Err(e) => {
1261 tracing::warn!(adapter = %adapter, error = %e, "failed to initialize BLE adapter");
1262 }
1263 }
1264 }
1265
1266 #[cfg(any(not(bluer_available), test))]
1267 if !ble_instances.is_empty() {
1268 #[cfg(not(test))]
1269 tracing::warn!("BLE transport configured but this build lacks BlueZ support");
1270 }
1271 }
1272
1273 transports
1274 }
1275
1276 fn find_transport_for_type(&self, transport_type: &str) -> Option<TransportId> {
1286 self.transports
1287 .iter()
1288 .filter(|(id, handle)| {
1289 handle.transport_type().name == transport_type
1290 && handle.is_operational()
1291 && !self.bootstrap_transports.contains(id)
1292 })
1293 .min_by_key(|(id, _)| id.as_u32())
1294 .map(|(id, _)| *id)
1295 }
1296
1297 #[allow(unused_variables)]
1303 fn resolve_ethernet_addr(
1304 &self,
1305 addr_str: &str,
1306 ) -> Result<(TransportId, TransportAddr), NodeError> {
1307 #[cfg(any(target_os = "linux", target_os = "macos"))]
1308 {
1309 let (iface, mac_str) = addr_str.split_once('/').ok_or_else(|| {
1310 NodeError::NoTransportForType(format!(
1311 "invalid Ethernet address format '{}': expected 'interface/mac'",
1312 addr_str
1313 ))
1314 })?;
1315
1316 let transport_id = self
1318 .transports
1319 .iter()
1320 .find(|(_, handle)| {
1321 handle.transport_type().name == "ethernet"
1322 && handle.is_operational()
1323 && handle.interface_name() == Some(iface)
1324 })
1325 .map(|(id, _)| *id)
1326 .ok_or_else(|| {
1327 NodeError::NoTransportForType(format!(
1328 "no operational Ethernet transport for interface '{}'",
1329 iface
1330 ))
1331 })?;
1332
1333 let mac = crate::transport::ethernet::parse_mac_string(mac_str).map_err(|e| {
1334 NodeError::NoTransportForType(format!("invalid MAC in '{}': {}", addr_str, e))
1335 })?;
1336
1337 Ok((transport_id, TransportAddr::from_bytes(&mac)))
1338 }
1339 #[cfg(not(any(target_os = "linux", target_os = "macos")))]
1340 {
1341 Err(NodeError::NoTransportForType(
1342 "Ethernet transport is not supported on this platform".to_string(),
1343 ))
1344 }
1345 }
1346
1347 #[cfg(bluer_available)]
1351 fn resolve_ble_addr(&self, addr_str: &str) -> Result<(TransportId, TransportAddr), NodeError> {
1352 let ta = TransportAddr::from_string(addr_str);
1353 let adapter = crate::transport::ble::addr::adapter_from_addr(&ta).ok_or_else(|| {
1354 NodeError::NoTransportForType(format!(
1355 "invalid BLE address format '{}': expected 'adapter/mac'",
1356 addr_str
1357 ))
1358 })?;
1359
1360 let transport_id = self
1362 .transports
1363 .iter()
1364 .find(|(_, handle)| handle.transport_type().name == "ble" && handle.is_operational())
1365 .map(|(id, _)| *id)
1366 .ok_or_else(|| {
1367 NodeError::NoTransportForType(format!(
1368 "no operational BLE transport for adapter '{}'",
1369 adapter
1370 ))
1371 })?;
1372
1373 crate::transport::ble::addr::BleAddr::parse(addr_str).map_err(|e| {
1375 NodeError::NoTransportForType(format!("invalid BLE address '{}': {}", addr_str, e))
1376 })?;
1377
1378 Ok((transport_id, TransportAddr::from_string(addr_str)))
1379 }
1380
1381 pub fn identity(&self) -> &Identity {
1385 &self.identity
1386 }
1387
1388 pub fn node_addr(&self) -> &NodeAddr {
1390 self.identity.node_addr()
1391 }
1392
1393 pub fn npub(&self) -> String {
1395 self.identity.npub()
1396 }
1397
1398 pub(crate) fn peer_display_name(&self, addr: &NodeAddr) -> String {
1407 if let Some(hostname) = self.host_map.lookup_hostname(addr) {
1408 return hostname.to_string();
1409 }
1410 if let Some(name) = self.peer_aliases.get(addr) {
1411 return name.clone();
1412 }
1413 if let Some(peer) = self.peers.get(addr) {
1414 return peer.identity().short_npub();
1415 }
1416 if let Some(entry) = self.sessions.get(addr) {
1417 let (xonly, _) = entry.remote_pubkey().x_only_public_key();
1418 return PeerIdentity::from_pubkey(xonly).short_npub();
1419 }
1420 addr.short_hex()
1421 }
1422
1423 pub(in crate::node) fn deregister_session_index(&mut self, cache_key: (TransportId, u32)) {
1435 let owning_peer = self.peers_by_index.get(&cache_key).copied();
1439 self.peers_by_index.remove(&cache_key);
1440 if self.decrypt_registered_sessions.remove(&cache_key)
1441 && let Some(workers) = self.decrypt_workers.as_ref()
1442 {
1443 workers.unregister_session(cache_key);
1444 }
1445 if let Some(peer_addr) = owning_peer {
1456 let peer_has_other_index = self
1457 .peers_by_index
1458 .values()
1459 .any(|other| *other == peer_addr);
1460 if !peer_has_other_index {
1461 self.clear_connected_udp_for_peer(&peer_addr);
1462 }
1463 }
1464 }
1465
1466 pub(in crate::node) fn ensure_current_session_index_registered(
1475 &mut self,
1476 node_addr: &NodeAddr,
1477 context: &'static str,
1478 ) -> bool {
1479 let Some(peer) = self.peers.get(node_addr) else {
1480 return false;
1481 };
1482 let Some(transport_id) = peer.transport_id() else {
1483 warn!(
1484 peer = %self.peer_display_name(node_addr),
1485 context,
1486 "Cannot register current session index without transport id"
1487 );
1488 return false;
1489 };
1490 let Some(our_index) = peer.our_index() else {
1491 warn!(
1492 peer = %self.peer_display_name(node_addr),
1493 context,
1494 "Cannot register current session index without local index"
1495 );
1496 return false;
1497 };
1498
1499 let cache_key = (transport_id, our_index.as_u32());
1500 match self.peers_by_index.get(&cache_key).copied() {
1501 Some(existing) if existing == *node_addr => true,
1502 Some(existing) => {
1503 warn!(
1504 peer = %self.peer_display_name(node_addr),
1505 previous_owner = %self.peer_display_name(&existing),
1506 transport_id = %transport_id,
1507 our_index = %our_index,
1508 context,
1509 "Repairing current session index with stale owner"
1510 );
1511 self.peers_by_index.insert(cache_key, *node_addr);
1512 true
1513 }
1514 None => {
1515 warn!(
1516 peer = %self.peer_display_name(node_addr),
1517 transport_id = %transport_id,
1518 our_index = %our_index,
1519 context,
1520 "Repairing missing current session index"
1521 );
1522 self.peers_by_index.insert(cache_key, *node_addr);
1523 true
1524 }
1525 }
1526 }
1527
1528 pub fn config(&self) -> &Config {
1532 &self.config
1533 }
1534
1535 pub fn effective_ipv6_mtu(&self) -> u16 {
1541 crate::upper::icmp::effective_ipv6_mtu(self.transport_mtu())
1542 }
1543
1544 pub fn transport_mtu(&self) -> u16 {
1561 let min_operational = self
1562 .transports
1563 .values()
1564 .filter(|h| h.is_operational())
1565 .map(|h| h.mtu())
1566 .min();
1567 if let Some(mtu) = min_operational {
1568 return mtu;
1569 }
1570 if let Some((_, cfg)) = self.config.transports.udp.iter().next() {
1572 return cfg.mtu();
1573 }
1574 1280
1575 }
1576
1577 pub fn state(&self) -> NodeState {
1581 self.state
1582 }
1583
1584 pub fn uptime(&self) -> std::time::Duration {
1586 self.started_at.elapsed()
1587 }
1588
1589 pub fn is_running(&self) -> bool {
1591 self.state.is_operational()
1592 }
1593
1594 pub fn is_leaf_only(&self) -> bool {
1596 self.is_leaf_only
1597 }
1598
1599 pub fn tree_state(&self) -> &TreeState {
1603 &self.tree_state
1604 }
1605
1606 pub fn tree_state_mut(&mut self) -> &mut TreeState {
1608 &mut self.tree_state
1609 }
1610
1611 pub fn bloom_state(&self) -> &BloomState {
1615 &self.bloom_state
1616 }
1617
1618 pub fn bloom_state_mut(&mut self) -> &mut BloomState {
1620 &mut self.bloom_state
1621 }
1622
1623 pub fn estimated_mesh_size(&self) -> Option<u64> {
1627 self.estimated_mesh_size
1628 }
1629
1630 pub(crate) fn compute_mesh_size(&mut self) {
1636 let my_addr = *self.tree_state.my_node_addr();
1637 let parent_id = *self.tree_state.my_declaration().parent_id();
1638 let is_root = self.tree_state.is_root();
1639
1640 let max_fpr = self.config.node.bloom.max_inbound_fpr;
1641 let mut total: f64 = 1.0; let mut child_count: u32 = 0;
1643 let mut has_data = false;
1644
1645 if !is_root
1651 && let Some(parent) = self.peers.get(&parent_id)
1652 && let Some(filter) = parent.inbound_filter()
1653 {
1654 match filter.estimated_count(max_fpr) {
1655 Some(n) => {
1656 total += n;
1657 has_data = true;
1658 }
1659 None => {
1660 self.estimated_mesh_size = None;
1661 return;
1662 }
1663 }
1664 }
1665
1666 for (peer_addr, peer) in &self.peers {
1668 if peer_addr == &parent_id {
1669 continue;
1670 }
1671 if let Some(decl) = self.tree_state.peer_declaration(peer_addr)
1672 && *decl.parent_id() == my_addr
1673 {
1674 child_count += 1;
1675 if let Some(filter) = peer.inbound_filter() {
1676 match filter.estimated_count(max_fpr) {
1677 Some(n) => {
1678 total += n;
1679 has_data = true;
1680 }
1681 None => {
1682 self.estimated_mesh_size = None;
1683 return;
1684 }
1685 }
1686 }
1687 }
1688 }
1689
1690 if !has_data {
1691 self.estimated_mesh_size = None;
1692 return;
1693 }
1694
1695 let size = total.round() as u64;
1696 self.estimated_mesh_size = Some(size);
1697
1698 let now = std::time::Instant::now();
1700 let should_log = match self.last_mesh_size_log {
1701 None => true,
1702 Some(last) => {
1703 now.duration_since(last)
1704 >= std::time::Duration::from_secs(self.config.node.mmp.log_interval_secs)
1705 }
1706 };
1707 if should_log {
1708 tracing::debug!(
1709 estimated_mesh_size = size,
1710 peers = self.peers.len(),
1711 children = child_count,
1712 "Mesh size estimate"
1713 );
1714 self.last_mesh_size_log = Some(now);
1715 }
1716 }
1717
1718 pub fn coord_cache(&self) -> &CoordCache {
1722 &self.coord_cache
1723 }
1724
1725 pub fn coord_cache_mut(&mut self) -> &mut CoordCache {
1727 &mut self.coord_cache
1728 }
1729
1730 pub fn stats(&self) -> &stats::NodeStats {
1734 &self.stats
1735 }
1736
1737 pub(crate) fn stats_mut(&mut self) -> &mut stats::NodeStats {
1739 &mut self.stats
1740 }
1741
1742 pub fn stats_history(&self) -> &stats_history::StatsHistory {
1744 &self.stats_history
1745 }
1746
1747 pub(crate) fn record_stats_history(&mut self) {
1750 let fwd = &self.stats.forwarding;
1751 let peers_with_mmp: Vec<f64> = self
1752 .peers
1753 .values()
1754 .filter_map(|p| p.mmp().map(|m| m.metrics.loss_rate()))
1755 .collect();
1756 let loss_rate = if peers_with_mmp.is_empty() {
1757 0.0
1758 } else {
1759 peers_with_mmp.iter().sum::<f64>() / peers_with_mmp.len() as f64
1760 };
1761
1762 let snap = stats_history::Snapshot {
1763 mesh_size: self.estimated_mesh_size,
1764 tree_depth: self.tree_state.my_coords().depth() as u32,
1765 peer_count: self.peers.len() as u64,
1766 parent_switches_total: self.stats.tree.parent_switches,
1767 bytes_in_total: fwd.received_bytes,
1768 bytes_out_total: fwd.forwarded_bytes + fwd.originated_bytes,
1769 packets_in_total: fwd.received_packets,
1770 packets_out_total: fwd.forwarded_packets + fwd.originated_packets,
1771 loss_rate,
1772 active_sessions: self.sessions.len() as u64,
1773 };
1774
1775 let now = std::time::Instant::now();
1776 let peer_snaps: Vec<stats_history::PeerSnapshot> = self
1777 .peers
1778 .values()
1779 .map(|p| {
1780 let stats = p.link_stats();
1781 let (srtt_ms, loss_rate, ecn_ce) = match p.mmp() {
1782 Some(m) => (
1783 m.metrics.srtt_ms(),
1784 Some(m.metrics.loss_rate()),
1785 m.receiver.ecn_ce_count() as u64,
1786 ),
1787 None => (None, None, 0),
1788 };
1789 stats_history::PeerSnapshot {
1790 node_addr: *p.node_addr(),
1791 last_seen: now,
1792 srtt_ms,
1793 loss_rate,
1794 bytes_in_total: stats.bytes_recv,
1795 bytes_out_total: stats.bytes_sent,
1796 packets_in_total: stats.packets_recv,
1797 packets_out_total: stats.packets_sent,
1798 ecn_ce_total: ecn_ce,
1799 }
1800 })
1801 .collect();
1802
1803 self.stats_history.tick(now, &snap, &peer_snaps);
1804 }
1805
1806 pub fn tun_state(&self) -> TunState {
1810 self.tun_state
1811 }
1812
1813 pub fn tun_name(&self) -> Option<&str> {
1815 self.tun_name.as_deref()
1816 }
1817
1818 pub fn set_max_connections(&mut self, max: usize) {
1822 self.max_connections = max;
1823 }
1824
1825 pub fn set_max_peers(&mut self, max: usize) {
1827 self.max_peers = max;
1828 }
1829
1830 pub(crate) fn outbound_admission_check(&self) -> bool {
1833 self.max_peers == 0 || self.peers.len() < self.max_peers
1834 }
1835
1836 pub fn set_max_links(&mut self, max: usize) {
1838 self.max_links = max;
1839 }
1840
1841 pub fn connection_count(&self) -> usize {
1845 self.connections.len()
1846 }
1847
1848 pub fn peer_count(&self) -> usize {
1850 self.peers.len()
1851 }
1852
1853 pub fn link_count(&self) -> usize {
1855 self.links.len()
1856 }
1857
1858 pub fn transport_count(&self) -> usize {
1860 self.transports.len()
1861 }
1862
1863 pub fn allocate_transport_id(&mut self) -> TransportId {
1867 let id = TransportId::new(self.next_transport_id);
1868 self.next_transport_id += 1;
1869 id
1870 }
1871
1872 pub fn get_transport(&self, id: &TransportId) -> Option<&TransportHandle> {
1874 self.transports.get(id)
1875 }
1876
1877 pub fn get_transport_mut(&mut self, id: &TransportId) -> Option<&mut TransportHandle> {
1879 self.transports.get_mut(id)
1880 }
1881
1882 pub fn transport_ids(&self) -> impl Iterator<Item = &TransportId> {
1884 self.transports.keys()
1885 }
1886
1887 pub fn packet_rx(&mut self) -> Option<&mut PacketRx> {
1889 self.packet_rx.as_mut()
1890 }
1891
1892 pub fn allocate_link_id(&mut self) -> LinkId {
1896 let id = LinkId::new(self.next_link_id);
1897 self.next_link_id += 1;
1898 id
1899 }
1900
1901 pub fn add_link(&mut self, link: Link) -> Result<(), NodeError> {
1903 if self.max_links > 0 && self.links.len() >= self.max_links {
1904 return Err(NodeError::MaxLinksExceeded {
1905 max: self.max_links,
1906 });
1907 }
1908 let link_id = link.link_id();
1909 let transport_id = link.transport_id();
1910 let remote_addr = link.remote_addr().clone();
1911
1912 self.links.insert(link_id, link);
1913 self.addr_to_link
1914 .insert((transport_id, remote_addr), link_id);
1915 Ok(())
1916 }
1917
1918 pub fn get_link(&self, link_id: &LinkId) -> Option<&Link> {
1920 self.links.get(link_id)
1921 }
1922
1923 pub fn get_link_mut(&mut self, link_id: &LinkId) -> Option<&mut Link> {
1925 self.links.get_mut(link_id)
1926 }
1927
1928 pub fn find_link_by_addr(
1930 &self,
1931 transport_id: TransportId,
1932 addr: &TransportAddr,
1933 ) -> Option<LinkId> {
1934 self.addr_to_link
1935 .get(&(transport_id, addr.clone()))
1936 .copied()
1937 }
1938
1939 pub fn remove_link(&mut self, link_id: &LinkId) -> Option<Link> {
1945 if let Some(link) = self.links.remove(link_id) {
1946 let key = (link.transport_id(), link.remote_addr().clone());
1948 if self.addr_to_link.get(&key) == Some(link_id) {
1949 self.addr_to_link.remove(&key);
1950 }
1951 Some(link)
1952 } else {
1953 None
1954 }
1955 }
1956
1957 pub(crate) fn cleanup_bootstrap_transport_if_unused(&mut self, transport_id: TransportId) {
1958 if !self.bootstrap_transports.contains(&transport_id) {
1959 return;
1960 }
1961
1962 let transport_in_use = self
1963 .links
1964 .values()
1965 .any(|link| link.transport_id() == transport_id)
1966 || self
1967 .connections
1968 .values()
1969 .any(|conn| conn.transport_id() == Some(transport_id))
1970 || self
1971 .peers
1972 .values()
1973 .any(|peer| peer.transport_id() == Some(transport_id))
1974 || self
1975 .pending_connects
1976 .iter()
1977 .any(|pending| pending.transport_id == transport_id);
1978
1979 if transport_in_use {
1980 return;
1981 }
1982
1983 tracing::debug!(
1984 transport_id = %transport_id,
1985 "bootstrap transport has no remaining references; dropping"
1986 );
1987
1988 self.bootstrap_transports.remove(&transport_id);
1989 self.bootstrap_transport_npubs.remove(&transport_id);
1990 self.transport_drops.remove(&transport_id);
1991 self.transports.remove(&transport_id);
1992 }
1993
1994 pub fn links(&self) -> impl Iterator<Item = &Link> {
1996 self.links.values()
1997 }
1998
1999 pub fn add_connection(&mut self, connection: PeerConnection) -> Result<(), NodeError> {
2003 let link_id = connection.link_id();
2004
2005 if self.connections.contains_key(&link_id) {
2006 return Err(NodeError::ConnectionAlreadyExists(link_id));
2007 }
2008
2009 if self.max_connections > 0 && self.connections.len() >= self.max_connections {
2010 return Err(NodeError::MaxConnectionsExceeded {
2011 max: self.max_connections,
2012 });
2013 }
2014
2015 self.connections.insert(link_id, connection);
2016 Ok(())
2017 }
2018
2019 pub fn get_connection(&self, link_id: &LinkId) -> Option<&PeerConnection> {
2021 self.connections.get(link_id)
2022 }
2023
2024 pub fn get_connection_mut(&mut self, link_id: &LinkId) -> Option<&mut PeerConnection> {
2026 self.connections.get_mut(link_id)
2027 }
2028
2029 pub fn remove_connection(&mut self, link_id: &LinkId) -> Option<PeerConnection> {
2031 self.connections.remove(link_id)
2032 }
2033
2034 pub fn connections(&self) -> impl Iterator<Item = &PeerConnection> {
2036 self.connections.values()
2037 }
2038
2039 pub fn get_peer(&self, node_addr: &NodeAddr) -> Option<&ActivePeer> {
2043 self.peers.get(node_addr)
2044 }
2045
2046 pub fn get_peer_mut(&mut self, node_addr: &NodeAddr) -> Option<&mut ActivePeer> {
2048 self.peers.get_mut(node_addr)
2049 }
2050
2051 pub fn remove_peer(&mut self, node_addr: &NodeAddr) -> Option<ActivePeer> {
2053 self.peers.remove(node_addr)
2054 }
2055
2056 pub fn peers(&self) -> impl Iterator<Item = &ActivePeer> {
2058 self.peers.values()
2059 }
2060
2061 pub fn nostr_discovery_handle(&self) -> Option<&crate::discovery::nostr::NostrDiscovery> {
2065 self.nostr_discovery.as_deref()
2066 }
2067
2068 pub fn peer_ids(&self) -> impl Iterator<Item = &NodeAddr> {
2070 self.peers.keys()
2071 }
2072
2073 pub fn sendable_peers(&self) -> impl Iterator<Item = &ActivePeer> {
2075 self.peers.values().filter(|p| p.can_send())
2076 }
2077
2078 pub fn sendable_peer_count(&self) -> usize {
2080 self.peers.values().filter(|p| p.can_send()).count()
2081 }
2082
2083 pub(crate) fn set_discovery_fallback_transit_allowed(
2084 &mut self,
2085 peer_addr: NodeAddr,
2086 allowed: bool,
2087 ) {
2088 if allowed {
2089 self.discovery_fallback_transit_blocked_peers
2090 .remove(&peer_addr);
2091 } else {
2092 self.discovery_fallback_transit_blocked_peers
2093 .insert(peer_addr);
2094 }
2095 }
2096
2097 pub(crate) fn configured_discovery_fallback_transit(
2098 &self,
2099 peer_addr: &NodeAddr,
2100 ) -> Option<bool> {
2101 self.config.peers().iter().find_map(|peer| {
2102 PeerIdentity::from_npub(&peer.npub)
2103 .ok()
2104 .filter(|identity| identity.node_addr() == peer_addr)
2105 .map(|_| peer.discovery_fallback_transit)
2106 })
2107 }
2108
2109 pub(crate) fn discovery_fallback_transit_for_promotion(&self, peer_addr: &NodeAddr) -> bool {
2110 if let Some(retry_state) = self.retry_pending.get(peer_addr) {
2111 return retry_state.peer_config.discovery_fallback_transit;
2112 }
2113
2114 if let Some(allowed) = self.configured_discovery_fallback_transit(peer_addr) {
2115 return allowed;
2116 }
2117
2118 self.config.node.discovery.nostr.policy != crate::config::NostrDiscoveryPolicy::Open
2119 }
2120
2121 #[cfg(test)]
2126 pub(crate) fn disable_discovery_forward_rate_limit(&mut self) {
2127 self.discovery_forward_limiter
2128 .set_interval(std::time::Duration::ZERO);
2129 }
2130
2131 #[cfg(test)]
2132 pub(crate) fn get_session(&self, remote: &NodeAddr) -> Option<&SessionEntry> {
2133 self.sessions.get(remote)
2134 }
2135
2136 #[cfg(test)]
2138 pub(crate) fn get_session_mut(&mut self, remote: &NodeAddr) -> Option<&mut SessionEntry> {
2139 self.sessions.get_mut(remote)
2140 }
2141
2142 #[cfg(test)]
2144 pub(crate) fn remove_session(&mut self, remote: &NodeAddr) -> Option<SessionEntry> {
2145 self.sessions.remove(remote)
2146 }
2147
2148 #[cfg(test)]
2150 pub(crate) fn path_mtu_lookup_get(&self, fips_addr: &crate::FipsAddress) -> Option<u16> {
2151 self.path_mtu_lookup
2152 .read()
2153 .ok()
2154 .and_then(|map| map.get(fips_addr).copied())
2155 }
2156
2157 #[cfg(test)]
2159 pub(crate) fn path_mtu_lookup_insert(&self, fips_addr: crate::FipsAddress, mtu: u16) {
2160 if let Ok(mut map) = self.path_mtu_lookup.write() {
2161 map.insert(fips_addr, mtu);
2162 }
2163 }
2164
2165 pub fn session_count(&self) -> usize {
2167 self.sessions.len()
2168 }
2169
2170 pub(crate) fn session_entries(&self) -> impl Iterator<Item = (&NodeAddr, &SessionEntry)> {
2172 self.sessions.iter()
2173 }
2174
2175 pub(crate) fn register_identity(
2179 &mut self,
2180 node_addr: NodeAddr,
2181 pubkey: secp256k1::PublicKey,
2182 ) -> bool {
2183 let mut prefix = [0u8; 15];
2184 prefix.copy_from_slice(&node_addr.as_bytes()[0..15]);
2185 if let Some(entry) = self.identity_cache.get(&prefix)
2186 && entry.node_addr == node_addr
2187 && entry.pubkey == pubkey
2188 {
2189 return true;
2193 }
2194
2195 let (xonly, _) = pubkey.x_only_public_key();
2196 let derived_node_addr = NodeAddr::from_pubkey(&xonly);
2197 if derived_node_addr != node_addr {
2198 debug!(
2199 claimed_node_addr = %node_addr,
2200 derived_node_addr = %derived_node_addr,
2201 "Rejected identity cache entry with mismatched public key"
2202 );
2203 return false;
2204 }
2205
2206 let now_ms = Self::now_ms();
2207 if let Some(entry) = self.identity_cache.get_mut(&prefix)
2208 && entry.node_addr == node_addr
2209 {
2210 entry.pubkey = pubkey;
2211 entry.last_seen_ms = now_ms;
2212 return true;
2213 }
2214
2215 let npub = encode_npub(&xonly);
2216 self.identity_cache.insert(
2217 prefix,
2218 IdentityCacheEntry::new(node_addr, pubkey, npub, now_ms),
2219 );
2220 let max = self.config.node.cache.identity_size;
2222 if self.identity_cache.len() > max
2223 && let Some(oldest_key) = self
2224 .identity_cache
2225 .iter()
2226 .min_by_key(|(_, entry)| entry.last_seen_ms)
2227 .map(|(k, _)| *k)
2228 {
2229 self.identity_cache.remove(&oldest_key);
2230 }
2231 true
2232 }
2233
2234 pub(crate) fn lookup_by_fips_prefix(
2236 &mut self,
2237 prefix: &[u8; 15],
2238 ) -> Option<(NodeAddr, secp256k1::PublicKey)> {
2239 if let Some(entry) = self.identity_cache.get_mut(prefix) {
2240 entry.last_seen_ms = Self::now_ms(); Some((entry.node_addr, entry.pubkey))
2242 } else {
2243 None
2244 }
2245 }
2246
2247 pub(crate) fn has_cached_identity(&self, addr: &NodeAddr) -> bool {
2249 let mut prefix = [0u8; 15];
2250 prefix.copy_from_slice(&addr.as_bytes()[0..15]);
2251 self.identity_cache.contains_key(&prefix)
2252 }
2253
2254 pub fn identity_cache_len(&self) -> usize {
2256 self.identity_cache.len()
2257 }
2258
2259 pub fn identity_cache_iter(
2264 &self,
2265 ) -> impl Iterator<Item = (&NodeAddr, &secp256k1::PublicKey, u64)> {
2266 self.identity_cache
2267 .values()
2268 .map(|entry| (&entry.node_addr, &entry.pubkey, entry.last_seen_ms))
2269 }
2270
2271 pub fn identity_cache_max(&self) -> usize {
2273 self.config.node.cache.identity_size
2274 }
2275
2276 pub fn pending_lookup_count(&self) -> usize {
2278 self.pending_lookups.len()
2279 }
2280
2281 pub fn pending_lookups_iter(
2283 &self,
2284 ) -> impl Iterator<Item = (&NodeAddr, &handlers::discovery::PendingLookup)> {
2285 self.pending_lookups.iter()
2286 }
2287
2288 pub fn recent_request_count(&self) -> usize {
2290 self.recent_requests.len()
2291 }
2292
2293 pub fn pending_tun_destinations(&self) -> usize {
2295 self.pending_tun_packets.len()
2296 }
2297
2298 pub fn pending_tun_total_packets(&self) -> usize {
2300 self.pending_tun_packets.values().map(|q| q.len()).sum()
2301 }
2302
2303 pub fn retry_state_iter(&self) -> impl Iterator<Item = (&NodeAddr, &retry::RetryState)> {
2305 self.retry_pending.iter()
2306 }
2307
2308 pub(crate) fn is_tree_peer(&self, peer_addr: &NodeAddr) -> bool {
2315 if !self.tree_state.is_root() && self.tree_state.my_declaration().parent_id() == peer_addr {
2317 return true;
2318 }
2319 if let Some(decl) = self.tree_state.peer_declaration(peer_addr)
2321 && decl.parent_id() == self.node_addr()
2322 {
2323 return true;
2324 }
2325 false
2326 }
2327
2328 pub fn find_next_hop(&mut self, dest_node_addr: &NodeAddr) -> Option<&ActivePeer> {
2351 if dest_node_addr == self.node_addr() {
2353 return None;
2354 }
2355
2356 let direct_peer_can_send = self
2360 .peers
2361 .get(dest_node_addr)
2362 .is_some_and(|peer| peer.can_send());
2363 if let Some(peer) = self.peers.get(dest_node_addr)
2364 && peer.is_healthy()
2365 {
2366 return Some(peer);
2367 }
2368
2369 let now_ms = Self::now_ms();
2370
2371 let sendable_learned_peers = if self.config.node.routing.mode == RoutingMode::ReplyLearned {
2372 Some(
2373 self.peers
2374 .iter()
2375 .filter(|(_, peer)| peer.can_send())
2376 .map(|(addr, _)| *addr)
2377 .collect::<HashSet<_>>(),
2378 )
2379 } else {
2380 None
2381 };
2382
2383 let explore_fallback = sendable_learned_peers.as_ref().is_some_and(|sendable| {
2390 self.learned_routes.should_explore_fallback(
2391 dest_node_addr,
2392 now_ms,
2393 self.config.node.routing.learned_fallback_explore_interval,
2394 |addr| sendable.contains(addr),
2395 )
2396 });
2397 if let Some(sendable) = &sendable_learned_peers
2398 && !explore_fallback
2399 && let Some(next_hop_addr) =
2400 self.learned_routes
2401 .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2402 {
2403 return self.peers.get(&next_hop_addr);
2404 }
2405
2406 let Some(dest_coords) = self
2408 .coord_cache
2409 .get_and_touch(dest_node_addr, now_ms)
2410 .cloned()
2411 else {
2412 if let Some(sendable) = &sendable_learned_peers
2413 && let Some(next_hop_addr) =
2414 self.learned_routes
2415 .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2416 {
2417 return self.peers.get(&next_hop_addr);
2418 }
2419 if direct_peer_can_send {
2420 return self.peers.get(dest_node_addr);
2421 }
2422 return None;
2423 };
2424
2425 let coordinate_route_addr = {
2428 let candidates: Vec<&ActivePeer> = self.destination_in_filters(dest_node_addr);
2429 if !candidates.is_empty() {
2430 self.select_best_candidate(&candidates, &dest_coords)
2431 .map(|peer| *peer.node_addr())
2432 } else {
2433 None
2434 }
2435 };
2436 if let Some(next_hop_addr) = coordinate_route_addr {
2437 return self.peers.get(&next_hop_addr);
2438 }
2439
2440 let tree_route_addr = self
2442 .tree_state
2443 .find_next_hop(&dest_coords)
2444 .filter(|next_hop_id| {
2445 self.peers
2446 .get(next_hop_id)
2447 .is_some_and(|peer| peer.can_send())
2448 });
2449 if let Some(next_hop_addr) = tree_route_addr {
2450 return self.peers.get(&next_hop_addr);
2451 }
2452 if explore_fallback {
2453 return sendable_learned_peers.as_ref().and_then(|sendable| {
2454 self.learned_routes
2455 .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2456 .and_then(|next_hop_addr| self.peers.get(&next_hop_addr))
2457 });
2458 }
2459
2460 if let Some(sendable) = &sendable_learned_peers
2461 && let Some(next_hop_addr) =
2462 self.learned_routes
2463 .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2464 {
2465 return self.peers.get(&next_hop_addr);
2466 }
2467
2468 if direct_peer_can_send {
2469 return self.peers.get(dest_node_addr);
2470 }
2471
2472 None
2473 }
2474
2475 pub(in crate::node) fn learn_reverse_route(
2476 &mut self,
2477 destination: NodeAddr,
2478 next_hop: NodeAddr,
2479 ) {
2480 if self.config.node.routing.mode != RoutingMode::ReplyLearned
2481 || destination == *self.node_addr()
2482 {
2483 return;
2484 }
2485 let now_ms = Self::now_ms();
2486 self.learned_routes.learn(
2487 destination,
2488 next_hop,
2489 now_ms,
2490 self.config.node.routing.learned_ttl_secs,
2491 self.config.node.routing.max_learned_routes_per_dest,
2492 );
2493 }
2494
2495 pub(in crate::node) fn record_route_failure(
2496 &mut self,
2497 destination: NodeAddr,
2498 next_hop: NodeAddr,
2499 ) {
2500 if self.config.node.routing.mode != RoutingMode::ReplyLearned {
2501 return;
2502 }
2503 self.learned_routes.record_failure(&destination, &next_hop);
2504 }
2505
2506 pub(crate) fn learned_route_table_snapshot(&self, now_ms: u64) -> LearnedRouteTableSnapshot {
2507 self.learned_routes.snapshot(now_ms)
2508 }
2509
2510 pub(in crate::node) fn purge_learned_routes(&mut self, now_ms: u64) {
2511 self.learned_routes.purge_expired(now_ms);
2512 }
2513
2514 fn select_best_candidate<'a>(
2523 &'a self,
2524 candidates: &[&'a ActivePeer],
2525 dest_coords: &crate::tree::TreeCoordinate,
2526 ) -> Option<&'a ActivePeer> {
2527 let my_distance = self.tree_state.my_coords().distance_to(dest_coords);
2528
2529 let mut best: Option<(&ActivePeer, f64, usize)> = None;
2530
2531 for &candidate in candidates {
2532 if !candidate.can_send() {
2533 continue;
2534 }
2535
2536 let cost = candidate.link_cost();
2537
2538 let dist = self
2539 .tree_state
2540 .peer_coords(candidate.node_addr())
2541 .map(|pc| pc.distance_to(dest_coords))
2542 .unwrap_or(usize::MAX);
2543
2544 if dist >= my_distance {
2547 continue;
2548 }
2549
2550 let dominated = match &best {
2551 None => true,
2552 Some((_, best_cost, best_dist)) => {
2553 cost < *best_cost
2554 || (cost == *best_cost && dist < *best_dist)
2555 || (cost == *best_cost
2556 && dist == *best_dist
2557 && candidate.node_addr() < best.as_ref().unwrap().0.node_addr())
2558 }
2559 };
2560
2561 if dominated {
2562 best = Some((candidate, cost, dist));
2563 }
2564 }
2565
2566 best.map(|(peer, _, _)| peer)
2567 }
2568
2569 pub fn destination_in_filters(&self, dest: &NodeAddr) -> Vec<&ActivePeer> {
2571 self.peers.values().filter(|p| p.may_reach(dest)).collect()
2572 }
2573
2574 pub fn tun_tx(&self) -> Option<&TunTx> {
2578 self.tun_tx.as_ref()
2579 }
2580
2581 pub fn attach_external_packet_io(
2588 &mut self,
2589 capacity: usize,
2590 ) -> Result<ExternalPacketIo, NodeError> {
2591 if self.state != NodeState::Created {
2592 return Err(NodeError::Config(ConfigError::Validation(
2593 "external packet I/O must be attached before node start".to_string(),
2594 )));
2595 }
2596 if self.config.tun.enabled {
2597 return Err(NodeError::Config(ConfigError::Validation(
2598 "external packet I/O requires tun.enabled=false".to_string(),
2599 )));
2600 }
2601
2602 let capacity = capacity.max(1);
2603 let (outbound_tx, outbound_rx) = tokio::sync::mpsc::channel(capacity);
2604 let (inbound_tx, inbound_rx) = tokio::sync::mpsc::channel(capacity);
2605 self.tun_outbound_rx = Some(outbound_rx);
2606 self.external_packet_tx = Some(inbound_tx);
2607
2608 Ok(ExternalPacketIo {
2609 outbound_tx,
2610 inbound_rx,
2611 })
2612 }
2613
2614 pub(crate) fn attach_endpoint_data_io(
2619 &mut self,
2620 capacity: usize,
2621 ) -> Result<EndpointDataIo, NodeError> {
2622 if self.state != NodeState::Created {
2623 return Err(NodeError::Config(ConfigError::Validation(
2624 "endpoint data I/O must be attached before node start".to_string(),
2625 )));
2626 }
2627
2628 let command_capacity = endpoint_data_command_capacity(capacity);
2629 let (command_tx, command_rx) = tokio::sync::mpsc::channel(command_capacity);
2630 let (event_tx, event_rx) = tokio::sync::mpsc::unbounded_channel();
2635 self.endpoint_command_rx = Some(command_rx);
2636 self.endpoint_event_tx = Some(event_tx.clone());
2637
2638 Ok(EndpointDataIo {
2639 command_tx,
2640 event_rx,
2641 event_tx,
2642 })
2643 }
2644
2645 pub(crate) fn pubkey_for_node_addr(&self, addr: &NodeAddr) -> Option<secp256k1::PublicKey> {
2646 let mut prefix = [0u8; 15];
2647 prefix.copy_from_slice(&addr.as_bytes()[0..15]);
2648 self.identity_cache
2649 .get(&prefix)
2650 .filter(|entry| &entry.node_addr == addr)
2651 .map(|entry| entry.pubkey)
2652 }
2653
2654 pub(crate) fn npub_for_node_addr(&self, addr: &NodeAddr) -> Option<String> {
2655 let mut prefix = [0u8; 15];
2656 prefix.copy_from_slice(&addr.as_bytes()[0..15]);
2657 self.identity_cache
2658 .get(&prefix)
2659 .filter(|entry| &entry.node_addr == addr)
2660 .map(|entry| entry.npub.clone())
2661 }
2662
2663 pub(in crate::node) fn deliver_external_ipv6_packet(
2664 &self,
2665 src_addr: &NodeAddr,
2666 packet: Vec<u8>,
2667 ) {
2668 let Some(external_packet_tx) = &self.external_packet_tx else {
2669 return;
2670 };
2671 if packet.len() < 40 {
2672 return;
2673 }
2674 let Ok(destination) = FipsAddress::from_slice(&packet[24..40]) else {
2675 return;
2676 };
2677 let delivered = NodeDeliveredPacket {
2678 source_node_addr: *src_addr,
2679 source_npub: self.npub_for_node_addr(src_addr),
2680 destination,
2681 packet,
2682 };
2683 if let Err(error) = external_packet_tx.try_send(delivered) {
2684 debug!(error = %error, "Failed to deliver packet to external app sink");
2685 }
2686 }
2687
2688 pub(super) async fn send_encrypted_link_message(
2702 &mut self,
2703 node_addr: &NodeAddr,
2704 plaintext: &[u8],
2705 ) -> Result<(), NodeError> {
2706 self.send_encrypted_link_message_with_ce(node_addr, plaintext, false)
2707 .await
2708 }
2709
2710 pub(in crate::node) fn note_local_send_outcome(
2716 &mut self,
2717 result: &Result<usize, TransportError>,
2718 ) {
2719 match result {
2720 Ok(_) => {
2721 if self.last_local_send_failure_at.is_some() {
2722 self.last_local_send_failure_at = None;
2723 }
2724 }
2725 Err(TransportError::Io(e))
2726 if matches!(
2727 e.kind(),
2728 std::io::ErrorKind::NetworkUnreachable
2729 | std::io::ErrorKind::HostUnreachable
2730 | std::io::ErrorKind::AddrNotAvailable
2731 ) =>
2732 {
2733 self.last_local_send_failure_at = Some(std::time::Instant::now());
2734 }
2735 Err(_) => {}
2736 }
2737 }
2738
2739 pub(in crate::node) fn last_local_send_failure_at(&self) -> Option<std::time::Instant> {
2742 self.last_local_send_failure_at
2743 }
2744
2745 pub(super) async fn send_encrypted_link_message_with_ce(
2749 &mut self,
2750 node_addr: &NodeAddr,
2751 plaintext: &[u8],
2752 ce_flag: bool,
2753 ) -> Result<(), NodeError> {
2754 let peer = self
2755 .peers
2756 .get_mut(node_addr)
2757 .ok_or(NodeError::PeerNotFound(*node_addr))?;
2758
2759 let their_index = peer.their_index().ok_or_else(|| NodeError::SendFailed {
2760 node_addr: *node_addr,
2761 reason: "no their_index".into(),
2762 })?;
2763 let transport_id = peer.transport_id().ok_or_else(|| NodeError::SendFailed {
2764 node_addr: *node_addr,
2765 reason: "no transport_id".into(),
2766 })?;
2767 let remote_addr = peer
2768 .current_addr()
2769 .cloned()
2770 .ok_or_else(|| NodeError::SendFailed {
2771 node_addr: *node_addr,
2772 reason: "no current_addr".into(),
2773 })?;
2774 #[cfg(any(target_os = "linux", target_os = "macos"))]
2775 let connected_socket = peer.connected_udp();
2776
2777 let timestamp_ms = peer.session_elapsed_ms();
2779
2780 let sp_flag = peer.mmp().map(|mmp| mmp.spin_bit.tx_bit()).unwrap_or(false);
2782 let mut flags = if sp_flag { FLAG_SP } else { 0 };
2783 if ce_flag {
2784 flags |= FLAG_CE;
2785 }
2786 if peer.current_k_bit() {
2787 flags |= FLAG_KEY_EPOCH;
2788 }
2789
2790 let session = peer
2791 .noise_session_mut()
2792 .ok_or_else(|| NodeError::SendFailed {
2793 node_addr: *node_addr,
2794 reason: "no noise session".into(),
2795 })?;
2796
2797 const INNER_TS_LEN: usize = 4;
2805 let counter = session.current_send_counter();
2806 let inner_len = INNER_TS_LEN + plaintext.len();
2807 let payload_len = inner_len as u16;
2808 let header = build_established_header(their_index, counter, flags, payload_len);
2809
2810 let transport_for_send = self
2829 .transports
2830 .get(&transport_id)
2831 .ok_or(NodeError::TransportNotFound(transport_id))?;
2832 match transport_for_send.connection_state(&remote_addr) {
2833 ConnectionState::Connected => {}
2834 other => {
2835 if matches!(other, ConnectionState::None) {
2836 let _ = transport_for_send.connect(&remote_addr).await;
2837 }
2838 return Err(NodeError::SendFailed {
2839 node_addr: *node_addr,
2840 reason: format!("transport connection not ready: {:?}", other),
2841 });
2842 }
2843 }
2844 let is_udp = matches!(transport_for_send, TransportHandle::Udp(_));
2845 if let Some(workers) = self.encrypt_workers.as_ref().cloned()
2846 && is_udp
2847 && let Some(cipher_clone) = session.send_cipher_clone()
2848 {
2849 {
2850 let reserved_counter =
2854 session
2855 .take_send_counter()
2856 .map_err(|e| NodeError::SendFailed {
2857 node_addr: *node_addr,
2858 reason: format!("counter reservation failed: {}", e),
2859 })?;
2860 debug_assert_eq!(reserved_counter, counter);
2861 let header =
2865 build_established_header(their_index, reserved_counter, flags, payload_len);
2866 let transport = transport_for_send;
2867 let send_target = {
2874 if let TransportHandle::Udp(udp) = transport {
2875 let socket_addr = {
2876 #[cfg(any(target_os = "linux", target_os = "macos"))]
2877 {
2878 match connected_socket.as_ref() {
2879 Some(socket) => Some(socket.peer_addr()),
2880 None => udp.resolve_for_off_task(&remote_addr).await.ok(),
2881 }
2882 }
2883 #[cfg(not(any(target_os = "linux", target_os = "macos")))]
2884 {
2885 udp.resolve_for_off_task(&remote_addr).await.ok()
2886 }
2887 };
2888 match (udp.async_socket(), socket_addr) {
2889 (Some(socket), Some(socket_addr)) => Some((socket, socket_addr)),
2890 _ => None,
2891 }
2892 } else {
2893 None
2894 }
2895 };
2896 if let Some((socket, socket_addr)) = send_target {
2897 let wire_capacity = ESTABLISHED_HEADER_SIZE + inner_len + 16;
2913 let mut wire_buf = Vec::with_capacity(wire_capacity);
2914 wire_buf.extend_from_slice(&header);
2915 wire_buf.extend_from_slice(×tamp_ms.to_le_bytes());
2916 wire_buf.extend_from_slice(plaintext);
2917 let predicted_bytes = wire_capacity;
2918 if let Some(peer) = self.peers.get_mut(node_addr) {
2925 peer.link_stats_mut().record_sent(predicted_bytes);
2926 if let Some(mmp) = peer.mmp_mut() {
2927 mmp.sender
2928 .record_sent(reserved_counter, timestamp_ms, predicted_bytes);
2929 }
2930 }
2931 let scheduling_weight = self.send_weight_for_peer(node_addr);
2932 workers.dispatch(self::encrypt_worker::FmpSendJob {
2933 cipher: cipher_clone,
2934 counter: reserved_counter,
2935 wire_buf,
2936 fsp_seal: None,
2937 socket,
2938 dest_addr: socket_addr,
2939 #[cfg(any(target_os = "linux", target_os = "macos"))]
2940 connected_socket,
2941 drop_on_backpressure: plaintext
2942 .first()
2943 .is_some_and(|ty| *ty == SessionMessageType::EndpointData.to_byte()),
2944 scheduling_weight,
2945 queued_at: crate::perf_profile::stamp(),
2946 });
2947 return Ok(());
2948 }
2949 }
2950 }
2951
2952 let inner_plaintext = prepend_inner_header(timestamp_ms, plaintext);
2957 let ciphertext = {
2959 let _t = crate::perf_profile::Timer::start(crate::perf_profile::Stage::FmpEncrypt);
2960 session
2961 .encrypt_with_aad(&inner_plaintext, &header)
2962 .map_err(|e| NodeError::SendFailed {
2963 node_addr: *node_addr,
2964 reason: format!("encryption failed: {}", e),
2965 })?
2966 };
2967
2968 let wire_packet = build_encrypted(&header, &ciphertext);
2969
2970 let send_result = {
2972 let _t = crate::perf_profile::Timer::start(crate::perf_profile::Stage::UdpSend);
2973 let transport = self
2974 .transports
2975 .get(&transport_id)
2976 .ok_or(NodeError::TransportNotFound(transport_id))?;
2977 transport.send(&remote_addr, &wire_packet).await
2978 };
2979 self.note_local_send_outcome(&send_result);
2980 let bytes_sent = send_result.map_err(|e| match e {
2981 TransportError::MtuExceeded { packet_size, mtu } => NodeError::MtuExceeded {
2982 node_addr: *node_addr,
2983 packet_size,
2984 mtu,
2985 },
2986 other => NodeError::SendFailed {
2987 node_addr: *node_addr,
2988 reason: format!("transport send: {}", other),
2989 },
2990 })?;
2991
2992 if let Some(peer) = self.peers.get_mut(node_addr) {
2994 peer.link_stats_mut().record_sent(bytes_sent);
2995 if let Some(mmp) = peer.mmp_mut() {
2997 mmp.sender.record_sent(counter, timestamp_ms, bytes_sent);
2998 }
2999 }
3000
3001 Ok(())
3002 }
3003}
3004
3005impl fmt::Debug for Node {
3006 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3007 f.debug_struct("Node")
3008 .field("node_addr", self.node_addr())
3009 .field("state", &self.state)
3010 .field("is_leaf_only", &self.is_leaf_only)
3011 .field("connections", &self.connection_count())
3012 .field("peers", &self.peer_count())
3013 .field("links", &self.link_count())
3014 .field("transports", &self.transport_count())
3015 .finish()
3016 }
3017}