1mod acl;
8mod bloom;
9mod decrypt_worker;
10mod discovery_rate_limit;
11mod encrypt_worker;
12mod handlers;
13mod lifecycle;
14mod rate_limit;
15mod retry;
16mod routing;
17mod routing_error_rate_limit;
18pub(crate) mod session;
19pub(crate) mod session_wire;
20pub(crate) mod stats;
21pub(crate) mod stats_history;
22#[cfg(test)]
23mod tests;
24mod tree;
25pub(crate) mod wire;
26
27use self::discovery_rate_limit::{DiscoveryBackoff, DiscoveryForwardRateLimiter};
28use self::rate_limit::HandshakeRateLimiter;
29use self::routing::{LearnedRouteTable, LearnedRouteTableSnapshot};
30use self::routing_error_rate_limit::RoutingErrorRateLimiter;
31use self::wire::{
32 ESTABLISHED_HEADER_SIZE, FLAG_CE, FLAG_KEY_EPOCH, FLAG_SP, build_encrypted,
33 build_established_header, prepend_inner_header,
34};
35use crate::bloom::BloomState;
36use crate::cache::CoordCache;
37use crate::config::RoutingMode;
38use crate::node::session::SessionEntry;
39use crate::peer::{ActivePeer, PeerConnection};
40#[cfg(any(target_os = "linux", target_os = "macos"))]
41use crate::transport::ethernet::EthernetTransport;
42use crate::transport::tcp::TcpTransport;
43use crate::transport::tor::TorTransport;
44use crate::transport::udp::UdpTransport;
45use crate::transport::{
46 Link, LinkId, PacketRx, PacketTx, TransportAddr, TransportError, TransportHandle, TransportId,
47};
48use crate::tree::TreeState;
49use crate::upper::hosts::HostMap;
50use crate::upper::icmp_rate_limit::IcmpRateLimiter;
51use crate::upper::tun::{TunError, TunOutboundRx, TunState, TunTx};
52use crate::utils::index::IndexAllocator;
53use crate::{
54 Config, ConfigError, FipsAddress, Identity, IdentityError, NodeAddr, PeerIdentity,
55 SessionMessageType, encode_npub,
56};
57use rand::Rng;
58use std::collections::{HashMap, HashSet, VecDeque};
59use std::fmt;
60use std::sync::Arc;
61use std::thread::JoinHandle;
62use thiserror::Error;
63use tracing::{debug, warn};
64
65pub(crate) const REKEY_JITTER_SECS: i64 = 15;
72
73#[derive(Debug, Error)]
75pub enum NodeError {
76 #[error("node not started")]
77 NotStarted,
78
79 #[error("node already started")]
80 AlreadyStarted,
81
82 #[error("node already stopped")]
83 AlreadyStopped,
84
85 #[error("transport not found: {0}")]
86 TransportNotFound(TransportId),
87
88 #[error("no transport available for type: {0}")]
89 NoTransportForType(String),
90
91 #[error("link not found: {0}")]
92 LinkNotFound(LinkId),
93
94 #[error("connection not found: {0}")]
95 ConnectionNotFound(LinkId),
96
97 #[error("peer not found: {0:?}")]
98 PeerNotFound(NodeAddr),
99
100 #[error("peer already exists: {0:?}")]
101 PeerAlreadyExists(NodeAddr),
102
103 #[error("connection already exists for link: {0}")]
104 ConnectionAlreadyExists(LinkId),
105
106 #[error("invalid peer npub '{npub}': {reason}")]
107 InvalidPeerNpub { npub: String, reason: String },
108
109 #[error("access denied: {0}")]
110 AccessDenied(String),
111
112 #[error("max connections exceeded: {max}")]
113 MaxConnectionsExceeded { max: usize },
114
115 #[error("max peers exceeded: {max}")]
116 MaxPeersExceeded { max: usize },
117
118 #[error("max links exceeded: {max}")]
119 MaxLinksExceeded { max: usize },
120
121 #[error("handshake incomplete for link {0}")]
122 HandshakeIncomplete(LinkId),
123
124 #[error("no session available for link {0}")]
125 NoSession(LinkId),
126
127 #[error("promotion failed for link {link_id}: {reason}")]
128 PromotionFailed { link_id: LinkId, reason: String },
129
130 #[error("send failed to {node_addr}: {reason}")]
131 SendFailed { node_addr: NodeAddr, reason: String },
132
133 #[error("mtu exceeded forwarding to {node_addr}: packet {packet_size} > mtu {mtu}")]
134 MtuExceeded {
135 node_addr: NodeAddr,
136 packet_size: usize,
137 mtu: u16,
138 },
139
140 #[error("config error: {0}")]
141 Config(#[from] ConfigError),
142
143 #[error("identity error: {0}")]
144 Identity(#[from] IdentityError),
145
146 #[error("TUN error: {0}")]
147 Tun(#[from] TunError),
148
149 #[error("index allocation failed: {0}")]
150 IndexAllocationFailed(String),
151
152 #[error("handshake failed: {0}")]
153 HandshakeFailed(String),
154
155 #[error("transport error: {0}")]
156 TransportError(String),
157
158 #[error("bootstrap handoff failed: {0}")]
159 BootstrapHandoff(String),
160}
161
162#[derive(Debug, Clone, PartialEq, Eq)]
164pub struct NodeDeliveredPacket {
165 pub source_node_addr: NodeAddr,
167 pub source_npub: Option<String>,
169 pub destination: FipsAddress,
171 pub packet: Vec<u8>,
173}
174
175#[derive(Debug, Clone)]
176struct IdentityCacheEntry {
177 node_addr: NodeAddr,
178 pubkey: secp256k1::PublicKey,
179 npub: String,
180 last_seen_ms: u64,
181}
182
183impl IdentityCacheEntry {
184 fn new(
185 node_addr: NodeAddr,
186 pubkey: secp256k1::PublicKey,
187 npub: String,
188 last_seen_ms: u64,
189 ) -> Self {
190 Self {
191 node_addr,
192 pubkey,
193 npub,
194 last_seen_ms,
195 }
196 }
197}
198
199#[derive(Debug)]
201pub struct ExternalPacketIo {
202 pub outbound_tx: crate::upper::tun::TunOutboundTx,
204 pub inbound_rx: tokio::sync::mpsc::Receiver<NodeDeliveredPacket>,
206}
207
208#[derive(Debug)]
210pub(crate) struct EndpointDataIo {
211 pub(crate) command_tx: tokio::sync::mpsc::Sender<NodeEndpointCommand>,
220 pub(crate) event_rx: tokio::sync::mpsc::UnboundedReceiver<NodeEndpointEvent>,
230 pub(crate) event_tx: tokio::sync::mpsc::UnboundedSender<NodeEndpointEvent>,
236}
237
238fn endpoint_data_command_capacity(requested: usize) -> usize {
239 if let Ok(raw) = std::env::var("FIPS_ENDPOINT_DATA_QUEUE_CAP")
240 && let Ok(value) = raw.trim().parse::<usize>()
241 && value > 0
242 {
243 return value;
244 }
245
246 requested.max(1).max(32_768)
247}
248
249#[derive(Debug)]
251pub(crate) enum NodeEndpointCommand {
252 Send {
256 remote: PeerIdentity,
257 payload: Vec<u8>,
258 queued_at: Option<std::time::Instant>,
259 response_tx: tokio::sync::oneshot::Sender<Result<(), NodeError>>,
260 },
261 SendOneway {
267 remote: PeerIdentity,
268 payload: Vec<u8>,
269 queued_at: Option<std::time::Instant>,
270 },
271 PeerSnapshot {
272 response_tx: tokio::sync::oneshot::Sender<Vec<NodeEndpointPeer>>,
273 },
274 UpdatePeers {
280 peers: Vec<crate::config::PeerConfig>,
281 response_tx: tokio::sync::oneshot::Sender<Result<UpdatePeersOutcome, NodeError>>,
282 },
283}
284
285#[derive(Debug, Clone, Default, PartialEq, Eq)]
287pub(crate) struct UpdatePeersOutcome {
288 pub(crate) added: usize,
289 pub(crate) removed: usize,
290 pub(crate) updated: usize,
291 pub(crate) unchanged: usize,
292}
293
294#[derive(Debug)]
296pub(crate) enum NodeEndpointEvent {
297 Data {
298 source_node_addr: NodeAddr,
299 source_npub: Option<String>,
300 payload: Vec<u8>,
301 queued_at: Option<std::time::Instant>,
302 },
303}
304
305#[derive(Debug, Clone, PartialEq, Eq)]
307pub(crate) struct NodeEndpointPeer {
308 pub(crate) npub: String,
309 pub(crate) transport_addr: Option<String>,
310 pub(crate) transport_type: Option<String>,
311 pub(crate) link_id: u64,
312 pub(crate) srtt_ms: Option<u64>,
313 pub(crate) packets_sent: u64,
314 pub(crate) packets_recv: u64,
315 pub(crate) bytes_sent: u64,
316 pub(crate) bytes_recv: u64,
317}
318
319#[derive(Clone, Copy, Debug, PartialEq, Eq)]
321pub enum NodeState {
322 Created,
324 Starting,
326 Running,
328 Stopping,
330 Stopped,
332}
333
334impl NodeState {
335 pub fn is_operational(&self) -> bool {
337 matches!(self, NodeState::Running)
338 }
339
340 pub fn can_start(&self) -> bool {
342 matches!(self, NodeState::Created | NodeState::Stopped)
343 }
344
345 pub fn can_stop(&self) -> bool {
347 matches!(self, NodeState::Running)
348 }
349}
350
351impl fmt::Display for NodeState {
352 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
353 let s = match self {
354 NodeState::Created => "created",
355 NodeState::Starting => "starting",
356 NodeState::Running => "running",
357 NodeState::Stopping => "stopping",
358 NodeState::Stopped => "stopped",
359 };
360 write!(f, "{}", s)
361 }
362}
363
364#[derive(Clone, Debug)]
371pub(crate) struct RecentRequest {
372 pub(crate) from_peer: NodeAddr,
374 pub(crate) timestamp_ms: u64,
376 pub(crate) response_forwarded: bool,
380}
381
382impl RecentRequest {
383 pub(crate) fn new(from_peer: NodeAddr, timestamp_ms: u64) -> Self {
384 Self {
385 from_peer,
386 timestamp_ms,
387 response_forwarded: false,
388 }
389 }
390
391 pub(crate) fn is_expired(&self, current_time_ms: u64, expiry_ms: u64) -> bool {
393 current_time_ms.saturating_sub(self.timestamp_ms) > expiry_ms
394 }
395}
396
397type AddrKey = (TransportId, TransportAddr);
399
400#[derive(Debug, Default)]
405struct TransportDropState {
406 prev_drops: u64,
408 dropping: bool,
410}
411
412struct PendingConnect {
418 link_id: LinkId,
420 transport_id: TransportId,
422 remote_addr: TransportAddr,
424 peer_identity: PeerIdentity,
426}
427
428pub struct Node {
442 identity: Identity,
445
446 startup_epoch: [u8; 8],
449
450 started_at: std::time::Instant,
452
453 config: Config,
456
457 state: NodeState,
460
461 is_leaf_only: bool,
463
464 tree_state: TreeState,
467
468 bloom_state: BloomState,
471
472 coord_cache: CoordCache,
475 learned_routes: LearnedRouteTable,
477 recent_requests: HashMap<u64, RecentRequest>,
480 path_mtu_lookup: Arc<std::sync::RwLock<HashMap<crate::FipsAddress, u16>>>,
486
487 transports: HashMap<TransportId, TransportHandle>,
490 transport_drops: HashMap<TransportId, TransportDropState>,
492 links: HashMap<LinkId, Link>,
494 addr_to_link: HashMap<AddrKey, LinkId>,
496
497 packet_tx: Option<PacketTx>,
500 packet_rx: Option<PacketRx>,
502
503 connections: HashMap<LinkId, PeerConnection>,
507
508 peers: HashMap<NodeAddr, ActivePeer>,
512
513 sessions: HashMap<NodeAddr, SessionEntry>,
517
518 identity_cache: HashMap<[u8; 15], IdentityCacheEntry>,
522
523 pending_tun_packets: HashMap<NodeAddr, VecDeque<Vec<u8>>>,
527 pending_endpoint_data: HashMap<NodeAddr, VecDeque<Vec<u8>>>,
529 pending_lookups: HashMap<NodeAddr, handlers::discovery::PendingLookup>,
533
534 max_connections: usize,
537 max_peers: usize,
539 max_links: usize,
541
542 next_link_id: u64,
545 next_transport_id: u32,
547
548 stats: stats::NodeStats,
551
552 stats_history: stats_history::StatsHistory,
554
555 tun_state: TunState,
558 tun_name: Option<String>,
560 tun_tx: Option<TunTx>,
562 tun_outbound_rx: Option<TunOutboundRx>,
564 external_packet_tx: Option<tokio::sync::mpsc::Sender<NodeDeliveredPacket>>,
566 endpoint_command_rx: Option<tokio::sync::mpsc::Receiver<NodeEndpointCommand>>,
568 endpoint_event_tx: Option<tokio::sync::mpsc::UnboundedSender<NodeEndpointEvent>>,
570 encrypt_workers: Option<encrypt_worker::EncryptWorkerPool>,
576 decrypt_workers: Option<decrypt_worker::DecryptWorkerPool>,
579 decrypt_registered_sessions: std::collections::HashSet<(TransportId, u32)>,
588 decrypt_fallback_rx:
592 Option<tokio::sync::mpsc::UnboundedReceiver<decrypt_worker::DecryptWorkerEvent>>,
593 decrypt_fallback_tx: tokio::sync::mpsc::UnboundedSender<decrypt_worker::DecryptWorkerEvent>,
594 tun_reader_handle: Option<JoinHandle<()>>,
596 tun_writer_handle: Option<JoinHandle<()>>,
598 #[cfg(target_os = "macos")]
601 tun_shutdown_fd: Option<std::os::unix::io::RawFd>,
602
603 dns_identity_rx: Option<crate::upper::dns::DnsIdentityRx>,
606 dns_task: Option<tokio::task::JoinHandle<()>>,
608
609 index_allocator: IndexAllocator,
612 peers_by_index: HashMap<(TransportId, u32), NodeAddr>,
615 pending_outbound: HashMap<(TransportId, u32), LinkId>,
618
619 msg1_rate_limiter: HandshakeRateLimiter,
622 icmp_rate_limiter: IcmpRateLimiter,
624 routing_error_rate_limiter: RoutingErrorRateLimiter,
626 coords_response_rate_limiter: RoutingErrorRateLimiter,
628 discovery_backoff: DiscoveryBackoff,
630 discovery_forward_limiter: DiscoveryForwardRateLimiter,
632
633 pending_connects: Vec<PendingConnect>,
639
640 retry_pending: HashMap<NodeAddr, retry::RetryState>,
646
647 nostr_discovery: Option<Arc<crate::discovery::nostr::NostrDiscovery>>,
649 lan_discovery: Option<Arc<crate::discovery::lan::LanDiscovery>>,
654 nostr_discovery_started_at_ms: Option<u64>,
659 startup_open_discovery_sweep_done: bool,
663 bootstrap_transports: HashSet<TransportId>,
665 bootstrap_transport_npubs: HashMap<TransportId, String>,
672 discovery_fallback_transit_blocked_peers: HashSet<NodeAddr>,
675
676 last_parent_reeval: Option<crate::time::Instant>,
679
680 last_congestion_log: Option<std::time::Instant>,
683
684 estimated_mesh_size: Option<u64>,
687 last_mesh_size_log: Option<std::time::Instant>,
689
690 last_self_warn: Option<std::time::Instant>,
696
697 last_local_send_failure_at: Option<std::time::Instant>,
705
706 peer_aliases: HashMap<NodeAddr, String>,
710
711 peer_acl: acl::PeerAclReloader,
713
714 host_map: Arc<HostMap>,
718}
719
720impl Node {
721 pub fn new(config: Config) -> Result<Self, NodeError> {
723 config.validate()?;
724 let identity = config.create_identity()?;
725 let node_addr = *identity.node_addr();
726 let is_leaf_only = config.is_leaf_only();
727
728 let (decrypt_fallback_tx, decrypt_fallback_rx) = tokio::sync::mpsc::unbounded_channel();
729 let decrypt_fallback_rx = Some(decrypt_fallback_rx);
730
731 let mut startup_epoch = [0u8; 8];
732 rand::rng().fill_bytes(&mut startup_epoch);
733
734 let mut bloom_state = if is_leaf_only {
735 BloomState::leaf_only(node_addr)
736 } else {
737 BloomState::new(node_addr)
738 };
739 bloom_state.set_update_debounce_ms(config.node.bloom.update_debounce_ms);
740
741 let tun_state = if config.tun.enabled {
742 TunState::Configured
743 } else {
744 TunState::Disabled
745 };
746
747 let mut tree_state = TreeState::new(node_addr);
749 tree_state.set_parent_hysteresis(config.node.tree.parent_hysteresis);
750 tree_state.set_hold_down(config.node.tree.hold_down_secs);
751 tree_state.set_flap_dampening(
752 config.node.tree.flap_threshold,
753 config.node.tree.flap_window_secs,
754 config.node.tree.flap_dampening_secs,
755 );
756 tree_state
757 .sign_declaration(&identity)
758 .expect("signing own declaration should never fail");
759
760 let coord_cache = CoordCache::new(
761 config.node.cache.coord_size,
762 config.node.cache.coord_ttl_secs * 1000,
763 );
764 let rl = &config.node.rate_limit;
765 let msg1_rate_limiter = HandshakeRateLimiter::with_params(
766 rate_limit::TokenBucket::with_params(rl.handshake_burst, rl.handshake_rate),
767 config.node.limits.max_pending_inbound,
768 );
769
770 let max_connections = config.node.limits.max_connections;
771 let max_peers = config.node.limits.max_peers;
772 let max_links = config.node.limits.max_links;
773 let coords_response_interval_ms = config.node.session.coords_response_interval_ms;
774 let backoff_base_secs = config.node.discovery.backoff_base_secs;
775 let backoff_max_secs = config.node.discovery.backoff_max_secs;
776 let forward_min_interval_secs = config.node.discovery.forward_min_interval_secs;
777
778 let (host_map, peer_acl) = Self::host_map_and_peer_acl(&config);
779
780 Ok(Self {
781 identity,
782 startup_epoch,
783 started_at: std::time::Instant::now(),
784 config,
785 state: NodeState::Created,
786 is_leaf_only,
787 tree_state,
788 bloom_state,
789 coord_cache,
790 learned_routes: LearnedRouteTable::default(),
791 recent_requests: HashMap::new(),
792 transports: HashMap::new(),
793 transport_drops: HashMap::new(),
794 links: HashMap::new(),
795 addr_to_link: HashMap::new(),
796 packet_tx: None,
797 packet_rx: None,
798 connections: HashMap::new(),
799 peers: HashMap::new(),
800 sessions: HashMap::new(),
801 identity_cache: HashMap::new(),
802 pending_tun_packets: HashMap::new(),
803 pending_endpoint_data: HashMap::new(),
804 pending_lookups: HashMap::new(),
805 max_connections,
806 max_peers,
807 max_links,
808 next_link_id: 1,
809 next_transport_id: 1,
810 stats: stats::NodeStats::new(),
811 stats_history: stats_history::StatsHistory::new(),
812 tun_state,
813 tun_name: None,
814 tun_tx: None,
815 tun_outbound_rx: None,
816 external_packet_tx: None,
817 endpoint_command_rx: None,
818 endpoint_event_tx: None,
819 encrypt_workers: None,
820 decrypt_workers: None,
821 decrypt_registered_sessions: std::collections::HashSet::new(),
822 decrypt_fallback_tx,
823 decrypt_fallback_rx,
824 tun_reader_handle: None,
825 tun_writer_handle: None,
826 #[cfg(target_os = "macos")]
827 tun_shutdown_fd: None,
828 dns_identity_rx: None,
829 dns_task: None,
830 index_allocator: IndexAllocator::new(),
831 peers_by_index: HashMap::new(),
832 pending_outbound: HashMap::new(),
833 msg1_rate_limiter,
834 icmp_rate_limiter: IcmpRateLimiter::new(),
835 routing_error_rate_limiter: RoutingErrorRateLimiter::new(),
836 coords_response_rate_limiter: RoutingErrorRateLimiter::with_interval(
837 std::time::Duration::from_millis(coords_response_interval_ms),
838 ),
839 discovery_backoff: DiscoveryBackoff::with_params(backoff_base_secs, backoff_max_secs),
840 discovery_forward_limiter: DiscoveryForwardRateLimiter::with_interval(
841 std::time::Duration::from_secs(forward_min_interval_secs),
842 ),
843 pending_connects: Vec::new(),
844 retry_pending: HashMap::new(),
845 nostr_discovery: None,
846 nostr_discovery_started_at_ms: None,
847 lan_discovery: None,
848 startup_open_discovery_sweep_done: false,
849 bootstrap_transports: HashSet::new(),
850 bootstrap_transport_npubs: HashMap::new(),
851 discovery_fallback_transit_blocked_peers: HashSet::new(),
852 last_parent_reeval: None,
853 last_congestion_log: None,
854 estimated_mesh_size: None,
855 last_mesh_size_log: None,
856 last_self_warn: None,
857 last_local_send_failure_at: None,
858 peer_aliases: HashMap::new(),
859 peer_acl,
860 host_map,
861 path_mtu_lookup: Arc::new(std::sync::RwLock::new(HashMap::new())),
862 })
863 }
864
865 pub fn with_identity(identity: Identity, config: Config) -> Result<Self, NodeError> {
870 config.validate()?;
871 let node_addr = *identity.node_addr();
872
873 let (decrypt_fallback_tx, decrypt_fallback_rx) = tokio::sync::mpsc::unbounded_channel();
874 let decrypt_fallback_rx = Some(decrypt_fallback_rx);
875
876 let mut startup_epoch = [0u8; 8];
877 rand::rng().fill_bytes(&mut startup_epoch);
878
879 let tun_state = if config.tun.enabled {
880 TunState::Configured
881 } else {
882 TunState::Disabled
883 };
884
885 let mut tree_state = TreeState::new(node_addr);
887 tree_state.set_parent_hysteresis(config.node.tree.parent_hysteresis);
888 tree_state.set_hold_down(config.node.tree.hold_down_secs);
889 tree_state.set_flap_dampening(
890 config.node.tree.flap_threshold,
891 config.node.tree.flap_window_secs,
892 config.node.tree.flap_dampening_secs,
893 );
894 tree_state
895 .sign_declaration(&identity)
896 .expect("signing own declaration should never fail");
897
898 let mut bloom_state = BloomState::new(node_addr);
899 bloom_state.set_update_debounce_ms(config.node.bloom.update_debounce_ms);
900
901 let coord_cache = CoordCache::new(
902 config.node.cache.coord_size,
903 config.node.cache.coord_ttl_secs * 1000,
904 );
905 let rl = &config.node.rate_limit;
906 let msg1_rate_limiter = HandshakeRateLimiter::with_params(
907 rate_limit::TokenBucket::with_params(rl.handshake_burst, rl.handshake_rate),
908 config.node.limits.max_pending_inbound,
909 );
910
911 let max_connections = config.node.limits.max_connections;
912 let max_peers = config.node.limits.max_peers;
913 let max_links = config.node.limits.max_links;
914 let coords_response_interval_ms = config.node.session.coords_response_interval_ms;
915
916 let (host_map, peer_acl) = Self::host_map_and_peer_acl(&config);
917
918 Ok(Self {
919 identity,
920 startup_epoch,
921 started_at: std::time::Instant::now(),
922 config,
923 state: NodeState::Created,
924 is_leaf_only: false,
925 tree_state,
926 bloom_state,
927 coord_cache,
928 learned_routes: LearnedRouteTable::default(),
929 recent_requests: HashMap::new(),
930 transports: HashMap::new(),
931 transport_drops: HashMap::new(),
932 links: HashMap::new(),
933 addr_to_link: HashMap::new(),
934 packet_tx: None,
935 packet_rx: None,
936 connections: HashMap::new(),
937 peers: HashMap::new(),
938 sessions: HashMap::new(),
939 identity_cache: HashMap::new(),
940 pending_tun_packets: HashMap::new(),
941 pending_endpoint_data: HashMap::new(),
942 pending_lookups: HashMap::new(),
943 max_connections,
944 max_peers,
945 max_links,
946 next_link_id: 1,
947 next_transport_id: 1,
948 stats: stats::NodeStats::new(),
949 stats_history: stats_history::StatsHistory::new(),
950 tun_state,
951 tun_name: None,
952 tun_tx: None,
953 tun_outbound_rx: None,
954 external_packet_tx: None,
955 endpoint_command_rx: None,
956 endpoint_event_tx: None,
957 encrypt_workers: None,
958 decrypt_workers: None,
959 decrypt_registered_sessions: std::collections::HashSet::new(),
960 decrypt_fallback_tx,
961 decrypt_fallback_rx,
962 tun_reader_handle: None,
963 tun_writer_handle: None,
964 #[cfg(target_os = "macos")]
965 tun_shutdown_fd: None,
966 dns_identity_rx: None,
967 dns_task: None,
968 index_allocator: IndexAllocator::new(),
969 peers_by_index: HashMap::new(),
970 pending_outbound: HashMap::new(),
971 msg1_rate_limiter,
972 icmp_rate_limiter: IcmpRateLimiter::new(),
973 routing_error_rate_limiter: RoutingErrorRateLimiter::new(),
974 coords_response_rate_limiter: RoutingErrorRateLimiter::with_interval(
975 std::time::Duration::from_millis(coords_response_interval_ms),
976 ),
977 discovery_backoff: DiscoveryBackoff::new(),
978 discovery_forward_limiter: DiscoveryForwardRateLimiter::new(),
979 pending_connects: Vec::new(),
980 retry_pending: HashMap::new(),
981 nostr_discovery: None,
982 nostr_discovery_started_at_ms: None,
983 lan_discovery: None,
984 startup_open_discovery_sweep_done: false,
985 bootstrap_transports: HashSet::new(),
986 bootstrap_transport_npubs: HashMap::new(),
987 discovery_fallback_transit_blocked_peers: HashSet::new(),
988 last_parent_reeval: None,
989 last_congestion_log: None,
990 estimated_mesh_size: None,
991 last_mesh_size_log: None,
992 last_self_warn: None,
993 last_local_send_failure_at: None,
994 peer_aliases: HashMap::new(),
995 peer_acl,
996 host_map,
997 path_mtu_lookup: Arc::new(std::sync::RwLock::new(HashMap::new())),
998 })
999 }
1000
1001 pub fn leaf_only(config: Config) -> Result<Self, NodeError> {
1003 let mut node = Self::new(config)?;
1004 node.is_leaf_only = true;
1005 node.bloom_state = BloomState::leaf_only(*node.identity.node_addr());
1006 Ok(node)
1007 }
1008
1009 fn host_map_and_peer_acl(config: &Config) -> (Arc<HostMap>, acl::PeerAclReloader) {
1010 let base_host_map = HostMap::from_peer_configs(config.peers());
1011 if !config.node.system_files_enabled {
1012 return (
1013 Arc::new(base_host_map.clone()),
1014 acl::PeerAclReloader::memory_only(base_host_map),
1015 );
1016 }
1017
1018 let mut host_map = base_host_map.clone();
1019 let hosts_path = std::path::PathBuf::from(crate::upper::hosts::DEFAULT_HOSTS_PATH);
1020 let hosts_file = HostMap::load_hosts_file(std::path::Path::new(
1021 crate::upper::hosts::DEFAULT_HOSTS_PATH,
1022 ));
1023 host_map.merge(hosts_file);
1024 let peer_acl = acl::PeerAclReloader::with_alias_sources(
1025 std::path::PathBuf::from(acl::DEFAULT_PEERS_ALLOW_PATH),
1026 std::path::PathBuf::from(acl::DEFAULT_PEERS_DENY_PATH),
1027 base_host_map,
1028 hosts_path,
1029 );
1030 (Arc::new(host_map), peer_acl)
1031 }
1032
1033 async fn create_transports(&mut self, packet_tx: &PacketTx) -> Vec<TransportHandle> {
1037 let mut transports = Vec::new();
1038
1039 let udp_instances: Vec<_> = self
1041 .config
1042 .transports
1043 .udp
1044 .iter()
1045 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1046 .collect();
1047
1048 for (name, udp_config) in udp_instances {
1050 let transport_id = self.allocate_transport_id();
1051 let udp = UdpTransport::new(transport_id, name, udp_config, packet_tx.clone());
1052 transports.push(TransportHandle::Udp(udp));
1053 }
1054
1055 #[cfg(feature = "sim-transport")]
1056 {
1057 let sim_instances: Vec<_> = self
1058 .config
1059 .transports
1060 .sim
1061 .iter()
1062 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1063 .collect();
1064
1065 for (name, sim_config) in sim_instances {
1066 let transport_id = self.allocate_transport_id();
1067 let sim = crate::transport::sim::SimTransport::new(
1068 transport_id,
1069 name,
1070 sim_config,
1071 packet_tx.clone(),
1072 );
1073 transports.push(TransportHandle::Sim(sim));
1074 }
1075 }
1076
1077 #[cfg(any(target_os = "linux", target_os = "macos"))]
1079 {
1080 let eth_instances: Vec<_> = self
1081 .config
1082 .transports
1083 .ethernet
1084 .iter()
1085 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1086 .collect();
1087 let xonly = self.identity.pubkey();
1088 for (name, eth_config) in eth_instances {
1089 let transport_id = self.allocate_transport_id();
1090 let mut eth =
1091 EthernetTransport::new(transport_id, name, eth_config, packet_tx.clone());
1092 eth.set_local_pubkey(xonly);
1093 transports.push(TransportHandle::Ethernet(eth));
1094 }
1095 }
1096
1097 let tcp_instances: Vec<_> = self
1099 .config
1100 .transports
1101 .tcp
1102 .iter()
1103 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1104 .collect();
1105
1106 for (name, tcp_config) in tcp_instances {
1107 let transport_id = self.allocate_transport_id();
1108 let tcp = TcpTransport::new(transport_id, name, tcp_config, packet_tx.clone());
1109 transports.push(TransportHandle::Tcp(tcp));
1110 }
1111
1112 let tor_instances: Vec<_> = self
1114 .config
1115 .transports
1116 .tor
1117 .iter()
1118 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1119 .collect();
1120
1121 for (name, tor_config) in tor_instances {
1122 let transport_id = self.allocate_transport_id();
1123 let tor = TorTransport::new(transport_id, name, tor_config, packet_tx.clone());
1124 transports.push(TransportHandle::Tor(tor));
1125 }
1126
1127 #[cfg(bluer_available)]
1129 {
1130 let ble_instances: Vec<_> = self
1131 .config
1132 .transports
1133 .ble
1134 .iter()
1135 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1136 .collect();
1137
1138 #[cfg(all(bluer_available, not(test)))]
1139 for (name, ble_config) in ble_instances {
1140 let transport_id = self.allocate_transport_id();
1141 let adapter = ble_config.adapter().to_string();
1142 let mtu = ble_config.mtu();
1143 match crate::transport::ble::io::BluerIo::new(&adapter, mtu).await {
1144 Ok(io) => {
1145 let mut ble = crate::transport::ble::BleTransport::new(
1146 transport_id,
1147 name,
1148 ble_config,
1149 io,
1150 packet_tx.clone(),
1151 );
1152 ble.set_local_pubkey(self.identity.pubkey().serialize());
1153 transports.push(TransportHandle::Ble(ble));
1154 }
1155 Err(e) => {
1156 tracing::warn!(adapter = %adapter, error = %e, "failed to initialize BLE adapter");
1157 }
1158 }
1159 }
1160
1161 #[cfg(any(not(bluer_available), test))]
1162 if !ble_instances.is_empty() {
1163 #[cfg(not(test))]
1164 tracing::warn!("BLE transport configured but this build lacks BlueZ support");
1165 }
1166 }
1167
1168 transports
1169 }
1170
1171 fn find_transport_for_type(&self, transport_type: &str) -> Option<TransportId> {
1181 self.transports
1182 .iter()
1183 .filter(|(id, handle)| {
1184 handle.transport_type().name == transport_type
1185 && handle.is_operational()
1186 && !self.bootstrap_transports.contains(id)
1187 })
1188 .min_by_key(|(id, _)| id.as_u32())
1189 .map(|(id, _)| *id)
1190 }
1191
1192 #[allow(unused_variables)]
1198 fn resolve_ethernet_addr(
1199 &self,
1200 addr_str: &str,
1201 ) -> Result<(TransportId, TransportAddr), NodeError> {
1202 #[cfg(any(target_os = "linux", target_os = "macos"))]
1203 {
1204 let (iface, mac_str) = addr_str.split_once('/').ok_or_else(|| {
1205 NodeError::NoTransportForType(format!(
1206 "invalid Ethernet address format '{}': expected 'interface/mac'",
1207 addr_str
1208 ))
1209 })?;
1210
1211 let transport_id = self
1213 .transports
1214 .iter()
1215 .find(|(_, handle)| {
1216 handle.transport_type().name == "ethernet"
1217 && handle.is_operational()
1218 && handle.interface_name() == Some(iface)
1219 })
1220 .map(|(id, _)| *id)
1221 .ok_or_else(|| {
1222 NodeError::NoTransportForType(format!(
1223 "no operational Ethernet transport for interface '{}'",
1224 iface
1225 ))
1226 })?;
1227
1228 let mac = crate::transport::ethernet::parse_mac_string(mac_str).map_err(|e| {
1229 NodeError::NoTransportForType(format!("invalid MAC in '{}': {}", addr_str, e))
1230 })?;
1231
1232 Ok((transport_id, TransportAddr::from_bytes(&mac)))
1233 }
1234 #[cfg(not(any(target_os = "linux", target_os = "macos")))]
1235 {
1236 Err(NodeError::NoTransportForType(
1237 "Ethernet transport is not supported on this platform".to_string(),
1238 ))
1239 }
1240 }
1241
1242 #[cfg(bluer_available)]
1246 fn resolve_ble_addr(&self, addr_str: &str) -> Result<(TransportId, TransportAddr), NodeError> {
1247 let ta = TransportAddr::from_string(addr_str);
1248 let adapter = crate::transport::ble::addr::adapter_from_addr(&ta).ok_or_else(|| {
1249 NodeError::NoTransportForType(format!(
1250 "invalid BLE address format '{}': expected 'adapter/mac'",
1251 addr_str
1252 ))
1253 })?;
1254
1255 let transport_id = self
1257 .transports
1258 .iter()
1259 .find(|(_, handle)| handle.transport_type().name == "ble" && handle.is_operational())
1260 .map(|(id, _)| *id)
1261 .ok_or_else(|| {
1262 NodeError::NoTransportForType(format!(
1263 "no operational BLE transport for adapter '{}'",
1264 adapter
1265 ))
1266 })?;
1267
1268 crate::transport::ble::addr::BleAddr::parse(addr_str).map_err(|e| {
1270 NodeError::NoTransportForType(format!("invalid BLE address '{}': {}", addr_str, e))
1271 })?;
1272
1273 Ok((transport_id, TransportAddr::from_string(addr_str)))
1274 }
1275
1276 pub fn identity(&self) -> &Identity {
1280 &self.identity
1281 }
1282
1283 pub fn node_addr(&self) -> &NodeAddr {
1285 self.identity.node_addr()
1286 }
1287
1288 pub fn npub(&self) -> String {
1290 self.identity.npub()
1291 }
1292
1293 pub(crate) fn peer_display_name(&self, addr: &NodeAddr) -> String {
1302 if let Some(hostname) = self.host_map.lookup_hostname(addr) {
1303 return hostname.to_string();
1304 }
1305 if let Some(name) = self.peer_aliases.get(addr) {
1306 return name.clone();
1307 }
1308 if let Some(peer) = self.peers.get(addr) {
1309 return peer.identity().short_npub();
1310 }
1311 if let Some(entry) = self.sessions.get(addr) {
1312 let (xonly, _) = entry.remote_pubkey().x_only_public_key();
1313 return PeerIdentity::from_pubkey(xonly).short_npub();
1314 }
1315 addr.short_hex()
1316 }
1317
1318 pub(in crate::node) fn deregister_session_index(&mut self, cache_key: (TransportId, u32)) {
1330 let owning_peer = self.peers_by_index.get(&cache_key).copied();
1334 self.peers_by_index.remove(&cache_key);
1335 if self.decrypt_registered_sessions.remove(&cache_key)
1336 && let Some(workers) = self.decrypt_workers.as_ref()
1337 {
1338 workers.unregister_session(cache_key);
1339 }
1340 if let Some(peer_addr) = owning_peer {
1351 let peer_has_other_index = self
1352 .peers_by_index
1353 .values()
1354 .any(|other| *other == peer_addr);
1355 if !peer_has_other_index {
1356 self.clear_connected_udp_for_peer(&peer_addr);
1357 }
1358 }
1359 }
1360
1361 pub(in crate::node) fn ensure_current_session_index_registered(
1370 &mut self,
1371 node_addr: &NodeAddr,
1372 context: &'static str,
1373 ) -> bool {
1374 let Some(peer) = self.peers.get(node_addr) else {
1375 return false;
1376 };
1377 let Some(transport_id) = peer.transport_id() else {
1378 warn!(
1379 peer = %self.peer_display_name(node_addr),
1380 context,
1381 "Cannot register current session index without transport id"
1382 );
1383 return false;
1384 };
1385 let Some(our_index) = peer.our_index() else {
1386 warn!(
1387 peer = %self.peer_display_name(node_addr),
1388 context,
1389 "Cannot register current session index without local index"
1390 );
1391 return false;
1392 };
1393
1394 let cache_key = (transport_id, our_index.as_u32());
1395 match self.peers_by_index.get(&cache_key).copied() {
1396 Some(existing) if existing == *node_addr => true,
1397 Some(existing) => {
1398 warn!(
1399 peer = %self.peer_display_name(node_addr),
1400 previous_owner = %self.peer_display_name(&existing),
1401 transport_id = %transport_id,
1402 our_index = %our_index,
1403 context,
1404 "Repairing current session index with stale owner"
1405 );
1406 self.peers_by_index.insert(cache_key, *node_addr);
1407 true
1408 }
1409 None => {
1410 warn!(
1411 peer = %self.peer_display_name(node_addr),
1412 transport_id = %transport_id,
1413 our_index = %our_index,
1414 context,
1415 "Repairing missing current session index"
1416 );
1417 self.peers_by_index.insert(cache_key, *node_addr);
1418 true
1419 }
1420 }
1421 }
1422
1423 pub fn config(&self) -> &Config {
1427 &self.config
1428 }
1429
1430 pub fn effective_ipv6_mtu(&self) -> u16 {
1436 crate::upper::icmp::effective_ipv6_mtu(self.transport_mtu())
1437 }
1438
1439 pub fn transport_mtu(&self) -> u16 {
1456 let min_operational = self
1457 .transports
1458 .values()
1459 .filter(|h| h.is_operational())
1460 .map(|h| h.mtu())
1461 .min();
1462 if let Some(mtu) = min_operational {
1463 return mtu;
1464 }
1465 if let Some((_, cfg)) = self.config.transports.udp.iter().next() {
1467 return cfg.mtu();
1468 }
1469 1280
1470 }
1471
1472 pub fn state(&self) -> NodeState {
1476 self.state
1477 }
1478
1479 pub fn uptime(&self) -> std::time::Duration {
1481 self.started_at.elapsed()
1482 }
1483
1484 pub fn is_running(&self) -> bool {
1486 self.state.is_operational()
1487 }
1488
1489 pub fn is_leaf_only(&self) -> bool {
1491 self.is_leaf_only
1492 }
1493
1494 pub fn tree_state(&self) -> &TreeState {
1498 &self.tree_state
1499 }
1500
1501 pub fn tree_state_mut(&mut self) -> &mut TreeState {
1503 &mut self.tree_state
1504 }
1505
1506 pub fn bloom_state(&self) -> &BloomState {
1510 &self.bloom_state
1511 }
1512
1513 pub fn bloom_state_mut(&mut self) -> &mut BloomState {
1515 &mut self.bloom_state
1516 }
1517
1518 pub fn estimated_mesh_size(&self) -> Option<u64> {
1522 self.estimated_mesh_size
1523 }
1524
1525 pub(crate) fn compute_mesh_size(&mut self) {
1531 let my_addr = *self.tree_state.my_node_addr();
1532 let parent_id = *self.tree_state.my_declaration().parent_id();
1533 let is_root = self.tree_state.is_root();
1534
1535 let max_fpr = self.config.node.bloom.max_inbound_fpr;
1536 let mut total: f64 = 1.0; let mut child_count: u32 = 0;
1538 let mut has_data = false;
1539
1540 if !is_root
1546 && let Some(parent) = self.peers.get(&parent_id)
1547 && let Some(filter) = parent.inbound_filter()
1548 {
1549 match filter.estimated_count(max_fpr) {
1550 Some(n) => {
1551 total += n;
1552 has_data = true;
1553 }
1554 None => {
1555 self.estimated_mesh_size = None;
1556 return;
1557 }
1558 }
1559 }
1560
1561 for (peer_addr, peer) in &self.peers {
1563 if let Some(decl) = self.tree_state.peer_declaration(peer_addr)
1564 && *decl.parent_id() == my_addr
1565 {
1566 child_count += 1;
1567 if let Some(filter) = peer.inbound_filter() {
1568 match filter.estimated_count(max_fpr) {
1569 Some(n) => {
1570 total += n;
1571 has_data = true;
1572 }
1573 None => {
1574 self.estimated_mesh_size = None;
1575 return;
1576 }
1577 }
1578 }
1579 }
1580 }
1581
1582 if !has_data {
1583 self.estimated_mesh_size = None;
1584 return;
1585 }
1586
1587 let size = total.round() as u64;
1588 self.estimated_mesh_size = Some(size);
1589
1590 let now = std::time::Instant::now();
1592 let should_log = match self.last_mesh_size_log {
1593 None => true,
1594 Some(last) => {
1595 now.duration_since(last)
1596 >= std::time::Duration::from_secs(self.config.node.mmp.log_interval_secs)
1597 }
1598 };
1599 if should_log {
1600 tracing::debug!(
1601 estimated_mesh_size = size,
1602 peers = self.peers.len(),
1603 children = child_count,
1604 "Mesh size estimate"
1605 );
1606 self.last_mesh_size_log = Some(now);
1607 }
1608 }
1609
1610 pub fn coord_cache(&self) -> &CoordCache {
1614 &self.coord_cache
1615 }
1616
1617 pub fn coord_cache_mut(&mut self) -> &mut CoordCache {
1619 &mut self.coord_cache
1620 }
1621
1622 pub fn stats(&self) -> &stats::NodeStats {
1626 &self.stats
1627 }
1628
1629 pub(crate) fn stats_mut(&mut self) -> &mut stats::NodeStats {
1631 &mut self.stats
1632 }
1633
1634 pub fn stats_history(&self) -> &stats_history::StatsHistory {
1636 &self.stats_history
1637 }
1638
1639 pub(crate) fn record_stats_history(&mut self) {
1642 let fwd = &self.stats.forwarding;
1643 let peers_with_mmp: Vec<f64> = self
1644 .peers
1645 .values()
1646 .filter_map(|p| p.mmp().map(|m| m.metrics.loss_rate()))
1647 .collect();
1648 let loss_rate = if peers_with_mmp.is_empty() {
1649 0.0
1650 } else {
1651 peers_with_mmp.iter().sum::<f64>() / peers_with_mmp.len() as f64
1652 };
1653
1654 let snap = stats_history::Snapshot {
1655 mesh_size: self.estimated_mesh_size,
1656 tree_depth: self.tree_state.my_coords().depth() as u32,
1657 peer_count: self.peers.len() as u64,
1658 parent_switches_total: self.stats.tree.parent_switches,
1659 bytes_in_total: fwd.received_bytes,
1660 bytes_out_total: fwd.forwarded_bytes + fwd.originated_bytes,
1661 packets_in_total: fwd.received_packets,
1662 packets_out_total: fwd.forwarded_packets + fwd.originated_packets,
1663 loss_rate,
1664 active_sessions: self.sessions.len() as u64,
1665 };
1666
1667 let now = std::time::Instant::now();
1668 let peer_snaps: Vec<stats_history::PeerSnapshot> = self
1669 .peers
1670 .values()
1671 .map(|p| {
1672 let stats = p.link_stats();
1673 let (srtt_ms, loss_rate, ecn_ce) = match p.mmp() {
1674 Some(m) => (
1675 m.metrics.srtt_ms(),
1676 Some(m.metrics.loss_rate()),
1677 m.receiver.ecn_ce_count() as u64,
1678 ),
1679 None => (None, None, 0),
1680 };
1681 stats_history::PeerSnapshot {
1682 node_addr: *p.node_addr(),
1683 last_seen: now,
1684 srtt_ms,
1685 loss_rate,
1686 bytes_in_total: stats.bytes_recv,
1687 bytes_out_total: stats.bytes_sent,
1688 packets_in_total: stats.packets_recv,
1689 packets_out_total: stats.packets_sent,
1690 ecn_ce_total: ecn_ce,
1691 }
1692 })
1693 .collect();
1694
1695 self.stats_history.tick(now, &snap, &peer_snaps);
1696 }
1697
1698 pub fn tun_state(&self) -> TunState {
1702 self.tun_state
1703 }
1704
1705 pub fn tun_name(&self) -> Option<&str> {
1707 self.tun_name.as_deref()
1708 }
1709
1710 pub fn set_max_connections(&mut self, max: usize) {
1714 self.max_connections = max;
1715 }
1716
1717 pub fn set_max_peers(&mut self, max: usize) {
1719 self.max_peers = max;
1720 }
1721
1722 pub fn set_max_links(&mut self, max: usize) {
1724 self.max_links = max;
1725 }
1726
1727 pub fn connection_count(&self) -> usize {
1731 self.connections.len()
1732 }
1733
1734 pub fn peer_count(&self) -> usize {
1736 self.peers.len()
1737 }
1738
1739 pub fn link_count(&self) -> usize {
1741 self.links.len()
1742 }
1743
1744 pub fn transport_count(&self) -> usize {
1746 self.transports.len()
1747 }
1748
1749 pub fn allocate_transport_id(&mut self) -> TransportId {
1753 let id = TransportId::new(self.next_transport_id);
1754 self.next_transport_id += 1;
1755 id
1756 }
1757
1758 pub fn get_transport(&self, id: &TransportId) -> Option<&TransportHandle> {
1760 self.transports.get(id)
1761 }
1762
1763 pub fn get_transport_mut(&mut self, id: &TransportId) -> Option<&mut TransportHandle> {
1765 self.transports.get_mut(id)
1766 }
1767
1768 pub fn transport_ids(&self) -> impl Iterator<Item = &TransportId> {
1770 self.transports.keys()
1771 }
1772
1773 pub fn packet_rx(&mut self) -> Option<&mut PacketRx> {
1775 self.packet_rx.as_mut()
1776 }
1777
1778 pub fn allocate_link_id(&mut self) -> LinkId {
1782 let id = LinkId::new(self.next_link_id);
1783 self.next_link_id += 1;
1784 id
1785 }
1786
1787 pub fn add_link(&mut self, link: Link) -> Result<(), NodeError> {
1789 if self.max_links > 0 && self.links.len() >= self.max_links {
1790 return Err(NodeError::MaxLinksExceeded {
1791 max: self.max_links,
1792 });
1793 }
1794 let link_id = link.link_id();
1795 let transport_id = link.transport_id();
1796 let remote_addr = link.remote_addr().clone();
1797
1798 self.links.insert(link_id, link);
1799 self.addr_to_link
1800 .insert((transport_id, remote_addr), link_id);
1801 Ok(())
1802 }
1803
1804 pub fn get_link(&self, link_id: &LinkId) -> Option<&Link> {
1806 self.links.get(link_id)
1807 }
1808
1809 pub fn get_link_mut(&mut self, link_id: &LinkId) -> Option<&mut Link> {
1811 self.links.get_mut(link_id)
1812 }
1813
1814 pub fn find_link_by_addr(
1816 &self,
1817 transport_id: TransportId,
1818 addr: &TransportAddr,
1819 ) -> Option<LinkId> {
1820 self.addr_to_link
1821 .get(&(transport_id, addr.clone()))
1822 .copied()
1823 }
1824
1825 pub fn remove_link(&mut self, link_id: &LinkId) -> Option<Link> {
1831 if let Some(link) = self.links.remove(link_id) {
1832 let key = (link.transport_id(), link.remote_addr().clone());
1834 if self.addr_to_link.get(&key) == Some(link_id) {
1835 self.addr_to_link.remove(&key);
1836 }
1837 Some(link)
1838 } else {
1839 None
1840 }
1841 }
1842
1843 pub(crate) fn cleanup_bootstrap_transport_if_unused(&mut self, transport_id: TransportId) {
1844 if !self.bootstrap_transports.contains(&transport_id) {
1845 return;
1846 }
1847
1848 let transport_in_use = self
1849 .links
1850 .values()
1851 .any(|link| link.transport_id() == transport_id)
1852 || self
1853 .connections
1854 .values()
1855 .any(|conn| conn.transport_id() == Some(transport_id))
1856 || self
1857 .peers
1858 .values()
1859 .any(|peer| peer.transport_id() == Some(transport_id))
1860 || self
1861 .pending_connects
1862 .iter()
1863 .any(|pending| pending.transport_id == transport_id);
1864
1865 if transport_in_use {
1866 return;
1867 }
1868
1869 tracing::debug!(
1870 transport_id = %transport_id,
1871 "bootstrap transport has no remaining references; dropping"
1872 );
1873
1874 self.bootstrap_transports.remove(&transport_id);
1875 self.bootstrap_transport_npubs.remove(&transport_id);
1876 self.transport_drops.remove(&transport_id);
1877 self.transports.remove(&transport_id);
1878 }
1879
1880 pub fn links(&self) -> impl Iterator<Item = &Link> {
1882 self.links.values()
1883 }
1884
1885 pub fn add_connection(&mut self, connection: PeerConnection) -> Result<(), NodeError> {
1889 let link_id = connection.link_id();
1890
1891 if self.connections.contains_key(&link_id) {
1892 return Err(NodeError::ConnectionAlreadyExists(link_id));
1893 }
1894
1895 if self.max_connections > 0 && self.connections.len() >= self.max_connections {
1896 return Err(NodeError::MaxConnectionsExceeded {
1897 max: self.max_connections,
1898 });
1899 }
1900
1901 self.connections.insert(link_id, connection);
1902 Ok(())
1903 }
1904
1905 pub fn get_connection(&self, link_id: &LinkId) -> Option<&PeerConnection> {
1907 self.connections.get(link_id)
1908 }
1909
1910 pub fn get_connection_mut(&mut self, link_id: &LinkId) -> Option<&mut PeerConnection> {
1912 self.connections.get_mut(link_id)
1913 }
1914
1915 pub fn remove_connection(&mut self, link_id: &LinkId) -> Option<PeerConnection> {
1917 self.connections.remove(link_id)
1918 }
1919
1920 pub fn connections(&self) -> impl Iterator<Item = &PeerConnection> {
1922 self.connections.values()
1923 }
1924
1925 pub fn get_peer(&self, node_addr: &NodeAddr) -> Option<&ActivePeer> {
1929 self.peers.get(node_addr)
1930 }
1931
1932 pub fn get_peer_mut(&mut self, node_addr: &NodeAddr) -> Option<&mut ActivePeer> {
1934 self.peers.get_mut(node_addr)
1935 }
1936
1937 pub fn remove_peer(&mut self, node_addr: &NodeAddr) -> Option<ActivePeer> {
1939 self.peers.remove(node_addr)
1940 }
1941
1942 pub fn peers(&self) -> impl Iterator<Item = &ActivePeer> {
1944 self.peers.values()
1945 }
1946
1947 pub fn nostr_discovery_handle(&self) -> Option<&crate::discovery::nostr::NostrDiscovery> {
1951 self.nostr_discovery.as_deref()
1952 }
1953
1954 pub fn peer_ids(&self) -> impl Iterator<Item = &NodeAddr> {
1956 self.peers.keys()
1957 }
1958
1959 pub fn sendable_peers(&self) -> impl Iterator<Item = &ActivePeer> {
1961 self.peers.values().filter(|p| p.can_send())
1962 }
1963
1964 pub fn sendable_peer_count(&self) -> usize {
1966 self.peers.values().filter(|p| p.can_send()).count()
1967 }
1968
1969 pub(crate) fn set_discovery_fallback_transit_allowed(
1970 &mut self,
1971 peer_addr: NodeAddr,
1972 allowed: bool,
1973 ) {
1974 if allowed {
1975 self.discovery_fallback_transit_blocked_peers
1976 .remove(&peer_addr);
1977 } else {
1978 self.discovery_fallback_transit_blocked_peers
1979 .insert(peer_addr);
1980 }
1981 }
1982
1983 pub(crate) fn configured_discovery_fallback_transit(
1984 &self,
1985 peer_addr: &NodeAddr,
1986 ) -> Option<bool> {
1987 self.config.peers().iter().find_map(|peer| {
1988 PeerIdentity::from_npub(&peer.npub)
1989 .ok()
1990 .filter(|identity| identity.node_addr() == peer_addr)
1991 .map(|_| peer.discovery_fallback_transit)
1992 })
1993 }
1994
1995 pub(crate) fn discovery_fallback_transit_for_promotion(&self, peer_addr: &NodeAddr) -> bool {
1996 if let Some(retry_state) = self.retry_pending.get(peer_addr) {
1997 return retry_state.peer_config.discovery_fallback_transit;
1998 }
1999
2000 if let Some(allowed) = self.configured_discovery_fallback_transit(peer_addr) {
2001 return allowed;
2002 }
2003
2004 self.config.node.discovery.nostr.policy != crate::config::NostrDiscoveryPolicy::Open
2005 }
2006
2007 #[cfg(test)]
2012 pub(crate) fn disable_discovery_forward_rate_limit(&mut self) {
2013 self.discovery_forward_limiter
2014 .set_interval(std::time::Duration::ZERO);
2015 }
2016
2017 #[cfg(test)]
2018 pub(crate) fn get_session(&self, remote: &NodeAddr) -> Option<&SessionEntry> {
2019 self.sessions.get(remote)
2020 }
2021
2022 #[cfg(test)]
2024 pub(crate) fn get_session_mut(&mut self, remote: &NodeAddr) -> Option<&mut SessionEntry> {
2025 self.sessions.get_mut(remote)
2026 }
2027
2028 #[cfg(test)]
2030 pub(crate) fn remove_session(&mut self, remote: &NodeAddr) -> Option<SessionEntry> {
2031 self.sessions.remove(remote)
2032 }
2033
2034 #[cfg(test)]
2036 pub(crate) fn path_mtu_lookup_get(&self, fips_addr: &crate::FipsAddress) -> Option<u16> {
2037 self.path_mtu_lookup
2038 .read()
2039 .ok()
2040 .and_then(|map| map.get(fips_addr).copied())
2041 }
2042
2043 #[cfg(test)]
2045 pub(crate) fn path_mtu_lookup_insert(&self, fips_addr: crate::FipsAddress, mtu: u16) {
2046 if let Ok(mut map) = self.path_mtu_lookup.write() {
2047 map.insert(fips_addr, mtu);
2048 }
2049 }
2050
2051 pub fn session_count(&self) -> usize {
2053 self.sessions.len()
2054 }
2055
2056 pub(crate) fn session_entries(&self) -> impl Iterator<Item = (&NodeAddr, &SessionEntry)> {
2058 self.sessions.iter()
2059 }
2060
2061 pub(crate) fn register_identity(
2065 &mut self,
2066 node_addr: NodeAddr,
2067 pubkey: secp256k1::PublicKey,
2068 ) -> bool {
2069 let mut prefix = [0u8; 15];
2070 prefix.copy_from_slice(&node_addr.as_bytes()[0..15]);
2071 if let Some(entry) = self.identity_cache.get(&prefix)
2072 && entry.node_addr == node_addr
2073 && entry.pubkey == pubkey
2074 {
2075 return true;
2079 }
2080
2081 let (xonly, _) = pubkey.x_only_public_key();
2082 let derived_node_addr = NodeAddr::from_pubkey(&xonly);
2083 if derived_node_addr != node_addr {
2084 debug!(
2085 claimed_node_addr = %node_addr,
2086 derived_node_addr = %derived_node_addr,
2087 "Rejected identity cache entry with mismatched public key"
2088 );
2089 return false;
2090 }
2091
2092 let now_ms = Self::now_ms();
2093 if let Some(entry) = self.identity_cache.get_mut(&prefix)
2094 && entry.node_addr == node_addr
2095 {
2096 entry.pubkey = pubkey;
2097 entry.last_seen_ms = now_ms;
2098 return true;
2099 }
2100
2101 let npub = encode_npub(&xonly);
2102 self.identity_cache.insert(
2103 prefix,
2104 IdentityCacheEntry::new(node_addr, pubkey, npub, now_ms),
2105 );
2106 let max = self.config.node.cache.identity_size;
2108 if self.identity_cache.len() > max
2109 && let Some(oldest_key) = self
2110 .identity_cache
2111 .iter()
2112 .min_by_key(|(_, entry)| entry.last_seen_ms)
2113 .map(|(k, _)| *k)
2114 {
2115 self.identity_cache.remove(&oldest_key);
2116 }
2117 true
2118 }
2119
2120 pub(crate) fn lookup_by_fips_prefix(
2122 &mut self,
2123 prefix: &[u8; 15],
2124 ) -> Option<(NodeAddr, secp256k1::PublicKey)> {
2125 if let Some(entry) = self.identity_cache.get_mut(prefix) {
2126 entry.last_seen_ms = Self::now_ms(); Some((entry.node_addr, entry.pubkey))
2128 } else {
2129 None
2130 }
2131 }
2132
2133 pub(crate) fn has_cached_identity(&self, addr: &NodeAddr) -> bool {
2135 let mut prefix = [0u8; 15];
2136 prefix.copy_from_slice(&addr.as_bytes()[0..15]);
2137 self.identity_cache.contains_key(&prefix)
2138 }
2139
2140 pub fn identity_cache_len(&self) -> usize {
2142 self.identity_cache.len()
2143 }
2144
2145 pub fn identity_cache_iter(
2150 &self,
2151 ) -> impl Iterator<Item = (&NodeAddr, &secp256k1::PublicKey, u64)> {
2152 self.identity_cache
2153 .values()
2154 .map(|entry| (&entry.node_addr, &entry.pubkey, entry.last_seen_ms))
2155 }
2156
2157 pub fn identity_cache_max(&self) -> usize {
2159 self.config.node.cache.identity_size
2160 }
2161
2162 pub fn pending_lookup_count(&self) -> usize {
2164 self.pending_lookups.len()
2165 }
2166
2167 pub fn pending_lookups_iter(
2169 &self,
2170 ) -> impl Iterator<Item = (&NodeAddr, &handlers::discovery::PendingLookup)> {
2171 self.pending_lookups.iter()
2172 }
2173
2174 pub fn recent_request_count(&self) -> usize {
2176 self.recent_requests.len()
2177 }
2178
2179 pub fn pending_tun_destinations(&self) -> usize {
2181 self.pending_tun_packets.len()
2182 }
2183
2184 pub fn pending_tun_total_packets(&self) -> usize {
2186 self.pending_tun_packets.values().map(|q| q.len()).sum()
2187 }
2188
2189 pub fn retry_state_iter(&self) -> impl Iterator<Item = (&NodeAddr, &retry::RetryState)> {
2191 self.retry_pending.iter()
2192 }
2193
2194 pub(crate) fn is_tree_peer(&self, peer_addr: &NodeAddr) -> bool {
2201 if !self.tree_state.is_root() && self.tree_state.my_declaration().parent_id() == peer_addr {
2203 return true;
2204 }
2205 if let Some(decl) = self.tree_state.peer_declaration(peer_addr)
2207 && decl.parent_id() == self.node_addr()
2208 {
2209 return true;
2210 }
2211 false
2212 }
2213
2214 pub fn find_next_hop(&mut self, dest_node_addr: &NodeAddr) -> Option<&ActivePeer> {
2237 if dest_node_addr == self.node_addr() {
2239 return None;
2240 }
2241
2242 if let Some(peer) = self.peers.get(dest_node_addr)
2244 && peer.can_send()
2245 {
2246 return Some(peer);
2247 }
2248
2249 let now_ms = Self::now_ms();
2250
2251 let sendable_learned_peers = if self.config.node.routing.mode == RoutingMode::ReplyLearned {
2252 Some(
2253 self.peers
2254 .iter()
2255 .filter(|(_, peer)| peer.can_send())
2256 .map(|(addr, _)| *addr)
2257 .collect::<HashSet<_>>(),
2258 )
2259 } else {
2260 None
2261 };
2262
2263 let explore_fallback = sendable_learned_peers.as_ref().is_some_and(|sendable| {
2270 self.learned_routes.should_explore_fallback(
2271 dest_node_addr,
2272 now_ms,
2273 self.config.node.routing.learned_fallback_explore_interval,
2274 |addr| sendable.contains(addr),
2275 )
2276 });
2277 if let Some(sendable) = &sendable_learned_peers
2278 && !explore_fallback
2279 && let Some(next_hop_addr) =
2280 self.learned_routes
2281 .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2282 {
2283 return self.peers.get(&next_hop_addr);
2284 }
2285
2286 let Some(dest_coords) = self
2288 .coord_cache
2289 .get_and_touch(dest_node_addr, now_ms)
2290 .cloned()
2291 else {
2292 if let Some(sendable) = &sendable_learned_peers
2293 && let Some(next_hop_addr) =
2294 self.learned_routes
2295 .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2296 {
2297 return self.peers.get(&next_hop_addr);
2298 }
2299 return None;
2300 };
2301
2302 let coordinate_route_addr = {
2305 let candidates: Vec<&ActivePeer> = self.destination_in_filters(dest_node_addr);
2306 if !candidates.is_empty() {
2307 self.select_best_candidate(&candidates, &dest_coords)
2308 .map(|peer| *peer.node_addr())
2309 } else {
2310 None
2311 }
2312 };
2313 if let Some(next_hop_addr) = coordinate_route_addr {
2314 return self.peers.get(&next_hop_addr);
2315 }
2316
2317 let tree_route_addr = self
2319 .tree_state
2320 .find_next_hop(&dest_coords)
2321 .filter(|next_hop_id| {
2322 self.peers
2323 .get(next_hop_id)
2324 .is_some_and(|peer| peer.can_send())
2325 });
2326 if let Some(next_hop_addr) = tree_route_addr {
2327 return self.peers.get(&next_hop_addr);
2328 }
2329 if explore_fallback {
2330 return sendable_learned_peers.as_ref().and_then(|sendable| {
2331 self.learned_routes
2332 .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2333 .and_then(|next_hop_addr| self.peers.get(&next_hop_addr))
2334 });
2335 }
2336
2337 if let Some(sendable) = &sendable_learned_peers
2338 && let Some(next_hop_addr) =
2339 self.learned_routes
2340 .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2341 {
2342 return self.peers.get(&next_hop_addr);
2343 }
2344
2345 None
2346 }
2347
2348 pub(in crate::node) fn learn_reverse_route(
2349 &mut self,
2350 destination: NodeAddr,
2351 next_hop: NodeAddr,
2352 ) {
2353 if self.config.node.routing.mode != RoutingMode::ReplyLearned
2354 || destination == *self.node_addr()
2355 {
2356 return;
2357 }
2358 let now_ms = Self::now_ms();
2359 self.learned_routes.learn(
2360 destination,
2361 next_hop,
2362 now_ms,
2363 self.config.node.routing.learned_ttl_secs,
2364 self.config.node.routing.max_learned_routes_per_dest,
2365 );
2366 }
2367
2368 pub(in crate::node) fn record_route_failure(
2369 &mut self,
2370 destination: NodeAddr,
2371 next_hop: NodeAddr,
2372 ) {
2373 if self.config.node.routing.mode != RoutingMode::ReplyLearned {
2374 return;
2375 }
2376 self.learned_routes.record_failure(&destination, &next_hop);
2377 }
2378
2379 pub(crate) fn learned_route_table_snapshot(&self, now_ms: u64) -> LearnedRouteTableSnapshot {
2380 self.learned_routes.snapshot(now_ms)
2381 }
2382
2383 pub(in crate::node) fn purge_learned_routes(&mut self, now_ms: u64) {
2384 self.learned_routes.purge_expired(now_ms);
2385 }
2386
2387 fn select_best_candidate<'a>(
2396 &'a self,
2397 candidates: &[&'a ActivePeer],
2398 dest_coords: &crate::tree::TreeCoordinate,
2399 ) -> Option<&'a ActivePeer> {
2400 let my_distance = self.tree_state.my_coords().distance_to(dest_coords);
2401
2402 let mut best: Option<(&ActivePeer, f64, usize)> = None;
2403
2404 for &candidate in candidates {
2405 if !candidate.can_send() {
2406 continue;
2407 }
2408
2409 let cost = candidate.link_cost();
2410
2411 let dist = self
2412 .tree_state
2413 .peer_coords(candidate.node_addr())
2414 .map(|pc| pc.distance_to(dest_coords))
2415 .unwrap_or(usize::MAX);
2416
2417 if dist >= my_distance {
2420 continue;
2421 }
2422
2423 let dominated = match &best {
2424 None => true,
2425 Some((_, best_cost, best_dist)) => {
2426 cost < *best_cost
2427 || (cost == *best_cost && dist < *best_dist)
2428 || (cost == *best_cost
2429 && dist == *best_dist
2430 && candidate.node_addr() < best.as_ref().unwrap().0.node_addr())
2431 }
2432 };
2433
2434 if dominated {
2435 best = Some((candidate, cost, dist));
2436 }
2437 }
2438
2439 best.map(|(peer, _, _)| peer)
2440 }
2441
2442 pub fn destination_in_filters(&self, dest: &NodeAddr) -> Vec<&ActivePeer> {
2444 self.peers.values().filter(|p| p.may_reach(dest)).collect()
2445 }
2446
2447 pub fn tun_tx(&self) -> Option<&TunTx> {
2451 self.tun_tx.as_ref()
2452 }
2453
2454 pub fn attach_external_packet_io(
2461 &mut self,
2462 capacity: usize,
2463 ) -> Result<ExternalPacketIo, NodeError> {
2464 if self.state != NodeState::Created {
2465 return Err(NodeError::Config(ConfigError::Validation(
2466 "external packet I/O must be attached before node start".to_string(),
2467 )));
2468 }
2469 if self.config.tun.enabled {
2470 return Err(NodeError::Config(ConfigError::Validation(
2471 "external packet I/O requires tun.enabled=false".to_string(),
2472 )));
2473 }
2474
2475 let capacity = capacity.max(1);
2476 let (outbound_tx, outbound_rx) = tokio::sync::mpsc::channel(capacity);
2477 let (inbound_tx, inbound_rx) = tokio::sync::mpsc::channel(capacity);
2478 self.tun_outbound_rx = Some(outbound_rx);
2479 self.external_packet_tx = Some(inbound_tx);
2480
2481 Ok(ExternalPacketIo {
2482 outbound_tx,
2483 inbound_rx,
2484 })
2485 }
2486
2487 pub(crate) fn attach_endpoint_data_io(
2492 &mut self,
2493 capacity: usize,
2494 ) -> Result<EndpointDataIo, NodeError> {
2495 if self.state != NodeState::Created {
2496 return Err(NodeError::Config(ConfigError::Validation(
2497 "endpoint data I/O must be attached before node start".to_string(),
2498 )));
2499 }
2500
2501 let command_capacity = endpoint_data_command_capacity(capacity);
2502 let (command_tx, command_rx) = tokio::sync::mpsc::channel(command_capacity);
2503 let (event_tx, event_rx) = tokio::sync::mpsc::unbounded_channel();
2508 self.endpoint_command_rx = Some(command_rx);
2509 self.endpoint_event_tx = Some(event_tx.clone());
2510
2511 Ok(EndpointDataIo {
2512 command_tx,
2513 event_rx,
2514 event_tx,
2515 })
2516 }
2517
2518 pub(crate) fn pubkey_for_node_addr(&self, addr: &NodeAddr) -> Option<secp256k1::PublicKey> {
2519 let mut prefix = [0u8; 15];
2520 prefix.copy_from_slice(&addr.as_bytes()[0..15]);
2521 self.identity_cache
2522 .get(&prefix)
2523 .filter(|entry| &entry.node_addr == addr)
2524 .map(|entry| entry.pubkey)
2525 }
2526
2527 pub(crate) fn npub_for_node_addr(&self, addr: &NodeAddr) -> Option<String> {
2528 let mut prefix = [0u8; 15];
2529 prefix.copy_from_slice(&addr.as_bytes()[0..15]);
2530 self.identity_cache
2531 .get(&prefix)
2532 .filter(|entry| &entry.node_addr == addr)
2533 .map(|entry| entry.npub.clone())
2534 }
2535
2536 pub(in crate::node) fn deliver_external_ipv6_packet(
2537 &self,
2538 src_addr: &NodeAddr,
2539 packet: Vec<u8>,
2540 ) {
2541 let Some(external_packet_tx) = &self.external_packet_tx else {
2542 return;
2543 };
2544 if packet.len() < 40 {
2545 return;
2546 }
2547 let Ok(destination) = FipsAddress::from_slice(&packet[24..40]) else {
2548 return;
2549 };
2550 let delivered = NodeDeliveredPacket {
2551 source_node_addr: *src_addr,
2552 source_npub: self.npub_for_node_addr(src_addr),
2553 destination,
2554 packet,
2555 };
2556 if let Err(error) = external_packet_tx.try_send(delivered) {
2557 debug!(error = %error, "Failed to deliver packet to external app sink");
2558 }
2559 }
2560
2561 pub(super) async fn send_encrypted_link_message(
2575 &mut self,
2576 node_addr: &NodeAddr,
2577 plaintext: &[u8],
2578 ) -> Result<(), NodeError> {
2579 self.send_encrypted_link_message_with_ce(node_addr, plaintext, false)
2580 .await
2581 }
2582
2583 pub(in crate::node) fn note_local_send_outcome(
2589 &mut self,
2590 result: &Result<usize, TransportError>,
2591 ) {
2592 match result {
2593 Ok(_) => {
2594 if self.last_local_send_failure_at.is_some() {
2595 self.last_local_send_failure_at = None;
2596 }
2597 }
2598 Err(TransportError::Io(e))
2599 if matches!(
2600 e.kind(),
2601 std::io::ErrorKind::NetworkUnreachable
2602 | std::io::ErrorKind::HostUnreachable
2603 | std::io::ErrorKind::AddrNotAvailable
2604 ) =>
2605 {
2606 self.last_local_send_failure_at = Some(std::time::Instant::now());
2607 }
2608 Err(_) => {}
2609 }
2610 }
2611
2612 pub(in crate::node) fn last_local_send_failure_at(&self) -> Option<std::time::Instant> {
2615 self.last_local_send_failure_at
2616 }
2617
2618 pub(super) async fn send_encrypted_link_message_with_ce(
2622 &mut self,
2623 node_addr: &NodeAddr,
2624 plaintext: &[u8],
2625 ce_flag: bool,
2626 ) -> Result<(), NodeError> {
2627 let peer = self
2628 .peers
2629 .get_mut(node_addr)
2630 .ok_or(NodeError::PeerNotFound(*node_addr))?;
2631
2632 let their_index = peer.their_index().ok_or_else(|| NodeError::SendFailed {
2633 node_addr: *node_addr,
2634 reason: "no their_index".into(),
2635 })?;
2636 let transport_id = peer.transport_id().ok_or_else(|| NodeError::SendFailed {
2637 node_addr: *node_addr,
2638 reason: "no transport_id".into(),
2639 })?;
2640 let remote_addr = peer
2641 .current_addr()
2642 .cloned()
2643 .ok_or_else(|| NodeError::SendFailed {
2644 node_addr: *node_addr,
2645 reason: "no current_addr".into(),
2646 })?;
2647 #[cfg(any(target_os = "linux", target_os = "macos"))]
2648 let connected_socket = peer.connected_udp();
2649
2650 let timestamp_ms = peer.session_elapsed_ms();
2652
2653 let sp_flag = peer.mmp().map(|mmp| mmp.spin_bit.tx_bit()).unwrap_or(false);
2655 let mut flags = if sp_flag { FLAG_SP } else { 0 };
2656 if ce_flag {
2657 flags |= FLAG_CE;
2658 }
2659 if peer.current_k_bit() {
2660 flags |= FLAG_KEY_EPOCH;
2661 }
2662
2663 let session = peer
2664 .noise_session_mut()
2665 .ok_or_else(|| NodeError::SendFailed {
2666 node_addr: *node_addr,
2667 reason: "no noise session".into(),
2668 })?;
2669
2670 const INNER_TS_LEN: usize = 4;
2678 let counter = session.current_send_counter();
2679 let inner_len = INNER_TS_LEN + plaintext.len();
2680 let payload_len = inner_len as u16;
2681 let header = build_established_header(their_index, counter, flags, payload_len);
2682
2683 let transport_for_send = self
2702 .transports
2703 .get(&transport_id)
2704 .ok_or(NodeError::TransportNotFound(transport_id))?;
2705 let is_udp = matches!(transport_for_send, TransportHandle::Udp(_));
2706 if let Some(workers) = self.encrypt_workers.as_ref().cloned()
2707 && is_udp
2708 && let Some(cipher_clone) = session.send_cipher_clone()
2709 {
2710 {
2711 let reserved_counter =
2715 session
2716 .take_send_counter()
2717 .map_err(|e| NodeError::SendFailed {
2718 node_addr: *node_addr,
2719 reason: format!("counter reservation failed: {}", e),
2720 })?;
2721 debug_assert_eq!(reserved_counter, counter);
2722 let header =
2726 build_established_header(their_index, reserved_counter, flags, payload_len);
2727 let transport = transport_for_send;
2728 let send_target = {
2735 if let TransportHandle::Udp(udp) = transport {
2736 let socket_addr = {
2737 #[cfg(any(target_os = "linux", target_os = "macos"))]
2738 {
2739 match connected_socket.as_ref() {
2740 Some(socket) => Some(socket.peer_addr()),
2741 None => udp.resolve_for_off_task(&remote_addr).await.ok(),
2742 }
2743 }
2744 #[cfg(not(any(target_os = "linux", target_os = "macos")))]
2745 {
2746 udp.resolve_for_off_task(&remote_addr).await.ok()
2747 }
2748 };
2749 match (udp.async_socket(), socket_addr) {
2750 (Some(socket), Some(socket_addr)) => Some((socket, socket_addr)),
2751 _ => None,
2752 }
2753 } else {
2754 None
2755 }
2756 };
2757 if let Some((socket, socket_addr)) = send_target {
2758 let wire_capacity = ESTABLISHED_HEADER_SIZE + inner_len + 16;
2774 let mut wire_buf = Vec::with_capacity(wire_capacity);
2775 wire_buf.extend_from_slice(&header);
2776 wire_buf.extend_from_slice(×tamp_ms.to_le_bytes());
2777 wire_buf.extend_from_slice(plaintext);
2778 let predicted_bytes = wire_capacity;
2779 if let Some(peer) = self.peers.get_mut(node_addr) {
2786 peer.link_stats_mut().record_sent(predicted_bytes);
2787 if let Some(mmp) = peer.mmp_mut() {
2788 mmp.sender
2789 .record_sent(reserved_counter, timestamp_ms, predicted_bytes);
2790 }
2791 }
2792 workers.dispatch(self::encrypt_worker::FmpSendJob {
2793 cipher: cipher_clone,
2794 counter: reserved_counter,
2795 wire_buf,
2796 fsp_seal: None,
2797 socket,
2798 dest_addr: socket_addr,
2799 #[cfg(any(target_os = "linux", target_os = "macos"))]
2800 connected_socket,
2801 drop_on_backpressure: plaintext
2802 .first()
2803 .is_some_and(|ty| *ty == SessionMessageType::EndpointData.to_byte()),
2804 queued_at: crate::perf_profile::stamp(),
2805 });
2806 return Ok(());
2807 }
2808 }
2809 }
2810
2811 let inner_plaintext = prepend_inner_header(timestamp_ms, plaintext);
2816 let ciphertext = {
2818 let _t = crate::perf_profile::Timer::start(crate::perf_profile::Stage::FmpEncrypt);
2819 session
2820 .encrypt_with_aad(&inner_plaintext, &header)
2821 .map_err(|e| NodeError::SendFailed {
2822 node_addr: *node_addr,
2823 reason: format!("encryption failed: {}", e),
2824 })?
2825 };
2826
2827 let wire_packet = build_encrypted(&header, &ciphertext);
2828
2829 let send_result = {
2831 let _t = crate::perf_profile::Timer::start(crate::perf_profile::Stage::UdpSend);
2832 let transport = self
2833 .transports
2834 .get(&transport_id)
2835 .ok_or(NodeError::TransportNotFound(transport_id))?;
2836 transport.send(&remote_addr, &wire_packet).await
2837 };
2838 self.note_local_send_outcome(&send_result);
2839 let bytes_sent = send_result.map_err(|e| match e {
2840 TransportError::MtuExceeded { packet_size, mtu } => NodeError::MtuExceeded {
2841 node_addr: *node_addr,
2842 packet_size,
2843 mtu,
2844 },
2845 other => NodeError::SendFailed {
2846 node_addr: *node_addr,
2847 reason: format!("transport send: {}", other),
2848 },
2849 })?;
2850
2851 if let Some(peer) = self.peers.get_mut(node_addr) {
2853 peer.link_stats_mut().record_sent(bytes_sent);
2854 if let Some(mmp) = peer.mmp_mut() {
2856 mmp.sender.record_sent(counter, timestamp_ms, bytes_sent);
2857 }
2858 }
2859
2860 Ok(())
2861 }
2862}
2863
2864impl fmt::Debug for Node {
2865 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2866 f.debug_struct("Node")
2867 .field("node_addr", self.node_addr())
2868 .field("state", &self.state)
2869 .field("is_leaf_only", &self.is_leaf_only)
2870 .field("connections", &self.connection_count())
2871 .field("peers", &self.peer_count())
2872 .field("links", &self.link_count())
2873 .field("transports", &self.transport_count())
2874 .finish()
2875 }
2876}