1mod acl;
8mod bloom;
9mod decrypt_worker;
10mod discovery_rate_limit;
11mod encrypt_worker;
12mod handlers;
13mod lifecycle;
14mod rate_limit;
15mod retry;
16mod routing;
17mod routing_error_rate_limit;
18pub(crate) mod session;
19pub(crate) mod session_wire;
20pub(crate) mod stats;
21pub(crate) mod stats_history;
22#[cfg(test)]
23mod tests;
24mod tree;
25pub(crate) mod wire;
26
27use self::discovery_rate_limit::{DiscoveryBackoff, DiscoveryForwardRateLimiter};
28use self::rate_limit::HandshakeRateLimiter;
29use self::routing::{LearnedRouteTable, LearnedRouteTableSnapshot};
30use self::routing_error_rate_limit::RoutingErrorRateLimiter;
31#[cfg(unix)]
32use self::wire::ESTABLISHED_HEADER_SIZE;
33use self::wire::{
34 FLAG_CE, FLAG_KEY_EPOCH, FLAG_SP, build_encrypted, build_established_header,
35 prepend_inner_header,
36};
37use crate::bloom::{BloomFilter, BloomState};
38use crate::cache::CoordCache;
39use crate::config::{NostrDiscoveryPolicy, PeerConfig, RoutingMode};
40use crate::node::session::SessionEntry;
41use crate::node::session_wire::{FSP_PHASE_ESTABLISHED, FspCommonPrefix};
42use crate::peer::{ActivePeer, PeerConnection};
43#[cfg(any(target_os = "linux", target_os = "macos"))]
44use crate::transport::ethernet::EthernetTransport;
45use crate::transport::tcp::TcpTransport;
46use crate::transport::tor::TorTransport;
47use crate::transport::udp::UdpTransport;
48#[cfg(feature = "webrtc-transport")]
49use crate::transport::webrtc::WebRtcTransport;
50use crate::transport::{
51 ConnectionState, Link, LinkId, PacketRx, PacketTx, TransportAddr, TransportError,
52 TransportHandle, TransportId,
53};
54use crate::tree::TreeState;
55use crate::upper::hosts::HostMap;
56use crate::upper::icmp_rate_limit::IcmpRateLimiter;
57use crate::upper::tun::{TunError, TunOutboundRx, TunState, TunTx};
58use crate::utils::index::IndexAllocator;
59use crate::{
60 Config, ConfigError, FipsAddress, Identity, IdentityError, LinkMessageType, NodeAddr,
61 PeerIdentity, encode_npub,
62};
63use rand::Rng;
64use std::collections::{HashMap, HashSet, VecDeque};
65use std::fmt;
66use std::sync::Arc;
67use std::thread::JoinHandle;
68use thiserror::Error;
69use tracing::{debug, warn};
70
71const LOCAL_SEND_FAILURE_FAST_DEAD_WINDOW: std::time::Duration = std::time::Duration::from_secs(3);
72const SESSION_DIRECT_DEGRADED_HOLD_MS: u64 = 20_000;
73const SESSION_DIRECT_DEGRADED_MIN_SAMPLE: u64 = 16;
74const SESSION_DIRECT_DEGRADED_LOSS_THRESHOLD: f64 = 0.08;
75const SESSION_DIRECT_RECOVERY_LOSS_THRESHOLD: f64 = 0.02;
76const ROUTING_FALLBACK_MIN_COST_ADVANTAGE: f64 = 0.25;
77
78#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
79struct FmpPlaintextTrafficClass {
80 bulk_endpoint_data: bool,
81 drop_on_backpressure: bool,
82}
83
84#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
85struct EndpointPayloadTrafficClass {
86 bulk_endpoint_data: bool,
87 drop_on_backpressure: bool,
88}
89
90fn classify_fmp_plaintext_traffic(plaintext: &[u8]) -> FmpPlaintextTrafficClass {
91 let bulk_endpoint_data = fmp_plaintext_is_bulk_session_datagram(plaintext);
92 FmpPlaintextTrafficClass {
97 bulk_endpoint_data,
98 drop_on_backpressure: false,
99 }
100}
101
102fn fmp_plaintext_is_bulk_session_datagram(plaintext: &[u8]) -> bool {
103 if plaintext
104 .first()
105 .is_none_or(|ty| *ty != LinkMessageType::SessionDatagram.to_byte())
106 {
107 return false;
108 }
109 let Some(fsp_payload) = plaintext.get(crate::protocol::SESSION_DATAGRAM_HEADER_SIZE..) else {
110 return false;
111 };
112 FspCommonPrefix::parse(fsp_payload).is_some_and(|prefix| {
113 prefix.phase == FSP_PHASE_ESTABLISHED && !prefix.is_unencrypted() && !prefix.has_coords()
114 })
115}
116
117fn classify_endpoint_payload(payload: &[u8]) -> EndpointPayloadTrafficClass {
118 const IPPROTO_TCP: u8 = 6;
119 const IPPROTO_ICMPV6: u8 = 58;
120
121 match parse_endpoint_payload_ip_proto(payload) {
122 Some((IPPROTO_ICMPV6, _)) => EndpointPayloadTrafficClass::default(),
123 Some((IPPROTO_TCP, offset)) => {
124 let latency_sensitive = endpoint_tcp_payload_is_latency_sensitive(payload, offset);
125 EndpointPayloadTrafficClass {
126 bulk_endpoint_data: !latency_sensitive,
127 drop_on_backpressure: false,
128 }
129 }
130 _ => EndpointPayloadTrafficClass {
131 bulk_endpoint_data: true,
132 drop_on_backpressure: true,
133 },
134 }
135}
136
137fn endpoint_tcp_payload_is_latency_sensitive(payload: &[u8], tcp_offset: usize) -> bool {
138 const TCP_MIN_HEADER_LEN: usize = 20;
139 const TCP_FLAG_FIN: u8 = 0x01;
140 const TCP_FLAG_SYN: u8 = 0x02;
141 const TCP_FLAG_RST: u8 = 0x04;
142 const INTERACTIVE_TCP_PAYLOAD_MAX: usize = 256;
143
144 if payload.len() < tcp_offset + TCP_MIN_HEADER_LEN {
145 return true;
146 }
147
148 let tcp_header_len = usize::from(payload[tcp_offset + 12] >> 4) * 4;
149 if tcp_header_len < TCP_MIN_HEADER_LEN || payload.len() < tcp_offset + tcp_header_len {
150 return true;
151 }
152
153 let flags = payload[tcp_offset + 13];
154 if flags & (TCP_FLAG_FIN | TCP_FLAG_SYN | TCP_FLAG_RST) != 0 {
155 return true;
156 }
157
158 let payload_len = endpoint_ip_payload_len(payload)
159 .and_then(|ip_payload_len| ip_payload_len.checked_sub(tcp_header_len))
160 .unwrap_or_else(|| payload.len().saturating_sub(tcp_offset + tcp_header_len));
161 payload_len <= INTERACTIVE_TCP_PAYLOAD_MAX
162}
163
164fn endpoint_ip_payload_len(payload: &[u8]) -> Option<usize> {
165 const IPV4_MIN_HEADER_LEN: usize = 20;
166 const IPV6_HEADER_LEN: usize = 40;
167
168 let version_ihl = payload.first().copied()?;
169 match version_ihl >> 4 {
170 4 => {
171 if payload.len() < IPV4_MIN_HEADER_LEN {
172 return None;
173 }
174 let header_len = usize::from(version_ihl & 0x0f) * 4;
175 if header_len < IPV4_MIN_HEADER_LEN || payload.len() < header_len {
176 return None;
177 }
178 let total_len = usize::from(u16::from_be_bytes([payload[2], payload[3]]));
179 total_len.checked_sub(header_len)
180 }
181 6 => {
182 if payload.len() < IPV6_HEADER_LEN {
183 return None;
184 }
185 Some(usize::from(u16::from_be_bytes([payload[4], payload[5]])))
186 }
187 _ => None,
188 }
189}
190
191fn parse_endpoint_payload_ip_proto(payload: &[u8]) -> Option<(u8, usize)> {
192 const IPV4_MIN_HEADER_LEN: usize = 20;
193
194 let version_ihl = payload.first().copied()?;
195
196 match version_ihl >> 4 {
197 4 => {
198 if payload.len() < IPV4_MIN_HEADER_LEN {
199 return None;
200 }
201 let header_len = usize::from(version_ihl & 0x0f) * 4;
202 if header_len >= IPV4_MIN_HEADER_LEN && payload.len() >= header_len {
203 Some((payload[9], header_len))
204 } else {
205 None
206 }
207 }
208 6 => ipv6_payload_next_header(payload),
209 _ => None,
210 }
211}
212
213#[cfg(test)]
214fn endpoint_payload_is_tcp(payload: &[u8]) -> bool {
215 const IPPROTO_TCP: u8 = 6;
216 parse_endpoint_payload_ip_proto(payload).is_some_and(|(proto, _)| proto == IPPROTO_TCP)
217}
218
219fn ipv6_payload_next_header(payload: &[u8]) -> Option<(u8, usize)> {
220 const IPV6_HEADER_LEN: usize = 40;
221 const IPV6_FRAGMENT_HEADER_LEN: usize = 8;
222
223 if payload.len() < IPV6_HEADER_LEN || payload[0] >> 4 != 6 {
224 return None;
225 }
226
227 let mut next_header = payload[6];
228 let mut offset = IPV6_HEADER_LEN;
229 let mut extension_count = 0usize;
230 while ipv6_extension_header_is_skippable(next_header) {
231 if next_header == 44 {
232 if payload.len() < offset + IPV6_FRAGMENT_HEADER_LEN {
233 return None;
234 }
235 next_header = payload[offset];
236 offset += IPV6_FRAGMENT_HEADER_LEN;
237 } else if next_header == 51 {
238 if payload.len() < offset + 2 {
239 return None;
240 }
241 let header_len = (usize::from(payload[offset + 1]) + 2) * 4;
242 if payload.len() < offset + header_len {
243 return None;
244 }
245 next_header = payload[offset];
246 offset += header_len;
247 } else {
248 if payload.len() < offset + 2 {
249 return None;
250 }
251 let header_len = (usize::from(payload[offset + 1]) + 1) * 8;
252 if payload.len() < offset + header_len {
253 return None;
254 }
255 next_header = payload[offset];
256 offset += header_len;
257 }
258 extension_count += 1;
259 if extension_count > 8 {
260 return None;
261 }
262 }
263
264 Some((next_header, offset))
265}
266
267fn ipv6_extension_header_is_skippable(next_header: u8) -> bool {
268 matches!(next_header, 0 | 43 | 44 | 51 | 60 | 135)
269}
270
271pub(crate) const REKEY_JITTER_SECS: i64 = 15;
278
279#[derive(Debug, Error)]
281pub enum NodeError {
282 #[error("node not started")]
283 NotStarted,
284
285 #[error("node already started")]
286 AlreadyStarted,
287
288 #[error("node already stopped")]
289 AlreadyStopped,
290
291 #[error("transport not found: {0}")]
292 TransportNotFound(TransportId),
293
294 #[error("no transport available for type: {0}")]
295 NoTransportForType(String),
296
297 #[error("link not found: {0}")]
298 LinkNotFound(LinkId),
299
300 #[error("connection not found: {0}")]
301 ConnectionNotFound(LinkId),
302
303 #[error("peer not found: {0:?}")]
304 PeerNotFound(NodeAddr),
305
306 #[error("peer already exists: {0:?}")]
307 PeerAlreadyExists(NodeAddr),
308
309 #[error("connection already exists for link: {0}")]
310 ConnectionAlreadyExists(LinkId),
311
312 #[error("invalid peer npub '{npub}': {reason}")]
313 InvalidPeerNpub { npub: String, reason: String },
314
315 #[error("discovery error: {0}")]
316 Discovery(String),
317
318 #[error("access denied: {0}")]
319 AccessDenied(String),
320
321 #[error("max connections exceeded: {max}")]
322 MaxConnectionsExceeded { max: usize },
323
324 #[error("max peers exceeded: {max}")]
325 MaxPeersExceeded { max: usize },
326
327 #[error("max links exceeded: {max}")]
328 MaxLinksExceeded { max: usize },
329
330 #[error("handshake incomplete for link {0}")]
331 HandshakeIncomplete(LinkId),
332
333 #[error("no session available for link {0}")]
334 NoSession(LinkId),
335
336 #[error("promotion failed for link {link_id}: {reason}")]
337 PromotionFailed { link_id: LinkId, reason: String },
338
339 #[error("send failed to {node_addr}: {reason}")]
340 SendFailed { node_addr: NodeAddr, reason: String },
341
342 #[error("mtu exceeded forwarding to {node_addr}: packet {packet_size} > mtu {mtu}")]
343 MtuExceeded {
344 node_addr: NodeAddr,
345 packet_size: usize,
346 mtu: u16,
347 },
348
349 #[error("config error: {0}")]
350 Config(#[from] ConfigError),
351
352 #[error("identity error: {0}")]
353 Identity(#[from] IdentityError),
354
355 #[error("TUN error: {0}")]
356 Tun(#[from] TunError),
357
358 #[error("index allocation failed: {0}")]
359 IndexAllocationFailed(String),
360
361 #[error("handshake failed: {0}")]
362 HandshakeFailed(String),
363
364 #[error("transport error: {0}")]
365 TransportError(String),
366
367 #[error("local route unavailable: {0}")]
368 LocalRouteUnavailable(String),
369
370 #[error("bootstrap handoff failed: {0}")]
371 BootstrapHandoff(String),
372}
373
374impl NodeError {
375 pub(in crate::node) fn from_transport_error(error: TransportError) -> Self {
376 if error.is_local_route_unavailable() {
377 Self::LocalRouteUnavailable(error.to_string())
378 } else {
379 Self::TransportError(error.to_string())
380 }
381 }
382
383 pub(in crate::node) fn is_local_route_unavailable(&self) -> bool {
384 matches!(self, Self::LocalRouteUnavailable(_))
385 }
386}
387
388#[derive(Debug, Clone, PartialEq, Eq)]
390pub struct NodeDeliveredPacket {
391 pub source_node_addr: NodeAddr,
393 pub source_npub: Option<String>,
395 pub destination: FipsAddress,
397 pub packet: Vec<u8>,
399}
400
401#[derive(Debug, Clone)]
402struct IdentityCacheEntry {
403 node_addr: NodeAddr,
404 pubkey: secp256k1::PublicKey,
405 npub: String,
406 last_seen_ms: u64,
407}
408
409impl IdentityCacheEntry {
410 fn new(
411 node_addr: NodeAddr,
412 pubkey: secp256k1::PublicKey,
413 npub: String,
414 last_seen_ms: u64,
415 ) -> Self {
416 Self {
417 node_addr,
418 pubkey,
419 npub,
420 last_seen_ms,
421 }
422 }
423}
424
425#[derive(Debug)]
427pub struct ExternalPacketIo {
428 pub outbound_tx: crate::upper::tun::TunOutboundTx,
430 pub inbound_rx: tokio::sync::mpsc::Receiver<NodeDeliveredPacket>,
432}
433
434#[derive(Debug)]
436pub(crate) struct EndpointDataIo {
437 pub(crate) command_tx: tokio::sync::mpsc::Sender<NodeEndpointCommand>,
446 pub(crate) event_rx: tokio::sync::mpsc::UnboundedReceiver<NodeEndpointEvent>,
456 pub(crate) event_tx: tokio::sync::mpsc::UnboundedSender<NodeEndpointEvent>,
462}
463
464fn endpoint_data_command_capacity(requested: usize) -> usize {
465 if let Ok(raw) = std::env::var("FIPS_ENDPOINT_DATA_QUEUE_CAP")
466 && let Ok(value) = raw.trim().parse::<usize>()
467 && value > 0
468 {
469 return value;
470 }
471
472 requested.max(1).max(32_768)
473}
474
475#[derive(Debug)]
477pub(crate) enum NodeEndpointCommand {
478 Send {
482 remote: PeerIdentity,
483 payload: Vec<u8>,
484 queued_at: Option<std::time::Instant>,
485 response_tx: tokio::sync::oneshot::Sender<Result<(), NodeError>>,
486 },
487 SendOneway {
493 remote: PeerIdentity,
494 payload: Vec<u8>,
495 queued_at: Option<std::time::Instant>,
496 },
497 PeerSnapshot {
498 response_tx: tokio::sync::oneshot::Sender<Vec<NodeEndpointPeer>>,
499 },
500 RelaySnapshot {
501 response_tx: tokio::sync::oneshot::Sender<Vec<NodeEndpointRelayStatus>>,
502 },
503 UpdateRelays {
504 advert_relays: Vec<String>,
505 dm_relays: Vec<String>,
506 response_tx: tokio::sync::oneshot::Sender<Result<(), NodeError>>,
507 },
508 UpdatePeers {
514 peers: Vec<crate::config::PeerConfig>,
515 response_tx: tokio::sync::oneshot::Sender<Result<UpdatePeersOutcome, NodeError>>,
516 },
517}
518
519#[derive(Debug, Clone, Default, PartialEq, Eq)]
521pub(crate) struct UpdatePeersOutcome {
522 pub(crate) added: usize,
523 pub(crate) removed: usize,
524 pub(crate) updated: usize,
525 pub(crate) unchanged: usize,
526}
527
528#[derive(Debug)]
530pub(crate) enum NodeEndpointEvent {
531 Data {
532 source_node_addr: NodeAddr,
533 source_npub: Option<String>,
534 payload: Vec<u8>,
535 queued_at: Option<std::time::Instant>,
536 },
537}
538
539#[derive(Debug, Clone, PartialEq, Eq)]
541pub(crate) struct NodeEndpointPeer {
542 pub(crate) npub: String,
543 pub(crate) connected: bool,
544 pub(crate) transport_addr: Option<String>,
545 pub(crate) transport_type: Option<String>,
546 pub(crate) link_id: u64,
547 pub(crate) srtt_ms: Option<u64>,
548 pub(crate) packets_sent: u64,
549 pub(crate) packets_recv: u64,
550 pub(crate) bytes_sent: u64,
551 pub(crate) bytes_recv: u64,
552 pub(crate) direct_probe_pending: bool,
553 pub(crate) direct_probe_after_ms: Option<u64>,
554}
555
556#[derive(Debug, Clone, PartialEq, Eq)]
558pub(crate) struct NodeEndpointRelayStatus {
559 pub(crate) url: String,
560 pub(crate) status: String,
561}
562
563#[derive(Clone, Copy, Debug, PartialEq, Eq)]
565pub enum NodeState {
566 Created,
568 Starting,
570 Running,
572 Stopping,
574 Stopped,
576}
577
578impl NodeState {
579 pub fn is_operational(&self) -> bool {
581 matches!(self, NodeState::Running)
582 }
583
584 pub fn can_start(&self) -> bool {
586 matches!(self, NodeState::Created | NodeState::Stopped)
587 }
588
589 pub fn can_stop(&self) -> bool {
591 matches!(self, NodeState::Running)
592 }
593}
594
595impl fmt::Display for NodeState {
596 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
597 let s = match self {
598 NodeState::Created => "created",
599 NodeState::Starting => "starting",
600 NodeState::Running => "running",
601 NodeState::Stopping => "stopping",
602 NodeState::Stopped => "stopped",
603 };
604 write!(f, "{}", s)
605 }
606}
607
608#[derive(Clone, Debug)]
615pub(crate) struct RecentRequest {
616 pub(crate) from_peer: NodeAddr,
618 pub(crate) timestamp_ms: u64,
620 pub(crate) response_forwarded: bool,
624}
625
626impl RecentRequest {
627 pub(crate) fn new(from_peer: NodeAddr, timestamp_ms: u64) -> Self {
628 Self {
629 from_peer,
630 timestamp_ms,
631 response_forwarded: false,
632 }
633 }
634
635 pub(crate) fn is_expired(&self, current_time_ms: u64, expiry_ms: u64) -> bool {
637 current_time_ms.saturating_sub(self.timestamp_ms) > expiry_ms
638 }
639}
640
641type AddrKey = (TransportId, TransportAddr);
643
644#[derive(Debug, Default)]
649struct TransportDropState {
650 prev_drops: u64,
652 dropping: bool,
654}
655
656struct PendingConnect {
662 link_id: LinkId,
664 transport_id: TransportId,
666 remote_addr: TransportAddr,
668 peer_identity: PeerIdentity,
670}
671
672pub struct Node {
686 identity: Identity,
689
690 startup_epoch: [u8; 8],
693
694 started_at: std::time::Instant,
696
697 config: Config,
700
701 state: NodeState,
704
705 is_leaf_only: bool,
707
708 tree_state: TreeState,
711
712 bloom_state: BloomState,
715
716 coord_cache: CoordCache,
719 learned_routes: LearnedRouteTable,
721 session_direct_degraded_until_ms: HashMap<NodeAddr, u64>,
724 recent_requests: HashMap<u64, RecentRequest>,
727 path_mtu_lookup: Arc<std::sync::RwLock<HashMap<crate::FipsAddress, u16>>>,
733
734 transports: HashMap<TransportId, TransportHandle>,
737 transport_drops: HashMap<TransportId, TransportDropState>,
739 links: HashMap<LinkId, Link>,
741 addr_to_link: HashMap<AddrKey, LinkId>,
743
744 packet_tx: Option<PacketTx>,
747 packet_rx: Option<PacketRx>,
749
750 connections: HashMap<LinkId, PeerConnection>,
754
755 peers: HashMap<NodeAddr, ActivePeer>,
759
760 sessions: HashMap<NodeAddr, SessionEntry>,
764
765 identity_cache: HashMap<[u8; 15], IdentityCacheEntry>,
769
770 pending_tun_packets: HashMap<NodeAddr, VecDeque<Vec<u8>>>,
774 pending_endpoint_data: HashMap<NodeAddr, VecDeque<Vec<u8>>>,
776 pending_lookups: HashMap<NodeAddr, handlers::discovery::PendingLookup>,
780
781 max_connections: usize,
784 max_peers: usize,
786 max_links: usize,
788
789 next_link_id: u64,
792 next_transport_id: u32,
794
795 stats: stats::NodeStats,
798
799 stats_history: stats_history::StatsHistory,
801
802 tun_state: TunState,
805 tun_name: Option<String>,
807 tun_tx: Option<TunTx>,
809 tun_outbound_rx: Option<TunOutboundRx>,
811 external_packet_tx: Option<tokio::sync::mpsc::Sender<NodeDeliveredPacket>>,
813 endpoint_command_rx: Option<tokio::sync::mpsc::Receiver<NodeEndpointCommand>>,
815 endpoint_event_tx: Option<tokio::sync::mpsc::UnboundedSender<NodeEndpointEvent>>,
817 encrypt_workers: Option<encrypt_worker::EncryptWorkerPool>,
823 decrypt_workers: Option<decrypt_worker::DecryptWorkerPool>,
826 decrypt_registered_sessions: std::collections::HashSet<(TransportId, u32)>,
835 decrypt_fallback_rx:
839 Option<tokio::sync::mpsc::UnboundedReceiver<decrypt_worker::DecryptWorkerEvent>>,
840 decrypt_fallback_tx: tokio::sync::mpsc::UnboundedSender<decrypt_worker::DecryptWorkerEvent>,
841 tun_reader_handle: Option<JoinHandle<()>>,
843 tun_writer_handle: Option<JoinHandle<()>>,
845 #[cfg(target_os = "macos")]
848 tun_shutdown_fd: Option<std::os::unix::io::RawFd>,
849
850 dns_identity_rx: Option<crate::upper::dns::DnsIdentityRx>,
853 dns_task: Option<tokio::task::JoinHandle<()>>,
855
856 index_allocator: IndexAllocator,
859 peers_by_index: HashMap<(TransportId, u32), NodeAddr>,
862 pending_outbound: HashMap<(TransportId, u32), LinkId>,
865
866 msg1_rate_limiter: HandshakeRateLimiter,
869 icmp_rate_limiter: IcmpRateLimiter,
871 routing_error_rate_limiter: RoutingErrorRateLimiter,
873 coords_response_rate_limiter: RoutingErrorRateLimiter,
875 discovery_backoff: DiscoveryBackoff,
877 discovery_forward_limiter: DiscoveryForwardRateLimiter,
879
880 pending_connects: Vec<PendingConnect>,
886
887 retry_pending: HashMap<NodeAddr, retry::RetryState>,
893
894 nostr_discovery: Option<Arc<crate::discovery::nostr::NostrDiscovery>>,
896 lan_discovery: Option<Arc<crate::discovery::lan::LanDiscovery>>,
901 local_instance_registry: Option<crate::discovery::local::LocalInstanceRegistry>,
905 local_instance_started_at_ms: Option<u64>,
906 last_local_instance_publish_ms: Option<u64>,
907 last_local_instance_scan_ms: Option<u64>,
908 nostr_discovery_started_at_ms: Option<u64>,
913 startup_open_discovery_sweep_done: bool,
917 bootstrap_transports: HashSet<TransportId>,
919 bootstrap_transport_npubs: HashMap<TransportId, String>,
926 discovery_fallback_transit_blocked_peers: HashSet<NodeAddr>,
929
930 last_parent_reeval: Option<crate::time::Instant>,
933
934 last_congestion_log: Option<std::time::Instant>,
937
938 estimated_mesh_size: Option<u64>,
941 last_mesh_size_log: Option<std::time::Instant>,
943
944 last_self_warn: Option<std::time::Instant>,
950
951 local_send_failure_at_by_peer: HashMap<NodeAddr, std::time::Instant>,
958 last_rx_loop_maintenance_timeout_at: Option<std::time::Instant>,
963
964 peer_aliases: HashMap<NodeAddr, String>,
968 configured_peer_send_weights: HashMap<NodeAddr, u8>,
971
972 peer_acl: acl::PeerAclReloader,
974
975 host_map: Arc<HostMap>,
979}
980
981impl Node {
982 pub fn new(config: Config) -> Result<Self, NodeError> {
984 config.validate()?;
985 let identity = config.create_identity()?;
986 let node_addr = *identity.node_addr();
987 let is_leaf_only = config.is_leaf_only();
988
989 let (decrypt_fallback_tx, decrypt_fallback_rx) = tokio::sync::mpsc::unbounded_channel();
990 let decrypt_fallback_rx = Some(decrypt_fallback_rx);
991
992 let mut startup_epoch = [0u8; 8];
993 rand::rng().fill_bytes(&mut startup_epoch);
994
995 let mut bloom_state = if is_leaf_only {
996 BloomState::leaf_only(node_addr)
997 } else {
998 BloomState::new(node_addr)
999 };
1000 bloom_state.set_update_debounce_ms(config.node.bloom.update_debounce_ms);
1001
1002 let tun_state = if config.tun.enabled {
1003 TunState::Configured
1004 } else {
1005 TunState::Disabled
1006 };
1007
1008 let mut tree_state = TreeState::new(node_addr);
1010 tree_state.set_parent_hysteresis(config.node.tree.parent_hysteresis);
1011 tree_state.set_hold_down(config.node.tree.hold_down_secs);
1012 tree_state.set_flap_dampening(
1013 config.node.tree.flap_threshold,
1014 config.node.tree.flap_window_secs,
1015 config.node.tree.flap_dampening_secs,
1016 );
1017 tree_state
1018 .sign_declaration(&identity)
1019 .expect("signing own declaration should never fail");
1020
1021 let coord_cache = CoordCache::new(
1022 config.node.cache.coord_size,
1023 config.node.cache.coord_ttl_secs * 1000,
1024 );
1025 let rl = &config.node.rate_limit;
1026 let msg1_rate_limiter = HandshakeRateLimiter::with_params(
1027 rate_limit::TokenBucket::with_params(rl.handshake_burst, rl.handshake_rate),
1028 config.node.limits.max_pending_inbound,
1029 );
1030
1031 let max_connections = config.node.limits.max_connections;
1032 let max_peers = config.node.limits.max_peers;
1033 let max_links = config.node.limits.max_links;
1034 let coords_response_interval_ms = config.node.session.coords_response_interval_ms;
1035 let backoff_base_secs = config.node.discovery.backoff_base_secs;
1036 let backoff_max_secs = config.node.discovery.backoff_max_secs;
1037 let forward_min_interval_secs = config.node.discovery.forward_min_interval_secs;
1038
1039 let (host_map, peer_acl) = Self::host_map_and_peer_acl(&config);
1040 let configured_peer_send_weights = Self::configured_peer_send_weights(&config);
1041
1042 Ok(Self {
1043 identity,
1044 startup_epoch,
1045 started_at: std::time::Instant::now(),
1046 config,
1047 state: NodeState::Created,
1048 is_leaf_only,
1049 tree_state,
1050 bloom_state,
1051 coord_cache,
1052 learned_routes: LearnedRouteTable::default(),
1053 session_direct_degraded_until_ms: HashMap::new(),
1054 recent_requests: HashMap::new(),
1055 transports: HashMap::new(),
1056 transport_drops: HashMap::new(),
1057 links: HashMap::new(),
1058 addr_to_link: HashMap::new(),
1059 packet_tx: None,
1060 packet_rx: None,
1061 connections: HashMap::new(),
1062 peers: HashMap::new(),
1063 sessions: HashMap::new(),
1064 identity_cache: HashMap::new(),
1065 pending_tun_packets: HashMap::new(),
1066 pending_endpoint_data: HashMap::new(),
1067 pending_lookups: HashMap::new(),
1068 max_connections,
1069 max_peers,
1070 max_links,
1071 next_link_id: 1,
1072 next_transport_id: 1,
1073 stats: stats::NodeStats::new(),
1074 stats_history: stats_history::StatsHistory::new(),
1075 tun_state,
1076 tun_name: None,
1077 tun_tx: None,
1078 tun_outbound_rx: None,
1079 external_packet_tx: None,
1080 endpoint_command_rx: None,
1081 endpoint_event_tx: None,
1082 encrypt_workers: None,
1083 decrypt_workers: None,
1084 decrypt_registered_sessions: std::collections::HashSet::new(),
1085 decrypt_fallback_tx,
1086 decrypt_fallback_rx,
1087 tun_reader_handle: None,
1088 tun_writer_handle: None,
1089 #[cfg(target_os = "macos")]
1090 tun_shutdown_fd: None,
1091 dns_identity_rx: None,
1092 dns_task: None,
1093 index_allocator: IndexAllocator::new(),
1094 peers_by_index: HashMap::new(),
1095 pending_outbound: HashMap::new(),
1096 msg1_rate_limiter,
1097 icmp_rate_limiter: IcmpRateLimiter::new(),
1098 routing_error_rate_limiter: RoutingErrorRateLimiter::new(),
1099 coords_response_rate_limiter: RoutingErrorRateLimiter::with_interval(
1100 std::time::Duration::from_millis(coords_response_interval_ms),
1101 ),
1102 discovery_backoff: DiscoveryBackoff::with_params(backoff_base_secs, backoff_max_secs),
1103 discovery_forward_limiter: DiscoveryForwardRateLimiter::with_interval(
1104 std::time::Duration::from_secs(forward_min_interval_secs),
1105 ),
1106 pending_connects: Vec::new(),
1107 retry_pending: HashMap::new(),
1108 nostr_discovery: None,
1109 nostr_discovery_started_at_ms: None,
1110 lan_discovery: None,
1111 local_instance_registry: None,
1112 local_instance_started_at_ms: None,
1113 last_local_instance_publish_ms: None,
1114 last_local_instance_scan_ms: None,
1115 startup_open_discovery_sweep_done: false,
1116 bootstrap_transports: HashSet::new(),
1117 bootstrap_transport_npubs: HashMap::new(),
1118 discovery_fallback_transit_blocked_peers: HashSet::new(),
1119 last_parent_reeval: None,
1120 last_congestion_log: None,
1121 estimated_mesh_size: None,
1122 last_mesh_size_log: None,
1123 last_self_warn: None,
1124 local_send_failure_at_by_peer: HashMap::new(),
1125 last_rx_loop_maintenance_timeout_at: None,
1126 peer_aliases: HashMap::new(),
1127 configured_peer_send_weights,
1128 peer_acl,
1129 host_map,
1130 path_mtu_lookup: Arc::new(std::sync::RwLock::new(HashMap::new())),
1131 })
1132 }
1133
1134 pub fn with_identity(identity: Identity, config: Config) -> Result<Self, NodeError> {
1139 config.validate()?;
1140 let node_addr = *identity.node_addr();
1141
1142 let (decrypt_fallback_tx, decrypt_fallback_rx) = tokio::sync::mpsc::unbounded_channel();
1143 let decrypt_fallback_rx = Some(decrypt_fallback_rx);
1144
1145 let mut startup_epoch = [0u8; 8];
1146 rand::rng().fill_bytes(&mut startup_epoch);
1147
1148 let tun_state = if config.tun.enabled {
1149 TunState::Configured
1150 } else {
1151 TunState::Disabled
1152 };
1153
1154 let mut tree_state = TreeState::new(node_addr);
1156 tree_state.set_parent_hysteresis(config.node.tree.parent_hysteresis);
1157 tree_state.set_hold_down(config.node.tree.hold_down_secs);
1158 tree_state.set_flap_dampening(
1159 config.node.tree.flap_threshold,
1160 config.node.tree.flap_window_secs,
1161 config.node.tree.flap_dampening_secs,
1162 );
1163 tree_state
1164 .sign_declaration(&identity)
1165 .expect("signing own declaration should never fail");
1166
1167 let mut bloom_state = BloomState::new(node_addr);
1168 bloom_state.set_update_debounce_ms(config.node.bloom.update_debounce_ms);
1169
1170 let coord_cache = CoordCache::new(
1171 config.node.cache.coord_size,
1172 config.node.cache.coord_ttl_secs * 1000,
1173 );
1174 let rl = &config.node.rate_limit;
1175 let msg1_rate_limiter = HandshakeRateLimiter::with_params(
1176 rate_limit::TokenBucket::with_params(rl.handshake_burst, rl.handshake_rate),
1177 config.node.limits.max_pending_inbound,
1178 );
1179
1180 let max_connections = config.node.limits.max_connections;
1181 let max_peers = config.node.limits.max_peers;
1182 let max_links = config.node.limits.max_links;
1183 let coords_response_interval_ms = config.node.session.coords_response_interval_ms;
1184
1185 let (host_map, peer_acl) = Self::host_map_and_peer_acl(&config);
1186 let configured_peer_send_weights = Self::configured_peer_send_weights(&config);
1187
1188 Ok(Self {
1189 identity,
1190 startup_epoch,
1191 started_at: std::time::Instant::now(),
1192 config,
1193 state: NodeState::Created,
1194 is_leaf_only: false,
1195 tree_state,
1196 bloom_state,
1197 coord_cache,
1198 learned_routes: LearnedRouteTable::default(),
1199 session_direct_degraded_until_ms: HashMap::new(),
1200 recent_requests: HashMap::new(),
1201 transports: HashMap::new(),
1202 transport_drops: HashMap::new(),
1203 links: HashMap::new(),
1204 addr_to_link: HashMap::new(),
1205 packet_tx: None,
1206 packet_rx: None,
1207 connections: HashMap::new(),
1208 peers: HashMap::new(),
1209 sessions: HashMap::new(),
1210 identity_cache: HashMap::new(),
1211 pending_tun_packets: HashMap::new(),
1212 pending_endpoint_data: HashMap::new(),
1213 pending_lookups: HashMap::new(),
1214 max_connections,
1215 max_peers,
1216 max_links,
1217 next_link_id: 1,
1218 next_transport_id: 1,
1219 stats: stats::NodeStats::new(),
1220 stats_history: stats_history::StatsHistory::new(),
1221 tun_state,
1222 tun_name: None,
1223 tun_tx: None,
1224 tun_outbound_rx: None,
1225 external_packet_tx: None,
1226 endpoint_command_rx: None,
1227 endpoint_event_tx: None,
1228 encrypt_workers: None,
1229 decrypt_workers: None,
1230 decrypt_registered_sessions: std::collections::HashSet::new(),
1231 decrypt_fallback_tx,
1232 decrypt_fallback_rx,
1233 tun_reader_handle: None,
1234 tun_writer_handle: None,
1235 #[cfg(target_os = "macos")]
1236 tun_shutdown_fd: None,
1237 dns_identity_rx: None,
1238 dns_task: None,
1239 index_allocator: IndexAllocator::new(),
1240 peers_by_index: HashMap::new(),
1241 pending_outbound: HashMap::new(),
1242 msg1_rate_limiter,
1243 icmp_rate_limiter: IcmpRateLimiter::new(),
1244 routing_error_rate_limiter: RoutingErrorRateLimiter::new(),
1245 coords_response_rate_limiter: RoutingErrorRateLimiter::with_interval(
1246 std::time::Duration::from_millis(coords_response_interval_ms),
1247 ),
1248 discovery_backoff: DiscoveryBackoff::new(),
1249 discovery_forward_limiter: DiscoveryForwardRateLimiter::new(),
1250 pending_connects: Vec::new(),
1251 retry_pending: HashMap::new(),
1252 nostr_discovery: None,
1253 nostr_discovery_started_at_ms: None,
1254 lan_discovery: None,
1255 local_instance_registry: None,
1256 local_instance_started_at_ms: None,
1257 last_local_instance_publish_ms: None,
1258 last_local_instance_scan_ms: None,
1259 startup_open_discovery_sweep_done: false,
1260 bootstrap_transports: HashSet::new(),
1261 bootstrap_transport_npubs: HashMap::new(),
1262 discovery_fallback_transit_blocked_peers: HashSet::new(),
1263 last_parent_reeval: None,
1264 last_congestion_log: None,
1265 estimated_mesh_size: None,
1266 last_mesh_size_log: None,
1267 last_self_warn: None,
1268 local_send_failure_at_by_peer: HashMap::new(),
1269 last_rx_loop_maintenance_timeout_at: None,
1270 peer_aliases: HashMap::new(),
1271 configured_peer_send_weights,
1272 peer_acl,
1273 host_map,
1274 path_mtu_lookup: Arc::new(std::sync::RwLock::new(HashMap::new())),
1275 })
1276 }
1277
1278 pub fn leaf_only(config: Config) -> Result<Self, NodeError> {
1280 let mut node = Self::new(config)?;
1281 node.is_leaf_only = true;
1282 node.bloom_state = BloomState::leaf_only(*node.identity.node_addr());
1283 Ok(node)
1284 }
1285
1286 fn host_map_and_peer_acl(config: &Config) -> (Arc<HostMap>, acl::PeerAclReloader) {
1287 let base_host_map = HostMap::from_peer_configs(config.peers());
1288 if !config.node.system_files_enabled {
1289 return (
1290 Arc::new(base_host_map.clone()),
1291 acl::PeerAclReloader::memory_only(base_host_map),
1292 );
1293 }
1294
1295 let mut host_map = base_host_map.clone();
1296 let hosts_path = std::path::PathBuf::from(crate::upper::hosts::DEFAULT_HOSTS_PATH);
1297 let hosts_file = HostMap::load_hosts_file(std::path::Path::new(
1298 crate::upper::hosts::DEFAULT_HOSTS_PATH,
1299 ));
1300 host_map.merge(hosts_file);
1301 let peer_acl = acl::PeerAclReloader::with_alias_sources(
1302 std::path::PathBuf::from(acl::DEFAULT_PEERS_ALLOW_PATH),
1303 std::path::PathBuf::from(acl::DEFAULT_PEERS_DENY_PATH),
1304 base_host_map,
1305 hosts_path,
1306 );
1307 (Arc::new(host_map), peer_acl)
1308 }
1309
1310 fn configured_peer_send_weights(config: &Config) -> HashMap<NodeAddr, u8> {
1311 config
1312 .peers()
1313 .iter()
1314 .filter_map(|peer| {
1315 PeerIdentity::from_npub(&peer.npub).ok().map(|identity| {
1316 (
1317 *identity.node_addr(),
1318 encrypt_worker::EXPLICIT_PEER_SEND_WEIGHT,
1319 )
1320 })
1321 })
1322 .collect()
1323 }
1324
1325 #[cfg(unix)]
1326 fn send_weight_for_peer(&self, peer_addr: &NodeAddr) -> u8 {
1327 self.configured_peer_send_weights
1328 .get(peer_addr)
1329 .copied()
1330 .unwrap_or(encrypt_worker::DEFAULT_SEND_WEIGHT)
1331 }
1332
1333 async fn create_transports(&mut self, packet_tx: &PacketTx) -> Vec<TransportHandle> {
1337 let mut transports = Vec::new();
1338
1339 let udp_instances: Vec<_> = self
1341 .config
1342 .transports
1343 .udp
1344 .iter()
1345 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1346 .collect();
1347
1348 for (name, udp_config) in udp_instances {
1350 let transport_id = self.allocate_transport_id();
1351 let udp = UdpTransport::new(transport_id, name, udp_config, packet_tx.clone());
1352 transports.push(TransportHandle::Udp(udp));
1353 }
1354
1355 #[cfg(feature = "sim-transport")]
1356 {
1357 let sim_instances: Vec<_> = self
1358 .config
1359 .transports
1360 .sim
1361 .iter()
1362 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1363 .collect();
1364
1365 for (name, sim_config) in sim_instances {
1366 let transport_id = self.allocate_transport_id();
1367 let sim = crate::transport::sim::SimTransport::new(
1368 transport_id,
1369 name,
1370 sim_config,
1371 packet_tx.clone(),
1372 );
1373 transports.push(TransportHandle::Sim(sim));
1374 }
1375 }
1376
1377 #[cfg(any(target_os = "linux", target_os = "macos"))]
1379 {
1380 let eth_instances: Vec<_> = self
1381 .config
1382 .transports
1383 .ethernet
1384 .iter()
1385 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1386 .collect();
1387 let xonly = self.identity.pubkey();
1388 for (name, eth_config) in eth_instances {
1389 let mut eth_config = eth_config;
1390 if eth_config.discovery_scope.is_none() {
1391 eth_config.discovery_scope = self.lan_discovery_scope();
1392 }
1393 let transport_id = self.allocate_transport_id();
1394 let mut eth =
1395 EthernetTransport::new(transport_id, name, eth_config, packet_tx.clone());
1396 eth.set_local_pubkey(xonly);
1397 transports.push(TransportHandle::Ethernet(eth));
1398 }
1399 }
1400
1401 let tcp_instances: Vec<_> = self
1403 .config
1404 .transports
1405 .tcp
1406 .iter()
1407 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1408 .collect();
1409
1410 for (name, tcp_config) in tcp_instances {
1411 let transport_id = self.allocate_transport_id();
1412 let tcp = TcpTransport::new(transport_id, name, tcp_config, packet_tx.clone());
1413 transports.push(TransportHandle::Tcp(tcp));
1414 }
1415
1416 let tor_instances: Vec<_> = self
1418 .config
1419 .transports
1420 .tor
1421 .iter()
1422 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1423 .collect();
1424
1425 for (name, tor_config) in tor_instances {
1426 let transport_id = self.allocate_transport_id();
1427 let tor = TorTransport::new(transport_id, name, tor_config, packet_tx.clone());
1428 transports.push(TransportHandle::Tor(tor));
1429 }
1430
1431 let webrtc_instances: Vec<_> = self
1432 .config
1433 .transports
1434 .webrtc
1435 .iter()
1436 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1437 .collect();
1438
1439 #[cfg(feature = "webrtc-transport")]
1440 {
1441 for (name, webrtc_config) in webrtc_instances {
1442 let transport_id = self.allocate_transport_id();
1443 match WebRtcTransport::new(
1444 transport_id,
1445 name,
1446 webrtc_config,
1447 packet_tx.clone(),
1448 &self.identity,
1449 &self.config.node.discovery.nostr,
1450 ) {
1451 Ok(webrtc) => transports.push(TransportHandle::WebRtc(Box::new(webrtc))),
1452 Err(err) => {
1453 warn!(
1454 transport_id = %transport_id,
1455 error = %err,
1456 "failed to initialize WebRTC transport"
1457 );
1458 }
1459 }
1460 }
1461 }
1462 #[cfg(not(feature = "webrtc-transport"))]
1463 if !webrtc_instances.is_empty() {
1464 warn!("WebRTC transport configured but this build lacks WebRTC transport support");
1465 }
1466
1467 #[cfg(bluer_available)]
1469 {
1470 let ble_instances: Vec<_> = self
1471 .config
1472 .transports
1473 .ble
1474 .iter()
1475 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1476 .collect();
1477
1478 #[cfg(all(bluer_available, not(test)))]
1479 for (name, ble_config) in ble_instances {
1480 let transport_id = self.allocate_transport_id();
1481 let adapter = ble_config.adapter().to_string();
1482 let mtu = ble_config.mtu();
1483 match crate::transport::ble::io::BluerIo::new(&adapter, mtu).await {
1484 Ok(io) => {
1485 let mut ble = crate::transport::ble::BleTransport::new(
1486 transport_id,
1487 name,
1488 ble_config,
1489 io,
1490 packet_tx.clone(),
1491 );
1492 ble.set_local_pubkey(self.identity.pubkey().serialize());
1493 transports.push(TransportHandle::Ble(ble));
1494 }
1495 Err(e) => {
1496 tracing::warn!(adapter = %adapter, error = %e, "failed to initialize BLE adapter");
1497 }
1498 }
1499 }
1500
1501 #[cfg(any(not(bluer_available), test))]
1502 if !ble_instances.is_empty() {
1503 #[cfg(not(test))]
1504 tracing::warn!("BLE transport configured but this build lacks BlueZ support");
1505 }
1506 }
1507
1508 transports
1509 }
1510
1511 fn find_transport_for_type(&self, transport_type: &str) -> Option<TransportId> {
1521 self.transports
1522 .iter()
1523 .filter(|(id, handle)| {
1524 handle.transport_type().name == transport_type
1525 && handle.is_operational()
1526 && !self.bootstrap_transports.contains(id)
1527 })
1528 .min_by_key(|(id, _)| id.as_u32())
1529 .map(|(id, _)| *id)
1530 }
1531
1532 #[allow(unused_variables)]
1538 fn resolve_ethernet_addr(
1539 &self,
1540 addr_str: &str,
1541 ) -> Result<(TransportId, TransportAddr), NodeError> {
1542 #[cfg(any(target_os = "linux", target_os = "macos"))]
1543 {
1544 let (iface, mac_str) = addr_str.split_once('/').ok_or_else(|| {
1545 NodeError::NoTransportForType(format!(
1546 "invalid Ethernet address format '{}': expected 'interface/mac'",
1547 addr_str
1548 ))
1549 })?;
1550
1551 let transport_id = self
1553 .transports
1554 .iter()
1555 .find(|(_, handle)| {
1556 handle.transport_type().name == "ethernet"
1557 && handle.is_operational()
1558 && handle.interface_name() == Some(iface)
1559 })
1560 .map(|(id, _)| *id)
1561 .ok_or_else(|| {
1562 NodeError::NoTransportForType(format!(
1563 "no operational Ethernet transport for interface '{}'",
1564 iface
1565 ))
1566 })?;
1567
1568 let mac = crate::transport::ethernet::parse_mac_string(mac_str).map_err(|e| {
1569 NodeError::NoTransportForType(format!("invalid MAC in '{}': {}", addr_str, e))
1570 })?;
1571
1572 Ok((transport_id, TransportAddr::from_bytes(&mac)))
1573 }
1574 #[cfg(not(any(target_os = "linux", target_os = "macos")))]
1575 {
1576 Err(NodeError::NoTransportForType(
1577 "Ethernet transport is not supported on this platform".to_string(),
1578 ))
1579 }
1580 }
1581
1582 #[cfg(bluer_available)]
1586 fn resolve_ble_addr(&self, addr_str: &str) -> Result<(TransportId, TransportAddr), NodeError> {
1587 let ta = TransportAddr::from_string(addr_str);
1588 let adapter = crate::transport::ble::addr::adapter_from_addr(&ta).ok_or_else(|| {
1589 NodeError::NoTransportForType(format!(
1590 "invalid BLE address format '{}': expected 'adapter/mac'",
1591 addr_str
1592 ))
1593 })?;
1594
1595 let transport_id = self
1597 .transports
1598 .iter()
1599 .find(|(_, handle)| handle.transport_type().name == "ble" && handle.is_operational())
1600 .map(|(id, _)| *id)
1601 .ok_or_else(|| {
1602 NodeError::NoTransportForType(format!(
1603 "no operational BLE transport for adapter '{}'",
1604 adapter
1605 ))
1606 })?;
1607
1608 crate::transport::ble::addr::BleAddr::parse(addr_str).map_err(|e| {
1610 NodeError::NoTransportForType(format!("invalid BLE address '{}': {}", addr_str, e))
1611 })?;
1612
1613 Ok((transport_id, TransportAddr::from_string(addr_str)))
1614 }
1615
1616 pub fn identity(&self) -> &Identity {
1620 &self.identity
1621 }
1622
1623 pub fn node_addr(&self) -> &NodeAddr {
1625 self.identity.node_addr()
1626 }
1627
1628 pub fn npub(&self) -> String {
1630 self.identity.npub()
1631 }
1632
1633 pub(crate) fn peer_display_name(&self, addr: &NodeAddr) -> String {
1642 if let Some(hostname) = self.host_map.lookup_hostname(addr) {
1643 return hostname.to_string();
1644 }
1645 if let Some(name) = self.peer_aliases.get(addr) {
1646 return name.clone();
1647 }
1648 if let Some(peer) = self.peers.get(addr) {
1649 return peer.identity().short_npub();
1650 }
1651 if let Some(entry) = self.sessions.get(addr) {
1652 let (xonly, _) = entry.remote_pubkey().x_only_public_key();
1653 return PeerIdentity::from_pubkey(xonly).short_npub();
1654 }
1655 addr.short_hex()
1656 }
1657
1658 pub(in crate::node) fn deregister_session_index(&mut self, cache_key: (TransportId, u32)) {
1670 let owning_peer = self.peers_by_index.get(&cache_key).copied();
1674 self.peers_by_index.remove(&cache_key);
1675 if self.decrypt_registered_sessions.remove(&cache_key)
1676 && let Some(workers) = self.decrypt_workers.as_ref()
1677 {
1678 workers.unregister_session(cache_key);
1679 }
1680 if let Some(peer_addr) = owning_peer {
1691 let peer_has_other_index = self
1692 .peers_by_index
1693 .values()
1694 .any(|other| *other == peer_addr);
1695 if !peer_has_other_index {
1696 self.clear_connected_udp_for_peer(&peer_addr);
1697 }
1698 }
1699 }
1700
1701 pub(in crate::node) fn ensure_current_session_index_registered(
1710 &mut self,
1711 node_addr: &NodeAddr,
1712 context: &'static str,
1713 ) -> bool {
1714 let Some(peer) = self.peers.get(node_addr) else {
1715 return false;
1716 };
1717 let Some(transport_id) = peer.transport_id() else {
1718 warn!(
1719 peer = %self.peer_display_name(node_addr),
1720 context,
1721 "Cannot register current session index without transport id"
1722 );
1723 return false;
1724 };
1725 let Some(our_index) = peer.our_index() else {
1726 warn!(
1727 peer = %self.peer_display_name(node_addr),
1728 context,
1729 "Cannot register current session index without local index"
1730 );
1731 return false;
1732 };
1733
1734 let cache_key = (transport_id, our_index.as_u32());
1735 match self.peers_by_index.get(&cache_key).copied() {
1736 Some(existing) if existing == *node_addr => true,
1737 Some(existing) => {
1738 warn!(
1739 peer = %self.peer_display_name(node_addr),
1740 previous_owner = %self.peer_display_name(&existing),
1741 transport_id = %transport_id,
1742 our_index = %our_index,
1743 context,
1744 "Repairing current session index with stale owner"
1745 );
1746 self.peers_by_index.insert(cache_key, *node_addr);
1747 true
1748 }
1749 None => {
1750 warn!(
1751 peer = %self.peer_display_name(node_addr),
1752 transport_id = %transport_id,
1753 our_index = %our_index,
1754 context,
1755 "Repairing missing current session index"
1756 );
1757 self.peers_by_index.insert(cache_key, *node_addr);
1758 true
1759 }
1760 }
1761 }
1762
1763 pub fn config(&self) -> &Config {
1767 &self.config
1768 }
1769
1770 pub fn effective_ipv6_mtu(&self) -> u16 {
1776 crate::upper::icmp::effective_ipv6_mtu(self.transport_mtu())
1777 }
1778
1779 pub fn transport_mtu(&self) -> u16 {
1796 let min_operational = self
1797 .transports
1798 .values()
1799 .filter(|h| h.is_operational())
1800 .map(|h| h.mtu())
1801 .min();
1802 if let Some(mtu) = min_operational {
1803 return mtu;
1804 }
1805 if let Some((_, cfg)) = self.config.transports.udp.iter().next() {
1807 return cfg.mtu();
1808 }
1809 1280
1810 }
1811
1812 pub fn state(&self) -> NodeState {
1816 self.state
1817 }
1818
1819 pub fn uptime(&self) -> std::time::Duration {
1821 self.started_at.elapsed()
1822 }
1823
1824 pub fn is_running(&self) -> bool {
1826 self.state.is_operational()
1827 }
1828
1829 pub fn is_leaf_only(&self) -> bool {
1831 self.is_leaf_only
1832 }
1833
1834 pub fn tree_state(&self) -> &TreeState {
1838 &self.tree_state
1839 }
1840
1841 pub fn tree_state_mut(&mut self) -> &mut TreeState {
1843 &mut self.tree_state
1844 }
1845
1846 pub fn bloom_state(&self) -> &BloomState {
1850 &self.bloom_state
1851 }
1852
1853 pub fn bloom_state_mut(&mut self) -> &mut BloomState {
1855 &mut self.bloom_state
1856 }
1857
1858 pub fn estimated_mesh_size(&self) -> Option<u64> {
1862 self.estimated_mesh_size
1863 }
1864
1865 pub(crate) fn compute_mesh_size(&mut self) {
1872 let my_addr = *self.tree_state.my_node_addr();
1873 let parent_id = *self.tree_state.my_declaration().parent_id();
1874 let is_root = self.tree_state.is_root();
1875
1876 let max_fpr = self.config.node.bloom.max_inbound_fpr;
1877 let mut child_count: u32 = 0;
1878 let mut union: Option<BloomFilter> = None;
1879
1880 let add_to_union = |union: &mut Option<BloomFilter>, filter: &BloomFilter| match union {
1881 None => *union = Some(filter.clone()),
1882 Some(existing) => {
1883 let _ = existing.merge(filter);
1885 }
1886 };
1887
1888 if !is_root
1890 && let Some(parent) = self.peers.get(&parent_id)
1891 && let Some(filter) = parent.inbound_filter()
1892 {
1893 add_to_union(&mut union, filter);
1894 }
1895
1896 for (peer_addr, peer) in &self.peers {
1899 if peer_addr == &parent_id {
1900 continue;
1901 }
1902 if let Some(decl) = self.tree_state.peer_declaration(peer_addr)
1903 && *decl.parent_id() == my_addr
1904 {
1905 child_count += 1;
1906 if let Some(filter) = peer.inbound_filter() {
1907 add_to_union(&mut union, filter);
1908 }
1909 }
1910 }
1911
1912 let Some(mut union) = union else {
1913 self.estimated_mesh_size = None;
1914 return;
1915 };
1916 union.insert(&my_addr);
1917
1918 let Some(union_estimate) = union.estimated_count(max_fpr) else {
1921 self.estimated_mesh_size = None;
1922 return;
1923 };
1924
1925 let size = union_estimate.round() as u64;
1926 self.estimated_mesh_size = Some(size);
1927
1928 let now = std::time::Instant::now();
1930 let should_log = match self.last_mesh_size_log {
1931 None => true,
1932 Some(last) => {
1933 now.duration_since(last)
1934 >= std::time::Duration::from_secs(self.config.node.mmp.log_interval_secs)
1935 }
1936 };
1937 if should_log {
1938 tracing::debug!(
1939 estimated_mesh_size = size,
1940 peers = self.peers.len(),
1941 children = child_count,
1942 "Mesh size estimate"
1943 );
1944 self.last_mesh_size_log = Some(now);
1945 }
1946 }
1947
1948 pub fn coord_cache(&self) -> &CoordCache {
1952 &self.coord_cache
1953 }
1954
1955 pub fn coord_cache_mut(&mut self) -> &mut CoordCache {
1957 &mut self.coord_cache
1958 }
1959
1960 pub fn stats(&self) -> &stats::NodeStats {
1964 &self.stats
1965 }
1966
1967 pub(crate) fn stats_mut(&mut self) -> &mut stats::NodeStats {
1969 &mut self.stats
1970 }
1971
1972 pub fn stats_history(&self) -> &stats_history::StatsHistory {
1974 &self.stats_history
1975 }
1976
1977 pub(crate) fn record_stats_history(&mut self) {
1980 let fwd = &self.stats.forwarding;
1981 let peers_with_mmp: Vec<f64> = self
1982 .peers
1983 .values()
1984 .filter_map(|p| p.mmp().map(|m| m.metrics.loss_rate()))
1985 .collect();
1986 let loss_rate = if peers_with_mmp.is_empty() {
1987 0.0
1988 } else {
1989 peers_with_mmp.iter().sum::<f64>() / peers_with_mmp.len() as f64
1990 };
1991
1992 let snap = stats_history::Snapshot {
1993 mesh_size: self.estimated_mesh_size,
1994 tree_depth: self.tree_state.my_coords().depth() as u32,
1995 peer_count: self.peers.len() as u64,
1996 parent_switches_total: self.stats.tree.parent_switches,
1997 bytes_in_total: fwd.received_bytes,
1998 bytes_out_total: fwd.forwarded_bytes + fwd.originated_bytes,
1999 packets_in_total: fwd.received_packets,
2000 packets_out_total: fwd.forwarded_packets + fwd.originated_packets,
2001 loss_rate,
2002 active_sessions: self.sessions.len() as u64,
2003 };
2004
2005 let now = std::time::Instant::now();
2006 let peer_snaps: Vec<stats_history::PeerSnapshot> = self
2007 .peers
2008 .values()
2009 .map(|p| {
2010 let stats = p.link_stats();
2011 let (srtt_ms, loss_rate, ecn_ce) = match p.mmp() {
2012 Some(m) => (
2013 m.metrics.srtt_ms(),
2014 Some(m.metrics.loss_rate()),
2015 m.receiver.ecn_ce_count() as u64,
2016 ),
2017 None => (None, None, 0),
2018 };
2019 stats_history::PeerSnapshot {
2020 node_addr: *p.node_addr(),
2021 last_seen: now,
2022 srtt_ms,
2023 loss_rate,
2024 bytes_in_total: stats.bytes_recv,
2025 bytes_out_total: stats.bytes_sent,
2026 packets_in_total: stats.packets_recv,
2027 packets_out_total: stats.packets_sent,
2028 ecn_ce_total: ecn_ce,
2029 }
2030 })
2031 .collect();
2032
2033 self.stats_history.tick(now, &snap, &peer_snaps);
2034 }
2035
2036 pub fn tun_state(&self) -> TunState {
2040 self.tun_state
2041 }
2042
2043 pub fn tun_name(&self) -> Option<&str> {
2045 self.tun_name.as_deref()
2046 }
2047
2048 pub fn set_max_connections(&mut self, max: usize) {
2052 self.max_connections = max;
2053 }
2054
2055 pub fn set_max_peers(&mut self, max: usize) {
2057 self.max_peers = max;
2058 }
2059
2060 pub(crate) fn outbound_admission_check(&self) -> bool {
2063 let connection_used = self
2064 .connections
2065 .len()
2066 .saturating_add(self.pending_connects.len());
2067 let peer_allowed = self.max_peers == 0 || self.peers.len() < self.max_peers;
2068 let connection_allowed =
2069 self.max_connections == 0 || connection_used < self.max_connections;
2070 let link_allowed = self.max_links == 0 || self.links.len() < self.max_links;
2071 peer_allowed && connection_allowed && link_allowed
2072 }
2073
2074 pub(crate) fn open_discovery_outbound_admission_check(&self) -> bool {
2078 if !self.outbound_admission_check() {
2079 return false;
2080 }
2081
2082 let nostr = &self.config.node.discovery.nostr;
2083 if !nostr.enabled || nostr.policy != NostrDiscoveryPolicy::Open {
2084 return true;
2085 }
2086
2087 let configured_npubs = self
2088 .config
2089 .peers()
2090 .iter()
2091 .map(|peer| peer.npub.clone())
2092 .collect::<HashSet<_>>();
2093 self.open_discovery_enqueue_budget(&configured_npubs) > 0
2094 }
2095
2096 pub(crate) fn outbound_direct_refresh_admission_check(&self) -> bool {
2100 let connection_used = self
2101 .connections
2102 .len()
2103 .saturating_add(self.pending_connects.len());
2104 let connection_allowed =
2105 self.max_connections == 0 || connection_used < self.max_connections;
2106 let link_allowed = self.max_links == 0 || self.links.len() < self.max_links;
2107 connection_allowed && link_allowed
2108 }
2109
2110 pub fn set_max_links(&mut self, max: usize) {
2112 self.max_links = max;
2113 }
2114
2115 pub fn connection_count(&self) -> usize {
2119 self.connections.len()
2120 }
2121
2122 pub fn peer_count(&self) -> usize {
2124 self.peers.len()
2125 }
2126
2127 pub fn link_count(&self) -> usize {
2129 self.links.len()
2130 }
2131
2132 pub fn transport_count(&self) -> usize {
2134 self.transports.len()
2135 }
2136
2137 pub fn allocate_transport_id(&mut self) -> TransportId {
2141 let id = TransportId::new(self.next_transport_id);
2142 self.next_transport_id += 1;
2143 id
2144 }
2145
2146 pub fn get_transport(&self, id: &TransportId) -> Option<&TransportHandle> {
2148 self.transports.get(id)
2149 }
2150
2151 pub fn get_transport_mut(&mut self, id: &TransportId) -> Option<&mut TransportHandle> {
2153 self.transports.get_mut(id)
2154 }
2155
2156 pub fn transport_ids(&self) -> impl Iterator<Item = &TransportId> {
2158 self.transports.keys()
2159 }
2160
2161 pub fn packet_rx(&mut self) -> Option<&mut PacketRx> {
2163 self.packet_rx.as_mut()
2164 }
2165
2166 pub fn allocate_link_id(&mut self) -> LinkId {
2170 let id = LinkId::new(self.next_link_id);
2171 self.next_link_id += 1;
2172 id
2173 }
2174
2175 pub fn add_link(&mut self, link: Link) -> Result<(), NodeError> {
2177 if self.max_links > 0 && self.links.len() >= self.max_links {
2178 return Err(NodeError::MaxLinksExceeded {
2179 max: self.max_links,
2180 });
2181 }
2182 let link_id = link.link_id();
2183 let transport_id = link.transport_id();
2184 let remote_addr = link.remote_addr().clone();
2185
2186 self.links.insert(link_id, link);
2187 self.addr_to_link
2188 .insert((transport_id, remote_addr), link_id);
2189 Ok(())
2190 }
2191
2192 pub fn get_link(&self, link_id: &LinkId) -> Option<&Link> {
2194 self.links.get(link_id)
2195 }
2196
2197 pub fn get_link_mut(&mut self, link_id: &LinkId) -> Option<&mut Link> {
2199 self.links.get_mut(link_id)
2200 }
2201
2202 pub fn find_link_by_addr(
2204 &self,
2205 transport_id: TransportId,
2206 addr: &TransportAddr,
2207 ) -> Option<LinkId> {
2208 self.addr_to_link
2209 .get(&(transport_id, addr.clone()))
2210 .copied()
2211 }
2212
2213 pub fn remove_link(&mut self, link_id: &LinkId) -> Option<Link> {
2219 if let Some(link) = self.links.remove(link_id) {
2220 let key = (link.transport_id(), link.remote_addr().clone());
2222 if self.addr_to_link.get(&key) == Some(link_id) {
2223 self.addr_to_link.remove(&key);
2224 }
2225 Some(link)
2226 } else {
2227 None
2228 }
2229 }
2230
2231 pub(crate) fn cleanup_bootstrap_transport_if_unused(&mut self, transport_id: TransportId) {
2232 if !self.bootstrap_transports.contains(&transport_id) {
2233 return;
2234 }
2235
2236 let transport_in_use = self
2237 .links
2238 .values()
2239 .any(|link| link.transport_id() == transport_id)
2240 || self
2241 .connections
2242 .values()
2243 .any(|conn| conn.transport_id() == Some(transport_id))
2244 || self
2245 .peers
2246 .values()
2247 .any(|peer| peer.transport_id() == Some(transport_id))
2248 || self
2249 .pending_connects
2250 .iter()
2251 .any(|pending| pending.transport_id == transport_id);
2252
2253 if transport_in_use {
2254 return;
2255 }
2256
2257 tracing::debug!(
2258 transport_id = %transport_id,
2259 "bootstrap transport has no remaining references; dropping"
2260 );
2261
2262 self.bootstrap_transports.remove(&transport_id);
2263 self.bootstrap_transport_npubs.remove(&transport_id);
2264 self.transport_drops.remove(&transport_id);
2265 self.transports.remove(&transport_id);
2266 }
2267
2268 pub fn links(&self) -> impl Iterator<Item = &Link> {
2270 self.links.values()
2271 }
2272
2273 pub fn add_connection(&mut self, connection: PeerConnection) -> Result<(), NodeError> {
2277 let link_id = connection.link_id();
2278
2279 if self.connections.contains_key(&link_id) {
2280 return Err(NodeError::ConnectionAlreadyExists(link_id));
2281 }
2282
2283 if self.max_connections > 0 && self.connections.len() >= self.max_connections {
2284 return Err(NodeError::MaxConnectionsExceeded {
2285 max: self.max_connections,
2286 });
2287 }
2288
2289 self.connections.insert(link_id, connection);
2290 Ok(())
2291 }
2292
2293 pub fn get_connection(&self, link_id: &LinkId) -> Option<&PeerConnection> {
2295 self.connections.get(link_id)
2296 }
2297
2298 pub fn get_connection_mut(&mut self, link_id: &LinkId) -> Option<&mut PeerConnection> {
2300 self.connections.get_mut(link_id)
2301 }
2302
2303 pub fn remove_connection(&mut self, link_id: &LinkId) -> Option<PeerConnection> {
2305 self.connections.remove(link_id)
2306 }
2307
2308 pub fn connections(&self) -> impl Iterator<Item = &PeerConnection> {
2310 self.connections.values()
2311 }
2312
2313 pub fn get_peer(&self, node_addr: &NodeAddr) -> Option<&ActivePeer> {
2317 self.peers.get(node_addr)
2318 }
2319
2320 pub fn get_peer_mut(&mut self, node_addr: &NodeAddr) -> Option<&mut ActivePeer> {
2322 self.peers.get_mut(node_addr)
2323 }
2324
2325 pub fn remove_peer(&mut self, node_addr: &NodeAddr) -> Option<ActivePeer> {
2327 self.peers.remove(node_addr)
2328 }
2329
2330 pub fn peers(&self) -> impl Iterator<Item = &ActivePeer> {
2332 self.peers.values()
2333 }
2334
2335 pub fn nostr_discovery_handle(&self) -> Option<&crate::discovery::nostr::NostrDiscovery> {
2339 self.nostr_discovery.as_deref()
2340 }
2341
2342 pub fn peer_ids(&self) -> impl Iterator<Item = &NodeAddr> {
2344 self.peers.keys()
2345 }
2346
2347 pub fn sendable_peers(&self) -> impl Iterator<Item = &ActivePeer> {
2349 self.peers.values().filter(|p| p.can_send())
2350 }
2351
2352 pub fn sendable_peer_count(&self) -> usize {
2354 self.peers.values().filter(|p| p.can_send()).count()
2355 }
2356
2357 pub(crate) fn set_discovery_fallback_transit_allowed(
2358 &mut self,
2359 peer_addr: NodeAddr,
2360 allowed: bool,
2361 ) {
2362 if allowed {
2363 self.discovery_fallback_transit_blocked_peers
2364 .remove(&peer_addr);
2365 } else {
2366 self.discovery_fallback_transit_blocked_peers
2367 .insert(peer_addr);
2368 }
2369 }
2370
2371 pub(crate) fn configured_discovery_fallback_transit(
2372 &self,
2373 peer_addr: &NodeAddr,
2374 ) -> Option<bool> {
2375 self.configured_peer(peer_addr)
2376 .map(|peer| peer.discovery_fallback_transit)
2377 }
2378
2379 pub(crate) fn configured_peer(&self, peer_addr: &NodeAddr) -> Option<&PeerConfig> {
2380 self.config.peers().iter().find(|peer| {
2381 PeerIdentity::from_npub(&peer.npub)
2382 .ok()
2383 .is_some_and(|identity| identity.node_addr() == peer_addr)
2384 })
2385 }
2386
2387 pub(in crate::node) fn active_peer_uses_configured_static_udp_path(
2388 &self,
2389 peer_addr: &NodeAddr,
2390 ) -> bool {
2391 let Some(peer_config) = self.configured_peer(peer_addr) else {
2392 return false;
2393 };
2394
2395 peer_config.addresses.iter().any(|candidate| {
2396 candidate.seen_at_ms.is_none()
2397 && candidate.transport.eq_ignore_ascii_case("udp")
2398 && self.active_peer_matches_candidate(peer_addr, candidate)
2399 })
2400 }
2401
2402 pub(crate) fn discovery_fallback_transit_for_promotion(&self, peer_addr: &NodeAddr) -> bool {
2403 if let Some(retry_state) = self.retry_pending.get(peer_addr) {
2404 return retry_state.peer_config.discovery_fallback_transit;
2405 }
2406
2407 if let Some(allowed) = self.configured_discovery_fallback_transit(peer_addr) {
2408 return allowed;
2409 }
2410
2411 self.config.node.discovery.nostr.policy != crate::config::NostrDiscoveryPolicy::Open
2412 }
2413
2414 #[cfg(test)]
2419 pub(crate) fn disable_discovery_forward_rate_limit(&mut self) {
2420 self.discovery_forward_limiter
2421 .set_interval(std::time::Duration::ZERO);
2422 }
2423
2424 #[cfg(test)]
2425 pub(crate) fn get_session(&self, remote: &NodeAddr) -> Option<&SessionEntry> {
2426 self.sessions.get(remote)
2427 }
2428
2429 #[cfg(test)]
2431 pub(crate) fn get_session_mut(&mut self, remote: &NodeAddr) -> Option<&mut SessionEntry> {
2432 self.sessions.get_mut(remote)
2433 }
2434
2435 #[cfg(test)]
2437 pub(crate) fn remove_session(&mut self, remote: &NodeAddr) -> Option<SessionEntry> {
2438 self.sessions.remove(remote)
2439 }
2440
2441 #[cfg(test)]
2443 pub(crate) fn path_mtu_lookup_get(&self, fips_addr: &crate::FipsAddress) -> Option<u16> {
2444 self.path_mtu_lookup
2445 .read()
2446 .ok()
2447 .and_then(|map| map.get(fips_addr).copied())
2448 }
2449
2450 #[cfg(test)]
2452 pub(crate) fn path_mtu_lookup_insert(&self, fips_addr: crate::FipsAddress, mtu: u16) {
2453 if let Ok(mut map) = self.path_mtu_lookup.write() {
2454 map.insert(fips_addr, mtu);
2455 }
2456 }
2457
2458 pub fn session_count(&self) -> usize {
2460 self.sessions.len()
2461 }
2462
2463 pub(crate) fn session_entries(&self) -> impl Iterator<Item = (&NodeAddr, &SessionEntry)> {
2465 self.sessions.iter()
2466 }
2467
2468 pub(crate) fn register_identity(
2472 &mut self,
2473 node_addr: NodeAddr,
2474 pubkey: secp256k1::PublicKey,
2475 ) -> bool {
2476 let mut prefix = [0u8; 15];
2477 prefix.copy_from_slice(&node_addr.as_bytes()[0..15]);
2478 if let Some(entry) = self.identity_cache.get(&prefix)
2479 && entry.node_addr == node_addr
2480 && entry.pubkey == pubkey
2481 {
2482 return true;
2486 }
2487
2488 let (xonly, _) = pubkey.x_only_public_key();
2489 let derived_node_addr = NodeAddr::from_pubkey(&xonly);
2490 if derived_node_addr != node_addr {
2491 debug!(
2492 claimed_node_addr = %node_addr,
2493 derived_node_addr = %derived_node_addr,
2494 "Rejected identity cache entry with mismatched public key"
2495 );
2496 return false;
2497 }
2498
2499 let now_ms = Self::now_ms();
2500 if let Some(entry) = self.identity_cache.get_mut(&prefix)
2501 && entry.node_addr == node_addr
2502 {
2503 entry.pubkey = pubkey;
2504 entry.last_seen_ms = now_ms;
2505 return true;
2506 }
2507
2508 let npub = encode_npub(&xonly);
2509 self.identity_cache.insert(
2510 prefix,
2511 IdentityCacheEntry::new(node_addr, pubkey, npub, now_ms),
2512 );
2513 let max = self.config.node.cache.identity_size;
2515 if self.identity_cache.len() > max
2516 && let Some(oldest_key) = self
2517 .identity_cache
2518 .iter()
2519 .min_by_key(|(_, entry)| entry.last_seen_ms)
2520 .map(|(k, _)| *k)
2521 {
2522 self.identity_cache.remove(&oldest_key);
2523 }
2524 true
2525 }
2526
2527 pub(crate) fn lookup_by_fips_prefix(
2529 &mut self,
2530 prefix: &[u8; 15],
2531 ) -> Option<(NodeAddr, secp256k1::PublicKey)> {
2532 if let Some(entry) = self.identity_cache.get_mut(prefix) {
2533 entry.last_seen_ms = Self::now_ms(); Some((entry.node_addr, entry.pubkey))
2535 } else {
2536 None
2537 }
2538 }
2539
2540 pub(crate) fn has_cached_identity(&self, addr: &NodeAddr) -> bool {
2542 let mut prefix = [0u8; 15];
2543 prefix.copy_from_slice(&addr.as_bytes()[0..15]);
2544 self.identity_cache.contains_key(&prefix)
2545 }
2546
2547 pub fn identity_cache_len(&self) -> usize {
2549 self.identity_cache.len()
2550 }
2551
2552 pub fn identity_cache_iter(
2557 &self,
2558 ) -> impl Iterator<Item = (&NodeAddr, &secp256k1::PublicKey, u64)> {
2559 self.identity_cache
2560 .values()
2561 .map(|entry| (&entry.node_addr, &entry.pubkey, entry.last_seen_ms))
2562 }
2563
2564 pub fn identity_cache_max(&self) -> usize {
2566 self.config.node.cache.identity_size
2567 }
2568
2569 pub fn pending_lookup_count(&self) -> usize {
2571 self.pending_lookups.len()
2572 }
2573
2574 pub fn pending_lookups_iter(
2576 &self,
2577 ) -> impl Iterator<Item = (&NodeAddr, &handlers::discovery::PendingLookup)> {
2578 self.pending_lookups.iter()
2579 }
2580
2581 pub fn recent_request_count(&self) -> usize {
2583 self.recent_requests.len()
2584 }
2585
2586 pub fn pending_tun_destinations(&self) -> usize {
2588 self.pending_tun_packets.len()
2589 }
2590
2591 pub fn pending_tun_total_packets(&self) -> usize {
2593 self.pending_tun_packets.values().map(|q| q.len()).sum()
2594 }
2595
2596 pub fn retry_state_iter(&self) -> impl Iterator<Item = (&NodeAddr, &retry::RetryState)> {
2598 self.retry_pending.iter()
2599 }
2600
2601 pub(crate) fn is_tree_peer(&self, peer_addr: &NodeAddr) -> bool {
2608 if !self.tree_state.is_root() && self.tree_state.my_declaration().parent_id() == peer_addr {
2610 return true;
2611 }
2612 if let Some(decl) = self.tree_state.peer_declaration(peer_addr)
2614 && decl.parent_id() == self.node_addr()
2615 {
2616 return true;
2617 }
2618 false
2619 }
2620
2621 pub fn find_next_hop(&mut self, dest_node_addr: &NodeAddr) -> Option<&ActivePeer> {
2645 if dest_node_addr == self.node_addr() {
2647 return None;
2648 }
2649 let now_ms = Self::now_ms();
2650 let direct_session_degraded =
2651 self.session_direct_path_blocks_direct_payload(dest_node_addr, now_ms);
2652
2653 let healthy_direct_route = self
2654 .peers
2655 .get(dest_node_addr)
2656 .filter(|peer| peer.is_healthy() && !direct_session_degraded)
2657 .map(|_| *dest_node_addr);
2658 if let Some(direct_addr) = healthy_direct_route
2659 && self
2660 .peers
2661 .get(&direct_addr)
2662 .is_some_and(|peer| peer.link_cost() <= 1.0 + ROUTING_FALLBACK_MIN_COST_ADVANTAGE)
2663 {
2664 return self.peers.get(&direct_addr);
2665 }
2666 let direct_payload_eligible = healthy_direct_route.is_some();
2667 let payload_candidate_can_send = |addr: &NodeAddr, peer: &ActivePeer| {
2668 if addr == dest_node_addr {
2669 direct_payload_eligible
2670 } else {
2671 peer.is_healthy()
2672 }
2673 };
2674
2675 let fallback_beats_direct = |node: &Self, fallback_addr: NodeAddr| {
2680 node.route_candidate_beats_direct(healthy_direct_route, fallback_addr)
2681 };
2682
2683 let sendable_learned_peers = if self.config.node.routing.mode == RoutingMode::ReplyLearned {
2684 Some(
2685 self.peers
2686 .iter()
2687 .filter(|(addr, peer)| payload_candidate_can_send(addr, peer))
2688 .map(|(addr, _)| *addr)
2689 .collect::<HashSet<_>>(),
2690 )
2691 } else {
2692 None
2693 };
2694
2695 let explore_fallback = sendable_learned_peers.as_ref().is_some_and(|sendable| {
2702 self.learned_routes.should_explore_fallback(
2703 dest_node_addr,
2704 now_ms,
2705 self.config.node.routing.learned_fallback_explore_interval,
2706 |addr| sendable.contains(addr),
2707 )
2708 });
2709 if let Some(sendable) = &sendable_learned_peers
2710 && !explore_fallback
2711 {
2712 let eligible = sendable
2713 .iter()
2714 .copied()
2715 .filter(|addr| fallback_beats_direct(self, *addr))
2716 .collect::<HashSet<_>>();
2717 if !eligible.is_empty()
2718 && let Some(next_hop_addr) =
2719 self.learned_routes
2720 .select_next_hop(dest_node_addr, now_ms, |addr| eligible.contains(addr))
2721 {
2722 return self.peers.get(&next_hop_addr);
2723 }
2724 }
2725
2726 let Some(dest_coords) = self
2728 .coord_cache
2729 .get_and_touch(dest_node_addr, now_ms)
2730 .cloned()
2731 else {
2732 if (healthy_direct_route.is_none() || explore_fallback)
2733 && let Some(sendable) = &sendable_learned_peers
2734 && let Some(next_hop_addr) =
2735 self.learned_routes
2736 .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2737 {
2738 return self.peers.get(&next_hop_addr);
2739 }
2740 if let Some(direct_addr) = healthy_direct_route {
2741 return self.peers.get(&direct_addr);
2742 }
2743 return None;
2744 };
2745
2746 let coordinate_route_addr = {
2749 let candidates: Vec<&ActivePeer> = self
2750 .peers
2751 .iter()
2752 .filter(|(addr, peer)| {
2753 payload_candidate_can_send(addr, peer) && peer.may_reach(dest_node_addr)
2754 })
2755 .map(|(_, peer)| peer)
2756 .collect();
2757 if !candidates.is_empty() {
2758 self.select_best_candidate(&candidates, &dest_coords)
2759 .map(|peer| *peer.node_addr())
2760 } else {
2761 None
2762 }
2763 };
2764 if let Some(next_hop_addr) = coordinate_route_addr
2765 && fallback_beats_direct(self, next_hop_addr)
2766 {
2767 return self.peers.get(&next_hop_addr);
2768 }
2769
2770 let tree_route_addr = self.select_tree_payload_candidate(
2772 &dest_coords,
2773 dest_node_addr,
2774 direct_payload_eligible,
2775 );
2776 if let Some(next_hop_addr) = tree_route_addr
2777 && fallback_beats_direct(self, next_hop_addr)
2778 {
2779 return self.peers.get(&next_hop_addr);
2780 }
2781
2782 if explore_fallback {
2783 return sendable_learned_peers.as_ref().and_then(|sendable| {
2784 self.learned_routes
2785 .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2786 .and_then(|next_hop_addr| self.peers.get(&next_hop_addr))
2787 });
2788 }
2789
2790 if let Some(direct_addr) = healthy_direct_route {
2791 return self.peers.get(&direct_addr);
2792 }
2793
2794 if let Some(sendable) = &sendable_learned_peers
2795 && let Some(next_hop_addr) =
2796 self.learned_routes
2797 .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2798 {
2799 return self.peers.get(&next_hop_addr);
2800 }
2801
2802 None
2803 }
2804
2805 pub(in crate::node) fn find_transit_next_hop(
2806 &mut self,
2807 dest_node_addr: &NodeAddr,
2808 previous_hop: &NodeAddr,
2809 ) -> Option<NodeAddr> {
2810 if dest_node_addr == self.node_addr() {
2811 return None;
2812 }
2813
2814 if dest_node_addr != previous_hop
2815 && self
2816 .peers
2817 .get(dest_node_addr)
2818 .is_some_and(|peer| peer.is_healthy())
2819 {
2820 return Some(*dest_node_addr);
2821 }
2822
2823 let next_hop_addr = *self.find_next_hop(dest_node_addr)?.node_addr();
2824 if &next_hop_addr == previous_hop {
2825 self.record_route_failure(*dest_node_addr, next_hop_addr);
2826 return None;
2827 }
2828 Some(next_hop_addr)
2829 }
2830
2831 fn route_candidate_beats_direct(
2832 &self,
2833 healthy_direct_route: Option<NodeAddr>,
2834 candidate_addr: NodeAddr,
2835 ) -> bool {
2836 let Some(direct_addr) = healthy_direct_route else {
2837 return true;
2838 };
2839 if candidate_addr == direct_addr {
2840 return false;
2841 }
2842
2843 let Some(direct) = self.peers.get(&direct_addr) else {
2844 return true;
2845 };
2846 let Some(candidate) = self.peers.get(&candidate_addr) else {
2847 return false;
2848 };
2849 if !candidate.is_healthy() {
2850 return false;
2851 }
2852
2853 let direct_cost = direct.link_cost();
2854 let candidate_cost = candidate.link_cost();
2855 candidate_cost + ROUTING_FALLBACK_MIN_COST_ADVANTAGE < direct_cost
2856 }
2857
2858 fn select_tree_payload_candidate(
2859 &self,
2860 dest_coords: &crate::tree::TreeCoordinate,
2861 direct_dest: &NodeAddr,
2862 direct_payload_eligible: bool,
2863 ) -> Option<NodeAddr> {
2864 if self.tree_state.my_coords().root_id() != dest_coords.root_id() {
2865 return None;
2866 }
2867
2868 let my_distance = self.tree_state.my_coords().distance_to(dest_coords);
2869 let mut best: Option<(NodeAddr, usize)> = None;
2870
2871 for (peer_addr, peer) in &self.peers {
2872 if peer_addr == direct_dest {
2873 if !direct_payload_eligible {
2874 continue;
2875 }
2876 } else if !peer.is_healthy() {
2877 continue;
2878 }
2879
2880 let Some(peer_coords) = self.tree_state.peer_coords(peer_addr) else {
2881 continue;
2882 };
2883 let distance = peer_coords.distance_to(dest_coords);
2884 if distance >= my_distance {
2885 continue;
2886 }
2887
2888 let dominated = match &best {
2889 None => true,
2890 Some((best_id, best_dist)) => {
2891 distance < *best_dist || (distance == *best_dist && peer_addr < best_id)
2892 }
2893 };
2894 if dominated {
2895 best = Some((*peer_addr, distance));
2896 }
2897 }
2898
2899 best.map(|(peer_addr, _)| peer_addr)
2900 }
2901
2902 pub(in crate::node) fn session_direct_path_is_degraded(
2903 &mut self,
2904 dest: &NodeAddr,
2905 now_ms: u64,
2906 ) -> bool {
2907 match self.session_direct_degraded_until_ms.get(dest).copied() {
2908 Some(until_ms) if until_ms > now_ms => true,
2909 Some(_) => {
2910 self.session_direct_degraded_until_ms.remove(dest);
2911 false
2912 }
2913 None => false,
2914 }
2915 }
2916
2917 pub(in crate::node) fn session_direct_path_blocks_direct_payload(
2918 &mut self,
2919 dest: &NodeAddr,
2920 now_ms: u64,
2921 ) -> bool {
2922 self.session_direct_path_is_degraded(dest, now_ms)
2923 && !self.active_peer_uses_configured_static_udp_path(dest)
2924 }
2925
2926 pub(in crate::node) fn mark_session_direct_path_degraded(
2927 &mut self,
2928 dest: NodeAddr,
2929 now_ms: u64,
2930 ) -> bool {
2931 let until_ms = now_ms.saturating_add(SESSION_DIRECT_DEGRADED_HOLD_MS);
2932 let entry = self
2933 .session_direct_degraded_until_ms
2934 .entry(dest)
2935 .or_insert(0);
2936 let was_degraded = *entry > now_ms;
2937 *entry = (*entry).max(until_ms);
2938 !was_degraded
2939 }
2940
2941 pub(in crate::node) fn clear_session_direct_path_degraded(&mut self, dest: &NodeAddr) -> bool {
2942 self.session_direct_degraded_until_ms.remove(dest).is_some()
2943 }
2944
2945 pub(in crate::node) fn learn_reverse_route(
2946 &mut self,
2947 destination: NodeAddr,
2948 next_hop: NodeAddr,
2949 ) {
2950 if self.config.node.routing.mode != RoutingMode::ReplyLearned
2951 || destination == *self.node_addr()
2952 {
2953 return;
2954 }
2955 let now_ms = Self::now_ms();
2956 self.learned_routes.learn(
2957 destination,
2958 next_hop,
2959 now_ms,
2960 self.config.node.routing.learned_ttl_secs,
2961 self.config.node.routing.max_learned_routes_per_dest,
2962 );
2963 }
2964
2965 pub(in crate::node) fn record_route_failure(
2966 &mut self,
2967 destination: NodeAddr,
2968 next_hop: NodeAddr,
2969 ) {
2970 if self.config.node.routing.mode != RoutingMode::ReplyLearned {
2971 return;
2972 }
2973 self.learned_routes.record_failure(&destination, &next_hop);
2974 }
2975
2976 pub(crate) fn learned_route_table_snapshot(&self, now_ms: u64) -> LearnedRouteTableSnapshot {
2977 self.learned_routes.snapshot(now_ms)
2978 }
2979
2980 pub(in crate::node) fn purge_learned_routes(&mut self, now_ms: u64) {
2981 self.learned_routes.purge_expired(now_ms);
2982 }
2983
2984 fn select_best_candidate<'a>(
2993 &'a self,
2994 candidates: &[&'a ActivePeer],
2995 dest_coords: &crate::tree::TreeCoordinate,
2996 ) -> Option<&'a ActivePeer> {
2997 let my_distance = self.tree_state.my_coords().distance_to(dest_coords);
2998
2999 let mut best: Option<(&ActivePeer, f64, usize)> = None;
3000
3001 for &candidate in candidates {
3002 if !candidate.can_send() {
3003 continue;
3004 }
3005
3006 let cost = candidate.link_cost();
3007
3008 let dist = self
3009 .tree_state
3010 .peer_coords(candidate.node_addr())
3011 .map(|pc| pc.distance_to(dest_coords))
3012 .unwrap_or(usize::MAX);
3013
3014 if dist >= my_distance {
3017 continue;
3018 }
3019
3020 let dominated = match &best {
3021 None => true,
3022 Some((_, best_cost, best_dist)) => {
3023 cost < *best_cost
3024 || (cost == *best_cost && dist < *best_dist)
3025 || (cost == *best_cost
3026 && dist == *best_dist
3027 && candidate.node_addr() < best.as_ref().unwrap().0.node_addr())
3028 }
3029 };
3030
3031 if dominated {
3032 best = Some((candidate, cost, dist));
3033 }
3034 }
3035
3036 best.map(|(peer, _, _)| peer)
3037 }
3038
3039 pub fn destination_in_filters(&self, dest: &NodeAddr) -> Vec<&ActivePeer> {
3041 self.peers.values().filter(|p| p.may_reach(dest)).collect()
3042 }
3043
3044 pub fn tun_tx(&self) -> Option<&TunTx> {
3048 self.tun_tx.as_ref()
3049 }
3050
3051 pub fn attach_external_packet_io(
3058 &mut self,
3059 capacity: usize,
3060 ) -> Result<ExternalPacketIo, NodeError> {
3061 if self.state != NodeState::Created {
3062 return Err(NodeError::Config(ConfigError::Validation(
3063 "external packet I/O must be attached before node start".to_string(),
3064 )));
3065 }
3066 if self.config.tun.enabled {
3067 return Err(NodeError::Config(ConfigError::Validation(
3068 "external packet I/O requires tun.enabled=false".to_string(),
3069 )));
3070 }
3071
3072 let capacity = capacity.max(1);
3073 let (outbound_tx, outbound_rx) = tokio::sync::mpsc::channel(capacity);
3074 let (inbound_tx, inbound_rx) = tokio::sync::mpsc::channel(capacity);
3075 self.tun_outbound_rx = Some(outbound_rx);
3076 self.external_packet_tx = Some(inbound_tx);
3077
3078 Ok(ExternalPacketIo {
3079 outbound_tx,
3080 inbound_rx,
3081 })
3082 }
3083
3084 pub(crate) fn attach_endpoint_data_io(
3089 &mut self,
3090 capacity: usize,
3091 ) -> Result<EndpointDataIo, NodeError> {
3092 if self.state != NodeState::Created {
3093 return Err(NodeError::Config(ConfigError::Validation(
3094 "endpoint data I/O must be attached before node start".to_string(),
3095 )));
3096 }
3097
3098 let command_capacity = endpoint_data_command_capacity(capacity);
3099 let (command_tx, command_rx) = tokio::sync::mpsc::channel(command_capacity);
3100 let (event_tx, event_rx) = tokio::sync::mpsc::unbounded_channel();
3105 self.endpoint_command_rx = Some(command_rx);
3106 self.endpoint_event_tx = Some(event_tx.clone());
3107
3108 Ok(EndpointDataIo {
3109 command_tx,
3110 event_rx,
3111 event_tx,
3112 })
3113 }
3114
3115 pub(crate) fn pubkey_for_node_addr(&self, addr: &NodeAddr) -> Option<secp256k1::PublicKey> {
3116 let mut prefix = [0u8; 15];
3117 prefix.copy_from_slice(&addr.as_bytes()[0..15]);
3118 self.identity_cache
3119 .get(&prefix)
3120 .filter(|entry| &entry.node_addr == addr)
3121 .map(|entry| entry.pubkey)
3122 }
3123
3124 pub(crate) fn npub_for_node_addr(&self, addr: &NodeAddr) -> Option<String> {
3125 let mut prefix = [0u8; 15];
3126 prefix.copy_from_slice(&addr.as_bytes()[0..15]);
3127 self.identity_cache
3128 .get(&prefix)
3129 .filter(|entry| &entry.node_addr == addr)
3130 .map(|entry| entry.npub.clone())
3131 }
3132
3133 pub(in crate::node) fn deliver_external_ipv6_packet(
3134 &self,
3135 src_addr: &NodeAddr,
3136 packet: Vec<u8>,
3137 ) {
3138 let Some(external_packet_tx) = &self.external_packet_tx else {
3139 return;
3140 };
3141 if packet.len() < 40 {
3142 return;
3143 }
3144 let Ok(destination) = FipsAddress::from_slice(&packet[24..40]) else {
3145 return;
3146 };
3147 let delivered = NodeDeliveredPacket {
3148 source_node_addr: *src_addr,
3149 source_npub: self.npub_for_node_addr(src_addr),
3150 destination,
3151 packet,
3152 };
3153 if let Err(error) = external_packet_tx.try_send(delivered) {
3154 debug!(error = %error, "Failed to deliver packet to external app sink");
3155 }
3156 }
3157
3158 pub(super) async fn send_encrypted_link_message(
3172 &mut self,
3173 node_addr: &NodeAddr,
3174 plaintext: &[u8],
3175 ) -> Result<(), NodeError> {
3176 self.send_encrypted_link_message_with_ce(node_addr, plaintext, false)
3177 .await
3178 }
3179
3180 pub(in crate::node) fn note_local_send_outcome(
3186 &mut self,
3187 node_addr: &NodeAddr,
3188 result: &Result<usize, TransportError>,
3189 ) {
3190 match result {
3191 Ok(_) => {
3192 self.local_send_failure_at_by_peer.remove(node_addr);
3193 }
3194 Err(error) if error.is_local_route_unavailable() => {
3195 self.local_send_failure_at_by_peer
3196 .insert(*node_addr, std::time::Instant::now());
3197 }
3198 Err(_) => {}
3199 }
3200 }
3201
3202 pub(in crate::node) fn local_send_failure_dead_timeout_for_peer(
3208 &self,
3209 node_addr: &NodeAddr,
3210 now: std::time::Instant,
3211 dead_timeout: std::time::Duration,
3212 fast_dead_timeout: std::time::Duration,
3213 ) -> std::time::Duration {
3214 match self.local_send_failure_at_by_peer.get(node_addr).copied() {
3215 Some(t) if now.duration_since(t) <= LOCAL_SEND_FAILURE_FAST_DEAD_WINDOW => {
3216 fast_dead_timeout.min(dead_timeout)
3217 }
3218 None => dead_timeout,
3219 Some(_) => dead_timeout,
3220 }
3221 }
3222
3223 pub(in crate::node) fn purge_expired_local_send_failures(&mut self, now: std::time::Instant) {
3224 self.local_send_failure_at_by_peer
3225 .retain(|_, at| now.duration_since(*at) <= LOCAL_SEND_FAILURE_FAST_DEAD_WINDOW);
3226 }
3227
3228 pub(in crate::node) fn mark_rx_loop_maintenance_timeout(&mut self) {
3229 self.last_rx_loop_maintenance_timeout_at = Some(std::time::Instant::now());
3230 }
3231
3232 pub(in crate::node) fn rx_loop_maintenance_timed_out_recently(&self) -> bool {
3233 let Some(t) = self.last_rx_loop_maintenance_timeout_at else {
3234 return false;
3235 };
3236 let grace = std::time::Duration::from_secs(self.config.node.link_dead_timeout_secs.max(1));
3237 std::time::Instant::now().duration_since(t) <= grace
3238 }
3239
3240 pub(super) async fn send_encrypted_link_message_with_ce(
3244 &mut self,
3245 node_addr: &NodeAddr,
3246 plaintext: &[u8],
3247 ce_flag: bool,
3248 ) -> Result<(), NodeError> {
3249 let peer = self
3250 .peers
3251 .get_mut(node_addr)
3252 .ok_or(NodeError::PeerNotFound(*node_addr))?;
3253
3254 let their_index = peer.their_index().ok_or_else(|| NodeError::SendFailed {
3255 node_addr: *node_addr,
3256 reason: "no their_index".into(),
3257 })?;
3258 let transport_id = peer.transport_id().ok_or_else(|| NodeError::SendFailed {
3259 node_addr: *node_addr,
3260 reason: "no transport_id".into(),
3261 })?;
3262 let remote_addr = peer
3263 .current_addr()
3264 .cloned()
3265 .ok_or_else(|| NodeError::SendFailed {
3266 node_addr: *node_addr,
3267 reason: "no current_addr".into(),
3268 })?;
3269 #[cfg(any(target_os = "linux", target_os = "macos"))]
3270 let connected_socket = peer.connected_udp();
3271
3272 let timestamp_ms = peer.session_elapsed_ms();
3274
3275 let sp_flag = peer.mmp().map(|mmp| mmp.spin_bit.tx_bit()).unwrap_or(false);
3277 let mut flags = if sp_flag { FLAG_SP } else { 0 };
3278 if ce_flag {
3279 flags |= FLAG_CE;
3280 }
3281 if peer.current_k_bit() {
3282 flags |= FLAG_KEY_EPOCH;
3283 }
3284
3285 let session = peer
3286 .noise_session_mut()
3287 .ok_or_else(|| NodeError::SendFailed {
3288 node_addr: *node_addr,
3289 reason: "no noise session".into(),
3290 })?;
3291
3292 const INNER_TS_LEN: usize = 4;
3300 let counter = session.current_send_counter();
3301 let inner_len = INNER_TS_LEN + plaintext.len();
3302 let payload_len = inner_len as u16;
3303 let header = build_established_header(their_index, counter, flags, payload_len);
3304
3305 let transport_for_send = self
3327 .transports
3328 .get(&transport_id)
3329 .ok_or(NodeError::TransportNotFound(transport_id))?;
3330 match transport_for_send.connection_state(&remote_addr) {
3331 ConnectionState::Connected => {}
3332 other => {
3333 if matches!(other, ConnectionState::None) {
3334 let _ = transport_for_send.connect(&remote_addr).await;
3335 }
3336 return Err(NodeError::SendFailed {
3337 node_addr: *node_addr,
3338 reason: format!("transport connection not ready: {:?}", other),
3339 });
3340 }
3341 }
3342 #[cfg(unix)]
3343 {
3344 let is_udp = matches!(transport_for_send, TransportHandle::Udp(_));
3345 if let Some(workers) = self.encrypt_workers.as_ref().cloned()
3346 && is_udp
3347 && let Some(cipher_clone) = session.send_cipher_clone()
3348 {
3349 let reserved_counter =
3353 session
3354 .take_send_counter()
3355 .map_err(|e| NodeError::SendFailed {
3356 node_addr: *node_addr,
3357 reason: format!("counter reservation failed: {}", e),
3358 })?;
3359 debug_assert_eq!(reserved_counter, counter);
3360 let header =
3364 build_established_header(their_index, reserved_counter, flags, payload_len);
3365 let transport = transport_for_send;
3366 let send_target = {
3373 if let TransportHandle::Udp(udp) = transport {
3374 let socket_addr = {
3375 #[cfg(any(target_os = "linux", target_os = "macos"))]
3376 {
3377 match connected_socket.as_ref() {
3378 Some(socket) => Some(socket.peer_addr()),
3379 None => udp.resolve_for_off_task(&remote_addr).await.ok(),
3380 }
3381 }
3382 #[cfg(not(any(target_os = "linux", target_os = "macos")))]
3383 {
3384 udp.resolve_for_off_task(&remote_addr).await.ok()
3385 }
3386 };
3387 match (udp.async_socket(), socket_addr) {
3388 (Some(socket), Some(socket_addr)) => Some((socket, socket_addr)),
3389 _ => None,
3390 }
3391 } else {
3392 None
3393 }
3394 };
3395 if let Some((socket, socket_addr)) = send_target {
3396 let wire_capacity = ESTABLISHED_HEADER_SIZE + inner_len + 16;
3412 let mut wire_buf = Vec::with_capacity(wire_capacity);
3413 wire_buf.extend_from_slice(&header);
3414 wire_buf.extend_from_slice(×tamp_ms.to_le_bytes());
3415 wire_buf.extend_from_slice(plaintext);
3416 let predicted_bytes = wire_capacity;
3417 if let Some(peer) = self.peers.get_mut(node_addr) {
3424 peer.link_stats_mut().record_sent(predicted_bytes);
3425 if let Some(mmp) = peer.mmp_mut() {
3426 mmp.sender
3427 .record_sent(reserved_counter, timestamp_ms, predicted_bytes);
3428 }
3429 }
3430 let scheduling_weight = self.send_weight_for_peer(node_addr);
3431 let traffic_class = classify_fmp_plaintext_traffic(plaintext);
3432 workers.dispatch(self::encrypt_worker::FmpSendJob {
3433 cipher: cipher_clone,
3434 counter: reserved_counter,
3435 wire_buf,
3436 fsp_seal: None,
3437 socket,
3438 dest_addr: socket_addr,
3439 #[cfg(any(target_os = "linux", target_os = "macos"))]
3440 connected_socket,
3441 bulk_endpoint_data: traffic_class.bulk_endpoint_data,
3442 drop_on_backpressure: traffic_class.drop_on_backpressure,
3443 scheduling_weight,
3444 queued_at: crate::perf_profile::stamp(),
3445 });
3446 return Ok(());
3447 }
3448 }
3449 }
3450
3451 let inner_plaintext = prepend_inner_header(timestamp_ms, plaintext);
3456 let ciphertext = {
3458 let _t = crate::perf_profile::Timer::start(crate::perf_profile::Stage::FmpEncrypt);
3459 session
3460 .encrypt_with_aad(&inner_plaintext, &header)
3461 .map_err(|e| NodeError::SendFailed {
3462 node_addr: *node_addr,
3463 reason: format!("encryption failed: {}", e),
3464 })?
3465 };
3466
3467 let wire_packet = build_encrypted(&header, &ciphertext);
3468
3469 let send_result = {
3471 let _t = crate::perf_profile::Timer::start(crate::perf_profile::Stage::UdpSend);
3472 let transport = self
3473 .transports
3474 .get(&transport_id)
3475 .ok_or(NodeError::TransportNotFound(transport_id))?;
3476 transport.send(&remote_addr, &wire_packet).await
3477 };
3478 self.note_local_send_outcome(node_addr, &send_result);
3479 let bytes_sent = send_result.map_err(|e| match e {
3480 TransportError::MtuExceeded { packet_size, mtu } => NodeError::MtuExceeded {
3481 node_addr: *node_addr,
3482 packet_size,
3483 mtu,
3484 },
3485 other => NodeError::SendFailed {
3486 node_addr: *node_addr,
3487 reason: format!("transport send: {}", other),
3488 },
3489 })?;
3490
3491 if let Some(peer) = self.peers.get_mut(node_addr) {
3493 peer.link_stats_mut().record_sent(bytes_sent);
3494 if let Some(mmp) = peer.mmp_mut() {
3496 mmp.sender.record_sent(counter, timestamp_ms, bytes_sent);
3497 }
3498 }
3499
3500 Ok(())
3501 }
3502}
3503
3504impl fmt::Debug for Node {
3505 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3506 f.debug_struct("Node")
3507 .field("node_addr", self.node_addr())
3508 .field("state", &self.state)
3509 .field("is_leaf_only", &self.is_leaf_only)
3510 .field("connections", &self.connection_count())
3511 .field("peers", &self.peer_count())
3512 .field("links", &self.link_count())
3513 .field("transports", &self.transport_count())
3514 .finish()
3515 }
3516}