1mod acl;
8mod bloom;
9mod decrypt_worker;
10mod discovery_rate_limit;
11mod encrypt_worker;
12mod handlers;
13mod lifecycle;
14mod rate_limit;
15mod retry;
16mod routing;
17mod routing_error_rate_limit;
18pub(crate) mod session;
19pub(crate) mod session_wire;
20pub(crate) mod stats;
21pub(crate) mod stats_history;
22#[cfg(test)]
23mod tests;
24mod tree;
25pub(crate) mod wire;
26
27use self::discovery_rate_limit::{DiscoveryBackoff, DiscoveryForwardRateLimiter};
28use self::rate_limit::HandshakeRateLimiter;
29use self::routing::{LearnedRouteTable, LearnedRouteTableSnapshot};
30use self::routing_error_rate_limit::RoutingErrorRateLimiter;
31#[cfg(unix)]
32use self::wire::ESTABLISHED_HEADER_SIZE;
33use self::wire::{
34 FLAG_CE, FLAG_KEY_EPOCH, FLAG_SP, build_encrypted, build_established_header,
35 prepend_inner_header,
36};
37use crate::bloom::BloomState;
38use crate::cache::CoordCache;
39use crate::config::{NostrDiscoveryPolicy, PeerConfig, RoutingMode};
40use crate::node::session::SessionEntry;
41use crate::node::session_wire::{FSP_PHASE_ESTABLISHED, FspCommonPrefix};
42use crate::peer::{ActivePeer, PeerConnection};
43#[cfg(any(target_os = "linux", target_os = "macos"))]
44use crate::transport::ethernet::EthernetTransport;
45use crate::transport::tcp::TcpTransport;
46use crate::transport::tor::TorTransport;
47use crate::transport::udp::UdpTransport;
48#[cfg(feature = "webrtc-transport")]
49use crate::transport::webrtc::WebRtcTransport;
50use crate::transport::{
51 ConnectionState, Link, LinkId, PacketRx, PacketTx, TransportAddr, TransportError,
52 TransportHandle, TransportId,
53};
54use crate::tree::TreeState;
55use crate::upper::hosts::HostMap;
56use crate::upper::icmp_rate_limit::IcmpRateLimiter;
57use crate::upper::tun::{TunError, TunOutboundRx, TunState, TunTx};
58use crate::utils::index::IndexAllocator;
59use crate::{
60 Config, ConfigError, FipsAddress, Identity, IdentityError, LinkMessageType, NodeAddr,
61 PeerIdentity, encode_npub,
62};
63use rand::Rng;
64use std::collections::{HashMap, HashSet, VecDeque};
65use std::fmt;
66use std::sync::Arc;
67use std::thread::JoinHandle;
68use thiserror::Error;
69use tracing::{debug, warn};
70
71const LOCAL_SEND_FAILURE_FAST_DEAD_WINDOW: std::time::Duration = std::time::Duration::from_secs(3);
72const SESSION_DIRECT_DEGRADED_HOLD_MS: u64 = 20_000;
73const SESSION_DIRECT_DEGRADED_MIN_SAMPLE: u64 = 16;
74const SESSION_DIRECT_DEGRADED_LOSS_THRESHOLD: f64 = 0.08;
75const SESSION_DIRECT_RECOVERY_LOSS_THRESHOLD: f64 = 0.02;
76const ROUTING_FALLBACK_MIN_COST_ADVANTAGE: f64 = 0.25;
77
78#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
79struct FmpPlaintextTrafficClass {
80 bulk_endpoint_data: bool,
81 drop_on_backpressure: bool,
82}
83
84#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
85struct EndpointPayloadTrafficClass {
86 bulk_endpoint_data: bool,
87 drop_on_backpressure: bool,
88}
89
90fn classify_fmp_plaintext_traffic(plaintext: &[u8]) -> FmpPlaintextTrafficClass {
91 let bulk_endpoint_data = fmp_plaintext_is_bulk_session_datagram(plaintext);
92 FmpPlaintextTrafficClass {
97 bulk_endpoint_data,
98 drop_on_backpressure: false,
99 }
100}
101
102fn fmp_plaintext_is_bulk_session_datagram(plaintext: &[u8]) -> bool {
103 if plaintext
104 .first()
105 .is_none_or(|ty| *ty != LinkMessageType::SessionDatagram.to_byte())
106 {
107 return false;
108 }
109 let Some(fsp_payload) = plaintext.get(crate::protocol::SESSION_DATAGRAM_HEADER_SIZE..) else {
110 return false;
111 };
112 FspCommonPrefix::parse(fsp_payload).is_some_and(|prefix| {
113 prefix.phase == FSP_PHASE_ESTABLISHED && !prefix.is_unencrypted() && !prefix.has_coords()
114 })
115}
116
117fn classify_endpoint_payload(payload: &[u8]) -> EndpointPayloadTrafficClass {
118 const IPPROTO_TCP: u8 = 6;
119 const IPPROTO_ICMPV6: u8 = 58;
120
121 match parse_endpoint_payload_ip_proto(payload) {
122 Some((IPPROTO_ICMPV6, _)) => EndpointPayloadTrafficClass::default(),
123 Some((IPPROTO_TCP, offset)) => {
124 let latency_sensitive = endpoint_tcp_payload_is_latency_sensitive(payload, offset);
125 EndpointPayloadTrafficClass {
126 bulk_endpoint_data: !latency_sensitive,
127 drop_on_backpressure: false,
128 }
129 }
130 _ => EndpointPayloadTrafficClass {
131 bulk_endpoint_data: true,
132 drop_on_backpressure: true,
133 },
134 }
135}
136
137fn endpoint_tcp_payload_is_latency_sensitive(payload: &[u8], tcp_offset: usize) -> bool {
138 const TCP_MIN_HEADER_LEN: usize = 20;
139 const TCP_FLAG_FIN: u8 = 0x01;
140 const TCP_FLAG_SYN: u8 = 0x02;
141 const TCP_FLAG_RST: u8 = 0x04;
142 const INTERACTIVE_TCP_PAYLOAD_MAX: usize = 256;
143
144 if payload.len() < tcp_offset + TCP_MIN_HEADER_LEN {
145 return true;
146 }
147
148 let tcp_header_len = usize::from(payload[tcp_offset + 12] >> 4) * 4;
149 if tcp_header_len < TCP_MIN_HEADER_LEN || payload.len() < tcp_offset + tcp_header_len {
150 return true;
151 }
152
153 let flags = payload[tcp_offset + 13];
154 if flags & (TCP_FLAG_FIN | TCP_FLAG_SYN | TCP_FLAG_RST) != 0 {
155 return true;
156 }
157
158 let payload_len = endpoint_ip_payload_len(payload)
159 .and_then(|ip_payload_len| ip_payload_len.checked_sub(tcp_header_len))
160 .unwrap_or_else(|| payload.len().saturating_sub(tcp_offset + tcp_header_len));
161 payload_len <= INTERACTIVE_TCP_PAYLOAD_MAX
162}
163
164fn endpoint_ip_payload_len(payload: &[u8]) -> Option<usize> {
165 const IPV4_MIN_HEADER_LEN: usize = 20;
166 const IPV6_HEADER_LEN: usize = 40;
167
168 let version_ihl = payload.first().copied()?;
169 match version_ihl >> 4 {
170 4 => {
171 if payload.len() < IPV4_MIN_HEADER_LEN {
172 return None;
173 }
174 let header_len = usize::from(version_ihl & 0x0f) * 4;
175 if header_len < IPV4_MIN_HEADER_LEN || payload.len() < header_len {
176 return None;
177 }
178 let total_len = usize::from(u16::from_be_bytes([payload[2], payload[3]]));
179 total_len.checked_sub(header_len)
180 }
181 6 => {
182 if payload.len() < IPV6_HEADER_LEN {
183 return None;
184 }
185 Some(usize::from(u16::from_be_bytes([payload[4], payload[5]])))
186 }
187 _ => None,
188 }
189}
190
191fn parse_endpoint_payload_ip_proto(payload: &[u8]) -> Option<(u8, usize)> {
192 const IPV4_MIN_HEADER_LEN: usize = 20;
193
194 let version_ihl = payload.first().copied()?;
195
196 match version_ihl >> 4 {
197 4 => {
198 if payload.len() < IPV4_MIN_HEADER_LEN {
199 return None;
200 }
201 let header_len = usize::from(version_ihl & 0x0f) * 4;
202 if header_len >= IPV4_MIN_HEADER_LEN && payload.len() >= header_len {
203 Some((payload[9], header_len))
204 } else {
205 None
206 }
207 }
208 6 => ipv6_payload_next_header(payload),
209 _ => None,
210 }
211}
212
213#[cfg(test)]
214fn endpoint_payload_is_tcp(payload: &[u8]) -> bool {
215 const IPPROTO_TCP: u8 = 6;
216 parse_endpoint_payload_ip_proto(payload).is_some_and(|(proto, _)| proto == IPPROTO_TCP)
217}
218
219fn ipv6_payload_next_header(payload: &[u8]) -> Option<(u8, usize)> {
220 const IPV6_HEADER_LEN: usize = 40;
221 const IPV6_FRAGMENT_HEADER_LEN: usize = 8;
222
223 if payload.len() < IPV6_HEADER_LEN || payload[0] >> 4 != 6 {
224 return None;
225 }
226
227 let mut next_header = payload[6];
228 let mut offset = IPV6_HEADER_LEN;
229 let mut extension_count = 0usize;
230 while ipv6_extension_header_is_skippable(next_header) {
231 if next_header == 44 {
232 if payload.len() < offset + IPV6_FRAGMENT_HEADER_LEN {
233 return None;
234 }
235 next_header = payload[offset];
236 offset += IPV6_FRAGMENT_HEADER_LEN;
237 } else if next_header == 51 {
238 if payload.len() < offset + 2 {
239 return None;
240 }
241 let header_len = (usize::from(payload[offset + 1]) + 2) * 4;
242 if payload.len() < offset + header_len {
243 return None;
244 }
245 next_header = payload[offset];
246 offset += header_len;
247 } else {
248 if payload.len() < offset + 2 {
249 return None;
250 }
251 let header_len = (usize::from(payload[offset + 1]) + 1) * 8;
252 if payload.len() < offset + header_len {
253 return None;
254 }
255 next_header = payload[offset];
256 offset += header_len;
257 }
258 extension_count += 1;
259 if extension_count > 8 {
260 return None;
261 }
262 }
263
264 Some((next_header, offset))
265}
266
267fn ipv6_extension_header_is_skippable(next_header: u8) -> bool {
268 matches!(next_header, 0 | 43 | 44 | 51 | 60 | 135)
269}
270
271pub(crate) const REKEY_JITTER_SECS: i64 = 15;
278
279#[derive(Debug, Error)]
281pub enum NodeError {
282 #[error("node not started")]
283 NotStarted,
284
285 #[error("node already started")]
286 AlreadyStarted,
287
288 #[error("node already stopped")]
289 AlreadyStopped,
290
291 #[error("transport not found: {0}")]
292 TransportNotFound(TransportId),
293
294 #[error("no transport available for type: {0}")]
295 NoTransportForType(String),
296
297 #[error("link not found: {0}")]
298 LinkNotFound(LinkId),
299
300 #[error("connection not found: {0}")]
301 ConnectionNotFound(LinkId),
302
303 #[error("peer not found: {0:?}")]
304 PeerNotFound(NodeAddr),
305
306 #[error("peer already exists: {0:?}")]
307 PeerAlreadyExists(NodeAddr),
308
309 #[error("connection already exists for link: {0}")]
310 ConnectionAlreadyExists(LinkId),
311
312 #[error("invalid peer npub '{npub}': {reason}")]
313 InvalidPeerNpub { npub: String, reason: String },
314
315 #[error("discovery error: {0}")]
316 Discovery(String),
317
318 #[error("access denied: {0}")]
319 AccessDenied(String),
320
321 #[error("max connections exceeded: {max}")]
322 MaxConnectionsExceeded { max: usize },
323
324 #[error("max peers exceeded: {max}")]
325 MaxPeersExceeded { max: usize },
326
327 #[error("max links exceeded: {max}")]
328 MaxLinksExceeded { max: usize },
329
330 #[error("handshake incomplete for link {0}")]
331 HandshakeIncomplete(LinkId),
332
333 #[error("no session available for link {0}")]
334 NoSession(LinkId),
335
336 #[error("promotion failed for link {link_id}: {reason}")]
337 PromotionFailed { link_id: LinkId, reason: String },
338
339 #[error("send failed to {node_addr}: {reason}")]
340 SendFailed { node_addr: NodeAddr, reason: String },
341
342 #[error("mtu exceeded forwarding to {node_addr}: packet {packet_size} > mtu {mtu}")]
343 MtuExceeded {
344 node_addr: NodeAddr,
345 packet_size: usize,
346 mtu: u16,
347 },
348
349 #[error("config error: {0}")]
350 Config(#[from] ConfigError),
351
352 #[error("identity error: {0}")]
353 Identity(#[from] IdentityError),
354
355 #[error("TUN error: {0}")]
356 Tun(#[from] TunError),
357
358 #[error("index allocation failed: {0}")]
359 IndexAllocationFailed(String),
360
361 #[error("handshake failed: {0}")]
362 HandshakeFailed(String),
363
364 #[error("transport error: {0}")]
365 TransportError(String),
366
367 #[error("local route unavailable: {0}")]
368 LocalRouteUnavailable(String),
369
370 #[error("bootstrap handoff failed: {0}")]
371 BootstrapHandoff(String),
372}
373
374impl NodeError {
375 pub(in crate::node) fn from_transport_error(error: TransportError) -> Self {
376 if error.is_local_route_unavailable() {
377 Self::LocalRouteUnavailable(error.to_string())
378 } else {
379 Self::TransportError(error.to_string())
380 }
381 }
382
383 pub(in crate::node) fn is_local_route_unavailable(&self) -> bool {
384 matches!(self, Self::LocalRouteUnavailable(_))
385 }
386}
387
388#[derive(Debug, Clone, PartialEq, Eq)]
390pub struct NodeDeliveredPacket {
391 pub source_node_addr: NodeAddr,
393 pub source_npub: Option<String>,
395 pub destination: FipsAddress,
397 pub packet: Vec<u8>,
399}
400
401#[derive(Debug, Clone)]
402struct IdentityCacheEntry {
403 node_addr: NodeAddr,
404 pubkey: secp256k1::PublicKey,
405 npub: String,
406 last_seen_ms: u64,
407}
408
409impl IdentityCacheEntry {
410 fn new(
411 node_addr: NodeAddr,
412 pubkey: secp256k1::PublicKey,
413 npub: String,
414 last_seen_ms: u64,
415 ) -> Self {
416 Self {
417 node_addr,
418 pubkey,
419 npub,
420 last_seen_ms,
421 }
422 }
423}
424
425#[derive(Debug)]
427pub struct ExternalPacketIo {
428 pub outbound_tx: crate::upper::tun::TunOutboundTx,
430 pub inbound_rx: tokio::sync::mpsc::Receiver<NodeDeliveredPacket>,
432}
433
434#[derive(Debug)]
436pub(crate) struct EndpointDataIo {
437 pub(crate) command_tx: tokio::sync::mpsc::Sender<NodeEndpointCommand>,
446 pub(crate) event_rx: tokio::sync::mpsc::UnboundedReceiver<NodeEndpointEvent>,
456 pub(crate) event_tx: tokio::sync::mpsc::UnboundedSender<NodeEndpointEvent>,
462}
463
464fn endpoint_data_command_capacity(requested: usize) -> usize {
465 if let Ok(raw) = std::env::var("FIPS_ENDPOINT_DATA_QUEUE_CAP")
466 && let Ok(value) = raw.trim().parse::<usize>()
467 && value > 0
468 {
469 return value;
470 }
471
472 requested.max(1).max(32_768)
473}
474
475#[derive(Debug)]
477pub(crate) enum NodeEndpointCommand {
478 Send {
482 remote: PeerIdentity,
483 payload: Vec<u8>,
484 queued_at: Option<std::time::Instant>,
485 response_tx: tokio::sync::oneshot::Sender<Result<(), NodeError>>,
486 },
487 SendOneway {
493 remote: PeerIdentity,
494 payload: Vec<u8>,
495 queued_at: Option<std::time::Instant>,
496 },
497 PeerSnapshot {
498 response_tx: tokio::sync::oneshot::Sender<Vec<NodeEndpointPeer>>,
499 },
500 RelaySnapshot {
501 response_tx: tokio::sync::oneshot::Sender<Vec<NodeEndpointRelayStatus>>,
502 },
503 UpdateRelays {
504 advert_relays: Vec<String>,
505 dm_relays: Vec<String>,
506 response_tx: tokio::sync::oneshot::Sender<Result<(), NodeError>>,
507 },
508 UpdatePeers {
514 peers: Vec<crate::config::PeerConfig>,
515 response_tx: tokio::sync::oneshot::Sender<Result<UpdatePeersOutcome, NodeError>>,
516 },
517}
518
519#[derive(Debug, Clone, Default, PartialEq, Eq)]
521pub(crate) struct UpdatePeersOutcome {
522 pub(crate) added: usize,
523 pub(crate) removed: usize,
524 pub(crate) updated: usize,
525 pub(crate) unchanged: usize,
526}
527
528#[derive(Debug)]
530pub(crate) enum NodeEndpointEvent {
531 Data {
532 source_node_addr: NodeAddr,
533 source_npub: Option<String>,
534 payload: Vec<u8>,
535 queued_at: Option<std::time::Instant>,
536 },
537}
538
539#[derive(Debug, Clone, PartialEq, Eq)]
541pub(crate) struct NodeEndpointPeer {
542 pub(crate) npub: String,
543 pub(crate) connected: bool,
544 pub(crate) transport_addr: Option<String>,
545 pub(crate) transport_type: Option<String>,
546 pub(crate) link_id: u64,
547 pub(crate) srtt_ms: Option<u64>,
548 pub(crate) packets_sent: u64,
549 pub(crate) packets_recv: u64,
550 pub(crate) bytes_sent: u64,
551 pub(crate) bytes_recv: u64,
552 pub(crate) direct_probe_pending: bool,
553 pub(crate) direct_probe_after_ms: Option<u64>,
554}
555
556#[derive(Debug, Clone, PartialEq, Eq)]
558pub(crate) struct NodeEndpointRelayStatus {
559 pub(crate) url: String,
560 pub(crate) status: String,
561}
562
563#[derive(Clone, Copy, Debug, PartialEq, Eq)]
565pub enum NodeState {
566 Created,
568 Starting,
570 Running,
572 Stopping,
574 Stopped,
576}
577
578impl NodeState {
579 pub fn is_operational(&self) -> bool {
581 matches!(self, NodeState::Running)
582 }
583
584 pub fn can_start(&self) -> bool {
586 matches!(self, NodeState::Created | NodeState::Stopped)
587 }
588
589 pub fn can_stop(&self) -> bool {
591 matches!(self, NodeState::Running)
592 }
593}
594
595impl fmt::Display for NodeState {
596 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
597 let s = match self {
598 NodeState::Created => "created",
599 NodeState::Starting => "starting",
600 NodeState::Running => "running",
601 NodeState::Stopping => "stopping",
602 NodeState::Stopped => "stopped",
603 };
604 write!(f, "{}", s)
605 }
606}
607
608#[derive(Clone, Debug)]
615pub(crate) struct RecentRequest {
616 pub(crate) from_peer: NodeAddr,
618 pub(crate) timestamp_ms: u64,
620 pub(crate) response_forwarded: bool,
624}
625
626impl RecentRequest {
627 pub(crate) fn new(from_peer: NodeAddr, timestamp_ms: u64) -> Self {
628 Self {
629 from_peer,
630 timestamp_ms,
631 response_forwarded: false,
632 }
633 }
634
635 pub(crate) fn is_expired(&self, current_time_ms: u64, expiry_ms: u64) -> bool {
637 current_time_ms.saturating_sub(self.timestamp_ms) > expiry_ms
638 }
639}
640
641type AddrKey = (TransportId, TransportAddr);
643
644#[derive(Debug, Default)]
649struct TransportDropState {
650 prev_drops: u64,
652 dropping: bool,
654}
655
656struct PendingConnect {
662 link_id: LinkId,
664 transport_id: TransportId,
666 remote_addr: TransportAddr,
668 peer_identity: PeerIdentity,
670}
671
672pub struct Node {
686 identity: Identity,
689
690 startup_epoch: [u8; 8],
693
694 started_at: std::time::Instant,
696
697 config: Config,
700
701 state: NodeState,
704
705 is_leaf_only: bool,
707
708 tree_state: TreeState,
711
712 bloom_state: BloomState,
715
716 coord_cache: CoordCache,
719 learned_routes: LearnedRouteTable,
721 session_direct_degraded_until_ms: HashMap<NodeAddr, u64>,
724 recent_requests: HashMap<u64, RecentRequest>,
727 path_mtu_lookup: Arc<std::sync::RwLock<HashMap<crate::FipsAddress, u16>>>,
733
734 transports: HashMap<TransportId, TransportHandle>,
737 transport_drops: HashMap<TransportId, TransportDropState>,
739 links: HashMap<LinkId, Link>,
741 addr_to_link: HashMap<AddrKey, LinkId>,
743
744 packet_tx: Option<PacketTx>,
747 packet_rx: Option<PacketRx>,
749
750 connections: HashMap<LinkId, PeerConnection>,
754
755 peers: HashMap<NodeAddr, ActivePeer>,
759
760 sessions: HashMap<NodeAddr, SessionEntry>,
764
765 identity_cache: HashMap<[u8; 15], IdentityCacheEntry>,
769
770 pending_tun_packets: HashMap<NodeAddr, VecDeque<Vec<u8>>>,
774 pending_endpoint_data: HashMap<NodeAddr, VecDeque<Vec<u8>>>,
776 pending_lookups: HashMap<NodeAddr, handlers::discovery::PendingLookup>,
780
781 max_connections: usize,
784 max_peers: usize,
786 max_links: usize,
788
789 next_link_id: u64,
792 next_transport_id: u32,
794
795 stats: stats::NodeStats,
798
799 stats_history: stats_history::StatsHistory,
801
802 tun_state: TunState,
805 tun_name: Option<String>,
807 tun_tx: Option<TunTx>,
809 tun_outbound_rx: Option<TunOutboundRx>,
811 external_packet_tx: Option<tokio::sync::mpsc::Sender<NodeDeliveredPacket>>,
813 endpoint_command_rx: Option<tokio::sync::mpsc::Receiver<NodeEndpointCommand>>,
815 endpoint_event_tx: Option<tokio::sync::mpsc::UnboundedSender<NodeEndpointEvent>>,
817 encrypt_workers: Option<encrypt_worker::EncryptWorkerPool>,
823 decrypt_workers: Option<decrypt_worker::DecryptWorkerPool>,
826 decrypt_registered_sessions: std::collections::HashSet<(TransportId, u32)>,
835 decrypt_fallback_rx:
839 Option<tokio::sync::mpsc::UnboundedReceiver<decrypt_worker::DecryptWorkerEvent>>,
840 decrypt_fallback_tx: tokio::sync::mpsc::UnboundedSender<decrypt_worker::DecryptWorkerEvent>,
841 tun_reader_handle: Option<JoinHandle<()>>,
843 tun_writer_handle: Option<JoinHandle<()>>,
845 #[cfg(target_os = "macos")]
848 tun_shutdown_fd: Option<std::os::unix::io::RawFd>,
849
850 dns_identity_rx: Option<crate::upper::dns::DnsIdentityRx>,
853 dns_task: Option<tokio::task::JoinHandle<()>>,
855
856 index_allocator: IndexAllocator,
859 peers_by_index: HashMap<(TransportId, u32), NodeAddr>,
862 pending_outbound: HashMap<(TransportId, u32), LinkId>,
865
866 msg1_rate_limiter: HandshakeRateLimiter,
869 icmp_rate_limiter: IcmpRateLimiter,
871 routing_error_rate_limiter: RoutingErrorRateLimiter,
873 coords_response_rate_limiter: RoutingErrorRateLimiter,
875 discovery_backoff: DiscoveryBackoff,
877 discovery_forward_limiter: DiscoveryForwardRateLimiter,
879
880 pending_connects: Vec<PendingConnect>,
886
887 retry_pending: HashMap<NodeAddr, retry::RetryState>,
893
894 nostr_discovery: Option<Arc<crate::discovery::nostr::NostrDiscovery>>,
896 lan_discovery: Option<Arc<crate::discovery::lan::LanDiscovery>>,
901 local_instance_registry: Option<crate::discovery::local::LocalInstanceRegistry>,
905 local_instance_started_at_ms: Option<u64>,
906 last_local_instance_publish_ms: Option<u64>,
907 last_local_instance_scan_ms: Option<u64>,
908 nostr_discovery_started_at_ms: Option<u64>,
913 startup_open_discovery_sweep_done: bool,
917 bootstrap_transports: HashSet<TransportId>,
919 bootstrap_transport_npubs: HashMap<TransportId, String>,
926 discovery_fallback_transit_blocked_peers: HashSet<NodeAddr>,
929
930 last_parent_reeval: Option<crate::time::Instant>,
933
934 last_congestion_log: Option<std::time::Instant>,
937
938 estimated_mesh_size: Option<u64>,
941 last_mesh_size_log: Option<std::time::Instant>,
943
944 last_self_warn: Option<std::time::Instant>,
950
951 local_send_failure_at_by_peer: HashMap<NodeAddr, std::time::Instant>,
958 last_rx_loop_maintenance_timeout_at: Option<std::time::Instant>,
963
964 peer_aliases: HashMap<NodeAddr, String>,
968 configured_peer_send_weights: HashMap<NodeAddr, u8>,
971
972 peer_acl: acl::PeerAclReloader,
974
975 host_map: Arc<HostMap>,
979}
980
981impl Node {
982 pub fn new(config: Config) -> Result<Self, NodeError> {
984 config.validate()?;
985 let identity = config.create_identity()?;
986 let node_addr = *identity.node_addr();
987 let is_leaf_only = config.is_leaf_only();
988
989 let (decrypt_fallback_tx, decrypt_fallback_rx) = tokio::sync::mpsc::unbounded_channel();
990 let decrypt_fallback_rx = Some(decrypt_fallback_rx);
991
992 let mut startup_epoch = [0u8; 8];
993 rand::rng().fill_bytes(&mut startup_epoch);
994
995 let mut bloom_state = if is_leaf_only {
996 BloomState::leaf_only(node_addr)
997 } else {
998 BloomState::new(node_addr)
999 };
1000 bloom_state.set_update_debounce_ms(config.node.bloom.update_debounce_ms);
1001
1002 let tun_state = if config.tun.enabled {
1003 TunState::Configured
1004 } else {
1005 TunState::Disabled
1006 };
1007
1008 let mut tree_state = TreeState::new(node_addr);
1010 tree_state.set_parent_hysteresis(config.node.tree.parent_hysteresis);
1011 tree_state.set_hold_down(config.node.tree.hold_down_secs);
1012 tree_state.set_flap_dampening(
1013 config.node.tree.flap_threshold,
1014 config.node.tree.flap_window_secs,
1015 config.node.tree.flap_dampening_secs,
1016 );
1017 tree_state
1018 .sign_declaration(&identity)
1019 .expect("signing own declaration should never fail");
1020
1021 let coord_cache = CoordCache::new(
1022 config.node.cache.coord_size,
1023 config.node.cache.coord_ttl_secs * 1000,
1024 );
1025 let rl = &config.node.rate_limit;
1026 let msg1_rate_limiter = HandshakeRateLimiter::with_params(
1027 rate_limit::TokenBucket::with_params(rl.handshake_burst, rl.handshake_rate),
1028 config.node.limits.max_pending_inbound,
1029 );
1030
1031 let max_connections = config.node.limits.max_connections;
1032 let max_peers = config.node.limits.max_peers;
1033 let max_links = config.node.limits.max_links;
1034 let coords_response_interval_ms = config.node.session.coords_response_interval_ms;
1035 let backoff_base_secs = config.node.discovery.backoff_base_secs;
1036 let backoff_max_secs = config.node.discovery.backoff_max_secs;
1037 let forward_min_interval_secs = config.node.discovery.forward_min_interval_secs;
1038
1039 let (host_map, peer_acl) = Self::host_map_and_peer_acl(&config);
1040 let configured_peer_send_weights = Self::configured_peer_send_weights(&config);
1041
1042 Ok(Self {
1043 identity,
1044 startup_epoch,
1045 started_at: std::time::Instant::now(),
1046 config,
1047 state: NodeState::Created,
1048 is_leaf_only,
1049 tree_state,
1050 bloom_state,
1051 coord_cache,
1052 learned_routes: LearnedRouteTable::default(),
1053 session_direct_degraded_until_ms: HashMap::new(),
1054 recent_requests: HashMap::new(),
1055 transports: HashMap::new(),
1056 transport_drops: HashMap::new(),
1057 links: HashMap::new(),
1058 addr_to_link: HashMap::new(),
1059 packet_tx: None,
1060 packet_rx: None,
1061 connections: HashMap::new(),
1062 peers: HashMap::new(),
1063 sessions: HashMap::new(),
1064 identity_cache: HashMap::new(),
1065 pending_tun_packets: HashMap::new(),
1066 pending_endpoint_data: HashMap::new(),
1067 pending_lookups: HashMap::new(),
1068 max_connections,
1069 max_peers,
1070 max_links,
1071 next_link_id: 1,
1072 next_transport_id: 1,
1073 stats: stats::NodeStats::new(),
1074 stats_history: stats_history::StatsHistory::new(),
1075 tun_state,
1076 tun_name: None,
1077 tun_tx: None,
1078 tun_outbound_rx: None,
1079 external_packet_tx: None,
1080 endpoint_command_rx: None,
1081 endpoint_event_tx: None,
1082 encrypt_workers: None,
1083 decrypt_workers: None,
1084 decrypt_registered_sessions: std::collections::HashSet::new(),
1085 decrypt_fallback_tx,
1086 decrypt_fallback_rx,
1087 tun_reader_handle: None,
1088 tun_writer_handle: None,
1089 #[cfg(target_os = "macos")]
1090 tun_shutdown_fd: None,
1091 dns_identity_rx: None,
1092 dns_task: None,
1093 index_allocator: IndexAllocator::new(),
1094 peers_by_index: HashMap::new(),
1095 pending_outbound: HashMap::new(),
1096 msg1_rate_limiter,
1097 icmp_rate_limiter: IcmpRateLimiter::new(),
1098 routing_error_rate_limiter: RoutingErrorRateLimiter::new(),
1099 coords_response_rate_limiter: RoutingErrorRateLimiter::with_interval(
1100 std::time::Duration::from_millis(coords_response_interval_ms),
1101 ),
1102 discovery_backoff: DiscoveryBackoff::with_params(backoff_base_secs, backoff_max_secs),
1103 discovery_forward_limiter: DiscoveryForwardRateLimiter::with_interval(
1104 std::time::Duration::from_secs(forward_min_interval_secs),
1105 ),
1106 pending_connects: Vec::new(),
1107 retry_pending: HashMap::new(),
1108 nostr_discovery: None,
1109 nostr_discovery_started_at_ms: None,
1110 lan_discovery: None,
1111 local_instance_registry: None,
1112 local_instance_started_at_ms: None,
1113 last_local_instance_publish_ms: None,
1114 last_local_instance_scan_ms: None,
1115 startup_open_discovery_sweep_done: false,
1116 bootstrap_transports: HashSet::new(),
1117 bootstrap_transport_npubs: HashMap::new(),
1118 discovery_fallback_transit_blocked_peers: HashSet::new(),
1119 last_parent_reeval: None,
1120 last_congestion_log: None,
1121 estimated_mesh_size: None,
1122 last_mesh_size_log: None,
1123 last_self_warn: None,
1124 local_send_failure_at_by_peer: HashMap::new(),
1125 last_rx_loop_maintenance_timeout_at: None,
1126 peer_aliases: HashMap::new(),
1127 configured_peer_send_weights,
1128 peer_acl,
1129 host_map,
1130 path_mtu_lookup: Arc::new(std::sync::RwLock::new(HashMap::new())),
1131 })
1132 }
1133
1134 pub fn with_identity(identity: Identity, config: Config) -> Result<Self, NodeError> {
1139 config.validate()?;
1140 let node_addr = *identity.node_addr();
1141
1142 let (decrypt_fallback_tx, decrypt_fallback_rx) = tokio::sync::mpsc::unbounded_channel();
1143 let decrypt_fallback_rx = Some(decrypt_fallback_rx);
1144
1145 let mut startup_epoch = [0u8; 8];
1146 rand::rng().fill_bytes(&mut startup_epoch);
1147
1148 let tun_state = if config.tun.enabled {
1149 TunState::Configured
1150 } else {
1151 TunState::Disabled
1152 };
1153
1154 let mut tree_state = TreeState::new(node_addr);
1156 tree_state.set_parent_hysteresis(config.node.tree.parent_hysteresis);
1157 tree_state.set_hold_down(config.node.tree.hold_down_secs);
1158 tree_state.set_flap_dampening(
1159 config.node.tree.flap_threshold,
1160 config.node.tree.flap_window_secs,
1161 config.node.tree.flap_dampening_secs,
1162 );
1163 tree_state
1164 .sign_declaration(&identity)
1165 .expect("signing own declaration should never fail");
1166
1167 let mut bloom_state = BloomState::new(node_addr);
1168 bloom_state.set_update_debounce_ms(config.node.bloom.update_debounce_ms);
1169
1170 let coord_cache = CoordCache::new(
1171 config.node.cache.coord_size,
1172 config.node.cache.coord_ttl_secs * 1000,
1173 );
1174 let rl = &config.node.rate_limit;
1175 let msg1_rate_limiter = HandshakeRateLimiter::with_params(
1176 rate_limit::TokenBucket::with_params(rl.handshake_burst, rl.handshake_rate),
1177 config.node.limits.max_pending_inbound,
1178 );
1179
1180 let max_connections = config.node.limits.max_connections;
1181 let max_peers = config.node.limits.max_peers;
1182 let max_links = config.node.limits.max_links;
1183 let coords_response_interval_ms = config.node.session.coords_response_interval_ms;
1184
1185 let (host_map, peer_acl) = Self::host_map_and_peer_acl(&config);
1186 let configured_peer_send_weights = Self::configured_peer_send_weights(&config);
1187
1188 Ok(Self {
1189 identity,
1190 startup_epoch,
1191 started_at: std::time::Instant::now(),
1192 config,
1193 state: NodeState::Created,
1194 is_leaf_only: false,
1195 tree_state,
1196 bloom_state,
1197 coord_cache,
1198 learned_routes: LearnedRouteTable::default(),
1199 session_direct_degraded_until_ms: HashMap::new(),
1200 recent_requests: HashMap::new(),
1201 transports: HashMap::new(),
1202 transport_drops: HashMap::new(),
1203 links: HashMap::new(),
1204 addr_to_link: HashMap::new(),
1205 packet_tx: None,
1206 packet_rx: None,
1207 connections: HashMap::new(),
1208 peers: HashMap::new(),
1209 sessions: HashMap::new(),
1210 identity_cache: HashMap::new(),
1211 pending_tun_packets: HashMap::new(),
1212 pending_endpoint_data: HashMap::new(),
1213 pending_lookups: HashMap::new(),
1214 max_connections,
1215 max_peers,
1216 max_links,
1217 next_link_id: 1,
1218 next_transport_id: 1,
1219 stats: stats::NodeStats::new(),
1220 stats_history: stats_history::StatsHistory::new(),
1221 tun_state,
1222 tun_name: None,
1223 tun_tx: None,
1224 tun_outbound_rx: None,
1225 external_packet_tx: None,
1226 endpoint_command_rx: None,
1227 endpoint_event_tx: None,
1228 encrypt_workers: None,
1229 decrypt_workers: None,
1230 decrypt_registered_sessions: std::collections::HashSet::new(),
1231 decrypt_fallback_tx,
1232 decrypt_fallback_rx,
1233 tun_reader_handle: None,
1234 tun_writer_handle: None,
1235 #[cfg(target_os = "macos")]
1236 tun_shutdown_fd: None,
1237 dns_identity_rx: None,
1238 dns_task: None,
1239 index_allocator: IndexAllocator::new(),
1240 peers_by_index: HashMap::new(),
1241 pending_outbound: HashMap::new(),
1242 msg1_rate_limiter,
1243 icmp_rate_limiter: IcmpRateLimiter::new(),
1244 routing_error_rate_limiter: RoutingErrorRateLimiter::new(),
1245 coords_response_rate_limiter: RoutingErrorRateLimiter::with_interval(
1246 std::time::Duration::from_millis(coords_response_interval_ms),
1247 ),
1248 discovery_backoff: DiscoveryBackoff::new(),
1249 discovery_forward_limiter: DiscoveryForwardRateLimiter::new(),
1250 pending_connects: Vec::new(),
1251 retry_pending: HashMap::new(),
1252 nostr_discovery: None,
1253 nostr_discovery_started_at_ms: None,
1254 lan_discovery: None,
1255 local_instance_registry: None,
1256 local_instance_started_at_ms: None,
1257 last_local_instance_publish_ms: None,
1258 last_local_instance_scan_ms: None,
1259 startup_open_discovery_sweep_done: false,
1260 bootstrap_transports: HashSet::new(),
1261 bootstrap_transport_npubs: HashMap::new(),
1262 discovery_fallback_transit_blocked_peers: HashSet::new(),
1263 last_parent_reeval: None,
1264 last_congestion_log: None,
1265 estimated_mesh_size: None,
1266 last_mesh_size_log: None,
1267 last_self_warn: None,
1268 local_send_failure_at_by_peer: HashMap::new(),
1269 last_rx_loop_maintenance_timeout_at: None,
1270 peer_aliases: HashMap::new(),
1271 configured_peer_send_weights,
1272 peer_acl,
1273 host_map,
1274 path_mtu_lookup: Arc::new(std::sync::RwLock::new(HashMap::new())),
1275 })
1276 }
1277
1278 pub fn leaf_only(config: Config) -> Result<Self, NodeError> {
1280 let mut node = Self::new(config)?;
1281 node.is_leaf_only = true;
1282 node.bloom_state = BloomState::leaf_only(*node.identity.node_addr());
1283 Ok(node)
1284 }
1285
1286 fn host_map_and_peer_acl(config: &Config) -> (Arc<HostMap>, acl::PeerAclReloader) {
1287 let base_host_map = HostMap::from_peer_configs(config.peers());
1288 if !config.node.system_files_enabled {
1289 return (
1290 Arc::new(base_host_map.clone()),
1291 acl::PeerAclReloader::memory_only(base_host_map),
1292 );
1293 }
1294
1295 let mut host_map = base_host_map.clone();
1296 let hosts_path = std::path::PathBuf::from(crate::upper::hosts::DEFAULT_HOSTS_PATH);
1297 let hosts_file = HostMap::load_hosts_file(std::path::Path::new(
1298 crate::upper::hosts::DEFAULT_HOSTS_PATH,
1299 ));
1300 host_map.merge(hosts_file);
1301 let peer_acl = acl::PeerAclReloader::with_alias_sources(
1302 std::path::PathBuf::from(acl::DEFAULT_PEERS_ALLOW_PATH),
1303 std::path::PathBuf::from(acl::DEFAULT_PEERS_DENY_PATH),
1304 base_host_map,
1305 hosts_path,
1306 );
1307 (Arc::new(host_map), peer_acl)
1308 }
1309
1310 fn configured_peer_send_weights(config: &Config) -> HashMap<NodeAddr, u8> {
1311 config
1312 .peers()
1313 .iter()
1314 .filter_map(|peer| {
1315 PeerIdentity::from_npub(&peer.npub).ok().map(|identity| {
1316 (
1317 *identity.node_addr(),
1318 encrypt_worker::EXPLICIT_PEER_SEND_WEIGHT,
1319 )
1320 })
1321 })
1322 .collect()
1323 }
1324
1325 #[cfg(unix)]
1326 fn send_weight_for_peer(&self, peer_addr: &NodeAddr) -> u8 {
1327 self.configured_peer_send_weights
1328 .get(peer_addr)
1329 .copied()
1330 .unwrap_or(encrypt_worker::DEFAULT_SEND_WEIGHT)
1331 }
1332
1333 async fn create_transports(&mut self, packet_tx: &PacketTx) -> Vec<TransportHandle> {
1337 let mut transports = Vec::new();
1338
1339 let udp_instances: Vec<_> = self
1341 .config
1342 .transports
1343 .udp
1344 .iter()
1345 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1346 .collect();
1347
1348 for (name, udp_config) in udp_instances {
1350 let transport_id = self.allocate_transport_id();
1351 let udp = UdpTransport::new(transport_id, name, udp_config, packet_tx.clone());
1352 transports.push(TransportHandle::Udp(udp));
1353 }
1354
1355 #[cfg(feature = "sim-transport")]
1356 {
1357 let sim_instances: Vec<_> = self
1358 .config
1359 .transports
1360 .sim
1361 .iter()
1362 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1363 .collect();
1364
1365 for (name, sim_config) in sim_instances {
1366 let transport_id = self.allocate_transport_id();
1367 let sim = crate::transport::sim::SimTransport::new(
1368 transport_id,
1369 name,
1370 sim_config,
1371 packet_tx.clone(),
1372 );
1373 transports.push(TransportHandle::Sim(sim));
1374 }
1375 }
1376
1377 #[cfg(any(target_os = "linux", target_os = "macos"))]
1379 {
1380 let eth_instances: Vec<_> = self
1381 .config
1382 .transports
1383 .ethernet
1384 .iter()
1385 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1386 .collect();
1387 let xonly = self.identity.pubkey();
1388 for (name, eth_config) in eth_instances {
1389 let mut eth_config = eth_config;
1390 if eth_config.discovery_scope.is_none() {
1391 eth_config.discovery_scope = self.lan_discovery_scope();
1392 }
1393 let transport_id = self.allocate_transport_id();
1394 let mut eth =
1395 EthernetTransport::new(transport_id, name, eth_config, packet_tx.clone());
1396 eth.set_local_pubkey(xonly);
1397 transports.push(TransportHandle::Ethernet(eth));
1398 }
1399 }
1400
1401 let tcp_instances: Vec<_> = self
1403 .config
1404 .transports
1405 .tcp
1406 .iter()
1407 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1408 .collect();
1409
1410 for (name, tcp_config) in tcp_instances {
1411 let transport_id = self.allocate_transport_id();
1412 let tcp = TcpTransport::new(transport_id, name, tcp_config, packet_tx.clone());
1413 transports.push(TransportHandle::Tcp(tcp));
1414 }
1415
1416 let tor_instances: Vec<_> = self
1418 .config
1419 .transports
1420 .tor
1421 .iter()
1422 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1423 .collect();
1424
1425 for (name, tor_config) in tor_instances {
1426 let transport_id = self.allocate_transport_id();
1427 let tor = TorTransport::new(transport_id, name, tor_config, packet_tx.clone());
1428 transports.push(TransportHandle::Tor(tor));
1429 }
1430
1431 let webrtc_instances: Vec<_> = self
1432 .config
1433 .transports
1434 .webrtc
1435 .iter()
1436 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1437 .collect();
1438
1439 #[cfg(feature = "webrtc-transport")]
1440 {
1441 for (name, webrtc_config) in webrtc_instances {
1442 let transport_id = self.allocate_transport_id();
1443 match WebRtcTransport::new(
1444 transport_id,
1445 name,
1446 webrtc_config,
1447 packet_tx.clone(),
1448 &self.identity,
1449 &self.config.node.discovery.nostr,
1450 ) {
1451 Ok(webrtc) => transports.push(TransportHandle::WebRtc(Box::new(webrtc))),
1452 Err(err) => {
1453 warn!(
1454 transport_id = %transport_id,
1455 error = %err,
1456 "failed to initialize WebRTC transport"
1457 );
1458 }
1459 }
1460 }
1461 }
1462 #[cfg(not(feature = "webrtc-transport"))]
1463 if !webrtc_instances.is_empty() {
1464 warn!("WebRTC transport configured but this build lacks WebRTC transport support");
1465 }
1466
1467 #[cfg(bluer_available)]
1469 {
1470 let ble_instances: Vec<_> = self
1471 .config
1472 .transports
1473 .ble
1474 .iter()
1475 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1476 .collect();
1477
1478 #[cfg(all(bluer_available, not(test)))]
1479 for (name, ble_config) in ble_instances {
1480 let transport_id = self.allocate_transport_id();
1481 let adapter = ble_config.adapter().to_string();
1482 let mtu = ble_config.mtu();
1483 match crate::transport::ble::io::BluerIo::new(&adapter, mtu).await {
1484 Ok(io) => {
1485 let mut ble = crate::transport::ble::BleTransport::new(
1486 transport_id,
1487 name,
1488 ble_config,
1489 io,
1490 packet_tx.clone(),
1491 );
1492 ble.set_local_pubkey(self.identity.pubkey().serialize());
1493 transports.push(TransportHandle::Ble(ble));
1494 }
1495 Err(e) => {
1496 tracing::warn!(adapter = %adapter, error = %e, "failed to initialize BLE adapter");
1497 }
1498 }
1499 }
1500
1501 #[cfg(any(not(bluer_available), test))]
1502 if !ble_instances.is_empty() {
1503 #[cfg(not(test))]
1504 tracing::warn!("BLE transport configured but this build lacks BlueZ support");
1505 }
1506 }
1507
1508 transports
1509 }
1510
1511 fn find_transport_for_type(&self, transport_type: &str) -> Option<TransportId> {
1521 self.transports
1522 .iter()
1523 .filter(|(id, handle)| {
1524 handle.transport_type().name == transport_type
1525 && handle.is_operational()
1526 && !self.bootstrap_transports.contains(id)
1527 })
1528 .min_by_key(|(id, _)| id.as_u32())
1529 .map(|(id, _)| *id)
1530 }
1531
1532 #[allow(unused_variables)]
1538 fn resolve_ethernet_addr(
1539 &self,
1540 addr_str: &str,
1541 ) -> Result<(TransportId, TransportAddr), NodeError> {
1542 #[cfg(any(target_os = "linux", target_os = "macos"))]
1543 {
1544 let (iface, mac_str) = addr_str.split_once('/').ok_or_else(|| {
1545 NodeError::NoTransportForType(format!(
1546 "invalid Ethernet address format '{}': expected 'interface/mac'",
1547 addr_str
1548 ))
1549 })?;
1550
1551 let transport_id = self
1553 .transports
1554 .iter()
1555 .find(|(_, handle)| {
1556 handle.transport_type().name == "ethernet"
1557 && handle.is_operational()
1558 && handle.interface_name() == Some(iface)
1559 })
1560 .map(|(id, _)| *id)
1561 .ok_or_else(|| {
1562 NodeError::NoTransportForType(format!(
1563 "no operational Ethernet transport for interface '{}'",
1564 iface
1565 ))
1566 })?;
1567
1568 let mac = crate::transport::ethernet::parse_mac_string(mac_str).map_err(|e| {
1569 NodeError::NoTransportForType(format!("invalid MAC in '{}': {}", addr_str, e))
1570 })?;
1571
1572 Ok((transport_id, TransportAddr::from_bytes(&mac)))
1573 }
1574 #[cfg(not(any(target_os = "linux", target_os = "macos")))]
1575 {
1576 Err(NodeError::NoTransportForType(
1577 "Ethernet transport is not supported on this platform".to_string(),
1578 ))
1579 }
1580 }
1581
1582 #[cfg(bluer_available)]
1586 fn resolve_ble_addr(&self, addr_str: &str) -> Result<(TransportId, TransportAddr), NodeError> {
1587 let ta = TransportAddr::from_string(addr_str);
1588 let adapter = crate::transport::ble::addr::adapter_from_addr(&ta).ok_or_else(|| {
1589 NodeError::NoTransportForType(format!(
1590 "invalid BLE address format '{}': expected 'adapter/mac'",
1591 addr_str
1592 ))
1593 })?;
1594
1595 let transport_id = self
1597 .transports
1598 .iter()
1599 .find(|(_, handle)| handle.transport_type().name == "ble" && handle.is_operational())
1600 .map(|(id, _)| *id)
1601 .ok_or_else(|| {
1602 NodeError::NoTransportForType(format!(
1603 "no operational BLE transport for adapter '{}'",
1604 adapter
1605 ))
1606 })?;
1607
1608 crate::transport::ble::addr::BleAddr::parse(addr_str).map_err(|e| {
1610 NodeError::NoTransportForType(format!("invalid BLE address '{}': {}", addr_str, e))
1611 })?;
1612
1613 Ok((transport_id, TransportAddr::from_string(addr_str)))
1614 }
1615
1616 pub fn identity(&self) -> &Identity {
1620 &self.identity
1621 }
1622
1623 pub fn node_addr(&self) -> &NodeAddr {
1625 self.identity.node_addr()
1626 }
1627
1628 pub fn npub(&self) -> String {
1630 self.identity.npub()
1631 }
1632
1633 pub(crate) fn peer_display_name(&self, addr: &NodeAddr) -> String {
1642 if let Some(hostname) = self.host_map.lookup_hostname(addr) {
1643 return hostname.to_string();
1644 }
1645 if let Some(name) = self.peer_aliases.get(addr) {
1646 return name.clone();
1647 }
1648 if let Some(peer) = self.peers.get(addr) {
1649 return peer.identity().short_npub();
1650 }
1651 if let Some(entry) = self.sessions.get(addr) {
1652 let (xonly, _) = entry.remote_pubkey().x_only_public_key();
1653 return PeerIdentity::from_pubkey(xonly).short_npub();
1654 }
1655 addr.short_hex()
1656 }
1657
1658 pub(in crate::node) fn deregister_session_index(&mut self, cache_key: (TransportId, u32)) {
1670 let owning_peer = self.peers_by_index.get(&cache_key).copied();
1674 self.peers_by_index.remove(&cache_key);
1675 if self.decrypt_registered_sessions.remove(&cache_key)
1676 && let Some(workers) = self.decrypt_workers.as_ref()
1677 {
1678 workers.unregister_session(cache_key);
1679 }
1680 if let Some(peer_addr) = owning_peer {
1691 let peer_has_other_index = self
1692 .peers_by_index
1693 .values()
1694 .any(|other| *other == peer_addr);
1695 if !peer_has_other_index {
1696 self.clear_connected_udp_for_peer(&peer_addr);
1697 }
1698 }
1699 }
1700
1701 pub(in crate::node) fn ensure_current_session_index_registered(
1710 &mut self,
1711 node_addr: &NodeAddr,
1712 context: &'static str,
1713 ) -> bool {
1714 let Some(peer) = self.peers.get(node_addr) else {
1715 return false;
1716 };
1717 let Some(transport_id) = peer.transport_id() else {
1718 warn!(
1719 peer = %self.peer_display_name(node_addr),
1720 context,
1721 "Cannot register current session index without transport id"
1722 );
1723 return false;
1724 };
1725 let Some(our_index) = peer.our_index() else {
1726 warn!(
1727 peer = %self.peer_display_name(node_addr),
1728 context,
1729 "Cannot register current session index without local index"
1730 );
1731 return false;
1732 };
1733
1734 let cache_key = (transport_id, our_index.as_u32());
1735 match self.peers_by_index.get(&cache_key).copied() {
1736 Some(existing) if existing == *node_addr => true,
1737 Some(existing) => {
1738 warn!(
1739 peer = %self.peer_display_name(node_addr),
1740 previous_owner = %self.peer_display_name(&existing),
1741 transport_id = %transport_id,
1742 our_index = %our_index,
1743 context,
1744 "Repairing current session index with stale owner"
1745 );
1746 self.peers_by_index.insert(cache_key, *node_addr);
1747 true
1748 }
1749 None => {
1750 warn!(
1751 peer = %self.peer_display_name(node_addr),
1752 transport_id = %transport_id,
1753 our_index = %our_index,
1754 context,
1755 "Repairing missing current session index"
1756 );
1757 self.peers_by_index.insert(cache_key, *node_addr);
1758 true
1759 }
1760 }
1761 }
1762
1763 pub fn config(&self) -> &Config {
1767 &self.config
1768 }
1769
1770 pub fn effective_ipv6_mtu(&self) -> u16 {
1776 crate::upper::icmp::effective_ipv6_mtu(self.transport_mtu())
1777 }
1778
1779 pub fn transport_mtu(&self) -> u16 {
1796 let min_operational = self
1797 .transports
1798 .values()
1799 .filter(|h| h.is_operational())
1800 .map(|h| h.mtu())
1801 .min();
1802 if let Some(mtu) = min_operational {
1803 return mtu;
1804 }
1805 if let Some((_, cfg)) = self.config.transports.udp.iter().next() {
1807 return cfg.mtu();
1808 }
1809 1280
1810 }
1811
1812 pub fn state(&self) -> NodeState {
1816 self.state
1817 }
1818
1819 pub fn uptime(&self) -> std::time::Duration {
1821 self.started_at.elapsed()
1822 }
1823
1824 pub fn is_running(&self) -> bool {
1826 self.state.is_operational()
1827 }
1828
1829 pub fn is_leaf_only(&self) -> bool {
1831 self.is_leaf_only
1832 }
1833
1834 pub fn tree_state(&self) -> &TreeState {
1838 &self.tree_state
1839 }
1840
1841 pub fn tree_state_mut(&mut self) -> &mut TreeState {
1843 &mut self.tree_state
1844 }
1845
1846 pub fn bloom_state(&self) -> &BloomState {
1850 &self.bloom_state
1851 }
1852
1853 pub fn bloom_state_mut(&mut self) -> &mut BloomState {
1855 &mut self.bloom_state
1856 }
1857
1858 pub fn estimated_mesh_size(&self) -> Option<u64> {
1862 self.estimated_mesh_size
1863 }
1864
1865 pub(crate) fn compute_mesh_size(&mut self) {
1871 let my_addr = *self.tree_state.my_node_addr();
1872 let parent_id = *self.tree_state.my_declaration().parent_id();
1873 let is_root = self.tree_state.is_root();
1874
1875 let max_fpr = self.config.node.bloom.max_inbound_fpr;
1876 let mut total: f64 = 1.0; let mut child_count: u32 = 0;
1878 let mut has_data = false;
1879
1880 if !is_root
1886 && let Some(parent) = self.peers.get(&parent_id)
1887 && let Some(filter) = parent.inbound_filter()
1888 {
1889 match filter.estimated_count(max_fpr) {
1890 Some(n) => {
1891 total += n;
1892 has_data = true;
1893 }
1894 None => {
1895 self.estimated_mesh_size = None;
1896 return;
1897 }
1898 }
1899 }
1900
1901 for (peer_addr, peer) in &self.peers {
1903 if peer_addr == &parent_id {
1904 continue;
1905 }
1906 if let Some(decl) = self.tree_state.peer_declaration(peer_addr)
1907 && *decl.parent_id() == my_addr
1908 {
1909 child_count += 1;
1910 if let Some(filter) = peer.inbound_filter() {
1911 match filter.estimated_count(max_fpr) {
1912 Some(n) => {
1913 total += n;
1914 has_data = true;
1915 }
1916 None => {
1917 self.estimated_mesh_size = None;
1918 return;
1919 }
1920 }
1921 }
1922 }
1923 }
1924
1925 if !has_data {
1926 self.estimated_mesh_size = None;
1927 return;
1928 }
1929
1930 let size = total.round() as u64;
1931 self.estimated_mesh_size = Some(size);
1932
1933 let now = std::time::Instant::now();
1935 let should_log = match self.last_mesh_size_log {
1936 None => true,
1937 Some(last) => {
1938 now.duration_since(last)
1939 >= std::time::Duration::from_secs(self.config.node.mmp.log_interval_secs)
1940 }
1941 };
1942 if should_log {
1943 tracing::debug!(
1944 estimated_mesh_size = size,
1945 peers = self.peers.len(),
1946 children = child_count,
1947 "Mesh size estimate"
1948 );
1949 self.last_mesh_size_log = Some(now);
1950 }
1951 }
1952
1953 pub fn coord_cache(&self) -> &CoordCache {
1957 &self.coord_cache
1958 }
1959
1960 pub fn coord_cache_mut(&mut self) -> &mut CoordCache {
1962 &mut self.coord_cache
1963 }
1964
1965 pub fn stats(&self) -> &stats::NodeStats {
1969 &self.stats
1970 }
1971
1972 pub(crate) fn stats_mut(&mut self) -> &mut stats::NodeStats {
1974 &mut self.stats
1975 }
1976
1977 pub fn stats_history(&self) -> &stats_history::StatsHistory {
1979 &self.stats_history
1980 }
1981
1982 pub(crate) fn record_stats_history(&mut self) {
1985 let fwd = &self.stats.forwarding;
1986 let peers_with_mmp: Vec<f64> = self
1987 .peers
1988 .values()
1989 .filter_map(|p| p.mmp().map(|m| m.metrics.loss_rate()))
1990 .collect();
1991 let loss_rate = if peers_with_mmp.is_empty() {
1992 0.0
1993 } else {
1994 peers_with_mmp.iter().sum::<f64>() / peers_with_mmp.len() as f64
1995 };
1996
1997 let snap = stats_history::Snapshot {
1998 mesh_size: self.estimated_mesh_size,
1999 tree_depth: self.tree_state.my_coords().depth() as u32,
2000 peer_count: self.peers.len() as u64,
2001 parent_switches_total: self.stats.tree.parent_switches,
2002 bytes_in_total: fwd.received_bytes,
2003 bytes_out_total: fwd.forwarded_bytes + fwd.originated_bytes,
2004 packets_in_total: fwd.received_packets,
2005 packets_out_total: fwd.forwarded_packets + fwd.originated_packets,
2006 loss_rate,
2007 active_sessions: self.sessions.len() as u64,
2008 };
2009
2010 let now = std::time::Instant::now();
2011 let peer_snaps: Vec<stats_history::PeerSnapshot> = self
2012 .peers
2013 .values()
2014 .map(|p| {
2015 let stats = p.link_stats();
2016 let (srtt_ms, loss_rate, ecn_ce) = match p.mmp() {
2017 Some(m) => (
2018 m.metrics.srtt_ms(),
2019 Some(m.metrics.loss_rate()),
2020 m.receiver.ecn_ce_count() as u64,
2021 ),
2022 None => (None, None, 0),
2023 };
2024 stats_history::PeerSnapshot {
2025 node_addr: *p.node_addr(),
2026 last_seen: now,
2027 srtt_ms,
2028 loss_rate,
2029 bytes_in_total: stats.bytes_recv,
2030 bytes_out_total: stats.bytes_sent,
2031 packets_in_total: stats.packets_recv,
2032 packets_out_total: stats.packets_sent,
2033 ecn_ce_total: ecn_ce,
2034 }
2035 })
2036 .collect();
2037
2038 self.stats_history.tick(now, &snap, &peer_snaps);
2039 }
2040
2041 pub fn tun_state(&self) -> TunState {
2045 self.tun_state
2046 }
2047
2048 pub fn tun_name(&self) -> Option<&str> {
2050 self.tun_name.as_deref()
2051 }
2052
2053 pub fn set_max_connections(&mut self, max: usize) {
2057 self.max_connections = max;
2058 }
2059
2060 pub fn set_max_peers(&mut self, max: usize) {
2062 self.max_peers = max;
2063 }
2064
2065 pub(crate) fn outbound_admission_check(&self) -> bool {
2068 let connection_used = self
2069 .connections
2070 .len()
2071 .saturating_add(self.pending_connects.len());
2072 let peer_allowed = self.max_peers == 0 || self.peers.len() < self.max_peers;
2073 let connection_allowed =
2074 self.max_connections == 0 || connection_used < self.max_connections;
2075 let link_allowed = self.max_links == 0 || self.links.len() < self.max_links;
2076 peer_allowed && connection_allowed && link_allowed
2077 }
2078
2079 pub(crate) fn open_discovery_outbound_admission_check(&self) -> bool {
2083 if !self.outbound_admission_check() {
2084 return false;
2085 }
2086
2087 let nostr = &self.config.node.discovery.nostr;
2088 if !nostr.enabled || nostr.policy != NostrDiscoveryPolicy::Open {
2089 return true;
2090 }
2091
2092 let configured_npubs = self
2093 .config
2094 .peers()
2095 .iter()
2096 .map(|peer| peer.npub.clone())
2097 .collect::<HashSet<_>>();
2098 self.open_discovery_enqueue_budget(&configured_npubs) > 0
2099 }
2100
2101 pub(crate) fn outbound_direct_refresh_admission_check(&self) -> bool {
2105 let connection_used = self
2106 .connections
2107 .len()
2108 .saturating_add(self.pending_connects.len());
2109 let connection_allowed =
2110 self.max_connections == 0 || connection_used < self.max_connections;
2111 let link_allowed = self.max_links == 0 || self.links.len() < self.max_links;
2112 connection_allowed && link_allowed
2113 }
2114
2115 pub fn set_max_links(&mut self, max: usize) {
2117 self.max_links = max;
2118 }
2119
2120 pub fn connection_count(&self) -> usize {
2124 self.connections.len()
2125 }
2126
2127 pub fn peer_count(&self) -> usize {
2129 self.peers.len()
2130 }
2131
2132 pub fn link_count(&self) -> usize {
2134 self.links.len()
2135 }
2136
2137 pub fn transport_count(&self) -> usize {
2139 self.transports.len()
2140 }
2141
2142 pub fn allocate_transport_id(&mut self) -> TransportId {
2146 let id = TransportId::new(self.next_transport_id);
2147 self.next_transport_id += 1;
2148 id
2149 }
2150
2151 pub fn get_transport(&self, id: &TransportId) -> Option<&TransportHandle> {
2153 self.transports.get(id)
2154 }
2155
2156 pub fn get_transport_mut(&mut self, id: &TransportId) -> Option<&mut TransportHandle> {
2158 self.transports.get_mut(id)
2159 }
2160
2161 pub fn transport_ids(&self) -> impl Iterator<Item = &TransportId> {
2163 self.transports.keys()
2164 }
2165
2166 pub fn packet_rx(&mut self) -> Option<&mut PacketRx> {
2168 self.packet_rx.as_mut()
2169 }
2170
2171 pub fn allocate_link_id(&mut self) -> LinkId {
2175 let id = LinkId::new(self.next_link_id);
2176 self.next_link_id += 1;
2177 id
2178 }
2179
2180 pub fn add_link(&mut self, link: Link) -> Result<(), NodeError> {
2182 if self.max_links > 0 && self.links.len() >= self.max_links {
2183 return Err(NodeError::MaxLinksExceeded {
2184 max: self.max_links,
2185 });
2186 }
2187 let link_id = link.link_id();
2188 let transport_id = link.transport_id();
2189 let remote_addr = link.remote_addr().clone();
2190
2191 self.links.insert(link_id, link);
2192 self.addr_to_link
2193 .insert((transport_id, remote_addr), link_id);
2194 Ok(())
2195 }
2196
2197 pub fn get_link(&self, link_id: &LinkId) -> Option<&Link> {
2199 self.links.get(link_id)
2200 }
2201
2202 pub fn get_link_mut(&mut self, link_id: &LinkId) -> Option<&mut Link> {
2204 self.links.get_mut(link_id)
2205 }
2206
2207 pub fn find_link_by_addr(
2209 &self,
2210 transport_id: TransportId,
2211 addr: &TransportAddr,
2212 ) -> Option<LinkId> {
2213 self.addr_to_link
2214 .get(&(transport_id, addr.clone()))
2215 .copied()
2216 }
2217
2218 pub fn remove_link(&mut self, link_id: &LinkId) -> Option<Link> {
2224 if let Some(link) = self.links.remove(link_id) {
2225 let key = (link.transport_id(), link.remote_addr().clone());
2227 if self.addr_to_link.get(&key) == Some(link_id) {
2228 self.addr_to_link.remove(&key);
2229 }
2230 Some(link)
2231 } else {
2232 None
2233 }
2234 }
2235
2236 pub(crate) fn cleanup_bootstrap_transport_if_unused(&mut self, transport_id: TransportId) {
2237 if !self.bootstrap_transports.contains(&transport_id) {
2238 return;
2239 }
2240
2241 let transport_in_use = self
2242 .links
2243 .values()
2244 .any(|link| link.transport_id() == transport_id)
2245 || self
2246 .connections
2247 .values()
2248 .any(|conn| conn.transport_id() == Some(transport_id))
2249 || self
2250 .peers
2251 .values()
2252 .any(|peer| peer.transport_id() == Some(transport_id))
2253 || self
2254 .pending_connects
2255 .iter()
2256 .any(|pending| pending.transport_id == transport_id);
2257
2258 if transport_in_use {
2259 return;
2260 }
2261
2262 tracing::debug!(
2263 transport_id = %transport_id,
2264 "bootstrap transport has no remaining references; dropping"
2265 );
2266
2267 self.bootstrap_transports.remove(&transport_id);
2268 self.bootstrap_transport_npubs.remove(&transport_id);
2269 self.transport_drops.remove(&transport_id);
2270 self.transports.remove(&transport_id);
2271 }
2272
2273 pub fn links(&self) -> impl Iterator<Item = &Link> {
2275 self.links.values()
2276 }
2277
2278 pub fn add_connection(&mut self, connection: PeerConnection) -> Result<(), NodeError> {
2282 let link_id = connection.link_id();
2283
2284 if self.connections.contains_key(&link_id) {
2285 return Err(NodeError::ConnectionAlreadyExists(link_id));
2286 }
2287
2288 if self.max_connections > 0 && self.connections.len() >= self.max_connections {
2289 return Err(NodeError::MaxConnectionsExceeded {
2290 max: self.max_connections,
2291 });
2292 }
2293
2294 self.connections.insert(link_id, connection);
2295 Ok(())
2296 }
2297
2298 pub fn get_connection(&self, link_id: &LinkId) -> Option<&PeerConnection> {
2300 self.connections.get(link_id)
2301 }
2302
2303 pub fn get_connection_mut(&mut self, link_id: &LinkId) -> Option<&mut PeerConnection> {
2305 self.connections.get_mut(link_id)
2306 }
2307
2308 pub fn remove_connection(&mut self, link_id: &LinkId) -> Option<PeerConnection> {
2310 self.connections.remove(link_id)
2311 }
2312
2313 pub fn connections(&self) -> impl Iterator<Item = &PeerConnection> {
2315 self.connections.values()
2316 }
2317
2318 pub fn get_peer(&self, node_addr: &NodeAddr) -> Option<&ActivePeer> {
2322 self.peers.get(node_addr)
2323 }
2324
2325 pub fn get_peer_mut(&mut self, node_addr: &NodeAddr) -> Option<&mut ActivePeer> {
2327 self.peers.get_mut(node_addr)
2328 }
2329
2330 pub fn remove_peer(&mut self, node_addr: &NodeAddr) -> Option<ActivePeer> {
2332 self.peers.remove(node_addr)
2333 }
2334
2335 pub fn peers(&self) -> impl Iterator<Item = &ActivePeer> {
2337 self.peers.values()
2338 }
2339
2340 pub fn nostr_discovery_handle(&self) -> Option<&crate::discovery::nostr::NostrDiscovery> {
2344 self.nostr_discovery.as_deref()
2345 }
2346
2347 pub fn peer_ids(&self) -> impl Iterator<Item = &NodeAddr> {
2349 self.peers.keys()
2350 }
2351
2352 pub fn sendable_peers(&self) -> impl Iterator<Item = &ActivePeer> {
2354 self.peers.values().filter(|p| p.can_send())
2355 }
2356
2357 pub fn sendable_peer_count(&self) -> usize {
2359 self.peers.values().filter(|p| p.can_send()).count()
2360 }
2361
2362 pub(crate) fn set_discovery_fallback_transit_allowed(
2363 &mut self,
2364 peer_addr: NodeAddr,
2365 allowed: bool,
2366 ) {
2367 if allowed {
2368 self.discovery_fallback_transit_blocked_peers
2369 .remove(&peer_addr);
2370 } else {
2371 self.discovery_fallback_transit_blocked_peers
2372 .insert(peer_addr);
2373 }
2374 }
2375
2376 pub(crate) fn configured_discovery_fallback_transit(
2377 &self,
2378 peer_addr: &NodeAddr,
2379 ) -> Option<bool> {
2380 self.configured_peer(peer_addr)
2381 .map(|peer| peer.discovery_fallback_transit)
2382 }
2383
2384 pub(crate) fn configured_peer(&self, peer_addr: &NodeAddr) -> Option<&PeerConfig> {
2385 self.config.peers().iter().find(|peer| {
2386 PeerIdentity::from_npub(&peer.npub)
2387 .ok()
2388 .is_some_and(|identity| identity.node_addr() == peer_addr)
2389 })
2390 }
2391
2392 pub(in crate::node) fn active_peer_uses_configured_static_udp_path(
2393 &self,
2394 peer_addr: &NodeAddr,
2395 ) -> bool {
2396 let Some(peer_config) = self.configured_peer(peer_addr) else {
2397 return false;
2398 };
2399
2400 peer_config.addresses.iter().any(|candidate| {
2401 candidate.seen_at_ms.is_none()
2402 && candidate.transport.eq_ignore_ascii_case("udp")
2403 && self.active_peer_matches_candidate(peer_addr, candidate)
2404 })
2405 }
2406
2407 pub(crate) fn discovery_fallback_transit_for_promotion(&self, peer_addr: &NodeAddr) -> bool {
2408 if let Some(retry_state) = self.retry_pending.get(peer_addr) {
2409 return retry_state.peer_config.discovery_fallback_transit;
2410 }
2411
2412 if let Some(allowed) = self.configured_discovery_fallback_transit(peer_addr) {
2413 return allowed;
2414 }
2415
2416 self.config.node.discovery.nostr.policy != crate::config::NostrDiscoveryPolicy::Open
2417 }
2418
2419 #[cfg(test)]
2424 pub(crate) fn disable_discovery_forward_rate_limit(&mut self) {
2425 self.discovery_forward_limiter
2426 .set_interval(std::time::Duration::ZERO);
2427 }
2428
2429 #[cfg(test)]
2430 pub(crate) fn get_session(&self, remote: &NodeAddr) -> Option<&SessionEntry> {
2431 self.sessions.get(remote)
2432 }
2433
2434 #[cfg(test)]
2436 pub(crate) fn get_session_mut(&mut self, remote: &NodeAddr) -> Option<&mut SessionEntry> {
2437 self.sessions.get_mut(remote)
2438 }
2439
2440 #[cfg(test)]
2442 pub(crate) fn remove_session(&mut self, remote: &NodeAddr) -> Option<SessionEntry> {
2443 self.sessions.remove(remote)
2444 }
2445
2446 #[cfg(test)]
2448 pub(crate) fn path_mtu_lookup_get(&self, fips_addr: &crate::FipsAddress) -> Option<u16> {
2449 self.path_mtu_lookup
2450 .read()
2451 .ok()
2452 .and_then(|map| map.get(fips_addr).copied())
2453 }
2454
2455 #[cfg(test)]
2457 pub(crate) fn path_mtu_lookup_insert(&self, fips_addr: crate::FipsAddress, mtu: u16) {
2458 if let Ok(mut map) = self.path_mtu_lookup.write() {
2459 map.insert(fips_addr, mtu);
2460 }
2461 }
2462
2463 pub fn session_count(&self) -> usize {
2465 self.sessions.len()
2466 }
2467
2468 pub(crate) fn session_entries(&self) -> impl Iterator<Item = (&NodeAddr, &SessionEntry)> {
2470 self.sessions.iter()
2471 }
2472
2473 pub(crate) fn register_identity(
2477 &mut self,
2478 node_addr: NodeAddr,
2479 pubkey: secp256k1::PublicKey,
2480 ) -> bool {
2481 let mut prefix = [0u8; 15];
2482 prefix.copy_from_slice(&node_addr.as_bytes()[0..15]);
2483 if let Some(entry) = self.identity_cache.get(&prefix)
2484 && entry.node_addr == node_addr
2485 && entry.pubkey == pubkey
2486 {
2487 return true;
2491 }
2492
2493 let (xonly, _) = pubkey.x_only_public_key();
2494 let derived_node_addr = NodeAddr::from_pubkey(&xonly);
2495 if derived_node_addr != node_addr {
2496 debug!(
2497 claimed_node_addr = %node_addr,
2498 derived_node_addr = %derived_node_addr,
2499 "Rejected identity cache entry with mismatched public key"
2500 );
2501 return false;
2502 }
2503
2504 let now_ms = Self::now_ms();
2505 if let Some(entry) = self.identity_cache.get_mut(&prefix)
2506 && entry.node_addr == node_addr
2507 {
2508 entry.pubkey = pubkey;
2509 entry.last_seen_ms = now_ms;
2510 return true;
2511 }
2512
2513 let npub = encode_npub(&xonly);
2514 self.identity_cache.insert(
2515 prefix,
2516 IdentityCacheEntry::new(node_addr, pubkey, npub, now_ms),
2517 );
2518 let max = self.config.node.cache.identity_size;
2520 if self.identity_cache.len() > max
2521 && let Some(oldest_key) = self
2522 .identity_cache
2523 .iter()
2524 .min_by_key(|(_, entry)| entry.last_seen_ms)
2525 .map(|(k, _)| *k)
2526 {
2527 self.identity_cache.remove(&oldest_key);
2528 }
2529 true
2530 }
2531
2532 pub(crate) fn lookup_by_fips_prefix(
2534 &mut self,
2535 prefix: &[u8; 15],
2536 ) -> Option<(NodeAddr, secp256k1::PublicKey)> {
2537 if let Some(entry) = self.identity_cache.get_mut(prefix) {
2538 entry.last_seen_ms = Self::now_ms(); Some((entry.node_addr, entry.pubkey))
2540 } else {
2541 None
2542 }
2543 }
2544
2545 pub(crate) fn has_cached_identity(&self, addr: &NodeAddr) -> bool {
2547 let mut prefix = [0u8; 15];
2548 prefix.copy_from_slice(&addr.as_bytes()[0..15]);
2549 self.identity_cache.contains_key(&prefix)
2550 }
2551
2552 pub fn identity_cache_len(&self) -> usize {
2554 self.identity_cache.len()
2555 }
2556
2557 pub fn identity_cache_iter(
2562 &self,
2563 ) -> impl Iterator<Item = (&NodeAddr, &secp256k1::PublicKey, u64)> {
2564 self.identity_cache
2565 .values()
2566 .map(|entry| (&entry.node_addr, &entry.pubkey, entry.last_seen_ms))
2567 }
2568
2569 pub fn identity_cache_max(&self) -> usize {
2571 self.config.node.cache.identity_size
2572 }
2573
2574 pub fn pending_lookup_count(&self) -> usize {
2576 self.pending_lookups.len()
2577 }
2578
2579 pub fn pending_lookups_iter(
2581 &self,
2582 ) -> impl Iterator<Item = (&NodeAddr, &handlers::discovery::PendingLookup)> {
2583 self.pending_lookups.iter()
2584 }
2585
2586 pub fn recent_request_count(&self) -> usize {
2588 self.recent_requests.len()
2589 }
2590
2591 pub fn pending_tun_destinations(&self) -> usize {
2593 self.pending_tun_packets.len()
2594 }
2595
2596 pub fn pending_tun_total_packets(&self) -> usize {
2598 self.pending_tun_packets.values().map(|q| q.len()).sum()
2599 }
2600
2601 pub fn retry_state_iter(&self) -> impl Iterator<Item = (&NodeAddr, &retry::RetryState)> {
2603 self.retry_pending.iter()
2604 }
2605
2606 pub(crate) fn is_tree_peer(&self, peer_addr: &NodeAddr) -> bool {
2613 if !self.tree_state.is_root() && self.tree_state.my_declaration().parent_id() == peer_addr {
2615 return true;
2616 }
2617 if let Some(decl) = self.tree_state.peer_declaration(peer_addr)
2619 && decl.parent_id() == self.node_addr()
2620 {
2621 return true;
2622 }
2623 false
2624 }
2625
2626 pub fn find_next_hop(&mut self, dest_node_addr: &NodeAddr) -> Option<&ActivePeer> {
2650 if dest_node_addr == self.node_addr() {
2652 return None;
2653 }
2654 let now_ms = Self::now_ms();
2655 let direct_session_degraded =
2656 self.session_direct_path_blocks_direct_payload(dest_node_addr, now_ms);
2657
2658 let healthy_direct_route = self
2659 .peers
2660 .get(dest_node_addr)
2661 .filter(|peer| peer.is_healthy() && !direct_session_degraded)
2662 .map(|_| *dest_node_addr);
2663 if let Some(direct_addr) = healthy_direct_route
2664 && self
2665 .peers
2666 .get(&direct_addr)
2667 .is_some_and(|peer| peer.link_cost() <= 1.0 + ROUTING_FALLBACK_MIN_COST_ADVANTAGE)
2668 {
2669 return self.peers.get(&direct_addr);
2670 }
2671 let direct_payload_eligible = healthy_direct_route.is_some();
2672 let payload_candidate_can_send = |addr: &NodeAddr, peer: &ActivePeer| {
2673 if addr == dest_node_addr {
2674 direct_payload_eligible
2675 } else {
2676 peer.is_healthy()
2677 }
2678 };
2679
2680 let fallback_beats_direct = |node: &Self, fallback_addr: NodeAddr| {
2685 node.route_candidate_beats_direct(healthy_direct_route, fallback_addr)
2686 };
2687
2688 let sendable_learned_peers = if self.config.node.routing.mode == RoutingMode::ReplyLearned {
2689 Some(
2690 self.peers
2691 .iter()
2692 .filter(|(addr, peer)| payload_candidate_can_send(addr, peer))
2693 .map(|(addr, _)| *addr)
2694 .collect::<HashSet<_>>(),
2695 )
2696 } else {
2697 None
2698 };
2699
2700 let explore_fallback = sendable_learned_peers.as_ref().is_some_and(|sendable| {
2707 self.learned_routes.should_explore_fallback(
2708 dest_node_addr,
2709 now_ms,
2710 self.config.node.routing.learned_fallback_explore_interval,
2711 |addr| sendable.contains(addr),
2712 )
2713 });
2714 if let Some(sendable) = &sendable_learned_peers
2715 && !explore_fallback
2716 {
2717 let eligible = sendable
2718 .iter()
2719 .copied()
2720 .filter(|addr| fallback_beats_direct(self, *addr))
2721 .collect::<HashSet<_>>();
2722 if !eligible.is_empty()
2723 && let Some(next_hop_addr) =
2724 self.learned_routes
2725 .select_next_hop(dest_node_addr, now_ms, |addr| eligible.contains(addr))
2726 {
2727 return self.peers.get(&next_hop_addr);
2728 }
2729 }
2730
2731 let Some(dest_coords) = self
2733 .coord_cache
2734 .get_and_touch(dest_node_addr, now_ms)
2735 .cloned()
2736 else {
2737 if (healthy_direct_route.is_none() || explore_fallback)
2738 && let Some(sendable) = &sendable_learned_peers
2739 && let Some(next_hop_addr) =
2740 self.learned_routes
2741 .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2742 {
2743 return self.peers.get(&next_hop_addr);
2744 }
2745 if let Some(direct_addr) = healthy_direct_route {
2746 return self.peers.get(&direct_addr);
2747 }
2748 return None;
2749 };
2750
2751 let coordinate_route_addr = {
2754 let candidates: Vec<&ActivePeer> = self
2755 .peers
2756 .iter()
2757 .filter(|(addr, peer)| {
2758 payload_candidate_can_send(addr, peer) && peer.may_reach(dest_node_addr)
2759 })
2760 .map(|(_, peer)| peer)
2761 .collect();
2762 if !candidates.is_empty() {
2763 self.select_best_candidate(&candidates, &dest_coords)
2764 .map(|peer| *peer.node_addr())
2765 } else {
2766 None
2767 }
2768 };
2769 if let Some(next_hop_addr) = coordinate_route_addr
2770 && fallback_beats_direct(self, next_hop_addr)
2771 {
2772 return self.peers.get(&next_hop_addr);
2773 }
2774
2775 let tree_route_addr = self.select_tree_payload_candidate(
2777 &dest_coords,
2778 dest_node_addr,
2779 direct_payload_eligible,
2780 );
2781 if let Some(next_hop_addr) = tree_route_addr
2782 && fallback_beats_direct(self, next_hop_addr)
2783 {
2784 return self.peers.get(&next_hop_addr);
2785 }
2786
2787 if explore_fallback {
2788 return sendable_learned_peers.as_ref().and_then(|sendable| {
2789 self.learned_routes
2790 .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2791 .and_then(|next_hop_addr| self.peers.get(&next_hop_addr))
2792 });
2793 }
2794
2795 if let Some(direct_addr) = healthy_direct_route {
2796 return self.peers.get(&direct_addr);
2797 }
2798
2799 if let Some(sendable) = &sendable_learned_peers
2800 && let Some(next_hop_addr) =
2801 self.learned_routes
2802 .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2803 {
2804 return self.peers.get(&next_hop_addr);
2805 }
2806
2807 None
2808 }
2809
2810 pub(in crate::node) fn find_transit_next_hop(
2811 &mut self,
2812 dest_node_addr: &NodeAddr,
2813 previous_hop: &NodeAddr,
2814 ) -> Option<NodeAddr> {
2815 if dest_node_addr == self.node_addr() {
2816 return None;
2817 }
2818
2819 if dest_node_addr != previous_hop
2820 && self
2821 .peers
2822 .get(dest_node_addr)
2823 .is_some_and(|peer| peer.is_healthy())
2824 {
2825 return Some(*dest_node_addr);
2826 }
2827
2828 let next_hop_addr = *self.find_next_hop(dest_node_addr)?.node_addr();
2829 if &next_hop_addr == previous_hop {
2830 self.record_route_failure(*dest_node_addr, next_hop_addr);
2831 return None;
2832 }
2833 Some(next_hop_addr)
2834 }
2835
2836 fn route_candidate_beats_direct(
2837 &self,
2838 healthy_direct_route: Option<NodeAddr>,
2839 candidate_addr: NodeAddr,
2840 ) -> bool {
2841 let Some(direct_addr) = healthy_direct_route else {
2842 return true;
2843 };
2844 if candidate_addr == direct_addr {
2845 return false;
2846 }
2847
2848 let Some(direct) = self.peers.get(&direct_addr) else {
2849 return true;
2850 };
2851 let Some(candidate) = self.peers.get(&candidate_addr) else {
2852 return false;
2853 };
2854 if !candidate.is_healthy() {
2855 return false;
2856 }
2857
2858 let direct_cost = direct.link_cost();
2859 let candidate_cost = candidate.link_cost();
2860 candidate_cost + ROUTING_FALLBACK_MIN_COST_ADVANTAGE < direct_cost
2861 }
2862
2863 fn select_tree_payload_candidate(
2864 &self,
2865 dest_coords: &crate::tree::TreeCoordinate,
2866 direct_dest: &NodeAddr,
2867 direct_payload_eligible: bool,
2868 ) -> Option<NodeAddr> {
2869 if self.tree_state.my_coords().root_id() != dest_coords.root_id() {
2870 return None;
2871 }
2872
2873 let my_distance = self.tree_state.my_coords().distance_to(dest_coords);
2874 let mut best: Option<(NodeAddr, usize)> = None;
2875
2876 for (peer_addr, peer) in &self.peers {
2877 if peer_addr == direct_dest {
2878 if !direct_payload_eligible {
2879 continue;
2880 }
2881 } else if !peer.is_healthy() {
2882 continue;
2883 }
2884
2885 let Some(peer_coords) = self.tree_state.peer_coords(peer_addr) else {
2886 continue;
2887 };
2888 let distance = peer_coords.distance_to(dest_coords);
2889 if distance >= my_distance {
2890 continue;
2891 }
2892
2893 let dominated = match &best {
2894 None => true,
2895 Some((best_id, best_dist)) => {
2896 distance < *best_dist || (distance == *best_dist && peer_addr < best_id)
2897 }
2898 };
2899 if dominated {
2900 best = Some((*peer_addr, distance));
2901 }
2902 }
2903
2904 best.map(|(peer_addr, _)| peer_addr)
2905 }
2906
2907 pub(in crate::node) fn session_direct_path_is_degraded(
2908 &mut self,
2909 dest: &NodeAddr,
2910 now_ms: u64,
2911 ) -> bool {
2912 match self.session_direct_degraded_until_ms.get(dest).copied() {
2913 Some(until_ms) if until_ms > now_ms => true,
2914 Some(_) => {
2915 self.session_direct_degraded_until_ms.remove(dest);
2916 false
2917 }
2918 None => false,
2919 }
2920 }
2921
2922 pub(in crate::node) fn session_direct_path_blocks_direct_payload(
2923 &mut self,
2924 dest: &NodeAddr,
2925 now_ms: u64,
2926 ) -> bool {
2927 self.session_direct_path_is_degraded(dest, now_ms)
2928 && !self.active_peer_uses_configured_static_udp_path(dest)
2929 }
2930
2931 pub(in crate::node) fn mark_session_direct_path_degraded(
2932 &mut self,
2933 dest: NodeAddr,
2934 now_ms: u64,
2935 ) -> bool {
2936 let until_ms = now_ms.saturating_add(SESSION_DIRECT_DEGRADED_HOLD_MS);
2937 let entry = self
2938 .session_direct_degraded_until_ms
2939 .entry(dest)
2940 .or_insert(0);
2941 let was_degraded = *entry > now_ms;
2942 *entry = (*entry).max(until_ms);
2943 !was_degraded
2944 }
2945
2946 pub(in crate::node) fn clear_session_direct_path_degraded(&mut self, dest: &NodeAddr) -> bool {
2947 self.session_direct_degraded_until_ms.remove(dest).is_some()
2948 }
2949
2950 pub(in crate::node) fn learn_reverse_route(
2951 &mut self,
2952 destination: NodeAddr,
2953 next_hop: NodeAddr,
2954 ) {
2955 if self.config.node.routing.mode != RoutingMode::ReplyLearned
2956 || destination == *self.node_addr()
2957 {
2958 return;
2959 }
2960 let now_ms = Self::now_ms();
2961 self.learned_routes.learn(
2962 destination,
2963 next_hop,
2964 now_ms,
2965 self.config.node.routing.learned_ttl_secs,
2966 self.config.node.routing.max_learned_routes_per_dest,
2967 );
2968 }
2969
2970 pub(in crate::node) fn record_route_failure(
2971 &mut self,
2972 destination: NodeAddr,
2973 next_hop: NodeAddr,
2974 ) {
2975 if self.config.node.routing.mode != RoutingMode::ReplyLearned {
2976 return;
2977 }
2978 self.learned_routes.record_failure(&destination, &next_hop);
2979 }
2980
2981 pub(crate) fn learned_route_table_snapshot(&self, now_ms: u64) -> LearnedRouteTableSnapshot {
2982 self.learned_routes.snapshot(now_ms)
2983 }
2984
2985 pub(in crate::node) fn purge_learned_routes(&mut self, now_ms: u64) {
2986 self.learned_routes.purge_expired(now_ms);
2987 }
2988
2989 fn select_best_candidate<'a>(
2998 &'a self,
2999 candidates: &[&'a ActivePeer],
3000 dest_coords: &crate::tree::TreeCoordinate,
3001 ) -> Option<&'a ActivePeer> {
3002 let my_distance = self.tree_state.my_coords().distance_to(dest_coords);
3003
3004 let mut best: Option<(&ActivePeer, f64, usize)> = None;
3005
3006 for &candidate in candidates {
3007 if !candidate.can_send() {
3008 continue;
3009 }
3010
3011 let cost = candidate.link_cost();
3012
3013 let dist = self
3014 .tree_state
3015 .peer_coords(candidate.node_addr())
3016 .map(|pc| pc.distance_to(dest_coords))
3017 .unwrap_or(usize::MAX);
3018
3019 if dist >= my_distance {
3022 continue;
3023 }
3024
3025 let dominated = match &best {
3026 None => true,
3027 Some((_, best_cost, best_dist)) => {
3028 cost < *best_cost
3029 || (cost == *best_cost && dist < *best_dist)
3030 || (cost == *best_cost
3031 && dist == *best_dist
3032 && candidate.node_addr() < best.as_ref().unwrap().0.node_addr())
3033 }
3034 };
3035
3036 if dominated {
3037 best = Some((candidate, cost, dist));
3038 }
3039 }
3040
3041 best.map(|(peer, _, _)| peer)
3042 }
3043
3044 pub fn destination_in_filters(&self, dest: &NodeAddr) -> Vec<&ActivePeer> {
3046 self.peers.values().filter(|p| p.may_reach(dest)).collect()
3047 }
3048
3049 pub fn tun_tx(&self) -> Option<&TunTx> {
3053 self.tun_tx.as_ref()
3054 }
3055
3056 pub fn attach_external_packet_io(
3063 &mut self,
3064 capacity: usize,
3065 ) -> Result<ExternalPacketIo, NodeError> {
3066 if self.state != NodeState::Created {
3067 return Err(NodeError::Config(ConfigError::Validation(
3068 "external packet I/O must be attached before node start".to_string(),
3069 )));
3070 }
3071 if self.config.tun.enabled {
3072 return Err(NodeError::Config(ConfigError::Validation(
3073 "external packet I/O requires tun.enabled=false".to_string(),
3074 )));
3075 }
3076
3077 let capacity = capacity.max(1);
3078 let (outbound_tx, outbound_rx) = tokio::sync::mpsc::channel(capacity);
3079 let (inbound_tx, inbound_rx) = tokio::sync::mpsc::channel(capacity);
3080 self.tun_outbound_rx = Some(outbound_rx);
3081 self.external_packet_tx = Some(inbound_tx);
3082
3083 Ok(ExternalPacketIo {
3084 outbound_tx,
3085 inbound_rx,
3086 })
3087 }
3088
3089 pub(crate) fn attach_endpoint_data_io(
3094 &mut self,
3095 capacity: usize,
3096 ) -> Result<EndpointDataIo, NodeError> {
3097 if self.state != NodeState::Created {
3098 return Err(NodeError::Config(ConfigError::Validation(
3099 "endpoint data I/O must be attached before node start".to_string(),
3100 )));
3101 }
3102
3103 let command_capacity = endpoint_data_command_capacity(capacity);
3104 let (command_tx, command_rx) = tokio::sync::mpsc::channel(command_capacity);
3105 let (event_tx, event_rx) = tokio::sync::mpsc::unbounded_channel();
3110 self.endpoint_command_rx = Some(command_rx);
3111 self.endpoint_event_tx = Some(event_tx.clone());
3112
3113 Ok(EndpointDataIo {
3114 command_tx,
3115 event_rx,
3116 event_tx,
3117 })
3118 }
3119
3120 pub(crate) fn pubkey_for_node_addr(&self, addr: &NodeAddr) -> Option<secp256k1::PublicKey> {
3121 let mut prefix = [0u8; 15];
3122 prefix.copy_from_slice(&addr.as_bytes()[0..15]);
3123 self.identity_cache
3124 .get(&prefix)
3125 .filter(|entry| &entry.node_addr == addr)
3126 .map(|entry| entry.pubkey)
3127 }
3128
3129 pub(crate) fn npub_for_node_addr(&self, addr: &NodeAddr) -> Option<String> {
3130 let mut prefix = [0u8; 15];
3131 prefix.copy_from_slice(&addr.as_bytes()[0..15]);
3132 self.identity_cache
3133 .get(&prefix)
3134 .filter(|entry| &entry.node_addr == addr)
3135 .map(|entry| entry.npub.clone())
3136 }
3137
3138 pub(in crate::node) fn deliver_external_ipv6_packet(
3139 &self,
3140 src_addr: &NodeAddr,
3141 packet: Vec<u8>,
3142 ) {
3143 let Some(external_packet_tx) = &self.external_packet_tx else {
3144 return;
3145 };
3146 if packet.len() < 40 {
3147 return;
3148 }
3149 let Ok(destination) = FipsAddress::from_slice(&packet[24..40]) else {
3150 return;
3151 };
3152 let delivered = NodeDeliveredPacket {
3153 source_node_addr: *src_addr,
3154 source_npub: self.npub_for_node_addr(src_addr),
3155 destination,
3156 packet,
3157 };
3158 if let Err(error) = external_packet_tx.try_send(delivered) {
3159 debug!(error = %error, "Failed to deliver packet to external app sink");
3160 }
3161 }
3162
3163 pub(super) async fn send_encrypted_link_message(
3177 &mut self,
3178 node_addr: &NodeAddr,
3179 plaintext: &[u8],
3180 ) -> Result<(), NodeError> {
3181 self.send_encrypted_link_message_with_ce(node_addr, plaintext, false)
3182 .await
3183 }
3184
3185 pub(in crate::node) fn note_local_send_outcome(
3191 &mut self,
3192 node_addr: &NodeAddr,
3193 result: &Result<usize, TransportError>,
3194 ) {
3195 match result {
3196 Ok(_) => {
3197 self.local_send_failure_at_by_peer.remove(node_addr);
3198 }
3199 Err(error) if error.is_local_route_unavailable() => {
3200 self.local_send_failure_at_by_peer
3201 .insert(*node_addr, std::time::Instant::now());
3202 }
3203 Err(_) => {}
3204 }
3205 }
3206
3207 pub(in crate::node) fn local_send_failure_dead_timeout_for_peer(
3213 &self,
3214 node_addr: &NodeAddr,
3215 now: std::time::Instant,
3216 dead_timeout: std::time::Duration,
3217 fast_dead_timeout: std::time::Duration,
3218 ) -> std::time::Duration {
3219 match self.local_send_failure_at_by_peer.get(node_addr).copied() {
3220 Some(t) if now.duration_since(t) <= LOCAL_SEND_FAILURE_FAST_DEAD_WINDOW => {
3221 fast_dead_timeout.min(dead_timeout)
3222 }
3223 None => dead_timeout,
3224 Some(_) => dead_timeout,
3225 }
3226 }
3227
3228 pub(in crate::node) fn purge_expired_local_send_failures(&mut self, now: std::time::Instant) {
3229 self.local_send_failure_at_by_peer
3230 .retain(|_, at| now.duration_since(*at) <= LOCAL_SEND_FAILURE_FAST_DEAD_WINDOW);
3231 }
3232
3233 pub(in crate::node) fn mark_rx_loop_maintenance_timeout(&mut self) {
3234 self.last_rx_loop_maintenance_timeout_at = Some(std::time::Instant::now());
3235 }
3236
3237 pub(in crate::node) fn rx_loop_maintenance_timed_out_recently(&self) -> bool {
3238 let Some(t) = self.last_rx_loop_maintenance_timeout_at else {
3239 return false;
3240 };
3241 let grace = std::time::Duration::from_secs(self.config.node.link_dead_timeout_secs.max(1));
3242 std::time::Instant::now().duration_since(t) <= grace
3243 }
3244
3245 pub(super) async fn send_encrypted_link_message_with_ce(
3249 &mut self,
3250 node_addr: &NodeAddr,
3251 plaintext: &[u8],
3252 ce_flag: bool,
3253 ) -> Result<(), NodeError> {
3254 let peer = self
3255 .peers
3256 .get_mut(node_addr)
3257 .ok_or(NodeError::PeerNotFound(*node_addr))?;
3258
3259 let their_index = peer.their_index().ok_or_else(|| NodeError::SendFailed {
3260 node_addr: *node_addr,
3261 reason: "no their_index".into(),
3262 })?;
3263 let transport_id = peer.transport_id().ok_or_else(|| NodeError::SendFailed {
3264 node_addr: *node_addr,
3265 reason: "no transport_id".into(),
3266 })?;
3267 let remote_addr = peer
3268 .current_addr()
3269 .cloned()
3270 .ok_or_else(|| NodeError::SendFailed {
3271 node_addr: *node_addr,
3272 reason: "no current_addr".into(),
3273 })?;
3274 #[cfg(any(target_os = "linux", target_os = "macos"))]
3275 let connected_socket = peer.connected_udp();
3276
3277 let timestamp_ms = peer.session_elapsed_ms();
3279
3280 let sp_flag = peer.mmp().map(|mmp| mmp.spin_bit.tx_bit()).unwrap_or(false);
3282 let mut flags = if sp_flag { FLAG_SP } else { 0 };
3283 if ce_flag {
3284 flags |= FLAG_CE;
3285 }
3286 if peer.current_k_bit() {
3287 flags |= FLAG_KEY_EPOCH;
3288 }
3289
3290 let session = peer
3291 .noise_session_mut()
3292 .ok_or_else(|| NodeError::SendFailed {
3293 node_addr: *node_addr,
3294 reason: "no noise session".into(),
3295 })?;
3296
3297 const INNER_TS_LEN: usize = 4;
3305 let counter = session.current_send_counter();
3306 let inner_len = INNER_TS_LEN + plaintext.len();
3307 let payload_len = inner_len as u16;
3308 let header = build_established_header(their_index, counter, flags, payload_len);
3309
3310 let transport_for_send = self
3332 .transports
3333 .get(&transport_id)
3334 .ok_or(NodeError::TransportNotFound(transport_id))?;
3335 match transport_for_send.connection_state(&remote_addr) {
3336 ConnectionState::Connected => {}
3337 other => {
3338 if matches!(other, ConnectionState::None) {
3339 let _ = transport_for_send.connect(&remote_addr).await;
3340 }
3341 return Err(NodeError::SendFailed {
3342 node_addr: *node_addr,
3343 reason: format!("transport connection not ready: {:?}", other),
3344 });
3345 }
3346 }
3347 #[cfg(unix)]
3348 {
3349 let is_udp = matches!(transport_for_send, TransportHandle::Udp(_));
3350 if let Some(workers) = self.encrypt_workers.as_ref().cloned()
3351 && is_udp
3352 && let Some(cipher_clone) = session.send_cipher_clone()
3353 {
3354 let reserved_counter =
3358 session
3359 .take_send_counter()
3360 .map_err(|e| NodeError::SendFailed {
3361 node_addr: *node_addr,
3362 reason: format!("counter reservation failed: {}", e),
3363 })?;
3364 debug_assert_eq!(reserved_counter, counter);
3365 let header =
3369 build_established_header(their_index, reserved_counter, flags, payload_len);
3370 let transport = transport_for_send;
3371 let send_target = {
3378 if let TransportHandle::Udp(udp) = transport {
3379 let socket_addr = {
3380 #[cfg(any(target_os = "linux", target_os = "macos"))]
3381 {
3382 match connected_socket.as_ref() {
3383 Some(socket) => Some(socket.peer_addr()),
3384 None => udp.resolve_for_off_task(&remote_addr).await.ok(),
3385 }
3386 }
3387 #[cfg(not(any(target_os = "linux", target_os = "macos")))]
3388 {
3389 udp.resolve_for_off_task(&remote_addr).await.ok()
3390 }
3391 };
3392 match (udp.async_socket(), socket_addr) {
3393 (Some(socket), Some(socket_addr)) => Some((socket, socket_addr)),
3394 _ => None,
3395 }
3396 } else {
3397 None
3398 }
3399 };
3400 if let Some((socket, socket_addr)) = send_target {
3401 let wire_capacity = ESTABLISHED_HEADER_SIZE + inner_len + 16;
3417 let mut wire_buf = Vec::with_capacity(wire_capacity);
3418 wire_buf.extend_from_slice(&header);
3419 wire_buf.extend_from_slice(×tamp_ms.to_le_bytes());
3420 wire_buf.extend_from_slice(plaintext);
3421 let predicted_bytes = wire_capacity;
3422 if let Some(peer) = self.peers.get_mut(node_addr) {
3429 peer.link_stats_mut().record_sent(predicted_bytes);
3430 if let Some(mmp) = peer.mmp_mut() {
3431 mmp.sender
3432 .record_sent(reserved_counter, timestamp_ms, predicted_bytes);
3433 }
3434 }
3435 let scheduling_weight = self.send_weight_for_peer(node_addr);
3436 let traffic_class = classify_fmp_plaintext_traffic(plaintext);
3437 workers.dispatch(self::encrypt_worker::FmpSendJob {
3438 cipher: cipher_clone,
3439 counter: reserved_counter,
3440 wire_buf,
3441 fsp_seal: None,
3442 socket,
3443 dest_addr: socket_addr,
3444 #[cfg(any(target_os = "linux", target_os = "macos"))]
3445 connected_socket,
3446 bulk_endpoint_data: traffic_class.bulk_endpoint_data,
3447 drop_on_backpressure: traffic_class.drop_on_backpressure,
3448 scheduling_weight,
3449 queued_at: crate::perf_profile::stamp(),
3450 });
3451 return Ok(());
3452 }
3453 }
3454 }
3455
3456 let inner_plaintext = prepend_inner_header(timestamp_ms, plaintext);
3461 let ciphertext = {
3463 let _t = crate::perf_profile::Timer::start(crate::perf_profile::Stage::FmpEncrypt);
3464 session
3465 .encrypt_with_aad(&inner_plaintext, &header)
3466 .map_err(|e| NodeError::SendFailed {
3467 node_addr: *node_addr,
3468 reason: format!("encryption failed: {}", e),
3469 })?
3470 };
3471
3472 let wire_packet = build_encrypted(&header, &ciphertext);
3473
3474 let send_result = {
3476 let _t = crate::perf_profile::Timer::start(crate::perf_profile::Stage::UdpSend);
3477 let transport = self
3478 .transports
3479 .get(&transport_id)
3480 .ok_or(NodeError::TransportNotFound(transport_id))?;
3481 transport.send(&remote_addr, &wire_packet).await
3482 };
3483 self.note_local_send_outcome(node_addr, &send_result);
3484 let bytes_sent = send_result.map_err(|e| match e {
3485 TransportError::MtuExceeded { packet_size, mtu } => NodeError::MtuExceeded {
3486 node_addr: *node_addr,
3487 packet_size,
3488 mtu,
3489 },
3490 other => NodeError::SendFailed {
3491 node_addr: *node_addr,
3492 reason: format!("transport send: {}", other),
3493 },
3494 })?;
3495
3496 if let Some(peer) = self.peers.get_mut(node_addr) {
3498 peer.link_stats_mut().record_sent(bytes_sent);
3499 if let Some(mmp) = peer.mmp_mut() {
3501 mmp.sender.record_sent(counter, timestamp_ms, bytes_sent);
3502 }
3503 }
3504
3505 Ok(())
3506 }
3507}
3508
3509impl fmt::Debug for Node {
3510 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3511 f.debug_struct("Node")
3512 .field("node_addr", self.node_addr())
3513 .field("state", &self.state)
3514 .field("is_leaf_only", &self.is_leaf_only)
3515 .field("connections", &self.connection_count())
3516 .field("peers", &self.peer_count())
3517 .field("links", &self.link_count())
3518 .field("transports", &self.transport_count())
3519 .finish()
3520 }
3521}