1mod acl;
8mod bloom;
9mod decrypt_worker;
10mod discovery_rate_limit;
11mod encrypt_worker;
12mod handlers;
13mod lifecycle;
14mod rate_limit;
15mod retry;
16mod routing;
17mod routing_error_rate_limit;
18pub(crate) mod session;
19pub(crate) mod session_wire;
20pub(crate) mod stats;
21pub(crate) mod stats_history;
22#[cfg(test)]
23mod tests;
24mod tree;
25pub(crate) mod wire;
26
27use self::discovery_rate_limit::{DiscoveryBackoff, DiscoveryForwardRateLimiter};
28use self::rate_limit::HandshakeRateLimiter;
29use self::routing::{LearnedRouteTable, LearnedRouteTableSnapshot};
30use self::routing_error_rate_limit::RoutingErrorRateLimiter;
31#[cfg(unix)]
32use self::wire::ESTABLISHED_HEADER_SIZE;
33use self::wire::{
34 FLAG_CE, FLAG_KEY_EPOCH, FLAG_SP, build_encrypted, build_established_header,
35 prepend_inner_header,
36};
37use crate::bloom::{BloomFilter, BloomState};
38use crate::cache::CoordCache;
39use crate::config::{NostrDiscoveryPolicy, PeerConfig, RoutingMode};
40use crate::node::session::SessionEntry;
41use crate::node::session_wire::{FSP_PHASE_ESTABLISHED, FspCommonPrefix};
42use crate::peer::{ActivePeer, PeerConnection};
43#[cfg(any(target_os = "linux", target_os = "macos"))]
44use crate::transport::ethernet::EthernetTransport;
45use crate::transport::tcp::TcpTransport;
46use crate::transport::tor::TorTransport;
47use crate::transport::udp::UdpTransport;
48#[cfg(feature = "webrtc-transport")]
49use crate::transport::webrtc::WebRtcTransport;
50use crate::transport::{
51 ConnectionState, Link, LinkId, PacketRx, PacketTx, TransportAddr, TransportError,
52 TransportHandle, TransportId,
53};
54use crate::tree::TreeState;
55use crate::upper::hosts::HostMap;
56use crate::upper::icmp_rate_limit::IcmpRateLimiter;
57use crate::upper::tun::{TunError, TunOutboundRx, TunState, TunTx};
58use crate::utils::index::IndexAllocator;
59use crate::{
60 Config, ConfigError, FipsAddress, Identity, IdentityError, LinkMessageType, NodeAddr,
61 PeerIdentity, encode_npub,
62};
63use rand::Rng;
64use std::collections::{HashMap, HashSet, VecDeque};
65use std::fmt;
66use std::sync::Arc;
67use std::thread::JoinHandle;
68use thiserror::Error;
69use tracing::{debug, warn};
70
71const LOCAL_SEND_FAILURE_FAST_DEAD_WINDOW: std::time::Duration = std::time::Duration::from_secs(3);
72const SESSION_DIRECT_DEGRADED_HOLD_MS: u64 = 20_000;
73const SESSION_DIRECT_DEGRADED_MIN_SAMPLE: u64 = 16;
74const SESSION_DIRECT_DEGRADED_LOSS_THRESHOLD: f64 = 0.08;
75const SESSION_DIRECT_RECOVERY_LOSS_THRESHOLD: f64 = 0.02;
76const ROUTING_FALLBACK_MIN_COST_ADVANTAGE: f64 = 0.25;
77
78#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
79struct FmpPlaintextTrafficClass {
80 bulk_endpoint_data: bool,
81 drop_on_backpressure: bool,
82}
83
84#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
85struct EndpointPayloadTrafficClass {
86 bulk_endpoint_data: bool,
87 drop_on_backpressure: bool,
88}
89
90#[derive(Clone, Copy, Debug, PartialEq, Eq)]
91pub(crate) enum EndpointCommandLane {
92 Priority,
93 Bulk,
94}
95
96fn classify_fmp_plaintext_traffic(plaintext: &[u8]) -> FmpPlaintextTrafficClass {
97 let bulk_endpoint_data = fmp_plaintext_is_bulk_session_datagram(plaintext);
98 FmpPlaintextTrafficClass {
103 bulk_endpoint_data,
104 drop_on_backpressure: false,
105 }
106}
107
108fn fmp_plaintext_is_bulk_session_datagram(plaintext: &[u8]) -> bool {
109 if plaintext
110 .first()
111 .is_none_or(|ty| *ty != LinkMessageType::SessionDatagram.to_byte())
112 {
113 return false;
114 }
115 let Some(fsp_payload) = plaintext.get(crate::protocol::SESSION_DATAGRAM_HEADER_SIZE..) else {
116 return false;
117 };
118 FspCommonPrefix::parse(fsp_payload).is_some_and(|prefix| {
119 prefix.phase == FSP_PHASE_ESTABLISHED && !prefix.is_unencrypted() && !prefix.has_coords()
120 })
121}
122
123fn classify_endpoint_payload(payload: &[u8]) -> EndpointPayloadTrafficClass {
124 const IPPROTO_TCP: u8 = 6;
125 const IPPROTO_ICMPV6: u8 = 58;
126
127 match parse_endpoint_payload_ip_proto(payload) {
128 Some((IPPROTO_ICMPV6, _)) => EndpointPayloadTrafficClass::default(),
129 Some((IPPROTO_TCP, offset)) => {
130 let latency_sensitive = endpoint_tcp_payload_is_latency_sensitive(payload, offset);
131 EndpointPayloadTrafficClass {
132 bulk_endpoint_data: !latency_sensitive,
133 drop_on_backpressure: false,
134 }
135 }
136 _ => EndpointPayloadTrafficClass {
137 bulk_endpoint_data: true,
138 drop_on_backpressure: true,
139 },
140 }
141}
142
143pub(crate) fn endpoint_payload_is_latency_sensitive(payload: &[u8]) -> bool {
144 !classify_endpoint_payload(payload).bulk_endpoint_data
145}
146
147pub(crate) fn endpoint_command_lane_for_payload(payload: &[u8]) -> EndpointCommandLane {
148 if endpoint_payload_is_latency_sensitive(payload) {
149 EndpointCommandLane::Priority
150 } else {
151 EndpointCommandLane::Bulk
152 }
153}
154
155fn endpoint_tcp_payload_is_latency_sensitive(payload: &[u8], tcp_offset: usize) -> bool {
156 const TCP_MIN_HEADER_LEN: usize = 20;
157 const TCP_FLAG_FIN: u8 = 0x01;
158 const TCP_FLAG_SYN: u8 = 0x02;
159 const TCP_FLAG_RST: u8 = 0x04;
160 const INTERACTIVE_TCP_PAYLOAD_MAX: usize = 256;
161
162 if payload.len() < tcp_offset + TCP_MIN_HEADER_LEN {
163 return true;
164 }
165
166 let tcp_header_len = usize::from(payload[tcp_offset + 12] >> 4) * 4;
167 if tcp_header_len < TCP_MIN_HEADER_LEN || payload.len() < tcp_offset + tcp_header_len {
168 return true;
169 }
170
171 let flags = payload[tcp_offset + 13];
172 if flags & (TCP_FLAG_FIN | TCP_FLAG_SYN | TCP_FLAG_RST) != 0 {
173 return true;
174 }
175
176 let payload_len = endpoint_ip_payload_len(payload)
177 .and_then(|ip_payload_len| ip_payload_len.checked_sub(tcp_header_len))
178 .unwrap_or_else(|| payload.len().saturating_sub(tcp_offset + tcp_header_len));
179 payload_len <= INTERACTIVE_TCP_PAYLOAD_MAX
180}
181
182fn endpoint_ip_payload_len(payload: &[u8]) -> Option<usize> {
183 const IPV4_MIN_HEADER_LEN: usize = 20;
184 const IPV6_HEADER_LEN: usize = 40;
185
186 let version_ihl = payload.first().copied()?;
187 match version_ihl >> 4 {
188 4 => {
189 if payload.len() < IPV4_MIN_HEADER_LEN {
190 return None;
191 }
192 let header_len = usize::from(version_ihl & 0x0f) * 4;
193 if header_len < IPV4_MIN_HEADER_LEN || payload.len() < header_len {
194 return None;
195 }
196 let total_len = usize::from(u16::from_be_bytes([payload[2], payload[3]]));
197 total_len.checked_sub(header_len)
198 }
199 6 => {
200 if payload.len() < IPV6_HEADER_LEN {
201 return None;
202 }
203 Some(usize::from(u16::from_be_bytes([payload[4], payload[5]])))
204 }
205 _ => None,
206 }
207}
208
209fn parse_endpoint_payload_ip_proto(payload: &[u8]) -> Option<(u8, usize)> {
210 const IPV4_MIN_HEADER_LEN: usize = 20;
211
212 let version_ihl = payload.first().copied()?;
213
214 match version_ihl >> 4 {
215 4 => {
216 if payload.len() < IPV4_MIN_HEADER_LEN {
217 return None;
218 }
219 let header_len = usize::from(version_ihl & 0x0f) * 4;
220 if header_len >= IPV4_MIN_HEADER_LEN && payload.len() >= header_len {
221 Some((payload[9], header_len))
222 } else {
223 None
224 }
225 }
226 6 => ipv6_payload_next_header(payload),
227 _ => None,
228 }
229}
230
231#[cfg(test)]
232fn endpoint_payload_is_tcp(payload: &[u8]) -> bool {
233 const IPPROTO_TCP: u8 = 6;
234 parse_endpoint_payload_ip_proto(payload).is_some_and(|(proto, _)| proto == IPPROTO_TCP)
235}
236
237fn ipv6_payload_next_header(payload: &[u8]) -> Option<(u8, usize)> {
238 const IPV6_HEADER_LEN: usize = 40;
239 const IPV6_FRAGMENT_HEADER_LEN: usize = 8;
240
241 if payload.len() < IPV6_HEADER_LEN || payload[0] >> 4 != 6 {
242 return None;
243 }
244
245 let mut next_header = payload[6];
246 let mut offset = IPV6_HEADER_LEN;
247 let mut extension_count = 0usize;
248 while ipv6_extension_header_is_skippable(next_header) {
249 if next_header == 44 {
250 if payload.len() < offset + IPV6_FRAGMENT_HEADER_LEN {
251 return None;
252 }
253 next_header = payload[offset];
254 offset += IPV6_FRAGMENT_HEADER_LEN;
255 } else if next_header == 51 {
256 if payload.len() < offset + 2 {
257 return None;
258 }
259 let header_len = (usize::from(payload[offset + 1]) + 2) * 4;
260 if payload.len() < offset + header_len {
261 return None;
262 }
263 next_header = payload[offset];
264 offset += header_len;
265 } else {
266 if payload.len() < offset + 2 {
267 return None;
268 }
269 let header_len = (usize::from(payload[offset + 1]) + 1) * 8;
270 if payload.len() < offset + header_len {
271 return None;
272 }
273 next_header = payload[offset];
274 offset += header_len;
275 }
276 extension_count += 1;
277 if extension_count > 8 {
278 return None;
279 }
280 }
281
282 Some((next_header, offset))
283}
284
285fn ipv6_extension_header_is_skippable(next_header: u8) -> bool {
286 matches!(next_header, 0 | 43 | 44 | 51 | 60 | 135)
287}
288
289pub(crate) const REKEY_JITTER_SECS: i64 = 15;
296
297#[derive(Debug, Error)]
299pub enum NodeError {
300 #[error("node not started")]
301 NotStarted,
302
303 #[error("node already started")]
304 AlreadyStarted,
305
306 #[error("node already stopped")]
307 AlreadyStopped,
308
309 #[error("transport not found: {0}")]
310 TransportNotFound(TransportId),
311
312 #[error("no transport available for type: {0}")]
313 NoTransportForType(String),
314
315 #[error("link not found: {0}")]
316 LinkNotFound(LinkId),
317
318 #[error("connection not found: {0}")]
319 ConnectionNotFound(LinkId),
320
321 #[error("peer not found: {0:?}")]
322 PeerNotFound(NodeAddr),
323
324 #[error("peer already exists: {0:?}")]
325 PeerAlreadyExists(NodeAddr),
326
327 #[error("connection already exists for link: {0}")]
328 ConnectionAlreadyExists(LinkId),
329
330 #[error("invalid peer npub '{npub}': {reason}")]
331 InvalidPeerNpub { npub: String, reason: String },
332
333 #[error("discovery error: {0}")]
334 Discovery(String),
335
336 #[error("access denied: {0}")]
337 AccessDenied(String),
338
339 #[error("max connections exceeded: {max}")]
340 MaxConnectionsExceeded { max: usize },
341
342 #[error("max peers exceeded: {max}")]
343 MaxPeersExceeded { max: usize },
344
345 #[error("max links exceeded: {max}")]
346 MaxLinksExceeded { max: usize },
347
348 #[error("handshake incomplete for link {0}")]
349 HandshakeIncomplete(LinkId),
350
351 #[error("no session available for link {0}")]
352 NoSession(LinkId),
353
354 #[error("promotion failed for link {link_id}: {reason}")]
355 PromotionFailed { link_id: LinkId, reason: String },
356
357 #[error("send failed to {node_addr}: {reason}")]
358 SendFailed { node_addr: NodeAddr, reason: String },
359
360 #[error("mtu exceeded forwarding to {node_addr}: packet {packet_size} > mtu {mtu}")]
361 MtuExceeded {
362 node_addr: NodeAddr,
363 packet_size: usize,
364 mtu: u16,
365 },
366
367 #[error("config error: {0}")]
368 Config(#[from] ConfigError),
369
370 #[error("identity error: {0}")]
371 Identity(#[from] IdentityError),
372
373 #[error("TUN error: {0}")]
374 Tun(#[from] TunError),
375
376 #[error("index allocation failed: {0}")]
377 IndexAllocationFailed(String),
378
379 #[error("handshake failed: {0}")]
380 HandshakeFailed(String),
381
382 #[error("transport error: {0}")]
383 TransportError(String),
384
385 #[error("local route unavailable: {0}")]
386 LocalRouteUnavailable(String),
387
388 #[error("bootstrap handoff failed: {0}")]
389 BootstrapHandoff(String),
390}
391
392impl NodeError {
393 pub(in crate::node) fn from_transport_error(error: TransportError) -> Self {
394 if error.is_local_route_unavailable() {
395 Self::LocalRouteUnavailable(error.to_string())
396 } else {
397 Self::TransportError(error.to_string())
398 }
399 }
400
401 pub(in crate::node) fn is_local_route_unavailable(&self) -> bool {
402 matches!(self, Self::LocalRouteUnavailable(_))
403 }
404}
405
406#[derive(Debug, Clone, PartialEq, Eq)]
408pub struct NodeDeliveredPacket {
409 pub source_node_addr: NodeAddr,
411 pub source_npub: Option<String>,
413 pub destination: FipsAddress,
415 pub packet: Vec<u8>,
417}
418
419#[derive(Debug, Clone)]
420struct IdentityCacheEntry {
421 node_addr: NodeAddr,
422 pubkey: secp256k1::PublicKey,
423 npub: String,
424 last_seen_ms: u64,
425}
426
427impl IdentityCacheEntry {
428 fn new(
429 node_addr: NodeAddr,
430 pubkey: secp256k1::PublicKey,
431 npub: String,
432 last_seen_ms: u64,
433 ) -> Self {
434 Self {
435 node_addr,
436 pubkey,
437 npub,
438 last_seen_ms,
439 }
440 }
441}
442
443#[derive(Debug)]
445pub struct ExternalPacketIo {
446 pub outbound_tx: crate::upper::tun::TunOutboundTx,
448 pub inbound_rx: tokio::sync::mpsc::Receiver<NodeDeliveredPacket>,
450}
451
452#[derive(Debug)]
454pub(crate) struct EndpointDataIo {
455 pub(crate) priority_command_tx: tokio::sync::mpsc::Sender<NodeEndpointCommand>,
458 pub(crate) command_tx: tokio::sync::mpsc::Sender<NodeEndpointCommand>,
467 pub(crate) event_rx: tokio::sync::mpsc::UnboundedReceiver<NodeEndpointEvent>,
477 pub(crate) event_tx: tokio::sync::mpsc::UnboundedSender<NodeEndpointEvent>,
483}
484
485fn endpoint_data_command_capacity(requested: usize) -> usize {
486 if let Ok(raw) = std::env::var("FIPS_ENDPOINT_DATA_QUEUE_CAP")
487 && let Ok(value) = raw.trim().parse::<usize>()
488 && value > 0
489 {
490 return value;
491 }
492
493 requested.max(1).max(32_768)
494}
495
496#[derive(Debug)]
498pub(crate) enum NodeEndpointCommand {
499 Send {
503 remote: PeerIdentity,
504 payload: Vec<u8>,
505 queued_at: Option<std::time::Instant>,
506 response_tx: tokio::sync::oneshot::Sender<Result<(), NodeError>>,
507 },
508 SendOneway {
514 remote: PeerIdentity,
515 payload: Vec<u8>,
516 queued_at: Option<std::time::Instant>,
517 },
518 PeerSnapshot {
519 response_tx: tokio::sync::oneshot::Sender<Vec<NodeEndpointPeer>>,
520 },
521 RelaySnapshot {
522 response_tx: tokio::sync::oneshot::Sender<Vec<NodeEndpointRelayStatus>>,
523 },
524 UpdateRelays {
525 advert_relays: Vec<String>,
526 dm_relays: Vec<String>,
527 response_tx: tokio::sync::oneshot::Sender<Result<(), NodeError>>,
528 },
529 UpdatePeers {
535 peers: Vec<crate::config::PeerConfig>,
536 response_tx: tokio::sync::oneshot::Sender<Result<UpdatePeersOutcome, NodeError>>,
537 },
538}
539
540#[derive(Debug, Clone, Default, PartialEq, Eq)]
542pub(crate) struct UpdatePeersOutcome {
543 pub(crate) added: usize,
544 pub(crate) removed: usize,
545 pub(crate) updated: usize,
546 pub(crate) unchanged: usize,
547}
548
549#[derive(Debug)]
551pub(crate) enum NodeEndpointEvent {
552 Data {
553 source_node_addr: NodeAddr,
554 source_npub: Option<String>,
555 payload: Vec<u8>,
556 queued_at: Option<std::time::Instant>,
557 },
558}
559
560#[derive(Debug, Clone, PartialEq, Eq)]
562pub(crate) struct NodeEndpointPeer {
563 pub(crate) npub: String,
564 pub(crate) connected: bool,
565 pub(crate) transport_addr: Option<String>,
566 pub(crate) transport_type: Option<String>,
567 pub(crate) link_id: u64,
568 pub(crate) srtt_ms: Option<u64>,
569 pub(crate) packets_sent: u64,
570 pub(crate) packets_recv: u64,
571 pub(crate) bytes_sent: u64,
572 pub(crate) bytes_recv: u64,
573 pub(crate) direct_probe_pending: bool,
574 pub(crate) direct_probe_after_ms: Option<u64>,
575}
576
577#[derive(Debug, Clone, PartialEq, Eq)]
579pub(crate) struct NodeEndpointRelayStatus {
580 pub(crate) url: String,
581 pub(crate) status: String,
582}
583
584#[derive(Clone, Copy, Debug, PartialEq, Eq)]
586pub enum NodeState {
587 Created,
589 Starting,
591 Running,
593 Stopping,
595 Stopped,
597}
598
599impl NodeState {
600 pub fn is_operational(&self) -> bool {
602 matches!(self, NodeState::Running)
603 }
604
605 pub fn can_start(&self) -> bool {
607 matches!(self, NodeState::Created | NodeState::Stopped)
608 }
609
610 pub fn can_stop(&self) -> bool {
612 matches!(self, NodeState::Running)
613 }
614}
615
616impl fmt::Display for NodeState {
617 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
618 let s = match self {
619 NodeState::Created => "created",
620 NodeState::Starting => "starting",
621 NodeState::Running => "running",
622 NodeState::Stopping => "stopping",
623 NodeState::Stopped => "stopped",
624 };
625 write!(f, "{}", s)
626 }
627}
628
629#[derive(Clone, Debug)]
636pub(crate) struct RecentRequest {
637 pub(crate) from_peer: NodeAddr,
639 pub(crate) timestamp_ms: u64,
641 pub(crate) response_forwarded: bool,
645}
646
647impl RecentRequest {
648 pub(crate) fn new(from_peer: NodeAddr, timestamp_ms: u64) -> Self {
649 Self {
650 from_peer,
651 timestamp_ms,
652 response_forwarded: false,
653 }
654 }
655
656 pub(crate) fn is_expired(&self, current_time_ms: u64, expiry_ms: u64) -> bool {
658 current_time_ms.saturating_sub(self.timestamp_ms) > expiry_ms
659 }
660}
661
662type AddrKey = (TransportId, TransportAddr);
664
665#[derive(Debug, Default)]
670struct TransportDropState {
671 prev_drops: u64,
673 dropping: bool,
675}
676
677struct PendingConnect {
683 link_id: LinkId,
685 transport_id: TransportId,
687 remote_addr: TransportAddr,
689 peer_identity: PeerIdentity,
691}
692
693pub struct Node {
707 identity: Identity,
710
711 startup_epoch: [u8; 8],
714
715 started_at: std::time::Instant,
717
718 config: Config,
721
722 state: NodeState,
725
726 is_leaf_only: bool,
728
729 tree_state: TreeState,
732
733 bloom_state: BloomState,
736
737 coord_cache: CoordCache,
740 learned_routes: LearnedRouteTable,
742 session_direct_degraded_until_ms: HashMap<NodeAddr, u64>,
745 recent_requests: HashMap<u64, RecentRequest>,
748 path_mtu_lookup: Arc<std::sync::RwLock<HashMap<crate::FipsAddress, u16>>>,
754
755 transports: HashMap<TransportId, TransportHandle>,
758 transport_drops: HashMap<TransportId, TransportDropState>,
760 links: HashMap<LinkId, Link>,
762 addr_to_link: HashMap<AddrKey, LinkId>,
764
765 packet_tx: Option<PacketTx>,
768 packet_rx: Option<PacketRx>,
770
771 connections: HashMap<LinkId, PeerConnection>,
775
776 peers: HashMap<NodeAddr, ActivePeer>,
780
781 sessions: HashMap<NodeAddr, SessionEntry>,
785
786 identity_cache: HashMap<[u8; 15], IdentityCacheEntry>,
790
791 pending_tun_packets: HashMap<NodeAddr, VecDeque<Vec<u8>>>,
795 pending_endpoint_data: HashMap<NodeAddr, VecDeque<Vec<u8>>>,
797 pending_lookups: HashMap<NodeAddr, handlers::discovery::PendingLookup>,
801
802 max_connections: usize,
805 max_peers: usize,
807 max_links: usize,
809
810 next_link_id: u64,
813 next_transport_id: u32,
815
816 stats: stats::NodeStats,
819
820 stats_history: stats_history::StatsHistory,
822
823 tun_state: TunState,
826 tun_name: Option<String>,
828 tun_tx: Option<TunTx>,
830 tun_outbound_rx: Option<TunOutboundRx>,
832 external_packet_tx: Option<tokio::sync::mpsc::Sender<NodeDeliveredPacket>>,
834 endpoint_priority_command_rx: Option<tokio::sync::mpsc::Receiver<NodeEndpointCommand>>,
836 endpoint_command_rx: Option<tokio::sync::mpsc::Receiver<NodeEndpointCommand>>,
838 endpoint_event_tx: Option<tokio::sync::mpsc::UnboundedSender<NodeEndpointEvent>>,
840 encrypt_workers: Option<encrypt_worker::EncryptWorkerPool>,
846 decrypt_workers: Option<decrypt_worker::DecryptWorkerPool>,
849 decrypt_registered_sessions: std::collections::HashSet<(TransportId, u32)>,
858 decrypt_fallback_rx:
862 Option<tokio::sync::mpsc::UnboundedReceiver<decrypt_worker::DecryptWorkerEvent>>,
863 decrypt_fallback_tx: tokio::sync::mpsc::UnboundedSender<decrypt_worker::DecryptWorkerEvent>,
864 tun_reader_handle: Option<JoinHandle<()>>,
866 tun_writer_handle: Option<JoinHandle<()>>,
868 #[cfg(target_os = "macos")]
871 tun_shutdown_fd: Option<std::os::unix::io::RawFd>,
872
873 dns_identity_rx: Option<crate::upper::dns::DnsIdentityRx>,
876 dns_task: Option<tokio::task::JoinHandle<()>>,
878
879 index_allocator: IndexAllocator,
882 peers_by_index: HashMap<(TransportId, u32), NodeAddr>,
885 pending_outbound: HashMap<(TransportId, u32), LinkId>,
888
889 msg1_rate_limiter: HandshakeRateLimiter,
892 icmp_rate_limiter: IcmpRateLimiter,
894 routing_error_rate_limiter: RoutingErrorRateLimiter,
896 coords_response_rate_limiter: RoutingErrorRateLimiter,
898 discovery_backoff: DiscoveryBackoff,
900 discovery_forward_limiter: DiscoveryForwardRateLimiter,
902
903 pending_connects: Vec<PendingConnect>,
909
910 retry_pending: HashMap<NodeAddr, retry::RetryState>,
916
917 nostr_discovery: Option<Arc<crate::discovery::nostr::NostrDiscovery>>,
919 lan_discovery: Option<Arc<crate::discovery::lan::LanDiscovery>>,
924 local_instance_registry: Option<crate::discovery::local::LocalInstanceRegistry>,
928 local_instance_started_at_ms: Option<u64>,
929 last_local_instance_publish_ms: Option<u64>,
930 last_local_instance_scan_ms: Option<u64>,
931 nostr_discovery_started_at_ms: Option<u64>,
936 startup_open_discovery_sweep_done: bool,
940 bootstrap_transports: HashSet<TransportId>,
942 bootstrap_transport_npubs: HashMap<TransportId, String>,
949 discovery_fallback_transit_blocked_peers: HashSet<NodeAddr>,
952
953 last_parent_reeval: Option<crate::time::Instant>,
956
957 last_congestion_log: Option<std::time::Instant>,
960
961 estimated_mesh_size: Option<u64>,
964 last_mesh_size_log: Option<std::time::Instant>,
966
967 last_self_warn: Option<std::time::Instant>,
973
974 local_send_failure_at_by_peer: HashMap<NodeAddr, std::time::Instant>,
981 last_rx_loop_maintenance_timeout_at: Option<std::time::Instant>,
986
987 peer_aliases: HashMap<NodeAddr, String>,
991 configured_peer_send_weights: HashMap<NodeAddr, u8>,
994
995 peer_acl: acl::PeerAclReloader,
997
998 host_map: Arc<HostMap>,
1002}
1003
1004impl Node {
1005 pub fn new(config: Config) -> Result<Self, NodeError> {
1007 config.validate()?;
1008 let identity = config.create_identity()?;
1009 let node_addr = *identity.node_addr();
1010 let is_leaf_only = config.is_leaf_only();
1011
1012 let (decrypt_fallback_tx, decrypt_fallback_rx) = tokio::sync::mpsc::unbounded_channel();
1013 let decrypt_fallback_rx = Some(decrypt_fallback_rx);
1014
1015 let mut startup_epoch = [0u8; 8];
1016 rand::rng().fill_bytes(&mut startup_epoch);
1017
1018 let mut bloom_state = if is_leaf_only {
1019 BloomState::leaf_only(node_addr)
1020 } else {
1021 BloomState::new(node_addr)
1022 };
1023 bloom_state.set_update_debounce_ms(config.node.bloom.update_debounce_ms);
1024
1025 let tun_state = if config.tun.enabled {
1026 TunState::Configured
1027 } else {
1028 TunState::Disabled
1029 };
1030
1031 let mut tree_state = TreeState::new(node_addr);
1033 tree_state.set_parent_hysteresis(config.node.tree.parent_hysteresis);
1034 tree_state.set_hold_down(config.node.tree.hold_down_secs);
1035 tree_state.set_flap_dampening(
1036 config.node.tree.flap_threshold,
1037 config.node.tree.flap_window_secs,
1038 config.node.tree.flap_dampening_secs,
1039 );
1040 tree_state
1041 .sign_declaration(&identity)
1042 .expect("signing own declaration should never fail");
1043
1044 let coord_cache = CoordCache::new(
1045 config.node.cache.coord_size,
1046 config.node.cache.coord_ttl_secs * 1000,
1047 );
1048 let rl = &config.node.rate_limit;
1049 let msg1_rate_limiter = HandshakeRateLimiter::with_params(
1050 rate_limit::TokenBucket::with_params(rl.handshake_burst, rl.handshake_rate),
1051 config.node.limits.max_pending_inbound,
1052 );
1053
1054 let max_connections = config.node.limits.max_connections;
1055 let max_peers = config.node.limits.max_peers;
1056 let max_links = config.node.limits.max_links;
1057 let coords_response_interval_ms = config.node.session.coords_response_interval_ms;
1058 let backoff_base_secs = config.node.discovery.backoff_base_secs;
1059 let backoff_max_secs = config.node.discovery.backoff_max_secs;
1060 let forward_min_interval_secs = config.node.discovery.forward_min_interval_secs;
1061
1062 let (host_map, peer_acl) = Self::host_map_and_peer_acl(&config);
1063 let configured_peer_send_weights = Self::configured_peer_send_weights(&config);
1064
1065 Ok(Self {
1066 identity,
1067 startup_epoch,
1068 started_at: std::time::Instant::now(),
1069 config,
1070 state: NodeState::Created,
1071 is_leaf_only,
1072 tree_state,
1073 bloom_state,
1074 coord_cache,
1075 learned_routes: LearnedRouteTable::default(),
1076 session_direct_degraded_until_ms: HashMap::new(),
1077 recent_requests: HashMap::new(),
1078 transports: HashMap::new(),
1079 transport_drops: HashMap::new(),
1080 links: HashMap::new(),
1081 addr_to_link: HashMap::new(),
1082 packet_tx: None,
1083 packet_rx: None,
1084 connections: HashMap::new(),
1085 peers: HashMap::new(),
1086 sessions: HashMap::new(),
1087 identity_cache: HashMap::new(),
1088 pending_tun_packets: HashMap::new(),
1089 pending_endpoint_data: HashMap::new(),
1090 pending_lookups: HashMap::new(),
1091 max_connections,
1092 max_peers,
1093 max_links,
1094 next_link_id: 1,
1095 next_transport_id: 1,
1096 stats: stats::NodeStats::new(),
1097 stats_history: stats_history::StatsHistory::new(),
1098 tun_state,
1099 tun_name: None,
1100 tun_tx: None,
1101 tun_outbound_rx: None,
1102 external_packet_tx: None,
1103 endpoint_priority_command_rx: None,
1104 endpoint_command_rx: None,
1105 endpoint_event_tx: None,
1106 encrypt_workers: None,
1107 decrypt_workers: None,
1108 decrypt_registered_sessions: std::collections::HashSet::new(),
1109 decrypt_fallback_tx,
1110 decrypt_fallback_rx,
1111 tun_reader_handle: None,
1112 tun_writer_handle: None,
1113 #[cfg(target_os = "macos")]
1114 tun_shutdown_fd: None,
1115 dns_identity_rx: None,
1116 dns_task: None,
1117 index_allocator: IndexAllocator::new(),
1118 peers_by_index: HashMap::new(),
1119 pending_outbound: HashMap::new(),
1120 msg1_rate_limiter,
1121 icmp_rate_limiter: IcmpRateLimiter::new(),
1122 routing_error_rate_limiter: RoutingErrorRateLimiter::new(),
1123 coords_response_rate_limiter: RoutingErrorRateLimiter::with_interval(
1124 std::time::Duration::from_millis(coords_response_interval_ms),
1125 ),
1126 discovery_backoff: DiscoveryBackoff::with_params(backoff_base_secs, backoff_max_secs),
1127 discovery_forward_limiter: DiscoveryForwardRateLimiter::with_interval(
1128 std::time::Duration::from_secs(forward_min_interval_secs),
1129 ),
1130 pending_connects: Vec::new(),
1131 retry_pending: HashMap::new(),
1132 nostr_discovery: None,
1133 nostr_discovery_started_at_ms: None,
1134 lan_discovery: None,
1135 local_instance_registry: None,
1136 local_instance_started_at_ms: None,
1137 last_local_instance_publish_ms: None,
1138 last_local_instance_scan_ms: None,
1139 startup_open_discovery_sweep_done: false,
1140 bootstrap_transports: HashSet::new(),
1141 bootstrap_transport_npubs: HashMap::new(),
1142 discovery_fallback_transit_blocked_peers: HashSet::new(),
1143 last_parent_reeval: None,
1144 last_congestion_log: None,
1145 estimated_mesh_size: None,
1146 last_mesh_size_log: None,
1147 last_self_warn: None,
1148 local_send_failure_at_by_peer: HashMap::new(),
1149 last_rx_loop_maintenance_timeout_at: None,
1150 peer_aliases: HashMap::new(),
1151 configured_peer_send_weights,
1152 peer_acl,
1153 host_map,
1154 path_mtu_lookup: Arc::new(std::sync::RwLock::new(HashMap::new())),
1155 })
1156 }
1157
1158 pub fn with_identity(identity: Identity, config: Config) -> Result<Self, NodeError> {
1163 config.validate()?;
1164 let node_addr = *identity.node_addr();
1165
1166 let (decrypt_fallback_tx, decrypt_fallback_rx) = tokio::sync::mpsc::unbounded_channel();
1167 let decrypt_fallback_rx = Some(decrypt_fallback_rx);
1168
1169 let mut startup_epoch = [0u8; 8];
1170 rand::rng().fill_bytes(&mut startup_epoch);
1171
1172 let tun_state = if config.tun.enabled {
1173 TunState::Configured
1174 } else {
1175 TunState::Disabled
1176 };
1177
1178 let mut tree_state = TreeState::new(node_addr);
1180 tree_state.set_parent_hysteresis(config.node.tree.parent_hysteresis);
1181 tree_state.set_hold_down(config.node.tree.hold_down_secs);
1182 tree_state.set_flap_dampening(
1183 config.node.tree.flap_threshold,
1184 config.node.tree.flap_window_secs,
1185 config.node.tree.flap_dampening_secs,
1186 );
1187 tree_state
1188 .sign_declaration(&identity)
1189 .expect("signing own declaration should never fail");
1190
1191 let mut bloom_state = BloomState::new(node_addr);
1192 bloom_state.set_update_debounce_ms(config.node.bloom.update_debounce_ms);
1193
1194 let coord_cache = CoordCache::new(
1195 config.node.cache.coord_size,
1196 config.node.cache.coord_ttl_secs * 1000,
1197 );
1198 let rl = &config.node.rate_limit;
1199 let msg1_rate_limiter = HandshakeRateLimiter::with_params(
1200 rate_limit::TokenBucket::with_params(rl.handshake_burst, rl.handshake_rate),
1201 config.node.limits.max_pending_inbound,
1202 );
1203
1204 let max_connections = config.node.limits.max_connections;
1205 let max_peers = config.node.limits.max_peers;
1206 let max_links = config.node.limits.max_links;
1207 let coords_response_interval_ms = config.node.session.coords_response_interval_ms;
1208
1209 let (host_map, peer_acl) = Self::host_map_and_peer_acl(&config);
1210 let configured_peer_send_weights = Self::configured_peer_send_weights(&config);
1211
1212 Ok(Self {
1213 identity,
1214 startup_epoch,
1215 started_at: std::time::Instant::now(),
1216 config,
1217 state: NodeState::Created,
1218 is_leaf_only: false,
1219 tree_state,
1220 bloom_state,
1221 coord_cache,
1222 learned_routes: LearnedRouteTable::default(),
1223 session_direct_degraded_until_ms: HashMap::new(),
1224 recent_requests: HashMap::new(),
1225 transports: HashMap::new(),
1226 transport_drops: HashMap::new(),
1227 links: HashMap::new(),
1228 addr_to_link: HashMap::new(),
1229 packet_tx: None,
1230 packet_rx: None,
1231 connections: HashMap::new(),
1232 peers: HashMap::new(),
1233 sessions: HashMap::new(),
1234 identity_cache: HashMap::new(),
1235 pending_tun_packets: HashMap::new(),
1236 pending_endpoint_data: HashMap::new(),
1237 pending_lookups: HashMap::new(),
1238 max_connections,
1239 max_peers,
1240 max_links,
1241 next_link_id: 1,
1242 next_transport_id: 1,
1243 stats: stats::NodeStats::new(),
1244 stats_history: stats_history::StatsHistory::new(),
1245 tun_state,
1246 tun_name: None,
1247 tun_tx: None,
1248 tun_outbound_rx: None,
1249 external_packet_tx: None,
1250 endpoint_priority_command_rx: None,
1251 endpoint_command_rx: None,
1252 endpoint_event_tx: None,
1253 encrypt_workers: None,
1254 decrypt_workers: None,
1255 decrypt_registered_sessions: std::collections::HashSet::new(),
1256 decrypt_fallback_tx,
1257 decrypt_fallback_rx,
1258 tun_reader_handle: None,
1259 tun_writer_handle: None,
1260 #[cfg(target_os = "macos")]
1261 tun_shutdown_fd: None,
1262 dns_identity_rx: None,
1263 dns_task: None,
1264 index_allocator: IndexAllocator::new(),
1265 peers_by_index: HashMap::new(),
1266 pending_outbound: HashMap::new(),
1267 msg1_rate_limiter,
1268 icmp_rate_limiter: IcmpRateLimiter::new(),
1269 routing_error_rate_limiter: RoutingErrorRateLimiter::new(),
1270 coords_response_rate_limiter: RoutingErrorRateLimiter::with_interval(
1271 std::time::Duration::from_millis(coords_response_interval_ms),
1272 ),
1273 discovery_backoff: DiscoveryBackoff::new(),
1274 discovery_forward_limiter: DiscoveryForwardRateLimiter::new(),
1275 pending_connects: Vec::new(),
1276 retry_pending: HashMap::new(),
1277 nostr_discovery: None,
1278 nostr_discovery_started_at_ms: None,
1279 lan_discovery: None,
1280 local_instance_registry: None,
1281 local_instance_started_at_ms: None,
1282 last_local_instance_publish_ms: None,
1283 last_local_instance_scan_ms: None,
1284 startup_open_discovery_sweep_done: false,
1285 bootstrap_transports: HashSet::new(),
1286 bootstrap_transport_npubs: HashMap::new(),
1287 discovery_fallback_transit_blocked_peers: HashSet::new(),
1288 last_parent_reeval: None,
1289 last_congestion_log: None,
1290 estimated_mesh_size: None,
1291 last_mesh_size_log: None,
1292 last_self_warn: None,
1293 local_send_failure_at_by_peer: HashMap::new(),
1294 last_rx_loop_maintenance_timeout_at: None,
1295 peer_aliases: HashMap::new(),
1296 configured_peer_send_weights,
1297 peer_acl,
1298 host_map,
1299 path_mtu_lookup: Arc::new(std::sync::RwLock::new(HashMap::new())),
1300 })
1301 }
1302
1303 pub fn leaf_only(config: Config) -> Result<Self, NodeError> {
1305 let mut node = Self::new(config)?;
1306 node.is_leaf_only = true;
1307 node.bloom_state = BloomState::leaf_only(*node.identity.node_addr());
1308 Ok(node)
1309 }
1310
1311 fn host_map_and_peer_acl(config: &Config) -> (Arc<HostMap>, acl::PeerAclReloader) {
1312 let base_host_map = HostMap::from_peer_configs(config.peers());
1313 if !config.node.system_files_enabled {
1314 return (
1315 Arc::new(base_host_map.clone()),
1316 acl::PeerAclReloader::memory_only(base_host_map),
1317 );
1318 }
1319
1320 let mut host_map = base_host_map.clone();
1321 let hosts_path = std::path::PathBuf::from(crate::upper::hosts::DEFAULT_HOSTS_PATH);
1322 let hosts_file = HostMap::load_hosts_file(std::path::Path::new(
1323 crate::upper::hosts::DEFAULT_HOSTS_PATH,
1324 ));
1325 host_map.merge(hosts_file);
1326 let peer_acl = acl::PeerAclReloader::with_alias_sources(
1327 std::path::PathBuf::from(acl::DEFAULT_PEERS_ALLOW_PATH),
1328 std::path::PathBuf::from(acl::DEFAULT_PEERS_DENY_PATH),
1329 base_host_map,
1330 hosts_path,
1331 );
1332 (Arc::new(host_map), peer_acl)
1333 }
1334
1335 fn configured_peer_send_weights(config: &Config) -> HashMap<NodeAddr, u8> {
1336 config
1337 .peers()
1338 .iter()
1339 .filter_map(|peer| {
1340 PeerIdentity::from_npub(&peer.npub).ok().map(|identity| {
1341 (
1342 *identity.node_addr(),
1343 encrypt_worker::EXPLICIT_PEER_SEND_WEIGHT,
1344 )
1345 })
1346 })
1347 .collect()
1348 }
1349
1350 #[cfg(unix)]
1351 fn send_weight_for_peer(&self, peer_addr: &NodeAddr) -> u8 {
1352 self.configured_peer_send_weights
1353 .get(peer_addr)
1354 .copied()
1355 .unwrap_or(encrypt_worker::DEFAULT_SEND_WEIGHT)
1356 }
1357
1358 async fn create_transports(&mut self, packet_tx: &PacketTx) -> Vec<TransportHandle> {
1362 let mut transports = Vec::new();
1363
1364 let udp_instances: Vec<_> = self
1366 .config
1367 .transports
1368 .udp
1369 .iter()
1370 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1371 .collect();
1372
1373 for (name, udp_config) in udp_instances {
1375 let transport_id = self.allocate_transport_id();
1376 let udp = UdpTransport::new(transport_id, name, udp_config, packet_tx.clone());
1377 transports.push(TransportHandle::Udp(udp));
1378 }
1379
1380 #[cfg(feature = "sim-transport")]
1381 {
1382 let sim_instances: Vec<_> = self
1383 .config
1384 .transports
1385 .sim
1386 .iter()
1387 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1388 .collect();
1389
1390 for (name, sim_config) in sim_instances {
1391 let transport_id = self.allocate_transport_id();
1392 let sim = crate::transport::sim::SimTransport::new(
1393 transport_id,
1394 name,
1395 sim_config,
1396 packet_tx.clone(),
1397 );
1398 transports.push(TransportHandle::Sim(sim));
1399 }
1400 }
1401
1402 #[cfg(any(target_os = "linux", target_os = "macos"))]
1404 {
1405 let eth_instances: Vec<_> = self
1406 .config
1407 .transports
1408 .ethernet
1409 .iter()
1410 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1411 .collect();
1412 let xonly = self.identity.pubkey();
1413 for (name, eth_config) in eth_instances {
1414 let mut eth_config = eth_config;
1415 if eth_config.discovery_scope.is_none() {
1416 eth_config.discovery_scope = self.lan_discovery_scope();
1417 }
1418 let transport_id = self.allocate_transport_id();
1419 let mut eth =
1420 EthernetTransport::new(transport_id, name, eth_config, packet_tx.clone());
1421 eth.set_local_pubkey(xonly);
1422 transports.push(TransportHandle::Ethernet(eth));
1423 }
1424 }
1425
1426 let tcp_instances: Vec<_> = self
1428 .config
1429 .transports
1430 .tcp
1431 .iter()
1432 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1433 .collect();
1434
1435 for (name, tcp_config) in tcp_instances {
1436 let transport_id = self.allocate_transport_id();
1437 let tcp = TcpTransport::new(transport_id, name, tcp_config, packet_tx.clone());
1438 transports.push(TransportHandle::Tcp(tcp));
1439 }
1440
1441 let tor_instances: Vec<_> = self
1443 .config
1444 .transports
1445 .tor
1446 .iter()
1447 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1448 .collect();
1449
1450 for (name, tor_config) in tor_instances {
1451 let transport_id = self.allocate_transport_id();
1452 let tor = TorTransport::new(transport_id, name, tor_config, packet_tx.clone());
1453 transports.push(TransportHandle::Tor(tor));
1454 }
1455
1456 let webrtc_instances: Vec<_> = self
1457 .config
1458 .transports
1459 .webrtc
1460 .iter()
1461 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1462 .collect();
1463
1464 #[cfg(feature = "webrtc-transport")]
1465 {
1466 for (name, webrtc_config) in webrtc_instances {
1467 let transport_id = self.allocate_transport_id();
1468 match WebRtcTransport::new(
1469 transport_id,
1470 name,
1471 webrtc_config,
1472 packet_tx.clone(),
1473 &self.identity,
1474 &self.config.node.discovery.nostr,
1475 ) {
1476 Ok(webrtc) => transports.push(TransportHandle::WebRtc(Box::new(webrtc))),
1477 Err(err) => {
1478 warn!(
1479 transport_id = %transport_id,
1480 error = %err,
1481 "failed to initialize WebRTC transport"
1482 );
1483 }
1484 }
1485 }
1486 }
1487 #[cfg(not(feature = "webrtc-transport"))]
1488 if !webrtc_instances.is_empty() {
1489 warn!("WebRTC transport configured but this build lacks WebRTC transport support");
1490 }
1491
1492 #[cfg(bluer_available)]
1494 {
1495 let ble_instances: Vec<_> = self
1496 .config
1497 .transports
1498 .ble
1499 .iter()
1500 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1501 .collect();
1502
1503 #[cfg(all(bluer_available, not(test)))]
1504 for (name, ble_config) in ble_instances {
1505 let transport_id = self.allocate_transport_id();
1506 let adapter = ble_config.adapter().to_string();
1507 let mtu = ble_config.mtu();
1508 match crate::transport::ble::io::BluerIo::new(&adapter, mtu).await {
1509 Ok(io) => {
1510 let mut ble = crate::transport::ble::BleTransport::new(
1511 transport_id,
1512 name,
1513 ble_config,
1514 io,
1515 packet_tx.clone(),
1516 );
1517 ble.set_local_pubkey(self.identity.pubkey().serialize());
1518 transports.push(TransportHandle::Ble(ble));
1519 }
1520 Err(e) => {
1521 tracing::warn!(adapter = %adapter, error = %e, "failed to initialize BLE adapter");
1522 }
1523 }
1524 }
1525
1526 #[cfg(any(not(bluer_available), test))]
1527 if !ble_instances.is_empty() {
1528 #[cfg(not(test))]
1529 tracing::warn!("BLE transport configured but this build lacks BlueZ support");
1530 }
1531 }
1532
1533 transports
1534 }
1535
1536 fn find_transport_for_type(&self, transport_type: &str) -> Option<TransportId> {
1546 self.transports
1547 .iter()
1548 .filter(|(id, handle)| {
1549 handle.transport_type().name == transport_type
1550 && handle.is_operational()
1551 && !self.bootstrap_transports.contains(id)
1552 })
1553 .min_by_key(|(id, _)| id.as_u32())
1554 .map(|(id, _)| *id)
1555 }
1556
1557 #[allow(unused_variables)]
1563 fn resolve_ethernet_addr(
1564 &self,
1565 addr_str: &str,
1566 ) -> Result<(TransportId, TransportAddr), NodeError> {
1567 #[cfg(any(target_os = "linux", target_os = "macos"))]
1568 {
1569 let (iface, mac_str) = addr_str.split_once('/').ok_or_else(|| {
1570 NodeError::NoTransportForType(format!(
1571 "invalid Ethernet address format '{}': expected 'interface/mac'",
1572 addr_str
1573 ))
1574 })?;
1575
1576 let transport_id = self
1578 .transports
1579 .iter()
1580 .find(|(_, handle)| {
1581 handle.transport_type().name == "ethernet"
1582 && handle.is_operational()
1583 && handle.interface_name() == Some(iface)
1584 })
1585 .map(|(id, _)| *id)
1586 .ok_or_else(|| {
1587 NodeError::NoTransportForType(format!(
1588 "no operational Ethernet transport for interface '{}'",
1589 iface
1590 ))
1591 })?;
1592
1593 let mac = crate::transport::ethernet::parse_mac_string(mac_str).map_err(|e| {
1594 NodeError::NoTransportForType(format!("invalid MAC in '{}': {}", addr_str, e))
1595 })?;
1596
1597 Ok((transport_id, TransportAddr::from_bytes(&mac)))
1598 }
1599 #[cfg(not(any(target_os = "linux", target_os = "macos")))]
1600 {
1601 Err(NodeError::NoTransportForType(
1602 "Ethernet transport is not supported on this platform".to_string(),
1603 ))
1604 }
1605 }
1606
1607 #[cfg(bluer_available)]
1611 fn resolve_ble_addr(&self, addr_str: &str) -> Result<(TransportId, TransportAddr), NodeError> {
1612 let ta = TransportAddr::from_string(addr_str);
1613 let adapter = crate::transport::ble::addr::adapter_from_addr(&ta).ok_or_else(|| {
1614 NodeError::NoTransportForType(format!(
1615 "invalid BLE address format '{}': expected 'adapter/mac'",
1616 addr_str
1617 ))
1618 })?;
1619
1620 let transport_id = self
1622 .transports
1623 .iter()
1624 .find(|(_, handle)| handle.transport_type().name == "ble" && handle.is_operational())
1625 .map(|(id, _)| *id)
1626 .ok_or_else(|| {
1627 NodeError::NoTransportForType(format!(
1628 "no operational BLE transport for adapter '{}'",
1629 adapter
1630 ))
1631 })?;
1632
1633 crate::transport::ble::addr::BleAddr::parse(addr_str).map_err(|e| {
1635 NodeError::NoTransportForType(format!("invalid BLE address '{}': {}", addr_str, e))
1636 })?;
1637
1638 Ok((transport_id, TransportAddr::from_string(addr_str)))
1639 }
1640
1641 pub fn identity(&self) -> &Identity {
1645 &self.identity
1646 }
1647
1648 pub fn node_addr(&self) -> &NodeAddr {
1650 self.identity.node_addr()
1651 }
1652
1653 pub fn npub(&self) -> String {
1655 self.identity.npub()
1656 }
1657
1658 pub(crate) fn peer_display_name(&self, addr: &NodeAddr) -> String {
1667 if let Some(hostname) = self.host_map.lookup_hostname(addr) {
1668 return hostname.to_string();
1669 }
1670 if let Some(name) = self.peer_aliases.get(addr) {
1671 return name.clone();
1672 }
1673 if let Some(peer) = self.peers.get(addr) {
1674 return peer.identity().short_npub();
1675 }
1676 if let Some(entry) = self.sessions.get(addr) {
1677 let (xonly, _) = entry.remote_pubkey().x_only_public_key();
1678 return PeerIdentity::from_pubkey(xonly).short_npub();
1679 }
1680 addr.short_hex()
1681 }
1682
1683 pub(in crate::node) fn deregister_session_index(&mut self, cache_key: (TransportId, u32)) {
1695 let owning_peer = self.peers_by_index.get(&cache_key).copied();
1699 self.peers_by_index.remove(&cache_key);
1700 if self.decrypt_registered_sessions.remove(&cache_key)
1701 && let Some(workers) = self.decrypt_workers.as_ref()
1702 {
1703 workers.unregister_session(cache_key);
1704 }
1705 if let Some(peer_addr) = owning_peer {
1716 let peer_has_other_index = self
1717 .peers_by_index
1718 .values()
1719 .any(|other| *other == peer_addr);
1720 if !peer_has_other_index {
1721 self.clear_connected_udp_for_peer(&peer_addr);
1722 }
1723 }
1724 }
1725
1726 pub(in crate::node) fn ensure_current_session_index_registered(
1735 &mut self,
1736 node_addr: &NodeAddr,
1737 context: &'static str,
1738 ) -> bool {
1739 let Some(peer) = self.peers.get(node_addr) else {
1740 return false;
1741 };
1742 let Some(transport_id) = peer.transport_id() else {
1743 warn!(
1744 peer = %self.peer_display_name(node_addr),
1745 context,
1746 "Cannot register current session index without transport id"
1747 );
1748 return false;
1749 };
1750 let Some(our_index) = peer.our_index() else {
1751 warn!(
1752 peer = %self.peer_display_name(node_addr),
1753 context,
1754 "Cannot register current session index without local index"
1755 );
1756 return false;
1757 };
1758
1759 let cache_key = (transport_id, our_index.as_u32());
1760 match self.peers_by_index.get(&cache_key).copied() {
1761 Some(existing) if existing == *node_addr => true,
1762 Some(existing) => {
1763 warn!(
1764 peer = %self.peer_display_name(node_addr),
1765 previous_owner = %self.peer_display_name(&existing),
1766 transport_id = %transport_id,
1767 our_index = %our_index,
1768 context,
1769 "Repairing current session index with stale owner"
1770 );
1771 self.peers_by_index.insert(cache_key, *node_addr);
1772 true
1773 }
1774 None => {
1775 warn!(
1776 peer = %self.peer_display_name(node_addr),
1777 transport_id = %transport_id,
1778 our_index = %our_index,
1779 context,
1780 "Repairing missing current session index"
1781 );
1782 self.peers_by_index.insert(cache_key, *node_addr);
1783 true
1784 }
1785 }
1786 }
1787
1788 pub fn config(&self) -> &Config {
1792 &self.config
1793 }
1794
1795 pub fn effective_ipv6_mtu(&self) -> u16 {
1801 crate::upper::icmp::effective_ipv6_mtu(self.transport_mtu())
1802 }
1803
1804 pub fn transport_mtu(&self) -> u16 {
1821 let min_operational = self
1822 .transports
1823 .values()
1824 .filter(|h| h.is_operational())
1825 .map(|h| h.mtu())
1826 .min();
1827 if let Some(mtu) = min_operational {
1828 return mtu;
1829 }
1830 if let Some((_, cfg)) = self.config.transports.udp.iter().next() {
1832 return cfg.mtu();
1833 }
1834 1280
1835 }
1836
1837 pub fn state(&self) -> NodeState {
1841 self.state
1842 }
1843
1844 pub fn uptime(&self) -> std::time::Duration {
1846 self.started_at.elapsed()
1847 }
1848
1849 pub fn is_running(&self) -> bool {
1851 self.state.is_operational()
1852 }
1853
1854 pub fn is_leaf_only(&self) -> bool {
1856 self.is_leaf_only
1857 }
1858
1859 pub fn tree_state(&self) -> &TreeState {
1863 &self.tree_state
1864 }
1865
1866 pub fn tree_state_mut(&mut self) -> &mut TreeState {
1868 &mut self.tree_state
1869 }
1870
1871 pub fn bloom_state(&self) -> &BloomState {
1875 &self.bloom_state
1876 }
1877
1878 pub fn bloom_state_mut(&mut self) -> &mut BloomState {
1880 &mut self.bloom_state
1881 }
1882
1883 pub fn estimated_mesh_size(&self) -> Option<u64> {
1887 self.estimated_mesh_size
1888 }
1889
1890 pub(crate) fn compute_mesh_size(&mut self) {
1897 let my_addr = *self.tree_state.my_node_addr();
1898 let parent_id = *self.tree_state.my_declaration().parent_id();
1899 let is_root = self.tree_state.is_root();
1900
1901 let max_fpr = self.config.node.bloom.max_inbound_fpr;
1902 let mut child_count: u32 = 0;
1903 let mut union: Option<BloomFilter> = None;
1904
1905 let add_to_union = |union: &mut Option<BloomFilter>, filter: &BloomFilter| match union {
1906 None => *union = Some(filter.clone()),
1907 Some(existing) => {
1908 let _ = existing.merge(filter);
1910 }
1911 };
1912
1913 if !is_root
1915 && let Some(parent) = self.peers.get(&parent_id)
1916 && let Some(filter) = parent.inbound_filter()
1917 {
1918 add_to_union(&mut union, filter);
1919 }
1920
1921 for (peer_addr, peer) in &self.peers {
1924 if peer_addr == &parent_id {
1925 continue;
1926 }
1927 if let Some(decl) = self.tree_state.peer_declaration(peer_addr)
1928 && *decl.parent_id() == my_addr
1929 {
1930 child_count += 1;
1931 if let Some(filter) = peer.inbound_filter() {
1932 add_to_union(&mut union, filter);
1933 }
1934 }
1935 }
1936
1937 let Some(mut union) = union else {
1938 self.estimated_mesh_size = None;
1939 return;
1940 };
1941 union.insert(&my_addr);
1942
1943 let Some(union_estimate) = union.estimated_count(max_fpr) else {
1946 self.estimated_mesh_size = None;
1947 return;
1948 };
1949
1950 let size = union_estimate.round() as u64;
1951 self.estimated_mesh_size = Some(size);
1952
1953 let now = std::time::Instant::now();
1955 let should_log = match self.last_mesh_size_log {
1956 None => true,
1957 Some(last) => {
1958 now.duration_since(last)
1959 >= std::time::Duration::from_secs(self.config.node.mmp.log_interval_secs)
1960 }
1961 };
1962 if should_log {
1963 tracing::debug!(
1964 estimated_mesh_size = size,
1965 peers = self.peers.len(),
1966 children = child_count,
1967 "Mesh size estimate"
1968 );
1969 self.last_mesh_size_log = Some(now);
1970 }
1971 }
1972
1973 pub fn coord_cache(&self) -> &CoordCache {
1977 &self.coord_cache
1978 }
1979
1980 pub fn coord_cache_mut(&mut self) -> &mut CoordCache {
1982 &mut self.coord_cache
1983 }
1984
1985 pub fn stats(&self) -> &stats::NodeStats {
1989 &self.stats
1990 }
1991
1992 pub(crate) fn stats_mut(&mut self) -> &mut stats::NodeStats {
1994 &mut self.stats
1995 }
1996
1997 pub fn stats_history(&self) -> &stats_history::StatsHistory {
1999 &self.stats_history
2000 }
2001
2002 pub(crate) fn record_stats_history(&mut self) {
2005 let fwd = &self.stats.forwarding;
2006 let peers_with_mmp: Vec<f64> = self
2007 .peers
2008 .values()
2009 .filter_map(|p| p.mmp().map(|m| m.metrics.loss_rate()))
2010 .collect();
2011 let loss_rate = if peers_with_mmp.is_empty() {
2012 0.0
2013 } else {
2014 peers_with_mmp.iter().sum::<f64>() / peers_with_mmp.len() as f64
2015 };
2016
2017 let snap = stats_history::Snapshot {
2018 mesh_size: self.estimated_mesh_size,
2019 tree_depth: self.tree_state.my_coords().depth() as u32,
2020 peer_count: self.peers.len() as u64,
2021 parent_switches_total: self.stats.tree.parent_switches,
2022 bytes_in_total: fwd.received_bytes,
2023 bytes_out_total: fwd.forwarded_bytes + fwd.originated_bytes,
2024 packets_in_total: fwd.received_packets,
2025 packets_out_total: fwd.forwarded_packets + fwd.originated_packets,
2026 loss_rate,
2027 active_sessions: self.sessions.len() as u64,
2028 };
2029
2030 let now = std::time::Instant::now();
2031 let peer_snaps: Vec<stats_history::PeerSnapshot> = self
2032 .peers
2033 .values()
2034 .map(|p| {
2035 let stats = p.link_stats();
2036 let (srtt_ms, loss_rate, ecn_ce) = match p.mmp() {
2037 Some(m) => (
2038 m.metrics.srtt_ms(),
2039 Some(m.metrics.loss_rate()),
2040 m.receiver.ecn_ce_count() as u64,
2041 ),
2042 None => (None, None, 0),
2043 };
2044 stats_history::PeerSnapshot {
2045 node_addr: *p.node_addr(),
2046 last_seen: now,
2047 srtt_ms,
2048 loss_rate,
2049 bytes_in_total: stats.bytes_recv,
2050 bytes_out_total: stats.bytes_sent,
2051 packets_in_total: stats.packets_recv,
2052 packets_out_total: stats.packets_sent,
2053 ecn_ce_total: ecn_ce,
2054 }
2055 })
2056 .collect();
2057
2058 self.stats_history.tick(now, &snap, &peer_snaps);
2059 }
2060
2061 pub fn tun_state(&self) -> TunState {
2065 self.tun_state
2066 }
2067
2068 pub fn tun_name(&self) -> Option<&str> {
2070 self.tun_name.as_deref()
2071 }
2072
2073 pub fn set_max_connections(&mut self, max: usize) {
2077 self.max_connections = max;
2078 }
2079
2080 pub fn set_max_peers(&mut self, max: usize) {
2082 self.max_peers = max;
2083 }
2084
2085 pub(crate) fn outbound_admission_check(&self) -> bool {
2088 let connection_used = self
2089 .connections
2090 .len()
2091 .saturating_add(self.pending_connects.len());
2092 let peer_allowed = self.max_peers == 0 || self.peers.len() < self.max_peers;
2093 let connection_allowed =
2094 self.max_connections == 0 || connection_used < self.max_connections;
2095 let link_allowed = self.max_links == 0 || self.links.len() < self.max_links;
2096 peer_allowed && connection_allowed && link_allowed
2097 }
2098
2099 pub(crate) fn open_discovery_outbound_admission_check(&self) -> bool {
2103 if !self.outbound_admission_check() {
2104 return false;
2105 }
2106
2107 let nostr = &self.config.node.discovery.nostr;
2108 if !nostr.enabled || nostr.policy != NostrDiscoveryPolicy::Open {
2109 return true;
2110 }
2111
2112 let configured_npubs = self
2113 .config
2114 .peers()
2115 .iter()
2116 .map(|peer| peer.npub.clone())
2117 .collect::<HashSet<_>>();
2118 self.open_discovery_enqueue_budget(&configured_npubs) > 0
2119 }
2120
2121 pub(crate) fn outbound_direct_refresh_admission_check(&self) -> bool {
2125 let connection_used = self
2126 .connections
2127 .len()
2128 .saturating_add(self.pending_connects.len());
2129 let connection_allowed =
2130 self.max_connections == 0 || connection_used < self.max_connections;
2131 let link_allowed = self.max_links == 0 || self.links.len() < self.max_links;
2132 connection_allowed && link_allowed
2133 }
2134
2135 pub fn set_max_links(&mut self, max: usize) {
2137 self.max_links = max;
2138 }
2139
2140 pub fn connection_count(&self) -> usize {
2144 self.connections.len()
2145 }
2146
2147 pub fn peer_count(&self) -> usize {
2149 self.peers.len()
2150 }
2151
2152 pub fn link_count(&self) -> usize {
2154 self.links.len()
2155 }
2156
2157 pub fn transport_count(&self) -> usize {
2159 self.transports.len()
2160 }
2161
2162 pub fn allocate_transport_id(&mut self) -> TransportId {
2166 let id = TransportId::new(self.next_transport_id);
2167 self.next_transport_id += 1;
2168 id
2169 }
2170
2171 pub fn get_transport(&self, id: &TransportId) -> Option<&TransportHandle> {
2173 self.transports.get(id)
2174 }
2175
2176 pub fn get_transport_mut(&mut self, id: &TransportId) -> Option<&mut TransportHandle> {
2178 self.transports.get_mut(id)
2179 }
2180
2181 pub fn transport_ids(&self) -> impl Iterator<Item = &TransportId> {
2183 self.transports.keys()
2184 }
2185
2186 pub fn packet_rx(&mut self) -> Option<&mut PacketRx> {
2188 self.packet_rx.as_mut()
2189 }
2190
2191 pub fn allocate_link_id(&mut self) -> LinkId {
2195 let id = LinkId::new(self.next_link_id);
2196 self.next_link_id += 1;
2197 id
2198 }
2199
2200 pub fn add_link(&mut self, link: Link) -> Result<(), NodeError> {
2202 if self.max_links > 0 && self.links.len() >= self.max_links {
2203 return Err(NodeError::MaxLinksExceeded {
2204 max: self.max_links,
2205 });
2206 }
2207 let link_id = link.link_id();
2208 let transport_id = link.transport_id();
2209 let remote_addr = link.remote_addr().clone();
2210
2211 self.links.insert(link_id, link);
2212 self.addr_to_link
2213 .insert((transport_id, remote_addr), link_id);
2214 Ok(())
2215 }
2216
2217 pub fn get_link(&self, link_id: &LinkId) -> Option<&Link> {
2219 self.links.get(link_id)
2220 }
2221
2222 pub fn get_link_mut(&mut self, link_id: &LinkId) -> Option<&mut Link> {
2224 self.links.get_mut(link_id)
2225 }
2226
2227 pub fn find_link_by_addr(
2229 &self,
2230 transport_id: TransportId,
2231 addr: &TransportAddr,
2232 ) -> Option<LinkId> {
2233 self.addr_to_link
2234 .get(&(transport_id, addr.clone()))
2235 .copied()
2236 }
2237
2238 pub fn remove_link(&mut self, link_id: &LinkId) -> Option<Link> {
2244 if let Some(link) = self.links.remove(link_id) {
2245 let key = (link.transport_id(), link.remote_addr().clone());
2247 if self.addr_to_link.get(&key) == Some(link_id) {
2248 self.addr_to_link.remove(&key);
2249 }
2250 Some(link)
2251 } else {
2252 None
2253 }
2254 }
2255
2256 pub(crate) fn cleanup_bootstrap_transport_if_unused(&mut self, transport_id: TransportId) {
2257 if !self.bootstrap_transports.contains(&transport_id) {
2258 return;
2259 }
2260
2261 let transport_in_use = self
2262 .links
2263 .values()
2264 .any(|link| link.transport_id() == transport_id)
2265 || self
2266 .connections
2267 .values()
2268 .any(|conn| conn.transport_id() == Some(transport_id))
2269 || self
2270 .peers
2271 .values()
2272 .any(|peer| peer.transport_id() == Some(transport_id))
2273 || self
2274 .pending_connects
2275 .iter()
2276 .any(|pending| pending.transport_id == transport_id);
2277
2278 if transport_in_use {
2279 return;
2280 }
2281
2282 tracing::debug!(
2283 transport_id = %transport_id,
2284 "bootstrap transport has no remaining references; dropping"
2285 );
2286
2287 self.bootstrap_transports.remove(&transport_id);
2288 self.bootstrap_transport_npubs.remove(&transport_id);
2289 self.transport_drops.remove(&transport_id);
2290 self.transports.remove(&transport_id);
2291 }
2292
2293 pub fn links(&self) -> impl Iterator<Item = &Link> {
2295 self.links.values()
2296 }
2297
2298 pub fn add_connection(&mut self, connection: PeerConnection) -> Result<(), NodeError> {
2302 let link_id = connection.link_id();
2303
2304 if self.connections.contains_key(&link_id) {
2305 return Err(NodeError::ConnectionAlreadyExists(link_id));
2306 }
2307
2308 if self.max_connections > 0 && self.connections.len() >= self.max_connections {
2309 return Err(NodeError::MaxConnectionsExceeded {
2310 max: self.max_connections,
2311 });
2312 }
2313
2314 self.connections.insert(link_id, connection);
2315 Ok(())
2316 }
2317
2318 pub fn get_connection(&self, link_id: &LinkId) -> Option<&PeerConnection> {
2320 self.connections.get(link_id)
2321 }
2322
2323 pub fn get_connection_mut(&mut self, link_id: &LinkId) -> Option<&mut PeerConnection> {
2325 self.connections.get_mut(link_id)
2326 }
2327
2328 pub fn remove_connection(&mut self, link_id: &LinkId) -> Option<PeerConnection> {
2330 self.connections.remove(link_id)
2331 }
2332
2333 pub fn connections(&self) -> impl Iterator<Item = &PeerConnection> {
2335 self.connections.values()
2336 }
2337
2338 pub fn get_peer(&self, node_addr: &NodeAddr) -> Option<&ActivePeer> {
2342 self.peers.get(node_addr)
2343 }
2344
2345 pub fn get_peer_mut(&mut self, node_addr: &NodeAddr) -> Option<&mut ActivePeer> {
2347 self.peers.get_mut(node_addr)
2348 }
2349
2350 pub fn remove_peer(&mut self, node_addr: &NodeAddr) -> Option<ActivePeer> {
2352 self.peers.remove(node_addr)
2353 }
2354
2355 pub fn peers(&self) -> impl Iterator<Item = &ActivePeer> {
2357 self.peers.values()
2358 }
2359
2360 pub fn nostr_discovery_handle(&self) -> Option<&crate::discovery::nostr::NostrDiscovery> {
2364 self.nostr_discovery.as_deref()
2365 }
2366
2367 pub fn peer_ids(&self) -> impl Iterator<Item = &NodeAddr> {
2369 self.peers.keys()
2370 }
2371
2372 pub fn sendable_peers(&self) -> impl Iterator<Item = &ActivePeer> {
2374 self.peers.values().filter(|p| p.can_send())
2375 }
2376
2377 pub fn sendable_peer_count(&self) -> usize {
2379 self.peers.values().filter(|p| p.can_send()).count()
2380 }
2381
2382 pub(crate) fn set_discovery_fallback_transit_allowed(
2383 &mut self,
2384 peer_addr: NodeAddr,
2385 allowed: bool,
2386 ) {
2387 if allowed {
2388 self.discovery_fallback_transit_blocked_peers
2389 .remove(&peer_addr);
2390 } else {
2391 self.discovery_fallback_transit_blocked_peers
2392 .insert(peer_addr);
2393 }
2394 }
2395
2396 pub(crate) fn configured_discovery_fallback_transit(
2397 &self,
2398 peer_addr: &NodeAddr,
2399 ) -> Option<bool> {
2400 self.configured_peer(peer_addr)
2401 .map(|peer| peer.discovery_fallback_transit)
2402 }
2403
2404 pub(crate) fn configured_peer(&self, peer_addr: &NodeAddr) -> Option<&PeerConfig> {
2405 self.config.peers().iter().find(|peer| {
2406 PeerIdentity::from_npub(&peer.npub)
2407 .ok()
2408 .is_some_and(|identity| identity.node_addr() == peer_addr)
2409 })
2410 }
2411
2412 pub(in crate::node) fn active_peer_uses_configured_static_udp_path(
2413 &self,
2414 peer_addr: &NodeAddr,
2415 ) -> bool {
2416 let Some(peer_config) = self.configured_peer(peer_addr) else {
2417 return false;
2418 };
2419
2420 peer_config.addresses.iter().any(|candidate| {
2421 candidate.seen_at_ms.is_none()
2422 && candidate.transport.eq_ignore_ascii_case("udp")
2423 && self.active_peer_matches_candidate(peer_addr, candidate)
2424 })
2425 }
2426
2427 pub(crate) fn discovery_fallback_transit_for_promotion(&self, peer_addr: &NodeAddr) -> bool {
2428 if let Some(retry_state) = self.retry_pending.get(peer_addr) {
2429 return retry_state.peer_config.discovery_fallback_transit;
2430 }
2431
2432 if let Some(allowed) = self.configured_discovery_fallback_transit(peer_addr) {
2433 return allowed;
2434 }
2435
2436 self.config.node.discovery.nostr.policy != crate::config::NostrDiscoveryPolicy::Open
2437 }
2438
2439 #[cfg(test)]
2444 pub(crate) fn disable_discovery_forward_rate_limit(&mut self) {
2445 self.discovery_forward_limiter
2446 .set_interval(std::time::Duration::ZERO);
2447 }
2448
2449 #[cfg(test)]
2450 pub(crate) fn get_session(&self, remote: &NodeAddr) -> Option<&SessionEntry> {
2451 self.sessions.get(remote)
2452 }
2453
2454 #[cfg(test)]
2456 pub(crate) fn get_session_mut(&mut self, remote: &NodeAddr) -> Option<&mut SessionEntry> {
2457 self.sessions.get_mut(remote)
2458 }
2459
2460 #[cfg(test)]
2462 pub(crate) fn remove_session(&mut self, remote: &NodeAddr) -> Option<SessionEntry> {
2463 self.sessions.remove(remote)
2464 }
2465
2466 #[cfg(test)]
2468 pub(crate) fn path_mtu_lookup_get(&self, fips_addr: &crate::FipsAddress) -> Option<u16> {
2469 self.path_mtu_lookup
2470 .read()
2471 .ok()
2472 .and_then(|map| map.get(fips_addr).copied())
2473 }
2474
2475 #[cfg(test)]
2477 pub(crate) fn path_mtu_lookup_insert(&self, fips_addr: crate::FipsAddress, mtu: u16) {
2478 if let Ok(mut map) = self.path_mtu_lookup.write() {
2479 map.insert(fips_addr, mtu);
2480 }
2481 }
2482
2483 pub fn session_count(&self) -> usize {
2485 self.sessions.len()
2486 }
2487
2488 pub(crate) fn session_entries(&self) -> impl Iterator<Item = (&NodeAddr, &SessionEntry)> {
2490 self.sessions.iter()
2491 }
2492
2493 pub(crate) fn register_identity(
2497 &mut self,
2498 node_addr: NodeAddr,
2499 pubkey: secp256k1::PublicKey,
2500 ) -> bool {
2501 let mut prefix = [0u8; 15];
2502 prefix.copy_from_slice(&node_addr.as_bytes()[0..15]);
2503 if let Some(entry) = self.identity_cache.get(&prefix)
2504 && entry.node_addr == node_addr
2505 && entry.pubkey == pubkey
2506 {
2507 return true;
2511 }
2512
2513 let (xonly, _) = pubkey.x_only_public_key();
2514 let derived_node_addr = NodeAddr::from_pubkey(&xonly);
2515 if derived_node_addr != node_addr {
2516 debug!(
2517 claimed_node_addr = %node_addr,
2518 derived_node_addr = %derived_node_addr,
2519 "Rejected identity cache entry with mismatched public key"
2520 );
2521 return false;
2522 }
2523
2524 let now_ms = Self::now_ms();
2525 if let Some(entry) = self.identity_cache.get_mut(&prefix)
2526 && entry.node_addr == node_addr
2527 {
2528 entry.pubkey = pubkey;
2529 entry.last_seen_ms = now_ms;
2530 return true;
2531 }
2532
2533 let npub = encode_npub(&xonly);
2534 self.identity_cache.insert(
2535 prefix,
2536 IdentityCacheEntry::new(node_addr, pubkey, npub, now_ms),
2537 );
2538 let max = self.config.node.cache.identity_size;
2540 if self.identity_cache.len() > max
2541 && let Some(oldest_key) = self
2542 .identity_cache
2543 .iter()
2544 .min_by_key(|(_, entry)| entry.last_seen_ms)
2545 .map(|(k, _)| *k)
2546 {
2547 self.identity_cache.remove(&oldest_key);
2548 }
2549 true
2550 }
2551
2552 pub(crate) fn lookup_by_fips_prefix(
2554 &mut self,
2555 prefix: &[u8; 15],
2556 ) -> Option<(NodeAddr, secp256k1::PublicKey)> {
2557 if let Some(entry) = self.identity_cache.get_mut(prefix) {
2558 entry.last_seen_ms = Self::now_ms(); Some((entry.node_addr, entry.pubkey))
2560 } else {
2561 None
2562 }
2563 }
2564
2565 pub(crate) fn has_cached_identity(&self, addr: &NodeAddr) -> bool {
2567 let mut prefix = [0u8; 15];
2568 prefix.copy_from_slice(&addr.as_bytes()[0..15]);
2569 self.identity_cache.contains_key(&prefix)
2570 }
2571
2572 pub fn identity_cache_len(&self) -> usize {
2574 self.identity_cache.len()
2575 }
2576
2577 pub fn identity_cache_iter(
2582 &self,
2583 ) -> impl Iterator<Item = (&NodeAddr, &secp256k1::PublicKey, u64)> {
2584 self.identity_cache
2585 .values()
2586 .map(|entry| (&entry.node_addr, &entry.pubkey, entry.last_seen_ms))
2587 }
2588
2589 pub fn identity_cache_max(&self) -> usize {
2591 self.config.node.cache.identity_size
2592 }
2593
2594 pub fn pending_lookup_count(&self) -> usize {
2596 self.pending_lookups.len()
2597 }
2598
2599 pub fn pending_lookups_iter(
2601 &self,
2602 ) -> impl Iterator<Item = (&NodeAddr, &handlers::discovery::PendingLookup)> {
2603 self.pending_lookups.iter()
2604 }
2605
2606 pub fn recent_request_count(&self) -> usize {
2608 self.recent_requests.len()
2609 }
2610
2611 pub fn pending_tun_destinations(&self) -> usize {
2613 self.pending_tun_packets.len()
2614 }
2615
2616 pub fn pending_tun_total_packets(&self) -> usize {
2618 self.pending_tun_packets.values().map(|q| q.len()).sum()
2619 }
2620
2621 pub fn retry_state_iter(&self) -> impl Iterator<Item = (&NodeAddr, &retry::RetryState)> {
2623 self.retry_pending.iter()
2624 }
2625
2626 pub(crate) fn is_tree_peer(&self, peer_addr: &NodeAddr) -> bool {
2633 if !self.tree_state.is_root() && self.tree_state.my_declaration().parent_id() == peer_addr {
2635 return true;
2636 }
2637 if let Some(decl) = self.tree_state.peer_declaration(peer_addr)
2639 && decl.parent_id() == self.node_addr()
2640 {
2641 return true;
2642 }
2643 false
2644 }
2645
2646 pub fn find_next_hop(&mut self, dest_node_addr: &NodeAddr) -> Option<&ActivePeer> {
2670 if dest_node_addr == self.node_addr() {
2672 return None;
2673 }
2674 let now_ms = Self::now_ms();
2675 let direct_session_degraded =
2676 self.session_direct_path_blocks_direct_payload(dest_node_addr, now_ms);
2677
2678 let healthy_direct_route = self
2679 .peers
2680 .get(dest_node_addr)
2681 .filter(|peer| peer.is_healthy() && !direct_session_degraded)
2682 .map(|_| *dest_node_addr);
2683 if let Some(direct_addr) = healthy_direct_route
2684 && self
2685 .peers
2686 .get(&direct_addr)
2687 .is_some_and(|peer| peer.link_cost() <= 1.0 + ROUTING_FALLBACK_MIN_COST_ADVANTAGE)
2688 {
2689 return self.peers.get(&direct_addr);
2690 }
2691 let direct_payload_eligible = healthy_direct_route.is_some();
2692 let payload_candidate_can_send = |addr: &NodeAddr, peer: &ActivePeer| {
2693 if addr == dest_node_addr {
2694 direct_payload_eligible
2695 } else {
2696 peer.is_healthy()
2697 }
2698 };
2699
2700 let fallback_beats_direct = |node: &Self, fallback_addr: NodeAddr| {
2705 node.route_candidate_beats_direct(healthy_direct_route, fallback_addr)
2706 };
2707
2708 let sendable_learned_peers = if self.config.node.routing.mode == RoutingMode::ReplyLearned {
2709 Some(
2710 self.peers
2711 .iter()
2712 .filter(|(addr, peer)| payload_candidate_can_send(addr, peer))
2713 .map(|(addr, _)| *addr)
2714 .collect::<HashSet<_>>(),
2715 )
2716 } else {
2717 None
2718 };
2719
2720 let explore_fallback = sendable_learned_peers.as_ref().is_some_and(|sendable| {
2727 self.learned_routes.should_explore_fallback(
2728 dest_node_addr,
2729 now_ms,
2730 self.config.node.routing.learned_fallback_explore_interval,
2731 |addr| sendable.contains(addr),
2732 )
2733 });
2734 if let Some(sendable) = &sendable_learned_peers
2735 && !explore_fallback
2736 {
2737 let eligible = sendable
2738 .iter()
2739 .copied()
2740 .filter(|addr| fallback_beats_direct(self, *addr))
2741 .collect::<HashSet<_>>();
2742 if !eligible.is_empty()
2743 && let Some(next_hop_addr) =
2744 self.learned_routes
2745 .select_next_hop(dest_node_addr, now_ms, |addr| eligible.contains(addr))
2746 {
2747 return self.peers.get(&next_hop_addr);
2748 }
2749 }
2750
2751 let Some(dest_coords) = self
2753 .coord_cache
2754 .get_and_touch(dest_node_addr, now_ms)
2755 .cloned()
2756 else {
2757 if (healthy_direct_route.is_none() || explore_fallback)
2758 && let Some(sendable) = &sendable_learned_peers
2759 && let Some(next_hop_addr) =
2760 self.learned_routes
2761 .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2762 {
2763 return self.peers.get(&next_hop_addr);
2764 }
2765 if let Some(direct_addr) = healthy_direct_route {
2766 return self.peers.get(&direct_addr);
2767 }
2768 return None;
2769 };
2770
2771 let coordinate_route_addr = {
2774 let candidates: Vec<&ActivePeer> = self
2775 .peers
2776 .iter()
2777 .filter(|(addr, peer)| {
2778 payload_candidate_can_send(addr, peer) && peer.may_reach(dest_node_addr)
2779 })
2780 .map(|(_, peer)| peer)
2781 .collect();
2782 if !candidates.is_empty() {
2783 self.select_best_candidate(&candidates, &dest_coords)
2784 .map(|peer| *peer.node_addr())
2785 } else {
2786 None
2787 }
2788 };
2789 if let Some(next_hop_addr) = coordinate_route_addr
2790 && fallback_beats_direct(self, next_hop_addr)
2791 {
2792 return self.peers.get(&next_hop_addr);
2793 }
2794
2795 let tree_route_addr = self.select_tree_payload_candidate(
2797 &dest_coords,
2798 dest_node_addr,
2799 direct_payload_eligible,
2800 );
2801 if let Some(next_hop_addr) = tree_route_addr
2802 && fallback_beats_direct(self, next_hop_addr)
2803 {
2804 return self.peers.get(&next_hop_addr);
2805 }
2806
2807 if explore_fallback {
2808 return sendable_learned_peers.as_ref().and_then(|sendable| {
2809 self.learned_routes
2810 .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2811 .and_then(|next_hop_addr| self.peers.get(&next_hop_addr))
2812 });
2813 }
2814
2815 if let Some(direct_addr) = healthy_direct_route {
2816 return self.peers.get(&direct_addr);
2817 }
2818
2819 if let Some(sendable) = &sendable_learned_peers
2820 && let Some(next_hop_addr) =
2821 self.learned_routes
2822 .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2823 {
2824 return self.peers.get(&next_hop_addr);
2825 }
2826
2827 None
2828 }
2829
2830 pub(in crate::node) fn find_transit_next_hop(
2831 &mut self,
2832 dest_node_addr: &NodeAddr,
2833 previous_hop: &NodeAddr,
2834 ) -> Option<NodeAddr> {
2835 if dest_node_addr == self.node_addr() {
2836 return None;
2837 }
2838
2839 if dest_node_addr != previous_hop
2840 && self
2841 .peers
2842 .get(dest_node_addr)
2843 .is_some_and(|peer| peer.is_healthy())
2844 {
2845 return Some(*dest_node_addr);
2846 }
2847
2848 let next_hop_addr = *self.find_next_hop(dest_node_addr)?.node_addr();
2849 if &next_hop_addr == previous_hop {
2850 self.record_route_failure(*dest_node_addr, next_hop_addr);
2851 return None;
2852 }
2853 Some(next_hop_addr)
2854 }
2855
2856 fn route_candidate_beats_direct(
2857 &self,
2858 healthy_direct_route: Option<NodeAddr>,
2859 candidate_addr: NodeAddr,
2860 ) -> bool {
2861 let Some(direct_addr) = healthy_direct_route else {
2862 return true;
2863 };
2864 if candidate_addr == direct_addr {
2865 return false;
2866 }
2867
2868 let Some(direct) = self.peers.get(&direct_addr) else {
2869 return true;
2870 };
2871 let Some(candidate) = self.peers.get(&candidate_addr) else {
2872 return false;
2873 };
2874 if !candidate.is_healthy() {
2875 return false;
2876 }
2877
2878 let direct_cost = direct.link_cost();
2879 let candidate_cost = candidate.link_cost();
2880 candidate_cost + ROUTING_FALLBACK_MIN_COST_ADVANTAGE < direct_cost
2881 }
2882
2883 fn select_tree_payload_candidate(
2884 &self,
2885 dest_coords: &crate::tree::TreeCoordinate,
2886 direct_dest: &NodeAddr,
2887 direct_payload_eligible: bool,
2888 ) -> Option<NodeAddr> {
2889 if self.tree_state.my_coords().root_id() != dest_coords.root_id() {
2890 return None;
2891 }
2892
2893 let my_distance = self.tree_state.my_coords().distance_to(dest_coords);
2894 let mut best: Option<(NodeAddr, usize)> = None;
2895
2896 for (peer_addr, peer) in &self.peers {
2897 if peer_addr == direct_dest {
2898 if !direct_payload_eligible {
2899 continue;
2900 }
2901 } else if !peer.is_healthy() {
2902 continue;
2903 }
2904
2905 let Some(peer_coords) = self.tree_state.peer_coords(peer_addr) else {
2906 continue;
2907 };
2908 let distance = peer_coords.distance_to(dest_coords);
2909 if distance >= my_distance {
2910 continue;
2911 }
2912
2913 let dominated = match &best {
2914 None => true,
2915 Some((best_id, best_dist)) => {
2916 distance < *best_dist || (distance == *best_dist && peer_addr < best_id)
2917 }
2918 };
2919 if dominated {
2920 best = Some((*peer_addr, distance));
2921 }
2922 }
2923
2924 best.map(|(peer_addr, _)| peer_addr)
2925 }
2926
2927 pub(in crate::node) fn session_direct_path_is_degraded(
2928 &mut self,
2929 dest: &NodeAddr,
2930 now_ms: u64,
2931 ) -> bool {
2932 match self.session_direct_degraded_until_ms.get(dest).copied() {
2933 Some(until_ms) if until_ms > now_ms => true,
2934 Some(_) => {
2935 self.session_direct_degraded_until_ms.remove(dest);
2936 false
2937 }
2938 None => false,
2939 }
2940 }
2941
2942 pub(in crate::node) fn session_direct_path_blocks_direct_payload(
2943 &mut self,
2944 dest: &NodeAddr,
2945 now_ms: u64,
2946 ) -> bool {
2947 self.session_direct_path_is_degraded(dest, now_ms)
2948 && !self.active_peer_uses_configured_static_udp_path(dest)
2949 }
2950
2951 pub(in crate::node) fn mark_session_direct_path_degraded(
2952 &mut self,
2953 dest: NodeAddr,
2954 now_ms: u64,
2955 ) -> bool {
2956 let until_ms = now_ms.saturating_add(SESSION_DIRECT_DEGRADED_HOLD_MS);
2957 let entry = self
2958 .session_direct_degraded_until_ms
2959 .entry(dest)
2960 .or_insert(0);
2961 let was_degraded = *entry > now_ms;
2962 *entry = (*entry).max(until_ms);
2963 !was_degraded
2964 }
2965
2966 pub(in crate::node) fn clear_session_direct_path_degraded(&mut self, dest: &NodeAddr) -> bool {
2967 self.session_direct_degraded_until_ms.remove(dest).is_some()
2968 }
2969
2970 pub(in crate::node) fn learn_reverse_route(
2971 &mut self,
2972 destination: NodeAddr,
2973 next_hop: NodeAddr,
2974 ) {
2975 if self.config.node.routing.mode != RoutingMode::ReplyLearned
2976 || destination == *self.node_addr()
2977 {
2978 return;
2979 }
2980 let now_ms = Self::now_ms();
2981 self.learned_routes.learn(
2982 destination,
2983 next_hop,
2984 now_ms,
2985 self.config.node.routing.learned_ttl_secs,
2986 self.config.node.routing.max_learned_routes_per_dest,
2987 );
2988 }
2989
2990 pub(in crate::node) fn record_route_failure(
2991 &mut self,
2992 destination: NodeAddr,
2993 next_hop: NodeAddr,
2994 ) {
2995 if self.config.node.routing.mode != RoutingMode::ReplyLearned {
2996 return;
2997 }
2998 self.learned_routes.record_failure(&destination, &next_hop);
2999 }
3000
3001 pub(crate) fn learned_route_table_snapshot(&self, now_ms: u64) -> LearnedRouteTableSnapshot {
3002 self.learned_routes.snapshot(now_ms)
3003 }
3004
3005 pub(in crate::node) fn purge_learned_routes(&mut self, now_ms: u64) {
3006 self.learned_routes.purge_expired(now_ms);
3007 }
3008
3009 fn select_best_candidate<'a>(
3018 &'a self,
3019 candidates: &[&'a ActivePeer],
3020 dest_coords: &crate::tree::TreeCoordinate,
3021 ) -> Option<&'a ActivePeer> {
3022 let my_distance = self.tree_state.my_coords().distance_to(dest_coords);
3023
3024 let mut best: Option<(&ActivePeer, f64, usize)> = None;
3025
3026 for &candidate in candidates {
3027 if !candidate.can_send() {
3028 continue;
3029 }
3030
3031 let cost = candidate.link_cost();
3032
3033 let dist = self
3034 .tree_state
3035 .peer_coords(candidate.node_addr())
3036 .map(|pc| pc.distance_to(dest_coords))
3037 .unwrap_or(usize::MAX);
3038
3039 if dist >= my_distance {
3042 continue;
3043 }
3044
3045 let dominated = match &best {
3046 None => true,
3047 Some((_, best_cost, best_dist)) => {
3048 cost < *best_cost
3049 || (cost == *best_cost && dist < *best_dist)
3050 || (cost == *best_cost
3051 && dist == *best_dist
3052 && candidate.node_addr() < best.as_ref().unwrap().0.node_addr())
3053 }
3054 };
3055
3056 if dominated {
3057 best = Some((candidate, cost, dist));
3058 }
3059 }
3060
3061 best.map(|(peer, _, _)| peer)
3062 }
3063
3064 pub fn destination_in_filters(&self, dest: &NodeAddr) -> Vec<&ActivePeer> {
3066 self.peers.values().filter(|p| p.may_reach(dest)).collect()
3067 }
3068
3069 pub fn tun_tx(&self) -> Option<&TunTx> {
3073 self.tun_tx.as_ref()
3074 }
3075
3076 pub fn attach_external_packet_io(
3083 &mut self,
3084 capacity: usize,
3085 ) -> Result<ExternalPacketIo, NodeError> {
3086 if self.state != NodeState::Created {
3087 return Err(NodeError::Config(ConfigError::Validation(
3088 "external packet I/O must be attached before node start".to_string(),
3089 )));
3090 }
3091 if self.config.tun.enabled {
3092 return Err(NodeError::Config(ConfigError::Validation(
3093 "external packet I/O requires tun.enabled=false".to_string(),
3094 )));
3095 }
3096
3097 let capacity = capacity.max(1);
3098 let (outbound_tx, outbound_rx) = tokio::sync::mpsc::channel(capacity);
3099 let (inbound_tx, inbound_rx) = tokio::sync::mpsc::channel(capacity);
3100 self.tun_outbound_rx = Some(outbound_rx);
3101 self.external_packet_tx = Some(inbound_tx);
3102
3103 Ok(ExternalPacketIo {
3104 outbound_tx,
3105 inbound_rx,
3106 })
3107 }
3108
3109 pub(crate) fn attach_endpoint_data_io(
3114 &mut self,
3115 capacity: usize,
3116 ) -> Result<EndpointDataIo, NodeError> {
3117 if self.state != NodeState::Created {
3118 return Err(NodeError::Config(ConfigError::Validation(
3119 "endpoint data I/O must be attached before node start".to_string(),
3120 )));
3121 }
3122
3123 let command_capacity = endpoint_data_command_capacity(capacity);
3124 let (priority_command_tx, priority_command_rx) =
3125 tokio::sync::mpsc::channel(command_capacity);
3126 let (command_tx, command_rx) = tokio::sync::mpsc::channel(command_capacity);
3127 let (event_tx, event_rx) = tokio::sync::mpsc::unbounded_channel();
3132 self.endpoint_priority_command_rx = Some(priority_command_rx);
3133 self.endpoint_command_rx = Some(command_rx);
3134 self.endpoint_event_tx = Some(event_tx.clone());
3135
3136 Ok(EndpointDataIo {
3137 priority_command_tx,
3138 command_tx,
3139 event_rx,
3140 event_tx,
3141 })
3142 }
3143
3144 pub(crate) fn pubkey_for_node_addr(&self, addr: &NodeAddr) -> Option<secp256k1::PublicKey> {
3145 let mut prefix = [0u8; 15];
3146 prefix.copy_from_slice(&addr.as_bytes()[0..15]);
3147 self.identity_cache
3148 .get(&prefix)
3149 .filter(|entry| &entry.node_addr == addr)
3150 .map(|entry| entry.pubkey)
3151 }
3152
3153 pub(crate) fn npub_for_node_addr(&self, addr: &NodeAddr) -> Option<String> {
3154 let mut prefix = [0u8; 15];
3155 prefix.copy_from_slice(&addr.as_bytes()[0..15]);
3156 self.identity_cache
3157 .get(&prefix)
3158 .filter(|entry| &entry.node_addr == addr)
3159 .map(|entry| entry.npub.clone())
3160 }
3161
3162 pub(in crate::node) fn deliver_external_ipv6_packet(
3163 &self,
3164 src_addr: &NodeAddr,
3165 packet: Vec<u8>,
3166 ) {
3167 let Some(external_packet_tx) = &self.external_packet_tx else {
3168 return;
3169 };
3170 if packet.len() < 40 {
3171 return;
3172 }
3173 let Ok(destination) = FipsAddress::from_slice(&packet[24..40]) else {
3174 return;
3175 };
3176 let delivered = NodeDeliveredPacket {
3177 source_node_addr: *src_addr,
3178 source_npub: self.npub_for_node_addr(src_addr),
3179 destination,
3180 packet,
3181 };
3182 if let Err(error) = external_packet_tx.try_send(delivered) {
3183 debug!(error = %error, "Failed to deliver packet to external app sink");
3184 }
3185 }
3186
3187 pub(super) async fn send_encrypted_link_message(
3201 &mut self,
3202 node_addr: &NodeAddr,
3203 plaintext: &[u8],
3204 ) -> Result<(), NodeError> {
3205 self.send_encrypted_link_message_with_ce(node_addr, plaintext, false)
3206 .await
3207 }
3208
3209 pub(in crate::node) fn note_local_send_outcome(
3215 &mut self,
3216 node_addr: &NodeAddr,
3217 result: &Result<usize, TransportError>,
3218 ) {
3219 match result {
3220 Ok(_) => {
3221 self.local_send_failure_at_by_peer.remove(node_addr);
3222 }
3223 Err(error) if error.is_local_route_unavailable() => {
3224 self.local_send_failure_at_by_peer
3225 .insert(*node_addr, std::time::Instant::now());
3226 }
3227 Err(_) => {}
3228 }
3229 }
3230
3231 pub(in crate::node) fn local_send_failure_dead_timeout_for_peer(
3237 &self,
3238 node_addr: &NodeAddr,
3239 now: std::time::Instant,
3240 dead_timeout: std::time::Duration,
3241 fast_dead_timeout: std::time::Duration,
3242 ) -> std::time::Duration {
3243 match self.local_send_failure_at_by_peer.get(node_addr).copied() {
3244 Some(t) if now.duration_since(t) <= LOCAL_SEND_FAILURE_FAST_DEAD_WINDOW => {
3245 fast_dead_timeout.min(dead_timeout)
3246 }
3247 None => dead_timeout,
3248 Some(_) => dead_timeout,
3249 }
3250 }
3251
3252 pub(in crate::node) fn purge_expired_local_send_failures(&mut self, now: std::time::Instant) {
3253 self.local_send_failure_at_by_peer
3254 .retain(|_, at| now.duration_since(*at) <= LOCAL_SEND_FAILURE_FAST_DEAD_WINDOW);
3255 }
3256
3257 pub(in crate::node) fn mark_rx_loop_maintenance_timeout(&mut self) {
3258 self.last_rx_loop_maintenance_timeout_at = Some(std::time::Instant::now());
3259 }
3260
3261 pub(in crate::node) fn rx_loop_maintenance_timed_out_recently(&self) -> bool {
3262 let Some(t) = self.last_rx_loop_maintenance_timeout_at else {
3263 return false;
3264 };
3265 let grace = std::time::Duration::from_secs(self.config.node.link_dead_timeout_secs.max(1));
3266 std::time::Instant::now().duration_since(t) <= grace
3267 }
3268
3269 pub(super) async fn send_encrypted_link_message_with_ce(
3273 &mut self,
3274 node_addr: &NodeAddr,
3275 plaintext: &[u8],
3276 ce_flag: bool,
3277 ) -> Result<(), NodeError> {
3278 let peer = self
3279 .peers
3280 .get_mut(node_addr)
3281 .ok_or(NodeError::PeerNotFound(*node_addr))?;
3282
3283 let their_index = peer.their_index().ok_or_else(|| NodeError::SendFailed {
3284 node_addr: *node_addr,
3285 reason: "no their_index".into(),
3286 })?;
3287 let transport_id = peer.transport_id().ok_or_else(|| NodeError::SendFailed {
3288 node_addr: *node_addr,
3289 reason: "no transport_id".into(),
3290 })?;
3291 let remote_addr = peer
3292 .current_addr()
3293 .cloned()
3294 .ok_or_else(|| NodeError::SendFailed {
3295 node_addr: *node_addr,
3296 reason: "no current_addr".into(),
3297 })?;
3298 #[cfg(any(target_os = "linux", target_os = "macos"))]
3299 let connected_socket = peer.connected_udp();
3300
3301 let timestamp_ms = peer.session_elapsed_ms();
3303
3304 let sp_flag = peer.mmp().map(|mmp| mmp.spin_bit.tx_bit()).unwrap_or(false);
3306 let mut flags = if sp_flag { FLAG_SP } else { 0 };
3307 if ce_flag {
3308 flags |= FLAG_CE;
3309 }
3310 if peer.current_k_bit() {
3311 flags |= FLAG_KEY_EPOCH;
3312 }
3313
3314 let session = peer
3315 .noise_session_mut()
3316 .ok_or_else(|| NodeError::SendFailed {
3317 node_addr: *node_addr,
3318 reason: "no noise session".into(),
3319 })?;
3320
3321 const INNER_TS_LEN: usize = 4;
3329 let counter = session.current_send_counter();
3330 let inner_len = INNER_TS_LEN + plaintext.len();
3331 let payload_len = inner_len as u16;
3332 let header = build_established_header(their_index, counter, flags, payload_len);
3333
3334 let transport_for_send = self
3356 .transports
3357 .get(&transport_id)
3358 .ok_or(NodeError::TransportNotFound(transport_id))?;
3359 match transport_for_send.connection_state(&remote_addr) {
3360 ConnectionState::Connected => {}
3361 other => {
3362 if matches!(other, ConnectionState::None) {
3363 let _ = transport_for_send.connect(&remote_addr).await;
3364 }
3365 return Err(NodeError::SendFailed {
3366 node_addr: *node_addr,
3367 reason: format!("transport connection not ready: {:?}", other),
3368 });
3369 }
3370 }
3371 #[cfg(unix)]
3372 {
3373 let is_udp = matches!(transport_for_send, TransportHandle::Udp(_));
3374 if let Some(workers) = self.encrypt_workers.as_ref().cloned()
3375 && is_udp
3376 && let Some(cipher_clone) = session.send_cipher_clone()
3377 {
3378 let reserved_counter =
3382 session
3383 .take_send_counter()
3384 .map_err(|e| NodeError::SendFailed {
3385 node_addr: *node_addr,
3386 reason: format!("counter reservation failed: {}", e),
3387 })?;
3388 debug_assert_eq!(reserved_counter, counter);
3389 let header =
3393 build_established_header(their_index, reserved_counter, flags, payload_len);
3394 let transport = transport_for_send;
3395 let send_target = {
3402 if let TransportHandle::Udp(udp) = transport {
3403 let socket_addr = {
3404 #[cfg(any(target_os = "linux", target_os = "macos"))]
3405 {
3406 match connected_socket.as_ref() {
3407 Some(socket) => Some(socket.peer_addr()),
3408 None => udp.resolve_for_off_task(&remote_addr).await.ok(),
3409 }
3410 }
3411 #[cfg(not(any(target_os = "linux", target_os = "macos")))]
3412 {
3413 udp.resolve_for_off_task(&remote_addr).await.ok()
3414 }
3415 };
3416 match (udp.async_socket(), socket_addr) {
3417 (Some(socket), Some(socket_addr)) => Some((socket, socket_addr)),
3418 _ => None,
3419 }
3420 } else {
3421 None
3422 }
3423 };
3424 if let Some((socket, socket_addr)) = send_target {
3425 let wire_capacity = ESTABLISHED_HEADER_SIZE + inner_len + 16;
3441 let mut wire_buf = Vec::with_capacity(wire_capacity);
3442 wire_buf.extend_from_slice(&header);
3443 wire_buf.extend_from_slice(×tamp_ms.to_le_bytes());
3444 wire_buf.extend_from_slice(plaintext);
3445 let predicted_bytes = wire_capacity;
3446 if let Some(peer) = self.peers.get_mut(node_addr) {
3453 peer.link_stats_mut().record_sent(predicted_bytes);
3454 if let Some(mmp) = peer.mmp_mut() {
3455 mmp.sender
3456 .record_sent(reserved_counter, timestamp_ms, predicted_bytes);
3457 }
3458 }
3459 let scheduling_weight = self.send_weight_for_peer(node_addr);
3460 let traffic_class = classify_fmp_plaintext_traffic(plaintext);
3461 workers.dispatch(self::encrypt_worker::FmpSendJob {
3462 cipher: cipher_clone,
3463 counter: reserved_counter,
3464 wire_buf,
3465 fsp_seal: None,
3466 socket,
3467 dest_addr: socket_addr,
3468 #[cfg(any(target_os = "linux", target_os = "macos"))]
3469 connected_socket,
3470 bulk_endpoint_data: traffic_class.bulk_endpoint_data,
3471 drop_on_backpressure: traffic_class.drop_on_backpressure,
3472 scheduling_weight,
3473 queued_at: crate::perf_profile::stamp(),
3474 });
3475 return Ok(());
3476 }
3477 }
3478 }
3479
3480 let inner_plaintext = prepend_inner_header(timestamp_ms, plaintext);
3485 let ciphertext = {
3487 let _t = crate::perf_profile::Timer::start(crate::perf_profile::Stage::FmpEncrypt);
3488 session
3489 .encrypt_with_aad(&inner_plaintext, &header)
3490 .map_err(|e| NodeError::SendFailed {
3491 node_addr: *node_addr,
3492 reason: format!("encryption failed: {}", e),
3493 })?
3494 };
3495
3496 let wire_packet = build_encrypted(&header, &ciphertext);
3497
3498 let send_result = {
3500 let _t = crate::perf_profile::Timer::start(crate::perf_profile::Stage::UdpSend);
3501 let transport = self
3502 .transports
3503 .get(&transport_id)
3504 .ok_or(NodeError::TransportNotFound(transport_id))?;
3505 transport.send(&remote_addr, &wire_packet).await
3506 };
3507 self.note_local_send_outcome(node_addr, &send_result);
3508 let bytes_sent = send_result.map_err(|e| match e {
3509 TransportError::MtuExceeded { packet_size, mtu } => NodeError::MtuExceeded {
3510 node_addr: *node_addr,
3511 packet_size,
3512 mtu,
3513 },
3514 other => NodeError::SendFailed {
3515 node_addr: *node_addr,
3516 reason: format!("transport send: {}", other),
3517 },
3518 })?;
3519
3520 if let Some(peer) = self.peers.get_mut(node_addr) {
3522 peer.link_stats_mut().record_sent(bytes_sent);
3523 if let Some(mmp) = peer.mmp_mut() {
3525 mmp.sender.record_sent(counter, timestamp_ms, bytes_sent);
3526 }
3527 }
3528
3529 Ok(())
3530 }
3531}
3532
3533impl fmt::Debug for Node {
3534 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3535 f.debug_struct("Node")
3536 .field("node_addr", self.node_addr())
3537 .field("state", &self.state)
3538 .field("is_leaf_only", &self.is_leaf_only)
3539 .field("connections", &self.connection_count())
3540 .field("peers", &self.peer_count())
3541 .field("links", &self.link_count())
3542 .field("transports", &self.transport_count())
3543 .finish()
3544 }
3545}