1mod acl;
8mod bloom;
9mod decrypt_worker;
10mod discovery_rate_limit;
11mod encrypt_worker;
12mod handlers;
13mod lifecycle;
14mod rate_limit;
15mod retry;
16mod routing;
17mod routing_error_rate_limit;
18pub(crate) mod session;
19pub(crate) mod session_wire;
20pub(crate) mod stats;
21pub(crate) mod stats_history;
22#[cfg(test)]
23mod tests;
24mod tree;
25pub(crate) mod wire;
26
27use self::discovery_rate_limit::{DiscoveryBackoff, DiscoveryForwardRateLimiter};
28use self::rate_limit::HandshakeRateLimiter;
29use self::routing::{LearnedRouteTable, LearnedRouteTableSnapshot};
30use self::routing_error_rate_limit::RoutingErrorRateLimiter;
31use self::wire::{
32 ESTABLISHED_HEADER_SIZE, FLAG_CE, FLAG_KEY_EPOCH, FLAG_SP, build_encrypted,
33 build_established_header, prepend_inner_header,
34};
35use crate::bloom::BloomState;
36use crate::cache::CoordCache;
37use crate::config::RoutingMode;
38use crate::node::session::SessionEntry;
39use crate::peer::{ActivePeer, PeerConnection};
40#[cfg(any(target_os = "linux", target_os = "macos"))]
41use crate::transport::ethernet::EthernetTransport;
42use crate::transport::tcp::TcpTransport;
43use crate::transport::tor::TorTransport;
44use crate::transport::udp::UdpTransport;
45use crate::transport::{
46 Link, LinkId, PacketRx, PacketTx, TransportAddr, TransportError, TransportHandle, TransportId,
47};
48use crate::tree::TreeState;
49use crate::upper::hosts::HostMap;
50use crate::upper::icmp_rate_limit::IcmpRateLimiter;
51use crate::upper::tun::{TunError, TunOutboundRx, TunState, TunTx};
52use crate::utils::index::IndexAllocator;
53use crate::{
54 Config, ConfigError, FipsAddress, Identity, IdentityError, NodeAddr, PeerIdentity,
55 SessionMessageType, encode_npub,
56};
57use rand::Rng;
58use std::collections::{HashMap, HashSet, VecDeque};
59use std::fmt;
60use std::sync::Arc;
61use std::thread::JoinHandle;
62use thiserror::Error;
63use tracing::{debug, warn};
64
65pub(crate) const REKEY_JITTER_SECS: i64 = 15;
72
73#[derive(Debug, Error)]
75pub enum NodeError {
76 #[error("node not started")]
77 NotStarted,
78
79 #[error("node already started")]
80 AlreadyStarted,
81
82 #[error("node already stopped")]
83 AlreadyStopped,
84
85 #[error("transport not found: {0}")]
86 TransportNotFound(TransportId),
87
88 #[error("no transport available for type: {0}")]
89 NoTransportForType(String),
90
91 #[error("link not found: {0}")]
92 LinkNotFound(LinkId),
93
94 #[error("connection not found: {0}")]
95 ConnectionNotFound(LinkId),
96
97 #[error("peer not found: {0:?}")]
98 PeerNotFound(NodeAddr),
99
100 #[error("peer already exists: {0:?}")]
101 PeerAlreadyExists(NodeAddr),
102
103 #[error("connection already exists for link: {0}")]
104 ConnectionAlreadyExists(LinkId),
105
106 #[error("invalid peer npub '{npub}': {reason}")]
107 InvalidPeerNpub { npub: String, reason: String },
108
109 #[error("discovery error: {0}")]
110 Discovery(String),
111
112 #[error("access denied: {0}")]
113 AccessDenied(String),
114
115 #[error("max connections exceeded: {max}")]
116 MaxConnectionsExceeded { max: usize },
117
118 #[error("max peers exceeded: {max}")]
119 MaxPeersExceeded { max: usize },
120
121 #[error("max links exceeded: {max}")]
122 MaxLinksExceeded { max: usize },
123
124 #[error("handshake incomplete for link {0}")]
125 HandshakeIncomplete(LinkId),
126
127 #[error("no session available for link {0}")]
128 NoSession(LinkId),
129
130 #[error("promotion failed for link {link_id}: {reason}")]
131 PromotionFailed { link_id: LinkId, reason: String },
132
133 #[error("send failed to {node_addr}: {reason}")]
134 SendFailed { node_addr: NodeAddr, reason: String },
135
136 #[error("mtu exceeded forwarding to {node_addr}: packet {packet_size} > mtu {mtu}")]
137 MtuExceeded {
138 node_addr: NodeAddr,
139 packet_size: usize,
140 mtu: u16,
141 },
142
143 #[error("config error: {0}")]
144 Config(#[from] ConfigError),
145
146 #[error("identity error: {0}")]
147 Identity(#[from] IdentityError),
148
149 #[error("TUN error: {0}")]
150 Tun(#[from] TunError),
151
152 #[error("index allocation failed: {0}")]
153 IndexAllocationFailed(String),
154
155 #[error("handshake failed: {0}")]
156 HandshakeFailed(String),
157
158 #[error("transport error: {0}")]
159 TransportError(String),
160
161 #[error("bootstrap handoff failed: {0}")]
162 BootstrapHandoff(String),
163}
164
165#[derive(Debug, Clone, PartialEq, Eq)]
167pub struct NodeDeliveredPacket {
168 pub source_node_addr: NodeAddr,
170 pub source_npub: Option<String>,
172 pub destination: FipsAddress,
174 pub packet: Vec<u8>,
176}
177
178#[derive(Debug, Clone)]
179struct IdentityCacheEntry {
180 node_addr: NodeAddr,
181 pubkey: secp256k1::PublicKey,
182 npub: String,
183 last_seen_ms: u64,
184}
185
186impl IdentityCacheEntry {
187 fn new(
188 node_addr: NodeAddr,
189 pubkey: secp256k1::PublicKey,
190 npub: String,
191 last_seen_ms: u64,
192 ) -> Self {
193 Self {
194 node_addr,
195 pubkey,
196 npub,
197 last_seen_ms,
198 }
199 }
200}
201
202#[derive(Debug)]
204pub struct ExternalPacketIo {
205 pub outbound_tx: crate::upper::tun::TunOutboundTx,
207 pub inbound_rx: tokio::sync::mpsc::Receiver<NodeDeliveredPacket>,
209}
210
211#[derive(Debug)]
213pub(crate) struct EndpointDataIo {
214 pub(crate) command_tx: tokio::sync::mpsc::Sender<NodeEndpointCommand>,
223 pub(crate) event_rx: tokio::sync::mpsc::UnboundedReceiver<NodeEndpointEvent>,
233 pub(crate) event_tx: tokio::sync::mpsc::UnboundedSender<NodeEndpointEvent>,
239}
240
241fn endpoint_data_command_capacity(requested: usize) -> usize {
242 if let Ok(raw) = std::env::var("FIPS_ENDPOINT_DATA_QUEUE_CAP")
243 && let Ok(value) = raw.trim().parse::<usize>()
244 && value > 0
245 {
246 return value;
247 }
248
249 requested.max(1).max(32_768)
250}
251
252#[derive(Debug)]
254pub(crate) enum NodeEndpointCommand {
255 Send {
259 remote: PeerIdentity,
260 payload: Vec<u8>,
261 queued_at: Option<std::time::Instant>,
262 response_tx: tokio::sync::oneshot::Sender<Result<(), NodeError>>,
263 },
264 SendOneway {
270 remote: PeerIdentity,
271 payload: Vec<u8>,
272 queued_at: Option<std::time::Instant>,
273 },
274 PeerSnapshot {
275 response_tx: tokio::sync::oneshot::Sender<Vec<NodeEndpointPeer>>,
276 },
277 RelaySnapshot {
278 response_tx: tokio::sync::oneshot::Sender<Vec<NodeEndpointRelayStatus>>,
279 },
280 UpdateRelays {
281 advert_relays: Vec<String>,
282 dm_relays: Vec<String>,
283 response_tx: tokio::sync::oneshot::Sender<Result<(), NodeError>>,
284 },
285 UpdatePeers {
291 peers: Vec<crate::config::PeerConfig>,
292 response_tx: tokio::sync::oneshot::Sender<Result<UpdatePeersOutcome, NodeError>>,
293 },
294}
295
296#[derive(Debug, Clone, Default, PartialEq, Eq)]
298pub(crate) struct UpdatePeersOutcome {
299 pub(crate) added: usize,
300 pub(crate) removed: usize,
301 pub(crate) updated: usize,
302 pub(crate) unchanged: usize,
303}
304
305#[derive(Debug)]
307pub(crate) enum NodeEndpointEvent {
308 Data {
309 source_node_addr: NodeAddr,
310 source_npub: Option<String>,
311 payload: Vec<u8>,
312 queued_at: Option<std::time::Instant>,
313 },
314}
315
316#[derive(Debug, Clone, PartialEq, Eq)]
318pub(crate) struct NodeEndpointPeer {
319 pub(crate) npub: String,
320 pub(crate) transport_addr: Option<String>,
321 pub(crate) transport_type: Option<String>,
322 pub(crate) link_id: u64,
323 pub(crate) srtt_ms: Option<u64>,
324 pub(crate) packets_sent: u64,
325 pub(crate) packets_recv: u64,
326 pub(crate) bytes_sent: u64,
327 pub(crate) bytes_recv: u64,
328}
329
330#[derive(Debug, Clone, PartialEq, Eq)]
332pub(crate) struct NodeEndpointRelayStatus {
333 pub(crate) url: String,
334 pub(crate) status: String,
335}
336
337#[derive(Clone, Copy, Debug, PartialEq, Eq)]
339pub enum NodeState {
340 Created,
342 Starting,
344 Running,
346 Stopping,
348 Stopped,
350}
351
352impl NodeState {
353 pub fn is_operational(&self) -> bool {
355 matches!(self, NodeState::Running)
356 }
357
358 pub fn can_start(&self) -> bool {
360 matches!(self, NodeState::Created | NodeState::Stopped)
361 }
362
363 pub fn can_stop(&self) -> bool {
365 matches!(self, NodeState::Running)
366 }
367}
368
369impl fmt::Display for NodeState {
370 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
371 let s = match self {
372 NodeState::Created => "created",
373 NodeState::Starting => "starting",
374 NodeState::Running => "running",
375 NodeState::Stopping => "stopping",
376 NodeState::Stopped => "stopped",
377 };
378 write!(f, "{}", s)
379 }
380}
381
382#[derive(Clone, Debug)]
389pub(crate) struct RecentRequest {
390 pub(crate) from_peer: NodeAddr,
392 pub(crate) timestamp_ms: u64,
394 pub(crate) response_forwarded: bool,
398}
399
400impl RecentRequest {
401 pub(crate) fn new(from_peer: NodeAddr, timestamp_ms: u64) -> Self {
402 Self {
403 from_peer,
404 timestamp_ms,
405 response_forwarded: false,
406 }
407 }
408
409 pub(crate) fn is_expired(&self, current_time_ms: u64, expiry_ms: u64) -> bool {
411 current_time_ms.saturating_sub(self.timestamp_ms) > expiry_ms
412 }
413}
414
415type AddrKey = (TransportId, TransportAddr);
417
418#[derive(Debug, Default)]
423struct TransportDropState {
424 prev_drops: u64,
426 dropping: bool,
428}
429
430struct PendingConnect {
436 link_id: LinkId,
438 transport_id: TransportId,
440 remote_addr: TransportAddr,
442 peer_identity: PeerIdentity,
444}
445
446pub struct Node {
460 identity: Identity,
463
464 startup_epoch: [u8; 8],
467
468 started_at: std::time::Instant,
470
471 config: Config,
474
475 state: NodeState,
478
479 is_leaf_only: bool,
481
482 tree_state: TreeState,
485
486 bloom_state: BloomState,
489
490 coord_cache: CoordCache,
493 learned_routes: LearnedRouteTable,
495 recent_requests: HashMap<u64, RecentRequest>,
498 path_mtu_lookup: Arc<std::sync::RwLock<HashMap<crate::FipsAddress, u16>>>,
504
505 transports: HashMap<TransportId, TransportHandle>,
508 transport_drops: HashMap<TransportId, TransportDropState>,
510 links: HashMap<LinkId, Link>,
512 addr_to_link: HashMap<AddrKey, LinkId>,
514
515 packet_tx: Option<PacketTx>,
518 packet_rx: Option<PacketRx>,
520
521 connections: HashMap<LinkId, PeerConnection>,
525
526 peers: HashMap<NodeAddr, ActivePeer>,
530
531 sessions: HashMap<NodeAddr, SessionEntry>,
535
536 identity_cache: HashMap<[u8; 15], IdentityCacheEntry>,
540
541 pending_tun_packets: HashMap<NodeAddr, VecDeque<Vec<u8>>>,
545 pending_endpoint_data: HashMap<NodeAddr, VecDeque<Vec<u8>>>,
547 pending_lookups: HashMap<NodeAddr, handlers::discovery::PendingLookup>,
551
552 max_connections: usize,
555 max_peers: usize,
557 max_links: usize,
559
560 next_link_id: u64,
563 next_transport_id: u32,
565
566 stats: stats::NodeStats,
569
570 stats_history: stats_history::StatsHistory,
572
573 tun_state: TunState,
576 tun_name: Option<String>,
578 tun_tx: Option<TunTx>,
580 tun_outbound_rx: Option<TunOutboundRx>,
582 external_packet_tx: Option<tokio::sync::mpsc::Sender<NodeDeliveredPacket>>,
584 endpoint_command_rx: Option<tokio::sync::mpsc::Receiver<NodeEndpointCommand>>,
586 endpoint_event_tx: Option<tokio::sync::mpsc::UnboundedSender<NodeEndpointEvent>>,
588 encrypt_workers: Option<encrypt_worker::EncryptWorkerPool>,
594 decrypt_workers: Option<decrypt_worker::DecryptWorkerPool>,
597 decrypt_registered_sessions: std::collections::HashSet<(TransportId, u32)>,
606 decrypt_fallback_rx:
610 Option<tokio::sync::mpsc::UnboundedReceiver<decrypt_worker::DecryptWorkerEvent>>,
611 decrypt_fallback_tx: tokio::sync::mpsc::UnboundedSender<decrypt_worker::DecryptWorkerEvent>,
612 tun_reader_handle: Option<JoinHandle<()>>,
614 tun_writer_handle: Option<JoinHandle<()>>,
616 #[cfg(target_os = "macos")]
619 tun_shutdown_fd: Option<std::os::unix::io::RawFd>,
620
621 dns_identity_rx: Option<crate::upper::dns::DnsIdentityRx>,
624 dns_task: Option<tokio::task::JoinHandle<()>>,
626
627 index_allocator: IndexAllocator,
630 peers_by_index: HashMap<(TransportId, u32), NodeAddr>,
633 pending_outbound: HashMap<(TransportId, u32), LinkId>,
636
637 msg1_rate_limiter: HandshakeRateLimiter,
640 icmp_rate_limiter: IcmpRateLimiter,
642 routing_error_rate_limiter: RoutingErrorRateLimiter,
644 coords_response_rate_limiter: RoutingErrorRateLimiter,
646 discovery_backoff: DiscoveryBackoff,
648 discovery_forward_limiter: DiscoveryForwardRateLimiter,
650
651 pending_connects: Vec<PendingConnect>,
657
658 retry_pending: HashMap<NodeAddr, retry::RetryState>,
664
665 nostr_discovery: Option<Arc<crate::discovery::nostr::NostrDiscovery>>,
667 lan_discovery: Option<Arc<crate::discovery::lan::LanDiscovery>>,
672 nostr_discovery_started_at_ms: Option<u64>,
677 startup_open_discovery_sweep_done: bool,
681 bootstrap_transports: HashSet<TransportId>,
683 bootstrap_transport_npubs: HashMap<TransportId, String>,
690 discovery_fallback_transit_blocked_peers: HashSet<NodeAddr>,
693
694 last_parent_reeval: Option<crate::time::Instant>,
697
698 last_congestion_log: Option<std::time::Instant>,
701
702 estimated_mesh_size: Option<u64>,
705 last_mesh_size_log: Option<std::time::Instant>,
707
708 last_self_warn: Option<std::time::Instant>,
714
715 last_local_send_failure_at: Option<std::time::Instant>,
723
724 peer_aliases: HashMap<NodeAddr, String>,
728
729 peer_acl: acl::PeerAclReloader,
731
732 host_map: Arc<HostMap>,
736}
737
738impl Node {
739 pub fn new(config: Config) -> Result<Self, NodeError> {
741 config.validate()?;
742 let identity = config.create_identity()?;
743 let node_addr = *identity.node_addr();
744 let is_leaf_only = config.is_leaf_only();
745
746 let (decrypt_fallback_tx, decrypt_fallback_rx) = tokio::sync::mpsc::unbounded_channel();
747 let decrypt_fallback_rx = Some(decrypt_fallback_rx);
748
749 let mut startup_epoch = [0u8; 8];
750 rand::rng().fill_bytes(&mut startup_epoch);
751
752 let mut bloom_state = if is_leaf_only {
753 BloomState::leaf_only(node_addr)
754 } else {
755 BloomState::new(node_addr)
756 };
757 bloom_state.set_update_debounce_ms(config.node.bloom.update_debounce_ms);
758
759 let tun_state = if config.tun.enabled {
760 TunState::Configured
761 } else {
762 TunState::Disabled
763 };
764
765 let mut tree_state = TreeState::new(node_addr);
767 tree_state.set_parent_hysteresis(config.node.tree.parent_hysteresis);
768 tree_state.set_hold_down(config.node.tree.hold_down_secs);
769 tree_state.set_flap_dampening(
770 config.node.tree.flap_threshold,
771 config.node.tree.flap_window_secs,
772 config.node.tree.flap_dampening_secs,
773 );
774 tree_state
775 .sign_declaration(&identity)
776 .expect("signing own declaration should never fail");
777
778 let coord_cache = CoordCache::new(
779 config.node.cache.coord_size,
780 config.node.cache.coord_ttl_secs * 1000,
781 );
782 let rl = &config.node.rate_limit;
783 let msg1_rate_limiter = HandshakeRateLimiter::with_params(
784 rate_limit::TokenBucket::with_params(rl.handshake_burst, rl.handshake_rate),
785 config.node.limits.max_pending_inbound,
786 );
787
788 let max_connections = config.node.limits.max_connections;
789 let max_peers = config.node.limits.max_peers;
790 let max_links = config.node.limits.max_links;
791 let coords_response_interval_ms = config.node.session.coords_response_interval_ms;
792 let backoff_base_secs = config.node.discovery.backoff_base_secs;
793 let backoff_max_secs = config.node.discovery.backoff_max_secs;
794 let forward_min_interval_secs = config.node.discovery.forward_min_interval_secs;
795
796 let (host_map, peer_acl) = Self::host_map_and_peer_acl(&config);
797
798 Ok(Self {
799 identity,
800 startup_epoch,
801 started_at: std::time::Instant::now(),
802 config,
803 state: NodeState::Created,
804 is_leaf_only,
805 tree_state,
806 bloom_state,
807 coord_cache,
808 learned_routes: LearnedRouteTable::default(),
809 recent_requests: HashMap::new(),
810 transports: HashMap::new(),
811 transport_drops: HashMap::new(),
812 links: HashMap::new(),
813 addr_to_link: HashMap::new(),
814 packet_tx: None,
815 packet_rx: None,
816 connections: HashMap::new(),
817 peers: HashMap::new(),
818 sessions: HashMap::new(),
819 identity_cache: HashMap::new(),
820 pending_tun_packets: HashMap::new(),
821 pending_endpoint_data: HashMap::new(),
822 pending_lookups: HashMap::new(),
823 max_connections,
824 max_peers,
825 max_links,
826 next_link_id: 1,
827 next_transport_id: 1,
828 stats: stats::NodeStats::new(),
829 stats_history: stats_history::StatsHistory::new(),
830 tun_state,
831 tun_name: None,
832 tun_tx: None,
833 tun_outbound_rx: None,
834 external_packet_tx: None,
835 endpoint_command_rx: None,
836 endpoint_event_tx: None,
837 encrypt_workers: None,
838 decrypt_workers: None,
839 decrypt_registered_sessions: std::collections::HashSet::new(),
840 decrypt_fallback_tx,
841 decrypt_fallback_rx,
842 tun_reader_handle: None,
843 tun_writer_handle: None,
844 #[cfg(target_os = "macos")]
845 tun_shutdown_fd: None,
846 dns_identity_rx: None,
847 dns_task: None,
848 index_allocator: IndexAllocator::new(),
849 peers_by_index: HashMap::new(),
850 pending_outbound: HashMap::new(),
851 msg1_rate_limiter,
852 icmp_rate_limiter: IcmpRateLimiter::new(),
853 routing_error_rate_limiter: RoutingErrorRateLimiter::new(),
854 coords_response_rate_limiter: RoutingErrorRateLimiter::with_interval(
855 std::time::Duration::from_millis(coords_response_interval_ms),
856 ),
857 discovery_backoff: DiscoveryBackoff::with_params(backoff_base_secs, backoff_max_secs),
858 discovery_forward_limiter: DiscoveryForwardRateLimiter::with_interval(
859 std::time::Duration::from_secs(forward_min_interval_secs),
860 ),
861 pending_connects: Vec::new(),
862 retry_pending: HashMap::new(),
863 nostr_discovery: None,
864 nostr_discovery_started_at_ms: None,
865 lan_discovery: None,
866 startup_open_discovery_sweep_done: false,
867 bootstrap_transports: HashSet::new(),
868 bootstrap_transport_npubs: HashMap::new(),
869 discovery_fallback_transit_blocked_peers: HashSet::new(),
870 last_parent_reeval: None,
871 last_congestion_log: None,
872 estimated_mesh_size: None,
873 last_mesh_size_log: None,
874 last_self_warn: None,
875 last_local_send_failure_at: None,
876 peer_aliases: HashMap::new(),
877 peer_acl,
878 host_map,
879 path_mtu_lookup: Arc::new(std::sync::RwLock::new(HashMap::new())),
880 })
881 }
882
883 pub fn with_identity(identity: Identity, config: Config) -> Result<Self, NodeError> {
888 config.validate()?;
889 let node_addr = *identity.node_addr();
890
891 let (decrypt_fallback_tx, decrypt_fallback_rx) = tokio::sync::mpsc::unbounded_channel();
892 let decrypt_fallback_rx = Some(decrypt_fallback_rx);
893
894 let mut startup_epoch = [0u8; 8];
895 rand::rng().fill_bytes(&mut startup_epoch);
896
897 let tun_state = if config.tun.enabled {
898 TunState::Configured
899 } else {
900 TunState::Disabled
901 };
902
903 let mut tree_state = TreeState::new(node_addr);
905 tree_state.set_parent_hysteresis(config.node.tree.parent_hysteresis);
906 tree_state.set_hold_down(config.node.tree.hold_down_secs);
907 tree_state.set_flap_dampening(
908 config.node.tree.flap_threshold,
909 config.node.tree.flap_window_secs,
910 config.node.tree.flap_dampening_secs,
911 );
912 tree_state
913 .sign_declaration(&identity)
914 .expect("signing own declaration should never fail");
915
916 let mut bloom_state = BloomState::new(node_addr);
917 bloom_state.set_update_debounce_ms(config.node.bloom.update_debounce_ms);
918
919 let coord_cache = CoordCache::new(
920 config.node.cache.coord_size,
921 config.node.cache.coord_ttl_secs * 1000,
922 );
923 let rl = &config.node.rate_limit;
924 let msg1_rate_limiter = HandshakeRateLimiter::with_params(
925 rate_limit::TokenBucket::with_params(rl.handshake_burst, rl.handshake_rate),
926 config.node.limits.max_pending_inbound,
927 );
928
929 let max_connections = config.node.limits.max_connections;
930 let max_peers = config.node.limits.max_peers;
931 let max_links = config.node.limits.max_links;
932 let coords_response_interval_ms = config.node.session.coords_response_interval_ms;
933
934 let (host_map, peer_acl) = Self::host_map_and_peer_acl(&config);
935
936 Ok(Self {
937 identity,
938 startup_epoch,
939 started_at: std::time::Instant::now(),
940 config,
941 state: NodeState::Created,
942 is_leaf_only: false,
943 tree_state,
944 bloom_state,
945 coord_cache,
946 learned_routes: LearnedRouteTable::default(),
947 recent_requests: HashMap::new(),
948 transports: HashMap::new(),
949 transport_drops: HashMap::new(),
950 links: HashMap::new(),
951 addr_to_link: HashMap::new(),
952 packet_tx: None,
953 packet_rx: None,
954 connections: HashMap::new(),
955 peers: HashMap::new(),
956 sessions: HashMap::new(),
957 identity_cache: HashMap::new(),
958 pending_tun_packets: HashMap::new(),
959 pending_endpoint_data: HashMap::new(),
960 pending_lookups: HashMap::new(),
961 max_connections,
962 max_peers,
963 max_links,
964 next_link_id: 1,
965 next_transport_id: 1,
966 stats: stats::NodeStats::new(),
967 stats_history: stats_history::StatsHistory::new(),
968 tun_state,
969 tun_name: None,
970 tun_tx: None,
971 tun_outbound_rx: None,
972 external_packet_tx: None,
973 endpoint_command_rx: None,
974 endpoint_event_tx: None,
975 encrypt_workers: None,
976 decrypt_workers: None,
977 decrypt_registered_sessions: std::collections::HashSet::new(),
978 decrypt_fallback_tx,
979 decrypt_fallback_rx,
980 tun_reader_handle: None,
981 tun_writer_handle: None,
982 #[cfg(target_os = "macos")]
983 tun_shutdown_fd: None,
984 dns_identity_rx: None,
985 dns_task: None,
986 index_allocator: IndexAllocator::new(),
987 peers_by_index: HashMap::new(),
988 pending_outbound: HashMap::new(),
989 msg1_rate_limiter,
990 icmp_rate_limiter: IcmpRateLimiter::new(),
991 routing_error_rate_limiter: RoutingErrorRateLimiter::new(),
992 coords_response_rate_limiter: RoutingErrorRateLimiter::with_interval(
993 std::time::Duration::from_millis(coords_response_interval_ms),
994 ),
995 discovery_backoff: DiscoveryBackoff::new(),
996 discovery_forward_limiter: DiscoveryForwardRateLimiter::new(),
997 pending_connects: Vec::new(),
998 retry_pending: HashMap::new(),
999 nostr_discovery: None,
1000 nostr_discovery_started_at_ms: None,
1001 lan_discovery: None,
1002 startup_open_discovery_sweep_done: false,
1003 bootstrap_transports: HashSet::new(),
1004 bootstrap_transport_npubs: HashMap::new(),
1005 discovery_fallback_transit_blocked_peers: HashSet::new(),
1006 last_parent_reeval: None,
1007 last_congestion_log: None,
1008 estimated_mesh_size: None,
1009 last_mesh_size_log: None,
1010 last_self_warn: None,
1011 last_local_send_failure_at: None,
1012 peer_aliases: HashMap::new(),
1013 peer_acl,
1014 host_map,
1015 path_mtu_lookup: Arc::new(std::sync::RwLock::new(HashMap::new())),
1016 })
1017 }
1018
1019 pub fn leaf_only(config: Config) -> Result<Self, NodeError> {
1021 let mut node = Self::new(config)?;
1022 node.is_leaf_only = true;
1023 node.bloom_state = BloomState::leaf_only(*node.identity.node_addr());
1024 Ok(node)
1025 }
1026
1027 fn host_map_and_peer_acl(config: &Config) -> (Arc<HostMap>, acl::PeerAclReloader) {
1028 let base_host_map = HostMap::from_peer_configs(config.peers());
1029 if !config.node.system_files_enabled {
1030 return (
1031 Arc::new(base_host_map.clone()),
1032 acl::PeerAclReloader::memory_only(base_host_map),
1033 );
1034 }
1035
1036 let mut host_map = base_host_map.clone();
1037 let hosts_path = std::path::PathBuf::from(crate::upper::hosts::DEFAULT_HOSTS_PATH);
1038 let hosts_file = HostMap::load_hosts_file(std::path::Path::new(
1039 crate::upper::hosts::DEFAULT_HOSTS_PATH,
1040 ));
1041 host_map.merge(hosts_file);
1042 let peer_acl = acl::PeerAclReloader::with_alias_sources(
1043 std::path::PathBuf::from(acl::DEFAULT_PEERS_ALLOW_PATH),
1044 std::path::PathBuf::from(acl::DEFAULT_PEERS_DENY_PATH),
1045 base_host_map,
1046 hosts_path,
1047 );
1048 (Arc::new(host_map), peer_acl)
1049 }
1050
1051 async fn create_transports(&mut self, packet_tx: &PacketTx) -> Vec<TransportHandle> {
1055 let mut transports = Vec::new();
1056
1057 let udp_instances: Vec<_> = self
1059 .config
1060 .transports
1061 .udp
1062 .iter()
1063 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1064 .collect();
1065
1066 for (name, udp_config) in udp_instances {
1068 let transport_id = self.allocate_transport_id();
1069 let udp = UdpTransport::new(transport_id, name, udp_config, packet_tx.clone());
1070 transports.push(TransportHandle::Udp(udp));
1071 }
1072
1073 #[cfg(feature = "sim-transport")]
1074 {
1075 let sim_instances: Vec<_> = self
1076 .config
1077 .transports
1078 .sim
1079 .iter()
1080 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1081 .collect();
1082
1083 for (name, sim_config) in sim_instances {
1084 let transport_id = self.allocate_transport_id();
1085 let sim = crate::transport::sim::SimTransport::new(
1086 transport_id,
1087 name,
1088 sim_config,
1089 packet_tx.clone(),
1090 );
1091 transports.push(TransportHandle::Sim(sim));
1092 }
1093 }
1094
1095 #[cfg(any(target_os = "linux", target_os = "macos"))]
1097 {
1098 let eth_instances: Vec<_> = self
1099 .config
1100 .transports
1101 .ethernet
1102 .iter()
1103 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1104 .collect();
1105 let xonly = self.identity.pubkey();
1106 for (name, eth_config) in eth_instances {
1107 let transport_id = self.allocate_transport_id();
1108 let mut eth =
1109 EthernetTransport::new(transport_id, name, eth_config, packet_tx.clone());
1110 eth.set_local_pubkey(xonly);
1111 transports.push(TransportHandle::Ethernet(eth));
1112 }
1113 }
1114
1115 let tcp_instances: Vec<_> = self
1117 .config
1118 .transports
1119 .tcp
1120 .iter()
1121 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1122 .collect();
1123
1124 for (name, tcp_config) in tcp_instances {
1125 let transport_id = self.allocate_transport_id();
1126 let tcp = TcpTransport::new(transport_id, name, tcp_config, packet_tx.clone());
1127 transports.push(TransportHandle::Tcp(tcp));
1128 }
1129
1130 let tor_instances: Vec<_> = self
1132 .config
1133 .transports
1134 .tor
1135 .iter()
1136 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1137 .collect();
1138
1139 for (name, tor_config) in tor_instances {
1140 let transport_id = self.allocate_transport_id();
1141 let tor = TorTransport::new(transport_id, name, tor_config, packet_tx.clone());
1142 transports.push(TransportHandle::Tor(tor));
1143 }
1144
1145 #[cfg(bluer_available)]
1147 {
1148 let ble_instances: Vec<_> = self
1149 .config
1150 .transports
1151 .ble
1152 .iter()
1153 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1154 .collect();
1155
1156 #[cfg(all(bluer_available, not(test)))]
1157 for (name, ble_config) in ble_instances {
1158 let transport_id = self.allocate_transport_id();
1159 let adapter = ble_config.adapter().to_string();
1160 let mtu = ble_config.mtu();
1161 match crate::transport::ble::io::BluerIo::new(&adapter, mtu).await {
1162 Ok(io) => {
1163 let mut ble = crate::transport::ble::BleTransport::new(
1164 transport_id,
1165 name,
1166 ble_config,
1167 io,
1168 packet_tx.clone(),
1169 );
1170 ble.set_local_pubkey(self.identity.pubkey().serialize());
1171 transports.push(TransportHandle::Ble(ble));
1172 }
1173 Err(e) => {
1174 tracing::warn!(adapter = %adapter, error = %e, "failed to initialize BLE adapter");
1175 }
1176 }
1177 }
1178
1179 #[cfg(any(not(bluer_available), test))]
1180 if !ble_instances.is_empty() {
1181 #[cfg(not(test))]
1182 tracing::warn!("BLE transport configured but this build lacks BlueZ support");
1183 }
1184 }
1185
1186 transports
1187 }
1188
1189 fn find_transport_for_type(&self, transport_type: &str) -> Option<TransportId> {
1199 self.transports
1200 .iter()
1201 .filter(|(id, handle)| {
1202 handle.transport_type().name == transport_type
1203 && handle.is_operational()
1204 && !self.bootstrap_transports.contains(id)
1205 })
1206 .min_by_key(|(id, _)| id.as_u32())
1207 .map(|(id, _)| *id)
1208 }
1209
1210 #[allow(unused_variables)]
1216 fn resolve_ethernet_addr(
1217 &self,
1218 addr_str: &str,
1219 ) -> Result<(TransportId, TransportAddr), NodeError> {
1220 #[cfg(any(target_os = "linux", target_os = "macos"))]
1221 {
1222 let (iface, mac_str) = addr_str.split_once('/').ok_or_else(|| {
1223 NodeError::NoTransportForType(format!(
1224 "invalid Ethernet address format '{}': expected 'interface/mac'",
1225 addr_str
1226 ))
1227 })?;
1228
1229 let transport_id = self
1231 .transports
1232 .iter()
1233 .find(|(_, handle)| {
1234 handle.transport_type().name == "ethernet"
1235 && handle.is_operational()
1236 && handle.interface_name() == Some(iface)
1237 })
1238 .map(|(id, _)| *id)
1239 .ok_or_else(|| {
1240 NodeError::NoTransportForType(format!(
1241 "no operational Ethernet transport for interface '{}'",
1242 iface
1243 ))
1244 })?;
1245
1246 let mac = crate::transport::ethernet::parse_mac_string(mac_str).map_err(|e| {
1247 NodeError::NoTransportForType(format!("invalid MAC in '{}': {}", addr_str, e))
1248 })?;
1249
1250 Ok((transport_id, TransportAddr::from_bytes(&mac)))
1251 }
1252 #[cfg(not(any(target_os = "linux", target_os = "macos")))]
1253 {
1254 Err(NodeError::NoTransportForType(
1255 "Ethernet transport is not supported on this platform".to_string(),
1256 ))
1257 }
1258 }
1259
1260 #[cfg(bluer_available)]
1264 fn resolve_ble_addr(&self, addr_str: &str) -> Result<(TransportId, TransportAddr), NodeError> {
1265 let ta = TransportAddr::from_string(addr_str);
1266 let adapter = crate::transport::ble::addr::adapter_from_addr(&ta).ok_or_else(|| {
1267 NodeError::NoTransportForType(format!(
1268 "invalid BLE address format '{}': expected 'adapter/mac'",
1269 addr_str
1270 ))
1271 })?;
1272
1273 let transport_id = self
1275 .transports
1276 .iter()
1277 .find(|(_, handle)| handle.transport_type().name == "ble" && handle.is_operational())
1278 .map(|(id, _)| *id)
1279 .ok_or_else(|| {
1280 NodeError::NoTransportForType(format!(
1281 "no operational BLE transport for adapter '{}'",
1282 adapter
1283 ))
1284 })?;
1285
1286 crate::transport::ble::addr::BleAddr::parse(addr_str).map_err(|e| {
1288 NodeError::NoTransportForType(format!("invalid BLE address '{}': {}", addr_str, e))
1289 })?;
1290
1291 Ok((transport_id, TransportAddr::from_string(addr_str)))
1292 }
1293
1294 pub fn identity(&self) -> &Identity {
1298 &self.identity
1299 }
1300
1301 pub fn node_addr(&self) -> &NodeAddr {
1303 self.identity.node_addr()
1304 }
1305
1306 pub fn npub(&self) -> String {
1308 self.identity.npub()
1309 }
1310
1311 pub(crate) fn peer_display_name(&self, addr: &NodeAddr) -> String {
1320 if let Some(hostname) = self.host_map.lookup_hostname(addr) {
1321 return hostname.to_string();
1322 }
1323 if let Some(name) = self.peer_aliases.get(addr) {
1324 return name.clone();
1325 }
1326 if let Some(peer) = self.peers.get(addr) {
1327 return peer.identity().short_npub();
1328 }
1329 if let Some(entry) = self.sessions.get(addr) {
1330 let (xonly, _) = entry.remote_pubkey().x_only_public_key();
1331 return PeerIdentity::from_pubkey(xonly).short_npub();
1332 }
1333 addr.short_hex()
1334 }
1335
1336 pub(in crate::node) fn deregister_session_index(&mut self, cache_key: (TransportId, u32)) {
1348 let owning_peer = self.peers_by_index.get(&cache_key).copied();
1352 self.peers_by_index.remove(&cache_key);
1353 if self.decrypt_registered_sessions.remove(&cache_key)
1354 && let Some(workers) = self.decrypt_workers.as_ref()
1355 {
1356 workers.unregister_session(cache_key);
1357 }
1358 if let Some(peer_addr) = owning_peer {
1369 let peer_has_other_index = self
1370 .peers_by_index
1371 .values()
1372 .any(|other| *other == peer_addr);
1373 if !peer_has_other_index {
1374 self.clear_connected_udp_for_peer(&peer_addr);
1375 }
1376 }
1377 }
1378
1379 pub(in crate::node) fn ensure_current_session_index_registered(
1388 &mut self,
1389 node_addr: &NodeAddr,
1390 context: &'static str,
1391 ) -> bool {
1392 let Some(peer) = self.peers.get(node_addr) else {
1393 return false;
1394 };
1395 let Some(transport_id) = peer.transport_id() else {
1396 warn!(
1397 peer = %self.peer_display_name(node_addr),
1398 context,
1399 "Cannot register current session index without transport id"
1400 );
1401 return false;
1402 };
1403 let Some(our_index) = peer.our_index() else {
1404 warn!(
1405 peer = %self.peer_display_name(node_addr),
1406 context,
1407 "Cannot register current session index without local index"
1408 );
1409 return false;
1410 };
1411
1412 let cache_key = (transport_id, our_index.as_u32());
1413 match self.peers_by_index.get(&cache_key).copied() {
1414 Some(existing) if existing == *node_addr => true,
1415 Some(existing) => {
1416 warn!(
1417 peer = %self.peer_display_name(node_addr),
1418 previous_owner = %self.peer_display_name(&existing),
1419 transport_id = %transport_id,
1420 our_index = %our_index,
1421 context,
1422 "Repairing current session index with stale owner"
1423 );
1424 self.peers_by_index.insert(cache_key, *node_addr);
1425 true
1426 }
1427 None => {
1428 warn!(
1429 peer = %self.peer_display_name(node_addr),
1430 transport_id = %transport_id,
1431 our_index = %our_index,
1432 context,
1433 "Repairing missing current session index"
1434 );
1435 self.peers_by_index.insert(cache_key, *node_addr);
1436 true
1437 }
1438 }
1439 }
1440
1441 pub fn config(&self) -> &Config {
1445 &self.config
1446 }
1447
1448 pub fn effective_ipv6_mtu(&self) -> u16 {
1454 crate::upper::icmp::effective_ipv6_mtu(self.transport_mtu())
1455 }
1456
1457 pub fn transport_mtu(&self) -> u16 {
1474 let min_operational = self
1475 .transports
1476 .values()
1477 .filter(|h| h.is_operational())
1478 .map(|h| h.mtu())
1479 .min();
1480 if let Some(mtu) = min_operational {
1481 return mtu;
1482 }
1483 if let Some((_, cfg)) = self.config.transports.udp.iter().next() {
1485 return cfg.mtu();
1486 }
1487 1280
1488 }
1489
1490 pub fn state(&self) -> NodeState {
1494 self.state
1495 }
1496
1497 pub fn uptime(&self) -> std::time::Duration {
1499 self.started_at.elapsed()
1500 }
1501
1502 pub fn is_running(&self) -> bool {
1504 self.state.is_operational()
1505 }
1506
1507 pub fn is_leaf_only(&self) -> bool {
1509 self.is_leaf_only
1510 }
1511
1512 pub fn tree_state(&self) -> &TreeState {
1516 &self.tree_state
1517 }
1518
1519 pub fn tree_state_mut(&mut self) -> &mut TreeState {
1521 &mut self.tree_state
1522 }
1523
1524 pub fn bloom_state(&self) -> &BloomState {
1528 &self.bloom_state
1529 }
1530
1531 pub fn bloom_state_mut(&mut self) -> &mut BloomState {
1533 &mut self.bloom_state
1534 }
1535
1536 pub fn estimated_mesh_size(&self) -> Option<u64> {
1540 self.estimated_mesh_size
1541 }
1542
1543 pub(crate) fn compute_mesh_size(&mut self) {
1549 let my_addr = *self.tree_state.my_node_addr();
1550 let parent_id = *self.tree_state.my_declaration().parent_id();
1551 let is_root = self.tree_state.is_root();
1552
1553 let max_fpr = self.config.node.bloom.max_inbound_fpr;
1554 let mut total: f64 = 1.0; let mut child_count: u32 = 0;
1556 let mut has_data = false;
1557
1558 if !is_root
1564 && let Some(parent) = self.peers.get(&parent_id)
1565 && let Some(filter) = parent.inbound_filter()
1566 {
1567 match filter.estimated_count(max_fpr) {
1568 Some(n) => {
1569 total += n;
1570 has_data = true;
1571 }
1572 None => {
1573 self.estimated_mesh_size = None;
1574 return;
1575 }
1576 }
1577 }
1578
1579 for (peer_addr, peer) in &self.peers {
1581 if let Some(decl) = self.tree_state.peer_declaration(peer_addr)
1582 && *decl.parent_id() == my_addr
1583 {
1584 child_count += 1;
1585 if let Some(filter) = peer.inbound_filter() {
1586 match filter.estimated_count(max_fpr) {
1587 Some(n) => {
1588 total += n;
1589 has_data = true;
1590 }
1591 None => {
1592 self.estimated_mesh_size = None;
1593 return;
1594 }
1595 }
1596 }
1597 }
1598 }
1599
1600 if !has_data {
1601 self.estimated_mesh_size = None;
1602 return;
1603 }
1604
1605 let size = total.round() as u64;
1606 self.estimated_mesh_size = Some(size);
1607
1608 let now = std::time::Instant::now();
1610 let should_log = match self.last_mesh_size_log {
1611 None => true,
1612 Some(last) => {
1613 now.duration_since(last)
1614 >= std::time::Duration::from_secs(self.config.node.mmp.log_interval_secs)
1615 }
1616 };
1617 if should_log {
1618 tracing::debug!(
1619 estimated_mesh_size = size,
1620 peers = self.peers.len(),
1621 children = child_count,
1622 "Mesh size estimate"
1623 );
1624 self.last_mesh_size_log = Some(now);
1625 }
1626 }
1627
1628 pub fn coord_cache(&self) -> &CoordCache {
1632 &self.coord_cache
1633 }
1634
1635 pub fn coord_cache_mut(&mut self) -> &mut CoordCache {
1637 &mut self.coord_cache
1638 }
1639
1640 pub fn stats(&self) -> &stats::NodeStats {
1644 &self.stats
1645 }
1646
1647 pub(crate) fn stats_mut(&mut self) -> &mut stats::NodeStats {
1649 &mut self.stats
1650 }
1651
1652 pub fn stats_history(&self) -> &stats_history::StatsHistory {
1654 &self.stats_history
1655 }
1656
1657 pub(crate) fn record_stats_history(&mut self) {
1660 let fwd = &self.stats.forwarding;
1661 let peers_with_mmp: Vec<f64> = self
1662 .peers
1663 .values()
1664 .filter_map(|p| p.mmp().map(|m| m.metrics.loss_rate()))
1665 .collect();
1666 let loss_rate = if peers_with_mmp.is_empty() {
1667 0.0
1668 } else {
1669 peers_with_mmp.iter().sum::<f64>() / peers_with_mmp.len() as f64
1670 };
1671
1672 let snap = stats_history::Snapshot {
1673 mesh_size: self.estimated_mesh_size,
1674 tree_depth: self.tree_state.my_coords().depth() as u32,
1675 peer_count: self.peers.len() as u64,
1676 parent_switches_total: self.stats.tree.parent_switches,
1677 bytes_in_total: fwd.received_bytes,
1678 bytes_out_total: fwd.forwarded_bytes + fwd.originated_bytes,
1679 packets_in_total: fwd.received_packets,
1680 packets_out_total: fwd.forwarded_packets + fwd.originated_packets,
1681 loss_rate,
1682 active_sessions: self.sessions.len() as u64,
1683 };
1684
1685 let now = std::time::Instant::now();
1686 let peer_snaps: Vec<stats_history::PeerSnapshot> = self
1687 .peers
1688 .values()
1689 .map(|p| {
1690 let stats = p.link_stats();
1691 let (srtt_ms, loss_rate, ecn_ce) = match p.mmp() {
1692 Some(m) => (
1693 m.metrics.srtt_ms(),
1694 Some(m.metrics.loss_rate()),
1695 m.receiver.ecn_ce_count() as u64,
1696 ),
1697 None => (None, None, 0),
1698 };
1699 stats_history::PeerSnapshot {
1700 node_addr: *p.node_addr(),
1701 last_seen: now,
1702 srtt_ms,
1703 loss_rate,
1704 bytes_in_total: stats.bytes_recv,
1705 bytes_out_total: stats.bytes_sent,
1706 packets_in_total: stats.packets_recv,
1707 packets_out_total: stats.packets_sent,
1708 ecn_ce_total: ecn_ce,
1709 }
1710 })
1711 .collect();
1712
1713 self.stats_history.tick(now, &snap, &peer_snaps);
1714 }
1715
1716 pub fn tun_state(&self) -> TunState {
1720 self.tun_state
1721 }
1722
1723 pub fn tun_name(&self) -> Option<&str> {
1725 self.tun_name.as_deref()
1726 }
1727
1728 pub fn set_max_connections(&mut self, max: usize) {
1732 self.max_connections = max;
1733 }
1734
1735 pub fn set_max_peers(&mut self, max: usize) {
1737 self.max_peers = max;
1738 }
1739
1740 pub fn set_max_links(&mut self, max: usize) {
1742 self.max_links = max;
1743 }
1744
1745 pub fn connection_count(&self) -> usize {
1749 self.connections.len()
1750 }
1751
1752 pub fn peer_count(&self) -> usize {
1754 self.peers.len()
1755 }
1756
1757 pub fn link_count(&self) -> usize {
1759 self.links.len()
1760 }
1761
1762 pub fn transport_count(&self) -> usize {
1764 self.transports.len()
1765 }
1766
1767 pub fn allocate_transport_id(&mut self) -> TransportId {
1771 let id = TransportId::new(self.next_transport_id);
1772 self.next_transport_id += 1;
1773 id
1774 }
1775
1776 pub fn get_transport(&self, id: &TransportId) -> Option<&TransportHandle> {
1778 self.transports.get(id)
1779 }
1780
1781 pub fn get_transport_mut(&mut self, id: &TransportId) -> Option<&mut TransportHandle> {
1783 self.transports.get_mut(id)
1784 }
1785
1786 pub fn transport_ids(&self) -> impl Iterator<Item = &TransportId> {
1788 self.transports.keys()
1789 }
1790
1791 pub fn packet_rx(&mut self) -> Option<&mut PacketRx> {
1793 self.packet_rx.as_mut()
1794 }
1795
1796 pub fn allocate_link_id(&mut self) -> LinkId {
1800 let id = LinkId::new(self.next_link_id);
1801 self.next_link_id += 1;
1802 id
1803 }
1804
1805 pub fn add_link(&mut self, link: Link) -> Result<(), NodeError> {
1807 if self.max_links > 0 && self.links.len() >= self.max_links {
1808 return Err(NodeError::MaxLinksExceeded {
1809 max: self.max_links,
1810 });
1811 }
1812 let link_id = link.link_id();
1813 let transport_id = link.transport_id();
1814 let remote_addr = link.remote_addr().clone();
1815
1816 self.links.insert(link_id, link);
1817 self.addr_to_link
1818 .insert((transport_id, remote_addr), link_id);
1819 Ok(())
1820 }
1821
1822 pub fn get_link(&self, link_id: &LinkId) -> Option<&Link> {
1824 self.links.get(link_id)
1825 }
1826
1827 pub fn get_link_mut(&mut self, link_id: &LinkId) -> Option<&mut Link> {
1829 self.links.get_mut(link_id)
1830 }
1831
1832 pub fn find_link_by_addr(
1834 &self,
1835 transport_id: TransportId,
1836 addr: &TransportAddr,
1837 ) -> Option<LinkId> {
1838 self.addr_to_link
1839 .get(&(transport_id, addr.clone()))
1840 .copied()
1841 }
1842
1843 pub fn remove_link(&mut self, link_id: &LinkId) -> Option<Link> {
1849 if let Some(link) = self.links.remove(link_id) {
1850 let key = (link.transport_id(), link.remote_addr().clone());
1852 if self.addr_to_link.get(&key) == Some(link_id) {
1853 self.addr_to_link.remove(&key);
1854 }
1855 Some(link)
1856 } else {
1857 None
1858 }
1859 }
1860
1861 pub(crate) fn cleanup_bootstrap_transport_if_unused(&mut self, transport_id: TransportId) {
1862 if !self.bootstrap_transports.contains(&transport_id) {
1863 return;
1864 }
1865
1866 let transport_in_use = self
1867 .links
1868 .values()
1869 .any(|link| link.transport_id() == transport_id)
1870 || self
1871 .connections
1872 .values()
1873 .any(|conn| conn.transport_id() == Some(transport_id))
1874 || self
1875 .peers
1876 .values()
1877 .any(|peer| peer.transport_id() == Some(transport_id))
1878 || self
1879 .pending_connects
1880 .iter()
1881 .any(|pending| pending.transport_id == transport_id);
1882
1883 if transport_in_use {
1884 return;
1885 }
1886
1887 tracing::debug!(
1888 transport_id = %transport_id,
1889 "bootstrap transport has no remaining references; dropping"
1890 );
1891
1892 self.bootstrap_transports.remove(&transport_id);
1893 self.bootstrap_transport_npubs.remove(&transport_id);
1894 self.transport_drops.remove(&transport_id);
1895 self.transports.remove(&transport_id);
1896 }
1897
1898 pub fn links(&self) -> impl Iterator<Item = &Link> {
1900 self.links.values()
1901 }
1902
1903 pub fn add_connection(&mut self, connection: PeerConnection) -> Result<(), NodeError> {
1907 let link_id = connection.link_id();
1908
1909 if self.connections.contains_key(&link_id) {
1910 return Err(NodeError::ConnectionAlreadyExists(link_id));
1911 }
1912
1913 if self.max_connections > 0 && self.connections.len() >= self.max_connections {
1914 return Err(NodeError::MaxConnectionsExceeded {
1915 max: self.max_connections,
1916 });
1917 }
1918
1919 self.connections.insert(link_id, connection);
1920 Ok(())
1921 }
1922
1923 pub fn get_connection(&self, link_id: &LinkId) -> Option<&PeerConnection> {
1925 self.connections.get(link_id)
1926 }
1927
1928 pub fn get_connection_mut(&mut self, link_id: &LinkId) -> Option<&mut PeerConnection> {
1930 self.connections.get_mut(link_id)
1931 }
1932
1933 pub fn remove_connection(&mut self, link_id: &LinkId) -> Option<PeerConnection> {
1935 self.connections.remove(link_id)
1936 }
1937
1938 pub fn connections(&self) -> impl Iterator<Item = &PeerConnection> {
1940 self.connections.values()
1941 }
1942
1943 pub fn get_peer(&self, node_addr: &NodeAddr) -> Option<&ActivePeer> {
1947 self.peers.get(node_addr)
1948 }
1949
1950 pub fn get_peer_mut(&mut self, node_addr: &NodeAddr) -> Option<&mut ActivePeer> {
1952 self.peers.get_mut(node_addr)
1953 }
1954
1955 pub fn remove_peer(&mut self, node_addr: &NodeAddr) -> Option<ActivePeer> {
1957 self.peers.remove(node_addr)
1958 }
1959
1960 pub fn peers(&self) -> impl Iterator<Item = &ActivePeer> {
1962 self.peers.values()
1963 }
1964
1965 pub fn nostr_discovery_handle(&self) -> Option<&crate::discovery::nostr::NostrDiscovery> {
1969 self.nostr_discovery.as_deref()
1970 }
1971
1972 pub fn peer_ids(&self) -> impl Iterator<Item = &NodeAddr> {
1974 self.peers.keys()
1975 }
1976
1977 pub fn sendable_peers(&self) -> impl Iterator<Item = &ActivePeer> {
1979 self.peers.values().filter(|p| p.can_send())
1980 }
1981
1982 pub fn sendable_peer_count(&self) -> usize {
1984 self.peers.values().filter(|p| p.can_send()).count()
1985 }
1986
1987 pub(crate) fn set_discovery_fallback_transit_allowed(
1988 &mut self,
1989 peer_addr: NodeAddr,
1990 allowed: bool,
1991 ) {
1992 if allowed {
1993 self.discovery_fallback_transit_blocked_peers
1994 .remove(&peer_addr);
1995 } else {
1996 self.discovery_fallback_transit_blocked_peers
1997 .insert(peer_addr);
1998 }
1999 }
2000
2001 pub(crate) fn configured_discovery_fallback_transit(
2002 &self,
2003 peer_addr: &NodeAddr,
2004 ) -> Option<bool> {
2005 self.config.peers().iter().find_map(|peer| {
2006 PeerIdentity::from_npub(&peer.npub)
2007 .ok()
2008 .filter(|identity| identity.node_addr() == peer_addr)
2009 .map(|_| peer.discovery_fallback_transit)
2010 })
2011 }
2012
2013 pub(crate) fn discovery_fallback_transit_for_promotion(&self, peer_addr: &NodeAddr) -> bool {
2014 if let Some(retry_state) = self.retry_pending.get(peer_addr) {
2015 return retry_state.peer_config.discovery_fallback_transit;
2016 }
2017
2018 if let Some(allowed) = self.configured_discovery_fallback_transit(peer_addr) {
2019 return allowed;
2020 }
2021
2022 self.config.node.discovery.nostr.policy != crate::config::NostrDiscoveryPolicy::Open
2023 }
2024
2025 #[cfg(test)]
2030 pub(crate) fn disable_discovery_forward_rate_limit(&mut self) {
2031 self.discovery_forward_limiter
2032 .set_interval(std::time::Duration::ZERO);
2033 }
2034
2035 #[cfg(test)]
2036 pub(crate) fn get_session(&self, remote: &NodeAddr) -> Option<&SessionEntry> {
2037 self.sessions.get(remote)
2038 }
2039
2040 #[cfg(test)]
2042 pub(crate) fn get_session_mut(&mut self, remote: &NodeAddr) -> Option<&mut SessionEntry> {
2043 self.sessions.get_mut(remote)
2044 }
2045
2046 #[cfg(test)]
2048 pub(crate) fn remove_session(&mut self, remote: &NodeAddr) -> Option<SessionEntry> {
2049 self.sessions.remove(remote)
2050 }
2051
2052 #[cfg(test)]
2054 pub(crate) fn path_mtu_lookup_get(&self, fips_addr: &crate::FipsAddress) -> Option<u16> {
2055 self.path_mtu_lookup
2056 .read()
2057 .ok()
2058 .and_then(|map| map.get(fips_addr).copied())
2059 }
2060
2061 #[cfg(test)]
2063 pub(crate) fn path_mtu_lookup_insert(&self, fips_addr: crate::FipsAddress, mtu: u16) {
2064 if let Ok(mut map) = self.path_mtu_lookup.write() {
2065 map.insert(fips_addr, mtu);
2066 }
2067 }
2068
2069 pub fn session_count(&self) -> usize {
2071 self.sessions.len()
2072 }
2073
2074 pub(crate) fn session_entries(&self) -> impl Iterator<Item = (&NodeAddr, &SessionEntry)> {
2076 self.sessions.iter()
2077 }
2078
2079 pub(crate) fn register_identity(
2083 &mut self,
2084 node_addr: NodeAddr,
2085 pubkey: secp256k1::PublicKey,
2086 ) -> bool {
2087 let mut prefix = [0u8; 15];
2088 prefix.copy_from_slice(&node_addr.as_bytes()[0..15]);
2089 if let Some(entry) = self.identity_cache.get(&prefix)
2090 && entry.node_addr == node_addr
2091 && entry.pubkey == pubkey
2092 {
2093 return true;
2097 }
2098
2099 let (xonly, _) = pubkey.x_only_public_key();
2100 let derived_node_addr = NodeAddr::from_pubkey(&xonly);
2101 if derived_node_addr != node_addr {
2102 debug!(
2103 claimed_node_addr = %node_addr,
2104 derived_node_addr = %derived_node_addr,
2105 "Rejected identity cache entry with mismatched public key"
2106 );
2107 return false;
2108 }
2109
2110 let now_ms = Self::now_ms();
2111 if let Some(entry) = self.identity_cache.get_mut(&prefix)
2112 && entry.node_addr == node_addr
2113 {
2114 entry.pubkey = pubkey;
2115 entry.last_seen_ms = now_ms;
2116 return true;
2117 }
2118
2119 let npub = encode_npub(&xonly);
2120 self.identity_cache.insert(
2121 prefix,
2122 IdentityCacheEntry::new(node_addr, pubkey, npub, now_ms),
2123 );
2124 let max = self.config.node.cache.identity_size;
2126 if self.identity_cache.len() > max
2127 && let Some(oldest_key) = self
2128 .identity_cache
2129 .iter()
2130 .min_by_key(|(_, entry)| entry.last_seen_ms)
2131 .map(|(k, _)| *k)
2132 {
2133 self.identity_cache.remove(&oldest_key);
2134 }
2135 true
2136 }
2137
2138 pub(crate) fn lookup_by_fips_prefix(
2140 &mut self,
2141 prefix: &[u8; 15],
2142 ) -> Option<(NodeAddr, secp256k1::PublicKey)> {
2143 if let Some(entry) = self.identity_cache.get_mut(prefix) {
2144 entry.last_seen_ms = Self::now_ms(); Some((entry.node_addr, entry.pubkey))
2146 } else {
2147 None
2148 }
2149 }
2150
2151 pub(crate) fn has_cached_identity(&self, addr: &NodeAddr) -> bool {
2153 let mut prefix = [0u8; 15];
2154 prefix.copy_from_slice(&addr.as_bytes()[0..15]);
2155 self.identity_cache.contains_key(&prefix)
2156 }
2157
2158 pub fn identity_cache_len(&self) -> usize {
2160 self.identity_cache.len()
2161 }
2162
2163 pub fn identity_cache_iter(
2168 &self,
2169 ) -> impl Iterator<Item = (&NodeAddr, &secp256k1::PublicKey, u64)> {
2170 self.identity_cache
2171 .values()
2172 .map(|entry| (&entry.node_addr, &entry.pubkey, entry.last_seen_ms))
2173 }
2174
2175 pub fn identity_cache_max(&self) -> usize {
2177 self.config.node.cache.identity_size
2178 }
2179
2180 pub fn pending_lookup_count(&self) -> usize {
2182 self.pending_lookups.len()
2183 }
2184
2185 pub fn pending_lookups_iter(
2187 &self,
2188 ) -> impl Iterator<Item = (&NodeAddr, &handlers::discovery::PendingLookup)> {
2189 self.pending_lookups.iter()
2190 }
2191
2192 pub fn recent_request_count(&self) -> usize {
2194 self.recent_requests.len()
2195 }
2196
2197 pub fn pending_tun_destinations(&self) -> usize {
2199 self.pending_tun_packets.len()
2200 }
2201
2202 pub fn pending_tun_total_packets(&self) -> usize {
2204 self.pending_tun_packets.values().map(|q| q.len()).sum()
2205 }
2206
2207 pub fn retry_state_iter(&self) -> impl Iterator<Item = (&NodeAddr, &retry::RetryState)> {
2209 self.retry_pending.iter()
2210 }
2211
2212 pub(crate) fn is_tree_peer(&self, peer_addr: &NodeAddr) -> bool {
2219 if !self.tree_state.is_root() && self.tree_state.my_declaration().parent_id() == peer_addr {
2221 return true;
2222 }
2223 if let Some(decl) = self.tree_state.peer_declaration(peer_addr)
2225 && decl.parent_id() == self.node_addr()
2226 {
2227 return true;
2228 }
2229 false
2230 }
2231
2232 pub fn find_next_hop(&mut self, dest_node_addr: &NodeAddr) -> Option<&ActivePeer> {
2255 if dest_node_addr == self.node_addr() {
2257 return None;
2258 }
2259
2260 let direct_peer_can_send = self
2264 .peers
2265 .get(dest_node_addr)
2266 .is_some_and(|peer| peer.can_send());
2267 if let Some(peer) = self.peers.get(dest_node_addr)
2268 && peer.is_healthy()
2269 {
2270 return Some(peer);
2271 }
2272
2273 let now_ms = Self::now_ms();
2274
2275 let sendable_learned_peers = if self.config.node.routing.mode == RoutingMode::ReplyLearned {
2276 Some(
2277 self.peers
2278 .iter()
2279 .filter(|(_, peer)| peer.can_send())
2280 .map(|(addr, _)| *addr)
2281 .collect::<HashSet<_>>(),
2282 )
2283 } else {
2284 None
2285 };
2286
2287 let explore_fallback = sendable_learned_peers.as_ref().is_some_and(|sendable| {
2294 self.learned_routes.should_explore_fallback(
2295 dest_node_addr,
2296 now_ms,
2297 self.config.node.routing.learned_fallback_explore_interval,
2298 |addr| sendable.contains(addr),
2299 )
2300 });
2301 if let Some(sendable) = &sendable_learned_peers
2302 && !explore_fallback
2303 && let Some(next_hop_addr) =
2304 self.learned_routes
2305 .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2306 {
2307 return self.peers.get(&next_hop_addr);
2308 }
2309
2310 let Some(dest_coords) = self
2312 .coord_cache
2313 .get_and_touch(dest_node_addr, now_ms)
2314 .cloned()
2315 else {
2316 if let Some(sendable) = &sendable_learned_peers
2317 && let Some(next_hop_addr) =
2318 self.learned_routes
2319 .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2320 {
2321 return self.peers.get(&next_hop_addr);
2322 }
2323 if direct_peer_can_send {
2324 return self.peers.get(dest_node_addr);
2325 }
2326 return None;
2327 };
2328
2329 let coordinate_route_addr = {
2332 let candidates: Vec<&ActivePeer> = self.destination_in_filters(dest_node_addr);
2333 if !candidates.is_empty() {
2334 self.select_best_candidate(&candidates, &dest_coords)
2335 .map(|peer| *peer.node_addr())
2336 } else {
2337 None
2338 }
2339 };
2340 if let Some(next_hop_addr) = coordinate_route_addr {
2341 return self.peers.get(&next_hop_addr);
2342 }
2343
2344 let tree_route_addr = self
2346 .tree_state
2347 .find_next_hop(&dest_coords)
2348 .filter(|next_hop_id| {
2349 self.peers
2350 .get(next_hop_id)
2351 .is_some_and(|peer| peer.can_send())
2352 });
2353 if let Some(next_hop_addr) = tree_route_addr {
2354 return self.peers.get(&next_hop_addr);
2355 }
2356 if explore_fallback {
2357 return sendable_learned_peers.as_ref().and_then(|sendable| {
2358 self.learned_routes
2359 .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2360 .and_then(|next_hop_addr| self.peers.get(&next_hop_addr))
2361 });
2362 }
2363
2364 if let Some(sendable) = &sendable_learned_peers
2365 && let Some(next_hop_addr) =
2366 self.learned_routes
2367 .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2368 {
2369 return self.peers.get(&next_hop_addr);
2370 }
2371
2372 if direct_peer_can_send {
2373 return self.peers.get(dest_node_addr);
2374 }
2375
2376 None
2377 }
2378
2379 pub(in crate::node) fn learn_reverse_route(
2380 &mut self,
2381 destination: NodeAddr,
2382 next_hop: NodeAddr,
2383 ) {
2384 if self.config.node.routing.mode != RoutingMode::ReplyLearned
2385 || destination == *self.node_addr()
2386 {
2387 return;
2388 }
2389 let now_ms = Self::now_ms();
2390 self.learned_routes.learn(
2391 destination,
2392 next_hop,
2393 now_ms,
2394 self.config.node.routing.learned_ttl_secs,
2395 self.config.node.routing.max_learned_routes_per_dest,
2396 );
2397 }
2398
2399 pub(in crate::node) fn record_route_failure(
2400 &mut self,
2401 destination: NodeAddr,
2402 next_hop: NodeAddr,
2403 ) {
2404 if self.config.node.routing.mode != RoutingMode::ReplyLearned {
2405 return;
2406 }
2407 self.learned_routes.record_failure(&destination, &next_hop);
2408 }
2409
2410 pub(crate) fn learned_route_table_snapshot(&self, now_ms: u64) -> LearnedRouteTableSnapshot {
2411 self.learned_routes.snapshot(now_ms)
2412 }
2413
2414 pub(in crate::node) fn purge_learned_routes(&mut self, now_ms: u64) {
2415 self.learned_routes.purge_expired(now_ms);
2416 }
2417
2418 fn select_best_candidate<'a>(
2427 &'a self,
2428 candidates: &[&'a ActivePeer],
2429 dest_coords: &crate::tree::TreeCoordinate,
2430 ) -> Option<&'a ActivePeer> {
2431 let my_distance = self.tree_state.my_coords().distance_to(dest_coords);
2432
2433 let mut best: Option<(&ActivePeer, f64, usize)> = None;
2434
2435 for &candidate in candidates {
2436 if !candidate.can_send() {
2437 continue;
2438 }
2439
2440 let cost = candidate.link_cost();
2441
2442 let dist = self
2443 .tree_state
2444 .peer_coords(candidate.node_addr())
2445 .map(|pc| pc.distance_to(dest_coords))
2446 .unwrap_or(usize::MAX);
2447
2448 if dist >= my_distance {
2451 continue;
2452 }
2453
2454 let dominated = match &best {
2455 None => true,
2456 Some((_, best_cost, best_dist)) => {
2457 cost < *best_cost
2458 || (cost == *best_cost && dist < *best_dist)
2459 || (cost == *best_cost
2460 && dist == *best_dist
2461 && candidate.node_addr() < best.as_ref().unwrap().0.node_addr())
2462 }
2463 };
2464
2465 if dominated {
2466 best = Some((candidate, cost, dist));
2467 }
2468 }
2469
2470 best.map(|(peer, _, _)| peer)
2471 }
2472
2473 pub fn destination_in_filters(&self, dest: &NodeAddr) -> Vec<&ActivePeer> {
2475 self.peers.values().filter(|p| p.may_reach(dest)).collect()
2476 }
2477
2478 pub fn tun_tx(&self) -> Option<&TunTx> {
2482 self.tun_tx.as_ref()
2483 }
2484
2485 pub fn attach_external_packet_io(
2492 &mut self,
2493 capacity: usize,
2494 ) -> Result<ExternalPacketIo, NodeError> {
2495 if self.state != NodeState::Created {
2496 return Err(NodeError::Config(ConfigError::Validation(
2497 "external packet I/O must be attached before node start".to_string(),
2498 )));
2499 }
2500 if self.config.tun.enabled {
2501 return Err(NodeError::Config(ConfigError::Validation(
2502 "external packet I/O requires tun.enabled=false".to_string(),
2503 )));
2504 }
2505
2506 let capacity = capacity.max(1);
2507 let (outbound_tx, outbound_rx) = tokio::sync::mpsc::channel(capacity);
2508 let (inbound_tx, inbound_rx) = tokio::sync::mpsc::channel(capacity);
2509 self.tun_outbound_rx = Some(outbound_rx);
2510 self.external_packet_tx = Some(inbound_tx);
2511
2512 Ok(ExternalPacketIo {
2513 outbound_tx,
2514 inbound_rx,
2515 })
2516 }
2517
2518 pub(crate) fn attach_endpoint_data_io(
2523 &mut self,
2524 capacity: usize,
2525 ) -> Result<EndpointDataIo, NodeError> {
2526 if self.state != NodeState::Created {
2527 return Err(NodeError::Config(ConfigError::Validation(
2528 "endpoint data I/O must be attached before node start".to_string(),
2529 )));
2530 }
2531
2532 let command_capacity = endpoint_data_command_capacity(capacity);
2533 let (command_tx, command_rx) = tokio::sync::mpsc::channel(command_capacity);
2534 let (event_tx, event_rx) = tokio::sync::mpsc::unbounded_channel();
2539 self.endpoint_command_rx = Some(command_rx);
2540 self.endpoint_event_tx = Some(event_tx.clone());
2541
2542 Ok(EndpointDataIo {
2543 command_tx,
2544 event_rx,
2545 event_tx,
2546 })
2547 }
2548
2549 pub(crate) fn pubkey_for_node_addr(&self, addr: &NodeAddr) -> Option<secp256k1::PublicKey> {
2550 let mut prefix = [0u8; 15];
2551 prefix.copy_from_slice(&addr.as_bytes()[0..15]);
2552 self.identity_cache
2553 .get(&prefix)
2554 .filter(|entry| &entry.node_addr == addr)
2555 .map(|entry| entry.pubkey)
2556 }
2557
2558 pub(crate) fn npub_for_node_addr(&self, addr: &NodeAddr) -> Option<String> {
2559 let mut prefix = [0u8; 15];
2560 prefix.copy_from_slice(&addr.as_bytes()[0..15]);
2561 self.identity_cache
2562 .get(&prefix)
2563 .filter(|entry| &entry.node_addr == addr)
2564 .map(|entry| entry.npub.clone())
2565 }
2566
2567 pub(in crate::node) fn deliver_external_ipv6_packet(
2568 &self,
2569 src_addr: &NodeAddr,
2570 packet: Vec<u8>,
2571 ) {
2572 let Some(external_packet_tx) = &self.external_packet_tx else {
2573 return;
2574 };
2575 if packet.len() < 40 {
2576 return;
2577 }
2578 let Ok(destination) = FipsAddress::from_slice(&packet[24..40]) else {
2579 return;
2580 };
2581 let delivered = NodeDeliveredPacket {
2582 source_node_addr: *src_addr,
2583 source_npub: self.npub_for_node_addr(src_addr),
2584 destination,
2585 packet,
2586 };
2587 if let Err(error) = external_packet_tx.try_send(delivered) {
2588 debug!(error = %error, "Failed to deliver packet to external app sink");
2589 }
2590 }
2591
2592 pub(super) async fn send_encrypted_link_message(
2606 &mut self,
2607 node_addr: &NodeAddr,
2608 plaintext: &[u8],
2609 ) -> Result<(), NodeError> {
2610 self.send_encrypted_link_message_with_ce(node_addr, plaintext, false)
2611 .await
2612 }
2613
2614 pub(in crate::node) fn note_local_send_outcome(
2620 &mut self,
2621 result: &Result<usize, TransportError>,
2622 ) {
2623 match result {
2624 Ok(_) => {
2625 if self.last_local_send_failure_at.is_some() {
2626 self.last_local_send_failure_at = None;
2627 }
2628 }
2629 Err(TransportError::Io(e))
2630 if matches!(
2631 e.kind(),
2632 std::io::ErrorKind::NetworkUnreachable
2633 | std::io::ErrorKind::HostUnreachable
2634 | std::io::ErrorKind::AddrNotAvailable
2635 ) =>
2636 {
2637 self.last_local_send_failure_at = Some(std::time::Instant::now());
2638 }
2639 Err(_) => {}
2640 }
2641 }
2642
2643 pub(in crate::node) fn last_local_send_failure_at(&self) -> Option<std::time::Instant> {
2646 self.last_local_send_failure_at
2647 }
2648
2649 pub(super) async fn send_encrypted_link_message_with_ce(
2653 &mut self,
2654 node_addr: &NodeAddr,
2655 plaintext: &[u8],
2656 ce_flag: bool,
2657 ) -> Result<(), NodeError> {
2658 let peer = self
2659 .peers
2660 .get_mut(node_addr)
2661 .ok_or(NodeError::PeerNotFound(*node_addr))?;
2662
2663 let their_index = peer.their_index().ok_or_else(|| NodeError::SendFailed {
2664 node_addr: *node_addr,
2665 reason: "no their_index".into(),
2666 })?;
2667 let transport_id = peer.transport_id().ok_or_else(|| NodeError::SendFailed {
2668 node_addr: *node_addr,
2669 reason: "no transport_id".into(),
2670 })?;
2671 let remote_addr = peer
2672 .current_addr()
2673 .cloned()
2674 .ok_or_else(|| NodeError::SendFailed {
2675 node_addr: *node_addr,
2676 reason: "no current_addr".into(),
2677 })?;
2678 #[cfg(any(target_os = "linux", target_os = "macos"))]
2679 let connected_socket = peer.connected_udp();
2680
2681 let timestamp_ms = peer.session_elapsed_ms();
2683
2684 let sp_flag = peer.mmp().map(|mmp| mmp.spin_bit.tx_bit()).unwrap_or(false);
2686 let mut flags = if sp_flag { FLAG_SP } else { 0 };
2687 if ce_flag {
2688 flags |= FLAG_CE;
2689 }
2690 if peer.current_k_bit() {
2691 flags |= FLAG_KEY_EPOCH;
2692 }
2693
2694 let session = peer
2695 .noise_session_mut()
2696 .ok_or_else(|| NodeError::SendFailed {
2697 node_addr: *node_addr,
2698 reason: "no noise session".into(),
2699 })?;
2700
2701 const INNER_TS_LEN: usize = 4;
2709 let counter = session.current_send_counter();
2710 let inner_len = INNER_TS_LEN + plaintext.len();
2711 let payload_len = inner_len as u16;
2712 let header = build_established_header(their_index, counter, flags, payload_len);
2713
2714 let transport_for_send = self
2733 .transports
2734 .get(&transport_id)
2735 .ok_or(NodeError::TransportNotFound(transport_id))?;
2736 let is_udp = matches!(transport_for_send, TransportHandle::Udp(_));
2737 if let Some(workers) = self.encrypt_workers.as_ref().cloned()
2738 && is_udp
2739 && let Some(cipher_clone) = session.send_cipher_clone()
2740 {
2741 {
2742 let reserved_counter =
2746 session
2747 .take_send_counter()
2748 .map_err(|e| NodeError::SendFailed {
2749 node_addr: *node_addr,
2750 reason: format!("counter reservation failed: {}", e),
2751 })?;
2752 debug_assert_eq!(reserved_counter, counter);
2753 let header =
2757 build_established_header(their_index, reserved_counter, flags, payload_len);
2758 let transport = transport_for_send;
2759 let send_target = {
2766 if let TransportHandle::Udp(udp) = transport {
2767 let socket_addr = {
2768 #[cfg(any(target_os = "linux", target_os = "macos"))]
2769 {
2770 match connected_socket.as_ref() {
2771 Some(socket) => Some(socket.peer_addr()),
2772 None => udp.resolve_for_off_task(&remote_addr).await.ok(),
2773 }
2774 }
2775 #[cfg(not(any(target_os = "linux", target_os = "macos")))]
2776 {
2777 udp.resolve_for_off_task(&remote_addr).await.ok()
2778 }
2779 };
2780 match (udp.async_socket(), socket_addr) {
2781 (Some(socket), Some(socket_addr)) => Some((socket, socket_addr)),
2782 _ => None,
2783 }
2784 } else {
2785 None
2786 }
2787 };
2788 if let Some((socket, socket_addr)) = send_target {
2789 let wire_capacity = ESTABLISHED_HEADER_SIZE + inner_len + 16;
2805 let mut wire_buf = Vec::with_capacity(wire_capacity);
2806 wire_buf.extend_from_slice(&header);
2807 wire_buf.extend_from_slice(×tamp_ms.to_le_bytes());
2808 wire_buf.extend_from_slice(plaintext);
2809 let predicted_bytes = wire_capacity;
2810 if let Some(peer) = self.peers.get_mut(node_addr) {
2817 peer.link_stats_mut().record_sent(predicted_bytes);
2818 if let Some(mmp) = peer.mmp_mut() {
2819 mmp.sender
2820 .record_sent(reserved_counter, timestamp_ms, predicted_bytes);
2821 }
2822 }
2823 workers.dispatch(self::encrypt_worker::FmpSendJob {
2824 cipher: cipher_clone,
2825 counter: reserved_counter,
2826 wire_buf,
2827 fsp_seal: None,
2828 socket,
2829 dest_addr: socket_addr,
2830 #[cfg(any(target_os = "linux", target_os = "macos"))]
2831 connected_socket,
2832 drop_on_backpressure: plaintext
2833 .first()
2834 .is_some_and(|ty| *ty == SessionMessageType::EndpointData.to_byte()),
2835 queued_at: crate::perf_profile::stamp(),
2836 });
2837 return Ok(());
2838 }
2839 }
2840 }
2841
2842 let inner_plaintext = prepend_inner_header(timestamp_ms, plaintext);
2847 let ciphertext = {
2849 let _t = crate::perf_profile::Timer::start(crate::perf_profile::Stage::FmpEncrypt);
2850 session
2851 .encrypt_with_aad(&inner_plaintext, &header)
2852 .map_err(|e| NodeError::SendFailed {
2853 node_addr: *node_addr,
2854 reason: format!("encryption failed: {}", e),
2855 })?
2856 };
2857
2858 let wire_packet = build_encrypted(&header, &ciphertext);
2859
2860 let send_result = {
2862 let _t = crate::perf_profile::Timer::start(crate::perf_profile::Stage::UdpSend);
2863 let transport = self
2864 .transports
2865 .get(&transport_id)
2866 .ok_or(NodeError::TransportNotFound(transport_id))?;
2867 transport.send(&remote_addr, &wire_packet).await
2868 };
2869 self.note_local_send_outcome(&send_result);
2870 let bytes_sent = send_result.map_err(|e| match e {
2871 TransportError::MtuExceeded { packet_size, mtu } => NodeError::MtuExceeded {
2872 node_addr: *node_addr,
2873 packet_size,
2874 mtu,
2875 },
2876 other => NodeError::SendFailed {
2877 node_addr: *node_addr,
2878 reason: format!("transport send: {}", other),
2879 },
2880 })?;
2881
2882 if let Some(peer) = self.peers.get_mut(node_addr) {
2884 peer.link_stats_mut().record_sent(bytes_sent);
2885 if let Some(mmp) = peer.mmp_mut() {
2887 mmp.sender.record_sent(counter, timestamp_ms, bytes_sent);
2888 }
2889 }
2890
2891 Ok(())
2892 }
2893}
2894
2895impl fmt::Debug for Node {
2896 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2897 f.debug_struct("Node")
2898 .field("node_addr", self.node_addr())
2899 .field("state", &self.state)
2900 .field("is_leaf_only", &self.is_leaf_only)
2901 .field("connections", &self.connection_count())
2902 .field("peers", &self.peer_count())
2903 .field("links", &self.link_count())
2904 .field("transports", &self.transport_count())
2905 .finish()
2906 }
2907}