1mod acl;
8mod bloom;
9mod decrypt_worker;
10mod discovery_rate_limit;
11mod encrypt_worker;
12mod handlers;
13mod lifecycle;
14mod rate_limit;
15mod retry;
16mod routing;
17mod routing_error_rate_limit;
18pub(crate) mod session;
19pub(crate) mod session_wire;
20pub(crate) mod stats;
21pub(crate) mod stats_history;
22#[cfg(test)]
23mod tests;
24mod tree;
25pub(crate) mod wire;
26
27use self::discovery_rate_limit::{DiscoveryBackoff, DiscoveryForwardRateLimiter};
28use self::rate_limit::HandshakeRateLimiter;
29use self::routing::{LearnedRouteTable, LearnedRouteTableSnapshot};
30use self::routing_error_rate_limit::RoutingErrorRateLimiter;
31use self::wire::{
32 ESTABLISHED_HEADER_SIZE, FLAG_CE, FLAG_KEY_EPOCH, FLAG_SP, build_encrypted,
33 build_established_header, prepend_inner_header,
34};
35use crate::bloom::BloomState;
36use crate::cache::CoordCache;
37use crate::config::RoutingMode;
38use crate::node::session::SessionEntry;
39use crate::peer::{ActivePeer, PeerConnection};
40#[cfg(any(target_os = "linux", target_os = "macos"))]
41use crate::transport::ethernet::EthernetTransport;
42use crate::transport::tcp::TcpTransport;
43use crate::transport::tor::TorTransport;
44use crate::transport::udp::UdpTransport;
45use crate::transport::{
46 Link, LinkId, PacketRx, PacketTx, TransportAddr, TransportError, TransportHandle, TransportId,
47};
48use crate::tree::TreeState;
49use crate::upper::hosts::HostMap;
50use crate::upper::icmp_rate_limit::IcmpRateLimiter;
51use crate::upper::tun::{TunError, TunOutboundRx, TunState, TunTx};
52use crate::utils::index::IndexAllocator;
53use crate::{
54 Config, ConfigError, FipsAddress, Identity, IdentityError, NodeAddr, PeerIdentity,
55 SessionMessageType, encode_npub,
56};
57use rand::Rng;
58use std::collections::{HashMap, HashSet, VecDeque};
59use std::fmt;
60use std::sync::Arc;
61use std::thread::JoinHandle;
62use thiserror::Error;
63use tracing::{debug, warn};
64
65#[derive(Debug, Error)]
67pub enum NodeError {
68 #[error("node not started")]
69 NotStarted,
70
71 #[error("node already started")]
72 AlreadyStarted,
73
74 #[error("node already stopped")]
75 AlreadyStopped,
76
77 #[error("transport not found: {0}")]
78 TransportNotFound(TransportId),
79
80 #[error("no transport available for type: {0}")]
81 NoTransportForType(String),
82
83 #[error("link not found: {0}")]
84 LinkNotFound(LinkId),
85
86 #[error("connection not found: {0}")]
87 ConnectionNotFound(LinkId),
88
89 #[error("peer not found: {0:?}")]
90 PeerNotFound(NodeAddr),
91
92 #[error("peer already exists: {0:?}")]
93 PeerAlreadyExists(NodeAddr),
94
95 #[error("connection already exists for link: {0}")]
96 ConnectionAlreadyExists(LinkId),
97
98 #[error("invalid peer npub '{npub}': {reason}")]
99 InvalidPeerNpub { npub: String, reason: String },
100
101 #[error("access denied: {0}")]
102 AccessDenied(String),
103
104 #[error("max connections exceeded: {max}")]
105 MaxConnectionsExceeded { max: usize },
106
107 #[error("max peers exceeded: {max}")]
108 MaxPeersExceeded { max: usize },
109
110 #[error("max links exceeded: {max}")]
111 MaxLinksExceeded { max: usize },
112
113 #[error("handshake incomplete for link {0}")]
114 HandshakeIncomplete(LinkId),
115
116 #[error("no session available for link {0}")]
117 NoSession(LinkId),
118
119 #[error("promotion failed for link {link_id}: {reason}")]
120 PromotionFailed { link_id: LinkId, reason: String },
121
122 #[error("send failed to {node_addr}: {reason}")]
123 SendFailed { node_addr: NodeAddr, reason: String },
124
125 #[error("mtu exceeded forwarding to {node_addr}: packet {packet_size} > mtu {mtu}")]
126 MtuExceeded {
127 node_addr: NodeAddr,
128 packet_size: usize,
129 mtu: u16,
130 },
131
132 #[error("config error: {0}")]
133 Config(#[from] ConfigError),
134
135 #[error("identity error: {0}")]
136 Identity(#[from] IdentityError),
137
138 #[error("TUN error: {0}")]
139 Tun(#[from] TunError),
140
141 #[error("index allocation failed: {0}")]
142 IndexAllocationFailed(String),
143
144 #[error("handshake failed: {0}")]
145 HandshakeFailed(String),
146
147 #[error("transport error: {0}")]
148 TransportError(String),
149
150 #[error("bootstrap handoff failed: {0}")]
151 BootstrapHandoff(String),
152}
153
154#[derive(Debug, Clone, PartialEq, Eq)]
156pub struct NodeDeliveredPacket {
157 pub source_node_addr: NodeAddr,
159 pub source_npub: Option<String>,
161 pub destination: FipsAddress,
163 pub packet: Vec<u8>,
165}
166
167#[derive(Debug, Clone)]
168struct IdentityCacheEntry {
169 node_addr: NodeAddr,
170 pubkey: secp256k1::PublicKey,
171 npub: String,
172 last_seen_ms: u64,
173}
174
175impl IdentityCacheEntry {
176 fn new(
177 node_addr: NodeAddr,
178 pubkey: secp256k1::PublicKey,
179 npub: String,
180 last_seen_ms: u64,
181 ) -> Self {
182 Self {
183 node_addr,
184 pubkey,
185 npub,
186 last_seen_ms,
187 }
188 }
189}
190
191#[derive(Debug)]
193pub struct ExternalPacketIo {
194 pub outbound_tx: crate::upper::tun::TunOutboundTx,
196 pub inbound_rx: tokio::sync::mpsc::Receiver<NodeDeliveredPacket>,
198}
199
200#[derive(Debug)]
202pub(crate) struct EndpointDataIo {
203 pub(crate) command_tx: tokio::sync::mpsc::Sender<NodeEndpointCommand>,
212 pub(crate) event_rx: tokio::sync::mpsc::UnboundedReceiver<NodeEndpointEvent>,
222 pub(crate) event_tx: tokio::sync::mpsc::UnboundedSender<NodeEndpointEvent>,
228}
229
230fn endpoint_data_command_capacity(requested: usize) -> usize {
231 if let Ok(raw) = std::env::var("FIPS_ENDPOINT_DATA_QUEUE_CAP")
232 && let Ok(value) = raw.trim().parse::<usize>()
233 && value > 0
234 {
235 return value;
236 }
237
238 requested.max(1).max(32_768)
239}
240
241#[derive(Debug)]
243pub(crate) enum NodeEndpointCommand {
244 Send {
248 remote: PeerIdentity,
249 payload: Vec<u8>,
250 queued_at: Option<std::time::Instant>,
251 response_tx: tokio::sync::oneshot::Sender<Result<(), NodeError>>,
252 },
253 SendOneway {
259 remote: PeerIdentity,
260 payload: Vec<u8>,
261 queued_at: Option<std::time::Instant>,
262 },
263 PeerSnapshot {
264 response_tx: tokio::sync::oneshot::Sender<Vec<NodeEndpointPeer>>,
265 },
266}
267
268#[derive(Debug)]
270pub(crate) enum NodeEndpointEvent {
271 Data {
272 source_node_addr: NodeAddr,
273 source_npub: Option<String>,
274 payload: Vec<u8>,
275 queued_at: Option<std::time::Instant>,
276 },
277}
278
279#[derive(Debug, Clone, PartialEq, Eq)]
281pub(crate) struct NodeEndpointPeer {
282 pub(crate) npub: String,
283 pub(crate) transport_addr: Option<String>,
284 pub(crate) transport_type: Option<String>,
285 pub(crate) link_id: u64,
286 pub(crate) srtt_ms: Option<u64>,
287 pub(crate) packets_sent: u64,
288 pub(crate) packets_recv: u64,
289 pub(crate) bytes_sent: u64,
290 pub(crate) bytes_recv: u64,
291}
292
293#[derive(Clone, Copy, Debug, PartialEq, Eq)]
295pub enum NodeState {
296 Created,
298 Starting,
300 Running,
302 Stopping,
304 Stopped,
306}
307
308impl NodeState {
309 pub fn is_operational(&self) -> bool {
311 matches!(self, NodeState::Running)
312 }
313
314 pub fn can_start(&self) -> bool {
316 matches!(self, NodeState::Created | NodeState::Stopped)
317 }
318
319 pub fn can_stop(&self) -> bool {
321 matches!(self, NodeState::Running)
322 }
323}
324
325impl fmt::Display for NodeState {
326 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
327 let s = match self {
328 NodeState::Created => "created",
329 NodeState::Starting => "starting",
330 NodeState::Running => "running",
331 NodeState::Stopping => "stopping",
332 NodeState::Stopped => "stopped",
333 };
334 write!(f, "{}", s)
335 }
336}
337
338#[derive(Clone, Debug)]
345pub(crate) struct RecentRequest {
346 pub(crate) from_peer: NodeAddr,
348 pub(crate) timestamp_ms: u64,
350 pub(crate) response_forwarded: bool,
354}
355
356impl RecentRequest {
357 pub(crate) fn new(from_peer: NodeAddr, timestamp_ms: u64) -> Self {
358 Self {
359 from_peer,
360 timestamp_ms,
361 response_forwarded: false,
362 }
363 }
364
365 pub(crate) fn is_expired(&self, current_time_ms: u64, expiry_ms: u64) -> bool {
367 current_time_ms.saturating_sub(self.timestamp_ms) > expiry_ms
368 }
369}
370
371type AddrKey = (TransportId, TransportAddr);
373
374#[derive(Debug, Default)]
379struct TransportDropState {
380 prev_drops: u64,
382 dropping: bool,
384}
385
386struct PendingConnect {
392 link_id: LinkId,
394 transport_id: TransportId,
396 remote_addr: TransportAddr,
398 peer_identity: PeerIdentity,
400}
401
402pub struct Node {
416 identity: Identity,
419
420 startup_epoch: [u8; 8],
423
424 started_at: std::time::Instant,
426
427 config: Config,
430
431 state: NodeState,
434
435 is_leaf_only: bool,
437
438 tree_state: TreeState,
441
442 bloom_state: BloomState,
445
446 coord_cache: CoordCache,
449 learned_routes: LearnedRouteTable,
451 recent_requests: HashMap<u64, RecentRequest>,
454 path_mtu_lookup: Arc<std::sync::RwLock<HashMap<crate::FipsAddress, u16>>>,
460
461 transports: HashMap<TransportId, TransportHandle>,
464 transport_drops: HashMap<TransportId, TransportDropState>,
466 links: HashMap<LinkId, Link>,
468 addr_to_link: HashMap<AddrKey, LinkId>,
470
471 packet_tx: Option<PacketTx>,
474 packet_rx: Option<PacketRx>,
476
477 connections: HashMap<LinkId, PeerConnection>,
481
482 peers: HashMap<NodeAddr, ActivePeer>,
486
487 sessions: HashMap<NodeAddr, SessionEntry>,
491
492 identity_cache: HashMap<[u8; 15], IdentityCacheEntry>,
496
497 pending_tun_packets: HashMap<NodeAddr, VecDeque<Vec<u8>>>,
501 pending_endpoint_data: HashMap<NodeAddr, VecDeque<Vec<u8>>>,
503 pending_lookups: HashMap<NodeAddr, handlers::discovery::PendingLookup>,
507
508 max_connections: usize,
511 max_peers: usize,
513 max_links: usize,
515
516 next_link_id: u64,
519 next_transport_id: u32,
521
522 stats: stats::NodeStats,
525
526 stats_history: stats_history::StatsHistory,
528
529 tun_state: TunState,
532 tun_name: Option<String>,
534 tun_tx: Option<TunTx>,
536 tun_outbound_rx: Option<TunOutboundRx>,
538 external_packet_tx: Option<tokio::sync::mpsc::Sender<NodeDeliveredPacket>>,
540 endpoint_command_rx: Option<tokio::sync::mpsc::Receiver<NodeEndpointCommand>>,
542 endpoint_event_tx: Option<tokio::sync::mpsc::UnboundedSender<NodeEndpointEvent>>,
544 encrypt_workers: Option<encrypt_worker::EncryptWorkerPool>,
550 decrypt_workers: Option<decrypt_worker::DecryptWorkerPool>,
553 decrypt_registered_sessions: std::collections::HashSet<(TransportId, u32)>,
562 decrypt_fallback_rx:
566 Option<tokio::sync::mpsc::UnboundedReceiver<decrypt_worker::DecryptFallback>>,
567 decrypt_fallback_tx: tokio::sync::mpsc::UnboundedSender<decrypt_worker::DecryptFallback>,
568 tun_reader_handle: Option<JoinHandle<()>>,
570 tun_writer_handle: Option<JoinHandle<()>>,
572 #[cfg(target_os = "macos")]
575 tun_shutdown_fd: Option<std::os::unix::io::RawFd>,
576
577 dns_identity_rx: Option<crate::upper::dns::DnsIdentityRx>,
580 dns_task: Option<tokio::task::JoinHandle<()>>,
582
583 index_allocator: IndexAllocator,
586 peers_by_index: HashMap<(TransportId, u32), NodeAddr>,
589 pending_outbound: HashMap<(TransportId, u32), LinkId>,
592
593 msg1_rate_limiter: HandshakeRateLimiter,
596 icmp_rate_limiter: IcmpRateLimiter,
598 routing_error_rate_limiter: RoutingErrorRateLimiter,
600 coords_response_rate_limiter: RoutingErrorRateLimiter,
602 discovery_backoff: DiscoveryBackoff,
604 discovery_forward_limiter: DiscoveryForwardRateLimiter,
606
607 pending_connects: Vec<PendingConnect>,
613
614 retry_pending: HashMap<NodeAddr, retry::RetryState>,
620
621 nostr_discovery: Option<Arc<crate::discovery::nostr::NostrDiscovery>>,
623 lan_discovery: Option<Arc<crate::discovery::lan::LanDiscovery>>,
628 nostr_discovery_started_at_ms: Option<u64>,
633 startup_open_discovery_sweep_done: bool,
637 bootstrap_transports: HashSet<TransportId>,
639 bootstrap_transport_npubs: HashMap<TransportId, String>,
646
647 last_parent_reeval: Option<crate::time::Instant>,
650
651 last_congestion_log: Option<std::time::Instant>,
654
655 estimated_mesh_size: Option<u64>,
658 last_mesh_size_log: Option<std::time::Instant>,
660
661 last_self_warn: Option<std::time::Instant>,
667
668 last_local_send_failure_at: Option<std::time::Instant>,
676
677 peer_aliases: HashMap<NodeAddr, String>,
681
682 peer_acl: acl::PeerAclReloader,
684
685 host_map: Arc<HostMap>,
689}
690
691impl Node {
692 pub fn new(config: Config) -> Result<Self, NodeError> {
694 config.validate()?;
695 let identity = config.create_identity()?;
696 let node_addr = *identity.node_addr();
697 let is_leaf_only = config.is_leaf_only();
698
699 let (decrypt_fallback_tx, decrypt_fallback_rx) = tokio::sync::mpsc::unbounded_channel();
700 let decrypt_fallback_rx = Some(decrypt_fallback_rx);
701
702 let mut startup_epoch = [0u8; 8];
703 rand::rng().fill_bytes(&mut startup_epoch);
704
705 let mut bloom_state = if is_leaf_only {
706 BloomState::leaf_only(node_addr)
707 } else {
708 BloomState::new(node_addr)
709 };
710 bloom_state.set_update_debounce_ms(config.node.bloom.update_debounce_ms);
711
712 let tun_state = if config.tun.enabled {
713 TunState::Configured
714 } else {
715 TunState::Disabled
716 };
717
718 let mut tree_state = TreeState::new(node_addr);
720 tree_state.set_parent_hysteresis(config.node.tree.parent_hysteresis);
721 tree_state.set_hold_down(config.node.tree.hold_down_secs);
722 tree_state.set_flap_dampening(
723 config.node.tree.flap_threshold,
724 config.node.tree.flap_window_secs,
725 config.node.tree.flap_dampening_secs,
726 );
727 tree_state
728 .sign_declaration(&identity)
729 .expect("signing own declaration should never fail");
730
731 let coord_cache = CoordCache::new(
732 config.node.cache.coord_size,
733 config.node.cache.coord_ttl_secs * 1000,
734 );
735 let rl = &config.node.rate_limit;
736 let msg1_rate_limiter = HandshakeRateLimiter::with_params(
737 rate_limit::TokenBucket::with_params(rl.handshake_burst, rl.handshake_rate),
738 config.node.limits.max_pending_inbound,
739 );
740
741 let max_connections = config.node.limits.max_connections;
742 let max_peers = config.node.limits.max_peers;
743 let max_links = config.node.limits.max_links;
744 let coords_response_interval_ms = config.node.session.coords_response_interval_ms;
745 let backoff_base_secs = config.node.discovery.backoff_base_secs;
746 let backoff_max_secs = config.node.discovery.backoff_max_secs;
747 let forward_min_interval_secs = config.node.discovery.forward_min_interval_secs;
748
749 let (host_map, peer_acl) = Self::host_map_and_peer_acl(&config);
750
751 Ok(Self {
752 identity,
753 startup_epoch,
754 started_at: std::time::Instant::now(),
755 config,
756 state: NodeState::Created,
757 is_leaf_only,
758 tree_state,
759 bloom_state,
760 coord_cache,
761 learned_routes: LearnedRouteTable::default(),
762 recent_requests: HashMap::new(),
763 transports: HashMap::new(),
764 transport_drops: HashMap::new(),
765 links: HashMap::new(),
766 addr_to_link: HashMap::new(),
767 packet_tx: None,
768 packet_rx: None,
769 connections: HashMap::new(),
770 peers: HashMap::new(),
771 sessions: HashMap::new(),
772 identity_cache: HashMap::new(),
773 pending_tun_packets: HashMap::new(),
774 pending_endpoint_data: HashMap::new(),
775 pending_lookups: HashMap::new(),
776 max_connections,
777 max_peers,
778 max_links,
779 next_link_id: 1,
780 next_transport_id: 1,
781 stats: stats::NodeStats::new(),
782 stats_history: stats_history::StatsHistory::new(),
783 tun_state,
784 tun_name: None,
785 tun_tx: None,
786 tun_outbound_rx: None,
787 external_packet_tx: None,
788 endpoint_command_rx: None,
789 endpoint_event_tx: None,
790 encrypt_workers: None,
791 decrypt_workers: None,
792 decrypt_registered_sessions: std::collections::HashSet::new(),
793 decrypt_fallback_tx,
794 decrypt_fallback_rx,
795 tun_reader_handle: None,
796 tun_writer_handle: None,
797 #[cfg(target_os = "macos")]
798 tun_shutdown_fd: None,
799 dns_identity_rx: None,
800 dns_task: None,
801 index_allocator: IndexAllocator::new(),
802 peers_by_index: HashMap::new(),
803 pending_outbound: HashMap::new(),
804 msg1_rate_limiter,
805 icmp_rate_limiter: IcmpRateLimiter::new(),
806 routing_error_rate_limiter: RoutingErrorRateLimiter::new(),
807 coords_response_rate_limiter: RoutingErrorRateLimiter::with_interval(
808 std::time::Duration::from_millis(coords_response_interval_ms),
809 ),
810 discovery_backoff: DiscoveryBackoff::with_params(backoff_base_secs, backoff_max_secs),
811 discovery_forward_limiter: DiscoveryForwardRateLimiter::with_interval(
812 std::time::Duration::from_secs(forward_min_interval_secs),
813 ),
814 pending_connects: Vec::new(),
815 retry_pending: HashMap::new(),
816 nostr_discovery: None,
817 nostr_discovery_started_at_ms: None,
818 lan_discovery: None,
819 startup_open_discovery_sweep_done: false,
820 bootstrap_transports: HashSet::new(),
821 bootstrap_transport_npubs: HashMap::new(),
822 last_parent_reeval: None,
823 last_congestion_log: None,
824 estimated_mesh_size: None,
825 last_mesh_size_log: None,
826 last_self_warn: None,
827 last_local_send_failure_at: None,
828 peer_aliases: HashMap::new(),
829 peer_acl,
830 host_map,
831 path_mtu_lookup: Arc::new(std::sync::RwLock::new(HashMap::new())),
832 })
833 }
834
835 pub fn with_identity(identity: Identity, config: Config) -> Result<Self, NodeError> {
840 config.validate()?;
841 let node_addr = *identity.node_addr();
842
843 let (decrypt_fallback_tx, decrypt_fallback_rx) = tokio::sync::mpsc::unbounded_channel();
844 let decrypt_fallback_rx = Some(decrypt_fallback_rx);
845
846 let mut startup_epoch = [0u8; 8];
847 rand::rng().fill_bytes(&mut startup_epoch);
848
849 let tun_state = if config.tun.enabled {
850 TunState::Configured
851 } else {
852 TunState::Disabled
853 };
854
855 let mut tree_state = TreeState::new(node_addr);
857 tree_state.set_parent_hysteresis(config.node.tree.parent_hysteresis);
858 tree_state.set_hold_down(config.node.tree.hold_down_secs);
859 tree_state.set_flap_dampening(
860 config.node.tree.flap_threshold,
861 config.node.tree.flap_window_secs,
862 config.node.tree.flap_dampening_secs,
863 );
864 tree_state
865 .sign_declaration(&identity)
866 .expect("signing own declaration should never fail");
867
868 let mut bloom_state = BloomState::new(node_addr);
869 bloom_state.set_update_debounce_ms(config.node.bloom.update_debounce_ms);
870
871 let coord_cache = CoordCache::new(
872 config.node.cache.coord_size,
873 config.node.cache.coord_ttl_secs * 1000,
874 );
875 let rl = &config.node.rate_limit;
876 let msg1_rate_limiter = HandshakeRateLimiter::with_params(
877 rate_limit::TokenBucket::with_params(rl.handshake_burst, rl.handshake_rate),
878 config.node.limits.max_pending_inbound,
879 );
880
881 let max_connections = config.node.limits.max_connections;
882 let max_peers = config.node.limits.max_peers;
883 let max_links = config.node.limits.max_links;
884 let coords_response_interval_ms = config.node.session.coords_response_interval_ms;
885
886 let (host_map, peer_acl) = Self::host_map_and_peer_acl(&config);
887
888 Ok(Self {
889 identity,
890 startup_epoch,
891 started_at: std::time::Instant::now(),
892 config,
893 state: NodeState::Created,
894 is_leaf_only: false,
895 tree_state,
896 bloom_state,
897 coord_cache,
898 learned_routes: LearnedRouteTable::default(),
899 recent_requests: HashMap::new(),
900 transports: HashMap::new(),
901 transport_drops: HashMap::new(),
902 links: HashMap::new(),
903 addr_to_link: HashMap::new(),
904 packet_tx: None,
905 packet_rx: None,
906 connections: HashMap::new(),
907 peers: HashMap::new(),
908 sessions: HashMap::new(),
909 identity_cache: HashMap::new(),
910 pending_tun_packets: HashMap::new(),
911 pending_endpoint_data: HashMap::new(),
912 pending_lookups: HashMap::new(),
913 max_connections,
914 max_peers,
915 max_links,
916 next_link_id: 1,
917 next_transport_id: 1,
918 stats: stats::NodeStats::new(),
919 stats_history: stats_history::StatsHistory::new(),
920 tun_state,
921 tun_name: None,
922 tun_tx: None,
923 tun_outbound_rx: None,
924 external_packet_tx: None,
925 endpoint_command_rx: None,
926 endpoint_event_tx: None,
927 encrypt_workers: None,
928 decrypt_workers: None,
929 decrypt_registered_sessions: std::collections::HashSet::new(),
930 decrypt_fallback_tx,
931 decrypt_fallback_rx,
932 tun_reader_handle: None,
933 tun_writer_handle: None,
934 #[cfg(target_os = "macos")]
935 tun_shutdown_fd: None,
936 dns_identity_rx: None,
937 dns_task: None,
938 index_allocator: IndexAllocator::new(),
939 peers_by_index: HashMap::new(),
940 pending_outbound: HashMap::new(),
941 msg1_rate_limiter,
942 icmp_rate_limiter: IcmpRateLimiter::new(),
943 routing_error_rate_limiter: RoutingErrorRateLimiter::new(),
944 coords_response_rate_limiter: RoutingErrorRateLimiter::with_interval(
945 std::time::Duration::from_millis(coords_response_interval_ms),
946 ),
947 discovery_backoff: DiscoveryBackoff::new(),
948 discovery_forward_limiter: DiscoveryForwardRateLimiter::new(),
949 pending_connects: Vec::new(),
950 retry_pending: HashMap::new(),
951 nostr_discovery: None,
952 nostr_discovery_started_at_ms: None,
953 lan_discovery: None,
954 startup_open_discovery_sweep_done: false,
955 bootstrap_transports: HashSet::new(),
956 bootstrap_transport_npubs: HashMap::new(),
957 last_parent_reeval: None,
958 last_congestion_log: None,
959 estimated_mesh_size: None,
960 last_mesh_size_log: None,
961 last_self_warn: None,
962 last_local_send_failure_at: None,
963 peer_aliases: HashMap::new(),
964 peer_acl,
965 host_map,
966 path_mtu_lookup: Arc::new(std::sync::RwLock::new(HashMap::new())),
967 })
968 }
969
970 pub fn leaf_only(config: Config) -> Result<Self, NodeError> {
972 let mut node = Self::new(config)?;
973 node.is_leaf_only = true;
974 node.bloom_state = BloomState::leaf_only(*node.identity.node_addr());
975 Ok(node)
976 }
977
978 fn host_map_and_peer_acl(config: &Config) -> (Arc<HostMap>, acl::PeerAclReloader) {
979 let base_host_map = HostMap::from_peer_configs(config.peers());
980 if !config.node.system_files_enabled {
981 return (
982 Arc::new(base_host_map.clone()),
983 acl::PeerAclReloader::memory_only(base_host_map),
984 );
985 }
986
987 let mut host_map = base_host_map.clone();
988 let hosts_path = std::path::PathBuf::from(crate::upper::hosts::DEFAULT_HOSTS_PATH);
989 let hosts_file = HostMap::load_hosts_file(std::path::Path::new(
990 crate::upper::hosts::DEFAULT_HOSTS_PATH,
991 ));
992 host_map.merge(hosts_file);
993 let peer_acl = acl::PeerAclReloader::with_alias_sources(
994 std::path::PathBuf::from(acl::DEFAULT_PEERS_ALLOW_PATH),
995 std::path::PathBuf::from(acl::DEFAULT_PEERS_DENY_PATH),
996 base_host_map,
997 hosts_path,
998 );
999 (Arc::new(host_map), peer_acl)
1000 }
1001
1002 async fn create_transports(&mut self, packet_tx: &PacketTx) -> Vec<TransportHandle> {
1006 let mut transports = Vec::new();
1007
1008 let udp_instances: Vec<_> = self
1010 .config
1011 .transports
1012 .udp
1013 .iter()
1014 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1015 .collect();
1016
1017 for (name, udp_config) in udp_instances {
1019 let transport_id = self.allocate_transport_id();
1020 let udp = UdpTransport::new(transport_id, name, udp_config, packet_tx.clone());
1021 transports.push(TransportHandle::Udp(udp));
1022 }
1023
1024 #[cfg(feature = "sim-transport")]
1025 {
1026 let sim_instances: Vec<_> = self
1027 .config
1028 .transports
1029 .sim
1030 .iter()
1031 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1032 .collect();
1033
1034 for (name, sim_config) in sim_instances {
1035 let transport_id = self.allocate_transport_id();
1036 let sim = crate::transport::sim::SimTransport::new(
1037 transport_id,
1038 name,
1039 sim_config,
1040 packet_tx.clone(),
1041 );
1042 transports.push(TransportHandle::Sim(sim));
1043 }
1044 }
1045
1046 #[cfg(any(target_os = "linux", target_os = "macos"))]
1048 {
1049 let eth_instances: Vec<_> = self
1050 .config
1051 .transports
1052 .ethernet
1053 .iter()
1054 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1055 .collect();
1056 let xonly = self.identity.pubkey();
1057 for (name, eth_config) in eth_instances {
1058 let transport_id = self.allocate_transport_id();
1059 let mut eth =
1060 EthernetTransport::new(transport_id, name, eth_config, packet_tx.clone());
1061 eth.set_local_pubkey(xonly);
1062 transports.push(TransportHandle::Ethernet(eth));
1063 }
1064 }
1065
1066 let tcp_instances: Vec<_> = self
1068 .config
1069 .transports
1070 .tcp
1071 .iter()
1072 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1073 .collect();
1074
1075 for (name, tcp_config) in tcp_instances {
1076 let transport_id = self.allocate_transport_id();
1077 let tcp = TcpTransport::new(transport_id, name, tcp_config, packet_tx.clone());
1078 transports.push(TransportHandle::Tcp(tcp));
1079 }
1080
1081 let tor_instances: Vec<_> = self
1083 .config
1084 .transports
1085 .tor
1086 .iter()
1087 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1088 .collect();
1089
1090 for (name, tor_config) in tor_instances {
1091 let transport_id = self.allocate_transport_id();
1092 let tor = TorTransport::new(transport_id, name, tor_config, packet_tx.clone());
1093 transports.push(TransportHandle::Tor(tor));
1094 }
1095
1096 #[cfg(bluer_available)]
1098 {
1099 let ble_instances: Vec<_> = self
1100 .config
1101 .transports
1102 .ble
1103 .iter()
1104 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1105 .collect();
1106
1107 #[cfg(all(bluer_available, not(test)))]
1108 for (name, ble_config) in ble_instances {
1109 let transport_id = self.allocate_transport_id();
1110 let adapter = ble_config.adapter().to_string();
1111 let mtu = ble_config.mtu();
1112 match crate::transport::ble::io::BluerIo::new(&adapter, mtu).await {
1113 Ok(io) => {
1114 let mut ble = crate::transport::ble::BleTransport::new(
1115 transport_id,
1116 name,
1117 ble_config,
1118 io,
1119 packet_tx.clone(),
1120 );
1121 ble.set_local_pubkey(self.identity.pubkey().serialize());
1122 transports.push(TransportHandle::Ble(ble));
1123 }
1124 Err(e) => {
1125 tracing::warn!(adapter = %adapter, error = %e, "failed to initialize BLE adapter");
1126 }
1127 }
1128 }
1129
1130 #[cfg(any(not(bluer_available), test))]
1131 if !ble_instances.is_empty() {
1132 #[cfg(not(test))]
1133 tracing::warn!("BLE transport configured but this build lacks BlueZ support");
1134 }
1135 }
1136
1137 transports
1138 }
1139
1140 fn find_transport_for_type(&self, transport_type: &str) -> Option<TransportId> {
1150 self.transports
1151 .iter()
1152 .filter(|(id, handle)| {
1153 handle.transport_type().name == transport_type
1154 && handle.is_operational()
1155 && !self.bootstrap_transports.contains(id)
1156 })
1157 .min_by_key(|(id, _)| id.as_u32())
1158 .map(|(id, _)| *id)
1159 }
1160
1161 #[allow(unused_variables)]
1167 fn resolve_ethernet_addr(
1168 &self,
1169 addr_str: &str,
1170 ) -> Result<(TransportId, TransportAddr), NodeError> {
1171 #[cfg(any(target_os = "linux", target_os = "macos"))]
1172 {
1173 let (iface, mac_str) = addr_str.split_once('/').ok_or_else(|| {
1174 NodeError::NoTransportForType(format!(
1175 "invalid Ethernet address format '{}': expected 'interface/mac'",
1176 addr_str
1177 ))
1178 })?;
1179
1180 let transport_id = self
1182 .transports
1183 .iter()
1184 .find(|(_, handle)| {
1185 handle.transport_type().name == "ethernet"
1186 && handle.is_operational()
1187 && handle.interface_name() == Some(iface)
1188 })
1189 .map(|(id, _)| *id)
1190 .ok_or_else(|| {
1191 NodeError::NoTransportForType(format!(
1192 "no operational Ethernet transport for interface '{}'",
1193 iface
1194 ))
1195 })?;
1196
1197 let mac = crate::transport::ethernet::parse_mac_string(mac_str).map_err(|e| {
1198 NodeError::NoTransportForType(format!("invalid MAC in '{}': {}", addr_str, e))
1199 })?;
1200
1201 Ok((transport_id, TransportAddr::from_bytes(&mac)))
1202 }
1203 #[cfg(not(any(target_os = "linux", target_os = "macos")))]
1204 {
1205 Err(NodeError::NoTransportForType(
1206 "Ethernet transport is not supported on this platform".to_string(),
1207 ))
1208 }
1209 }
1210
1211 #[cfg(bluer_available)]
1215 fn resolve_ble_addr(&self, addr_str: &str) -> Result<(TransportId, TransportAddr), NodeError> {
1216 let ta = TransportAddr::from_string(addr_str);
1217 let adapter = crate::transport::ble::addr::adapter_from_addr(&ta).ok_or_else(|| {
1218 NodeError::NoTransportForType(format!(
1219 "invalid BLE address format '{}': expected 'adapter/mac'",
1220 addr_str
1221 ))
1222 })?;
1223
1224 let transport_id = self
1226 .transports
1227 .iter()
1228 .find(|(_, handle)| handle.transport_type().name == "ble" && handle.is_operational())
1229 .map(|(id, _)| *id)
1230 .ok_or_else(|| {
1231 NodeError::NoTransportForType(format!(
1232 "no operational BLE transport for adapter '{}'",
1233 adapter
1234 ))
1235 })?;
1236
1237 crate::transport::ble::addr::BleAddr::parse(addr_str).map_err(|e| {
1239 NodeError::NoTransportForType(format!("invalid BLE address '{}': {}", addr_str, e))
1240 })?;
1241
1242 Ok((transport_id, TransportAddr::from_string(addr_str)))
1243 }
1244
1245 pub fn identity(&self) -> &Identity {
1249 &self.identity
1250 }
1251
1252 pub fn node_addr(&self) -> &NodeAddr {
1254 self.identity.node_addr()
1255 }
1256
1257 pub fn npub(&self) -> String {
1259 self.identity.npub()
1260 }
1261
1262 pub(crate) fn peer_display_name(&self, addr: &NodeAddr) -> String {
1271 if let Some(hostname) = self.host_map.lookup_hostname(addr) {
1272 return hostname.to_string();
1273 }
1274 if let Some(name) = self.peer_aliases.get(addr) {
1275 return name.clone();
1276 }
1277 if let Some(peer) = self.peers.get(addr) {
1278 return peer.identity().short_npub();
1279 }
1280 if let Some(entry) = self.sessions.get(addr) {
1281 let (xonly, _) = entry.remote_pubkey().x_only_public_key();
1282 return PeerIdentity::from_pubkey(xonly).short_npub();
1283 }
1284 addr.short_hex()
1285 }
1286
1287 pub(in crate::node) fn deregister_session_index(&mut self, cache_key: (TransportId, u32)) {
1299 let owning_peer = self.peers_by_index.get(&cache_key).copied();
1303 self.peers_by_index.remove(&cache_key);
1304 if self.decrypt_registered_sessions.remove(&cache_key)
1305 && let Some(workers) = self.decrypt_workers.as_ref()
1306 {
1307 workers.unregister_session(cache_key);
1308 }
1309 if let Some(peer_addr) = owning_peer {
1320 let peer_has_other_index = self
1321 .peers_by_index
1322 .values()
1323 .any(|other| *other == peer_addr);
1324 if !peer_has_other_index {
1325 self.clear_connected_udp_for_peer(&peer_addr);
1326 }
1327 }
1328 }
1329
1330 pub(in crate::node) fn ensure_current_session_index_registered(
1339 &mut self,
1340 node_addr: &NodeAddr,
1341 context: &'static str,
1342 ) -> bool {
1343 let Some(peer) = self.peers.get(node_addr) else {
1344 return false;
1345 };
1346 let Some(transport_id) = peer.transport_id() else {
1347 warn!(
1348 peer = %self.peer_display_name(node_addr),
1349 context,
1350 "Cannot register current session index without transport id"
1351 );
1352 return false;
1353 };
1354 let Some(our_index) = peer.our_index() else {
1355 warn!(
1356 peer = %self.peer_display_name(node_addr),
1357 context,
1358 "Cannot register current session index without local index"
1359 );
1360 return false;
1361 };
1362
1363 let cache_key = (transport_id, our_index.as_u32());
1364 match self.peers_by_index.get(&cache_key).copied() {
1365 Some(existing) if existing == *node_addr => true,
1366 Some(existing) => {
1367 warn!(
1368 peer = %self.peer_display_name(node_addr),
1369 previous_owner = %self.peer_display_name(&existing),
1370 transport_id = %transport_id,
1371 our_index = %our_index,
1372 context,
1373 "Repairing current session index with stale owner"
1374 );
1375 self.peers_by_index.insert(cache_key, *node_addr);
1376 true
1377 }
1378 None => {
1379 warn!(
1380 peer = %self.peer_display_name(node_addr),
1381 transport_id = %transport_id,
1382 our_index = %our_index,
1383 context,
1384 "Repairing missing current session index"
1385 );
1386 self.peers_by_index.insert(cache_key, *node_addr);
1387 true
1388 }
1389 }
1390 }
1391
1392 pub fn config(&self) -> &Config {
1396 &self.config
1397 }
1398
1399 pub fn effective_ipv6_mtu(&self) -> u16 {
1405 crate::upper::icmp::effective_ipv6_mtu(self.transport_mtu())
1406 }
1407
1408 pub fn transport_mtu(&self) -> u16 {
1425 let min_operational = self
1426 .transports
1427 .values()
1428 .filter(|h| h.is_operational())
1429 .map(|h| h.mtu())
1430 .min();
1431 if let Some(mtu) = min_operational {
1432 return mtu;
1433 }
1434 if let Some((_, cfg)) = self.config.transports.udp.iter().next() {
1436 return cfg.mtu();
1437 }
1438 1280
1439 }
1440
1441 pub fn state(&self) -> NodeState {
1445 self.state
1446 }
1447
1448 pub fn uptime(&self) -> std::time::Duration {
1450 self.started_at.elapsed()
1451 }
1452
1453 pub fn is_running(&self) -> bool {
1455 self.state.is_operational()
1456 }
1457
1458 pub fn is_leaf_only(&self) -> bool {
1460 self.is_leaf_only
1461 }
1462
1463 pub fn tree_state(&self) -> &TreeState {
1467 &self.tree_state
1468 }
1469
1470 pub fn tree_state_mut(&mut self) -> &mut TreeState {
1472 &mut self.tree_state
1473 }
1474
1475 pub fn bloom_state(&self) -> &BloomState {
1479 &self.bloom_state
1480 }
1481
1482 pub fn bloom_state_mut(&mut self) -> &mut BloomState {
1484 &mut self.bloom_state
1485 }
1486
1487 pub fn estimated_mesh_size(&self) -> Option<u64> {
1491 self.estimated_mesh_size
1492 }
1493
1494 pub(crate) fn compute_mesh_size(&mut self) {
1500 let my_addr = *self.tree_state.my_node_addr();
1501 let parent_id = *self.tree_state.my_declaration().parent_id();
1502 let is_root = self.tree_state.is_root();
1503
1504 let max_fpr = self.config.node.bloom.max_inbound_fpr;
1505 let mut total: f64 = 1.0; let mut child_count: u32 = 0;
1507 let mut has_data = false;
1508
1509 if !is_root
1515 && let Some(parent) = self.peers.get(&parent_id)
1516 && let Some(filter) = parent.inbound_filter()
1517 {
1518 match filter.estimated_count(max_fpr) {
1519 Some(n) => {
1520 total += n;
1521 has_data = true;
1522 }
1523 None => {
1524 self.estimated_mesh_size = None;
1525 return;
1526 }
1527 }
1528 }
1529
1530 for (peer_addr, peer) in &self.peers {
1532 if let Some(decl) = self.tree_state.peer_declaration(peer_addr)
1533 && *decl.parent_id() == my_addr
1534 {
1535 child_count += 1;
1536 if let Some(filter) = peer.inbound_filter() {
1537 match filter.estimated_count(max_fpr) {
1538 Some(n) => {
1539 total += n;
1540 has_data = true;
1541 }
1542 None => {
1543 self.estimated_mesh_size = None;
1544 return;
1545 }
1546 }
1547 }
1548 }
1549 }
1550
1551 if !has_data {
1552 self.estimated_mesh_size = None;
1553 return;
1554 }
1555
1556 let size = total.round() as u64;
1557 self.estimated_mesh_size = Some(size);
1558
1559 let now = std::time::Instant::now();
1561 let should_log = match self.last_mesh_size_log {
1562 None => true,
1563 Some(last) => {
1564 now.duration_since(last)
1565 >= std::time::Duration::from_secs(self.config.node.mmp.log_interval_secs)
1566 }
1567 };
1568 if should_log {
1569 tracing::debug!(
1570 estimated_mesh_size = size,
1571 peers = self.peers.len(),
1572 children = child_count,
1573 "Mesh size estimate"
1574 );
1575 self.last_mesh_size_log = Some(now);
1576 }
1577 }
1578
1579 pub fn coord_cache(&self) -> &CoordCache {
1583 &self.coord_cache
1584 }
1585
1586 pub fn coord_cache_mut(&mut self) -> &mut CoordCache {
1588 &mut self.coord_cache
1589 }
1590
1591 pub fn stats(&self) -> &stats::NodeStats {
1595 &self.stats
1596 }
1597
1598 pub(crate) fn stats_mut(&mut self) -> &mut stats::NodeStats {
1600 &mut self.stats
1601 }
1602
1603 pub fn stats_history(&self) -> &stats_history::StatsHistory {
1605 &self.stats_history
1606 }
1607
1608 pub(crate) fn record_stats_history(&mut self) {
1611 let fwd = &self.stats.forwarding;
1612 let peers_with_mmp: Vec<f64> = self
1613 .peers
1614 .values()
1615 .filter_map(|p| p.mmp().map(|m| m.metrics.loss_rate()))
1616 .collect();
1617 let loss_rate = if peers_with_mmp.is_empty() {
1618 0.0
1619 } else {
1620 peers_with_mmp.iter().sum::<f64>() / peers_with_mmp.len() as f64
1621 };
1622
1623 let snap = stats_history::Snapshot {
1624 mesh_size: self.estimated_mesh_size,
1625 tree_depth: self.tree_state.my_coords().depth() as u32,
1626 peer_count: self.peers.len() as u64,
1627 parent_switches_total: self.stats.tree.parent_switches,
1628 bytes_in_total: fwd.received_bytes,
1629 bytes_out_total: fwd.forwarded_bytes + fwd.originated_bytes,
1630 packets_in_total: fwd.received_packets,
1631 packets_out_total: fwd.forwarded_packets + fwd.originated_packets,
1632 loss_rate,
1633 active_sessions: self.sessions.len() as u64,
1634 };
1635
1636 let now = std::time::Instant::now();
1637 let peer_snaps: Vec<stats_history::PeerSnapshot> = self
1638 .peers
1639 .values()
1640 .map(|p| {
1641 let stats = p.link_stats();
1642 let (srtt_ms, loss_rate, ecn_ce) = match p.mmp() {
1643 Some(m) => (
1644 m.metrics.srtt_ms(),
1645 Some(m.metrics.loss_rate()),
1646 m.receiver.ecn_ce_count() as u64,
1647 ),
1648 None => (None, None, 0),
1649 };
1650 stats_history::PeerSnapshot {
1651 node_addr: *p.node_addr(),
1652 last_seen: now,
1653 srtt_ms,
1654 loss_rate,
1655 bytes_in_total: stats.bytes_recv,
1656 bytes_out_total: stats.bytes_sent,
1657 packets_in_total: stats.packets_recv,
1658 packets_out_total: stats.packets_sent,
1659 ecn_ce_total: ecn_ce,
1660 }
1661 })
1662 .collect();
1663
1664 self.stats_history.tick(now, &snap, &peer_snaps);
1665 }
1666
1667 pub fn tun_state(&self) -> TunState {
1671 self.tun_state
1672 }
1673
1674 pub fn tun_name(&self) -> Option<&str> {
1676 self.tun_name.as_deref()
1677 }
1678
1679 pub fn set_max_connections(&mut self, max: usize) {
1683 self.max_connections = max;
1684 }
1685
1686 pub fn set_max_peers(&mut self, max: usize) {
1688 self.max_peers = max;
1689 }
1690
1691 pub fn set_max_links(&mut self, max: usize) {
1693 self.max_links = max;
1694 }
1695
1696 pub fn connection_count(&self) -> usize {
1700 self.connections.len()
1701 }
1702
1703 pub fn peer_count(&self) -> usize {
1705 self.peers.len()
1706 }
1707
1708 pub fn link_count(&self) -> usize {
1710 self.links.len()
1711 }
1712
1713 pub fn transport_count(&self) -> usize {
1715 self.transports.len()
1716 }
1717
1718 pub fn allocate_transport_id(&mut self) -> TransportId {
1722 let id = TransportId::new(self.next_transport_id);
1723 self.next_transport_id += 1;
1724 id
1725 }
1726
1727 pub fn get_transport(&self, id: &TransportId) -> Option<&TransportHandle> {
1729 self.transports.get(id)
1730 }
1731
1732 pub fn get_transport_mut(&mut self, id: &TransportId) -> Option<&mut TransportHandle> {
1734 self.transports.get_mut(id)
1735 }
1736
1737 pub fn transport_ids(&self) -> impl Iterator<Item = &TransportId> {
1739 self.transports.keys()
1740 }
1741
1742 pub fn packet_rx(&mut self) -> Option<&mut PacketRx> {
1744 self.packet_rx.as_mut()
1745 }
1746
1747 pub fn allocate_link_id(&mut self) -> LinkId {
1751 let id = LinkId::new(self.next_link_id);
1752 self.next_link_id += 1;
1753 id
1754 }
1755
1756 pub fn add_link(&mut self, link: Link) -> Result<(), NodeError> {
1758 if self.max_links > 0 && self.links.len() >= self.max_links {
1759 return Err(NodeError::MaxLinksExceeded {
1760 max: self.max_links,
1761 });
1762 }
1763 let link_id = link.link_id();
1764 let transport_id = link.transport_id();
1765 let remote_addr = link.remote_addr().clone();
1766
1767 self.links.insert(link_id, link);
1768 self.addr_to_link
1769 .insert((transport_id, remote_addr), link_id);
1770 Ok(())
1771 }
1772
1773 pub fn get_link(&self, link_id: &LinkId) -> Option<&Link> {
1775 self.links.get(link_id)
1776 }
1777
1778 pub fn get_link_mut(&mut self, link_id: &LinkId) -> Option<&mut Link> {
1780 self.links.get_mut(link_id)
1781 }
1782
1783 pub fn find_link_by_addr(
1785 &self,
1786 transport_id: TransportId,
1787 addr: &TransportAddr,
1788 ) -> Option<LinkId> {
1789 self.addr_to_link
1790 .get(&(transport_id, addr.clone()))
1791 .copied()
1792 }
1793
1794 pub fn remove_link(&mut self, link_id: &LinkId) -> Option<Link> {
1800 if let Some(link) = self.links.remove(link_id) {
1801 let key = (link.transport_id(), link.remote_addr().clone());
1803 if self.addr_to_link.get(&key) == Some(link_id) {
1804 self.addr_to_link.remove(&key);
1805 }
1806 Some(link)
1807 } else {
1808 None
1809 }
1810 }
1811
1812 pub(crate) fn cleanup_bootstrap_transport_if_unused(&mut self, transport_id: TransportId) {
1813 if !self.bootstrap_transports.contains(&transport_id) {
1814 return;
1815 }
1816
1817 let transport_in_use = self
1818 .links
1819 .values()
1820 .any(|link| link.transport_id() == transport_id)
1821 || self
1822 .connections
1823 .values()
1824 .any(|conn| conn.transport_id() == Some(transport_id))
1825 || self
1826 .peers
1827 .values()
1828 .any(|peer| peer.transport_id() == Some(transport_id))
1829 || self
1830 .pending_connects
1831 .iter()
1832 .any(|pending| pending.transport_id == transport_id);
1833
1834 if transport_in_use {
1835 return;
1836 }
1837
1838 tracing::debug!(
1839 transport_id = %transport_id,
1840 "bootstrap transport has no remaining references; dropping"
1841 );
1842
1843 self.bootstrap_transports.remove(&transport_id);
1844 self.bootstrap_transport_npubs.remove(&transport_id);
1845 self.transport_drops.remove(&transport_id);
1846 self.transports.remove(&transport_id);
1847 }
1848
1849 pub fn links(&self) -> impl Iterator<Item = &Link> {
1851 self.links.values()
1852 }
1853
1854 pub fn add_connection(&mut self, connection: PeerConnection) -> Result<(), NodeError> {
1858 let link_id = connection.link_id();
1859
1860 if self.connections.contains_key(&link_id) {
1861 return Err(NodeError::ConnectionAlreadyExists(link_id));
1862 }
1863
1864 if self.max_connections > 0 && self.connections.len() >= self.max_connections {
1865 return Err(NodeError::MaxConnectionsExceeded {
1866 max: self.max_connections,
1867 });
1868 }
1869
1870 self.connections.insert(link_id, connection);
1871 Ok(())
1872 }
1873
1874 pub fn get_connection(&self, link_id: &LinkId) -> Option<&PeerConnection> {
1876 self.connections.get(link_id)
1877 }
1878
1879 pub fn get_connection_mut(&mut self, link_id: &LinkId) -> Option<&mut PeerConnection> {
1881 self.connections.get_mut(link_id)
1882 }
1883
1884 pub fn remove_connection(&mut self, link_id: &LinkId) -> Option<PeerConnection> {
1886 self.connections.remove(link_id)
1887 }
1888
1889 pub fn connections(&self) -> impl Iterator<Item = &PeerConnection> {
1891 self.connections.values()
1892 }
1893
1894 pub fn get_peer(&self, node_addr: &NodeAddr) -> Option<&ActivePeer> {
1898 self.peers.get(node_addr)
1899 }
1900
1901 pub fn get_peer_mut(&mut self, node_addr: &NodeAddr) -> Option<&mut ActivePeer> {
1903 self.peers.get_mut(node_addr)
1904 }
1905
1906 pub fn remove_peer(&mut self, node_addr: &NodeAddr) -> Option<ActivePeer> {
1908 self.peers.remove(node_addr)
1909 }
1910
1911 pub fn peers(&self) -> impl Iterator<Item = &ActivePeer> {
1913 self.peers.values()
1914 }
1915
1916 pub fn nostr_discovery_handle(&self) -> Option<&crate::discovery::nostr::NostrDiscovery> {
1920 self.nostr_discovery.as_deref()
1921 }
1922
1923 pub fn peer_ids(&self) -> impl Iterator<Item = &NodeAddr> {
1925 self.peers.keys()
1926 }
1927
1928 pub fn sendable_peers(&self) -> impl Iterator<Item = &ActivePeer> {
1930 self.peers.values().filter(|p| p.can_send())
1931 }
1932
1933 pub fn sendable_peer_count(&self) -> usize {
1935 self.peers.values().filter(|p| p.can_send()).count()
1936 }
1937
1938 #[cfg(test)]
1943 pub(crate) fn disable_discovery_forward_rate_limit(&mut self) {
1944 self.discovery_forward_limiter
1945 .set_interval(std::time::Duration::ZERO);
1946 }
1947
1948 #[cfg(test)]
1949 pub(crate) fn get_session(&self, remote: &NodeAddr) -> Option<&SessionEntry> {
1950 self.sessions.get(remote)
1951 }
1952
1953 #[cfg(test)]
1955 pub(crate) fn get_session_mut(&mut self, remote: &NodeAddr) -> Option<&mut SessionEntry> {
1956 self.sessions.get_mut(remote)
1957 }
1958
1959 #[cfg(test)]
1961 pub(crate) fn remove_session(&mut self, remote: &NodeAddr) -> Option<SessionEntry> {
1962 self.sessions.remove(remote)
1963 }
1964
1965 #[cfg(test)]
1967 pub(crate) fn path_mtu_lookup_get(&self, fips_addr: &crate::FipsAddress) -> Option<u16> {
1968 self.path_mtu_lookup
1969 .read()
1970 .ok()
1971 .and_then(|map| map.get(fips_addr).copied())
1972 }
1973
1974 #[cfg(test)]
1976 pub(crate) fn path_mtu_lookup_insert(&self, fips_addr: crate::FipsAddress, mtu: u16) {
1977 if let Ok(mut map) = self.path_mtu_lookup.write() {
1978 map.insert(fips_addr, mtu);
1979 }
1980 }
1981
1982 pub fn session_count(&self) -> usize {
1984 self.sessions.len()
1985 }
1986
1987 pub(crate) fn session_entries(&self) -> impl Iterator<Item = (&NodeAddr, &SessionEntry)> {
1989 self.sessions.iter()
1990 }
1991
1992 pub(crate) fn register_identity(
1996 &mut self,
1997 node_addr: NodeAddr,
1998 pubkey: secp256k1::PublicKey,
1999 ) -> bool {
2000 let mut prefix = [0u8; 15];
2001 prefix.copy_from_slice(&node_addr.as_bytes()[0..15]);
2002 if let Some(entry) = self.identity_cache.get(&prefix)
2003 && entry.node_addr == node_addr
2004 && entry.pubkey == pubkey
2005 {
2006 return true;
2010 }
2011
2012 let (xonly, _) = pubkey.x_only_public_key();
2013 let derived_node_addr = NodeAddr::from_pubkey(&xonly);
2014 if derived_node_addr != node_addr {
2015 debug!(
2016 claimed_node_addr = %node_addr,
2017 derived_node_addr = %derived_node_addr,
2018 "Rejected identity cache entry with mismatched public key"
2019 );
2020 return false;
2021 }
2022
2023 let now_ms = Self::now_ms();
2024 if let Some(entry) = self.identity_cache.get_mut(&prefix)
2025 && entry.node_addr == node_addr
2026 {
2027 entry.pubkey = pubkey;
2028 entry.last_seen_ms = now_ms;
2029 return true;
2030 }
2031
2032 let npub = encode_npub(&xonly);
2033 self.identity_cache.insert(
2034 prefix,
2035 IdentityCacheEntry::new(node_addr, pubkey, npub, now_ms),
2036 );
2037 let max = self.config.node.cache.identity_size;
2039 if self.identity_cache.len() > max
2040 && let Some(oldest_key) = self
2041 .identity_cache
2042 .iter()
2043 .min_by_key(|(_, entry)| entry.last_seen_ms)
2044 .map(|(k, _)| *k)
2045 {
2046 self.identity_cache.remove(&oldest_key);
2047 }
2048 true
2049 }
2050
2051 pub(crate) fn lookup_by_fips_prefix(
2053 &mut self,
2054 prefix: &[u8; 15],
2055 ) -> Option<(NodeAddr, secp256k1::PublicKey)> {
2056 if let Some(entry) = self.identity_cache.get_mut(prefix) {
2057 entry.last_seen_ms = Self::now_ms(); Some((entry.node_addr, entry.pubkey))
2059 } else {
2060 None
2061 }
2062 }
2063
2064 pub(crate) fn has_cached_identity(&self, addr: &NodeAddr) -> bool {
2066 let mut prefix = [0u8; 15];
2067 prefix.copy_from_slice(&addr.as_bytes()[0..15]);
2068 self.identity_cache.contains_key(&prefix)
2069 }
2070
2071 pub fn identity_cache_len(&self) -> usize {
2073 self.identity_cache.len()
2074 }
2075
2076 pub fn identity_cache_iter(
2081 &self,
2082 ) -> impl Iterator<Item = (&NodeAddr, &secp256k1::PublicKey, u64)> {
2083 self.identity_cache
2084 .values()
2085 .map(|entry| (&entry.node_addr, &entry.pubkey, entry.last_seen_ms))
2086 }
2087
2088 pub fn identity_cache_max(&self) -> usize {
2090 self.config.node.cache.identity_size
2091 }
2092
2093 pub fn pending_lookup_count(&self) -> usize {
2095 self.pending_lookups.len()
2096 }
2097
2098 pub fn pending_lookups_iter(
2100 &self,
2101 ) -> impl Iterator<Item = (&NodeAddr, &handlers::discovery::PendingLookup)> {
2102 self.pending_lookups.iter()
2103 }
2104
2105 pub fn recent_request_count(&self) -> usize {
2107 self.recent_requests.len()
2108 }
2109
2110 pub fn pending_tun_destinations(&self) -> usize {
2112 self.pending_tun_packets.len()
2113 }
2114
2115 pub fn pending_tun_total_packets(&self) -> usize {
2117 self.pending_tun_packets.values().map(|q| q.len()).sum()
2118 }
2119
2120 pub fn retry_state_iter(&self) -> impl Iterator<Item = (&NodeAddr, &retry::RetryState)> {
2122 self.retry_pending.iter()
2123 }
2124
2125 pub(crate) fn is_tree_peer(&self, peer_addr: &NodeAddr) -> bool {
2132 if !self.tree_state.is_root() && self.tree_state.my_declaration().parent_id() == peer_addr {
2134 return true;
2135 }
2136 if let Some(decl) = self.tree_state.peer_declaration(peer_addr)
2138 && decl.parent_id() == self.node_addr()
2139 {
2140 return true;
2141 }
2142 false
2143 }
2144
2145 pub fn find_next_hop(&mut self, dest_node_addr: &NodeAddr) -> Option<&ActivePeer> {
2168 if dest_node_addr == self.node_addr() {
2170 return None;
2171 }
2172
2173 if let Some(peer) = self.peers.get(dest_node_addr)
2175 && peer.can_send()
2176 {
2177 return Some(peer);
2178 }
2179
2180 let now_ms = Self::now_ms();
2181
2182 let sendable_learned_peers = if self.config.node.routing.mode == RoutingMode::ReplyLearned {
2183 Some(
2184 self.peers
2185 .iter()
2186 .filter(|(_, peer)| peer.can_send())
2187 .map(|(addr, _)| *addr)
2188 .collect::<HashSet<_>>(),
2189 )
2190 } else {
2191 None
2192 };
2193
2194 let explore_fallback = sendable_learned_peers.as_ref().is_some_and(|sendable| {
2201 self.learned_routes.should_explore_fallback(
2202 dest_node_addr,
2203 now_ms,
2204 self.config.node.routing.learned_fallback_explore_interval,
2205 |addr| sendable.contains(addr),
2206 )
2207 });
2208 if let Some(sendable) = &sendable_learned_peers
2209 && !explore_fallback
2210 && let Some(next_hop_addr) =
2211 self.learned_routes
2212 .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2213 {
2214 return self.peers.get(&next_hop_addr);
2215 }
2216
2217 let Some(dest_coords) = self
2219 .coord_cache
2220 .get_and_touch(dest_node_addr, now_ms)
2221 .cloned()
2222 else {
2223 if let Some(sendable) = &sendable_learned_peers
2224 && let Some(next_hop_addr) =
2225 self.learned_routes
2226 .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2227 {
2228 return self.peers.get(&next_hop_addr);
2229 }
2230 return None;
2231 };
2232
2233 let coordinate_route_addr = {
2236 let candidates: Vec<&ActivePeer> = self.destination_in_filters(dest_node_addr);
2237 if !candidates.is_empty() {
2238 self.select_best_candidate(&candidates, &dest_coords)
2239 .map(|peer| *peer.node_addr())
2240 } else {
2241 None
2242 }
2243 };
2244 if let Some(next_hop_addr) = coordinate_route_addr {
2245 return self.peers.get(&next_hop_addr);
2246 }
2247
2248 let tree_route_addr = self
2250 .tree_state
2251 .find_next_hop(&dest_coords)
2252 .filter(|next_hop_id| {
2253 self.peers
2254 .get(next_hop_id)
2255 .is_some_and(|peer| peer.can_send())
2256 });
2257 if let Some(next_hop_addr) = tree_route_addr {
2258 return self.peers.get(&next_hop_addr);
2259 }
2260 if explore_fallback {
2261 return sendable_learned_peers.as_ref().and_then(|sendable| {
2262 self.learned_routes
2263 .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2264 .and_then(|next_hop_addr| self.peers.get(&next_hop_addr))
2265 });
2266 }
2267
2268 if let Some(sendable) = &sendable_learned_peers
2269 && let Some(next_hop_addr) =
2270 self.learned_routes
2271 .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2272 {
2273 return self.peers.get(&next_hop_addr);
2274 }
2275
2276 None
2277 }
2278
2279 pub(in crate::node) fn learn_reverse_route(
2280 &mut self,
2281 destination: NodeAddr,
2282 next_hop: NodeAddr,
2283 ) {
2284 if self.config.node.routing.mode != RoutingMode::ReplyLearned
2285 || destination == *self.node_addr()
2286 {
2287 return;
2288 }
2289 let now_ms = Self::now_ms();
2290 self.learned_routes.learn(
2291 destination,
2292 next_hop,
2293 now_ms,
2294 self.config.node.routing.learned_ttl_secs,
2295 self.config.node.routing.max_learned_routes_per_dest,
2296 );
2297 }
2298
2299 pub(in crate::node) fn record_route_failure(
2300 &mut self,
2301 destination: NodeAddr,
2302 next_hop: NodeAddr,
2303 ) {
2304 if self.config.node.routing.mode != RoutingMode::ReplyLearned {
2305 return;
2306 }
2307 self.learned_routes.record_failure(&destination, &next_hop);
2308 }
2309
2310 pub(crate) fn learned_route_table_snapshot(&self, now_ms: u64) -> LearnedRouteTableSnapshot {
2311 self.learned_routes.snapshot(now_ms)
2312 }
2313
2314 pub(in crate::node) fn purge_learned_routes(&mut self, now_ms: u64) {
2315 self.learned_routes.purge_expired(now_ms);
2316 }
2317
2318 fn select_best_candidate<'a>(
2327 &'a self,
2328 candidates: &[&'a ActivePeer],
2329 dest_coords: &crate::tree::TreeCoordinate,
2330 ) -> Option<&'a ActivePeer> {
2331 let my_distance = self.tree_state.my_coords().distance_to(dest_coords);
2332
2333 let mut best: Option<(&ActivePeer, f64, usize)> = None;
2334
2335 for &candidate in candidates {
2336 if !candidate.can_send() {
2337 continue;
2338 }
2339
2340 let cost = candidate.link_cost();
2341
2342 let dist = self
2343 .tree_state
2344 .peer_coords(candidate.node_addr())
2345 .map(|pc| pc.distance_to(dest_coords))
2346 .unwrap_or(usize::MAX);
2347
2348 if dist >= my_distance {
2351 continue;
2352 }
2353
2354 let dominated = match &best {
2355 None => true,
2356 Some((_, best_cost, best_dist)) => {
2357 cost < *best_cost
2358 || (cost == *best_cost && dist < *best_dist)
2359 || (cost == *best_cost
2360 && dist == *best_dist
2361 && candidate.node_addr() < best.as_ref().unwrap().0.node_addr())
2362 }
2363 };
2364
2365 if dominated {
2366 best = Some((candidate, cost, dist));
2367 }
2368 }
2369
2370 best.map(|(peer, _, _)| peer)
2371 }
2372
2373 pub fn destination_in_filters(&self, dest: &NodeAddr) -> Vec<&ActivePeer> {
2375 self.peers.values().filter(|p| p.may_reach(dest)).collect()
2376 }
2377
2378 pub fn tun_tx(&self) -> Option<&TunTx> {
2382 self.tun_tx.as_ref()
2383 }
2384
2385 pub fn attach_external_packet_io(
2392 &mut self,
2393 capacity: usize,
2394 ) -> Result<ExternalPacketIo, NodeError> {
2395 if self.state != NodeState::Created {
2396 return Err(NodeError::Config(ConfigError::Validation(
2397 "external packet I/O must be attached before node start".to_string(),
2398 )));
2399 }
2400 if self.config.tun.enabled {
2401 return Err(NodeError::Config(ConfigError::Validation(
2402 "external packet I/O requires tun.enabled=false".to_string(),
2403 )));
2404 }
2405
2406 let capacity = capacity.max(1);
2407 let (outbound_tx, outbound_rx) = tokio::sync::mpsc::channel(capacity);
2408 let (inbound_tx, inbound_rx) = tokio::sync::mpsc::channel(capacity);
2409 self.tun_outbound_rx = Some(outbound_rx);
2410 self.external_packet_tx = Some(inbound_tx);
2411
2412 Ok(ExternalPacketIo {
2413 outbound_tx,
2414 inbound_rx,
2415 })
2416 }
2417
2418 pub(crate) fn attach_endpoint_data_io(
2423 &mut self,
2424 capacity: usize,
2425 ) -> Result<EndpointDataIo, NodeError> {
2426 if self.state != NodeState::Created {
2427 return Err(NodeError::Config(ConfigError::Validation(
2428 "endpoint data I/O must be attached before node start".to_string(),
2429 )));
2430 }
2431
2432 let command_capacity = endpoint_data_command_capacity(capacity);
2433 let (command_tx, command_rx) = tokio::sync::mpsc::channel(command_capacity);
2434 let (event_tx, event_rx) = tokio::sync::mpsc::unbounded_channel();
2439 self.endpoint_command_rx = Some(command_rx);
2440 self.endpoint_event_tx = Some(event_tx.clone());
2441
2442 Ok(EndpointDataIo {
2443 command_tx,
2444 event_rx,
2445 event_tx,
2446 })
2447 }
2448
2449 pub(crate) fn pubkey_for_node_addr(&self, addr: &NodeAddr) -> Option<secp256k1::PublicKey> {
2450 let mut prefix = [0u8; 15];
2451 prefix.copy_from_slice(&addr.as_bytes()[0..15]);
2452 self.identity_cache
2453 .get(&prefix)
2454 .filter(|entry| &entry.node_addr == addr)
2455 .map(|entry| entry.pubkey)
2456 }
2457
2458 pub(crate) fn npub_for_node_addr(&self, addr: &NodeAddr) -> Option<String> {
2459 let mut prefix = [0u8; 15];
2460 prefix.copy_from_slice(&addr.as_bytes()[0..15]);
2461 self.identity_cache
2462 .get(&prefix)
2463 .filter(|entry| &entry.node_addr == addr)
2464 .map(|entry| entry.npub.clone())
2465 }
2466
2467 pub(in crate::node) fn deliver_external_ipv6_packet(
2468 &self,
2469 src_addr: &NodeAddr,
2470 packet: Vec<u8>,
2471 ) {
2472 let Some(external_packet_tx) = &self.external_packet_tx else {
2473 return;
2474 };
2475 if packet.len() < 40 {
2476 return;
2477 }
2478 let Ok(destination) = FipsAddress::from_slice(&packet[24..40]) else {
2479 return;
2480 };
2481 let delivered = NodeDeliveredPacket {
2482 source_node_addr: *src_addr,
2483 source_npub: self.npub_for_node_addr(src_addr),
2484 destination,
2485 packet,
2486 };
2487 if let Err(error) = external_packet_tx.try_send(delivered) {
2488 debug!(error = %error, "Failed to deliver packet to external app sink");
2489 }
2490 }
2491
2492 pub(super) async fn send_encrypted_link_message(
2506 &mut self,
2507 node_addr: &NodeAddr,
2508 plaintext: &[u8],
2509 ) -> Result<(), NodeError> {
2510 self.send_encrypted_link_message_with_ce(node_addr, plaintext, false)
2511 .await
2512 }
2513
2514 pub(in crate::node) fn note_local_send_outcome(
2520 &mut self,
2521 result: &Result<usize, TransportError>,
2522 ) {
2523 match result {
2524 Ok(_) => {
2525 if self.last_local_send_failure_at.is_some() {
2526 self.last_local_send_failure_at = None;
2527 }
2528 }
2529 Err(TransportError::Io(e))
2530 if matches!(
2531 e.kind(),
2532 std::io::ErrorKind::NetworkUnreachable
2533 | std::io::ErrorKind::HostUnreachable
2534 | std::io::ErrorKind::AddrNotAvailable
2535 ) =>
2536 {
2537 self.last_local_send_failure_at = Some(std::time::Instant::now());
2538 }
2539 Err(_) => {}
2540 }
2541 }
2542
2543 pub(in crate::node) fn last_local_send_failure_at(&self) -> Option<std::time::Instant> {
2546 self.last_local_send_failure_at
2547 }
2548
2549 pub(super) async fn send_encrypted_link_message_with_ce(
2553 &mut self,
2554 node_addr: &NodeAddr,
2555 plaintext: &[u8],
2556 ce_flag: bool,
2557 ) -> Result<(), NodeError> {
2558 let peer = self
2559 .peers
2560 .get_mut(node_addr)
2561 .ok_or(NodeError::PeerNotFound(*node_addr))?;
2562
2563 let their_index = peer.their_index().ok_or_else(|| NodeError::SendFailed {
2564 node_addr: *node_addr,
2565 reason: "no their_index".into(),
2566 })?;
2567 let transport_id = peer.transport_id().ok_or_else(|| NodeError::SendFailed {
2568 node_addr: *node_addr,
2569 reason: "no transport_id".into(),
2570 })?;
2571 let remote_addr = peer
2572 .current_addr()
2573 .cloned()
2574 .ok_or_else(|| NodeError::SendFailed {
2575 node_addr: *node_addr,
2576 reason: "no current_addr".into(),
2577 })?;
2578 #[cfg(any(target_os = "linux", target_os = "macos"))]
2579 let connected_socket = peer.connected_udp();
2580
2581 let timestamp_ms = peer.session_elapsed_ms();
2583
2584 let sp_flag = peer.mmp().map(|mmp| mmp.spin_bit.tx_bit()).unwrap_or(false);
2586 let mut flags = if sp_flag { FLAG_SP } else { 0 };
2587 if ce_flag {
2588 flags |= FLAG_CE;
2589 }
2590 if peer.current_k_bit() {
2591 flags |= FLAG_KEY_EPOCH;
2592 }
2593
2594 let session = peer
2595 .noise_session_mut()
2596 .ok_or_else(|| NodeError::SendFailed {
2597 node_addr: *node_addr,
2598 reason: "no noise session".into(),
2599 })?;
2600
2601 const INNER_TS_LEN: usize = 4;
2609 let counter = session.current_send_counter();
2610 let inner_len = INNER_TS_LEN + plaintext.len();
2611 let payload_len = inner_len as u16;
2612 let header = build_established_header(their_index, counter, flags, payload_len);
2613
2614 let transport_for_send = self
2633 .transports
2634 .get(&transport_id)
2635 .ok_or(NodeError::TransportNotFound(transport_id))?;
2636 let is_udp = matches!(transport_for_send, TransportHandle::Udp(_));
2637 if let Some(workers) = self.encrypt_workers.as_ref().cloned()
2638 && is_udp
2639 && let Some(cipher_clone) = session.send_cipher_clone()
2640 {
2641 {
2642 let reserved_counter =
2646 session
2647 .take_send_counter()
2648 .map_err(|e| NodeError::SendFailed {
2649 node_addr: *node_addr,
2650 reason: format!("counter reservation failed: {}", e),
2651 })?;
2652 debug_assert_eq!(reserved_counter, counter);
2653 let header =
2657 build_established_header(their_index, reserved_counter, flags, payload_len);
2658 let transport = transport_for_send;
2659 let send_target = {
2666 if let TransportHandle::Udp(udp) = transport {
2667 let socket_addr = {
2668 #[cfg(any(target_os = "linux", target_os = "macos"))]
2669 {
2670 match connected_socket.as_ref() {
2671 Some(socket) => Some(socket.peer_addr()),
2672 None => udp.resolve_for_off_task(&remote_addr).await.ok(),
2673 }
2674 }
2675 #[cfg(not(any(target_os = "linux", target_os = "macos")))]
2676 {
2677 udp.resolve_for_off_task(&remote_addr).await.ok()
2678 }
2679 };
2680 match (udp.async_socket(), socket_addr) {
2681 (Some(socket), Some(socket_addr)) => Some((socket, socket_addr)),
2682 _ => None,
2683 }
2684 } else {
2685 None
2686 }
2687 };
2688 if let Some((socket, socket_addr)) = send_target {
2689 let wire_capacity = ESTABLISHED_HEADER_SIZE + inner_len + 16;
2705 let mut wire_buf = Vec::with_capacity(wire_capacity);
2706 wire_buf.extend_from_slice(&header);
2707 wire_buf.extend_from_slice(×tamp_ms.to_le_bytes());
2708 wire_buf.extend_from_slice(plaintext);
2709 let predicted_bytes = wire_capacity;
2710 if let Some(peer) = self.peers.get_mut(node_addr) {
2717 peer.link_stats_mut().record_sent(predicted_bytes);
2718 if let Some(mmp) = peer.mmp_mut() {
2719 mmp.sender
2720 .record_sent(reserved_counter, timestamp_ms, predicted_bytes);
2721 }
2722 }
2723 workers.dispatch(self::encrypt_worker::FmpSendJob {
2724 cipher: cipher_clone,
2725 counter: reserved_counter,
2726 wire_buf,
2727 fsp_seal: None,
2728 socket,
2729 dest_addr: socket_addr,
2730 #[cfg(any(target_os = "linux", target_os = "macos"))]
2731 connected_socket,
2732 drop_on_backpressure: plaintext
2733 .first()
2734 .is_some_and(|ty| *ty == SessionMessageType::EndpointData.to_byte()),
2735 queued_at: crate::perf_profile::stamp(),
2736 });
2737 return Ok(());
2738 }
2739 }
2740 }
2741
2742 let inner_plaintext = prepend_inner_header(timestamp_ms, plaintext);
2747 let ciphertext = {
2749 let _t = crate::perf_profile::Timer::start(crate::perf_profile::Stage::FmpEncrypt);
2750 session
2751 .encrypt_with_aad(&inner_plaintext, &header)
2752 .map_err(|e| NodeError::SendFailed {
2753 node_addr: *node_addr,
2754 reason: format!("encryption failed: {}", e),
2755 })?
2756 };
2757
2758 let wire_packet = build_encrypted(&header, &ciphertext);
2759
2760 let send_result = {
2762 let _t = crate::perf_profile::Timer::start(crate::perf_profile::Stage::UdpSend);
2763 let transport = self
2764 .transports
2765 .get(&transport_id)
2766 .ok_or(NodeError::TransportNotFound(transport_id))?;
2767 transport.send(&remote_addr, &wire_packet).await
2768 };
2769 self.note_local_send_outcome(&send_result);
2770 let bytes_sent = send_result.map_err(|e| match e {
2771 TransportError::MtuExceeded { packet_size, mtu } => NodeError::MtuExceeded {
2772 node_addr: *node_addr,
2773 packet_size,
2774 mtu,
2775 },
2776 other => NodeError::SendFailed {
2777 node_addr: *node_addr,
2778 reason: format!("transport send: {}", other),
2779 },
2780 })?;
2781
2782 if let Some(peer) = self.peers.get_mut(node_addr) {
2784 peer.link_stats_mut().record_sent(bytes_sent);
2785 if let Some(mmp) = peer.mmp_mut() {
2787 mmp.sender.record_sent(counter, timestamp_ms, bytes_sent);
2788 }
2789 }
2790
2791 Ok(())
2792 }
2793}
2794
2795impl fmt::Debug for Node {
2796 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2797 f.debug_struct("Node")
2798 .field("node_addr", self.node_addr())
2799 .field("state", &self.state)
2800 .field("is_leaf_only", &self.is_leaf_only)
2801 .field("connections", &self.connection_count())
2802 .field("peers", &self.peer_count())
2803 .field("links", &self.link_count())
2804 .field("transports", &self.transport_count())
2805 .finish()
2806 }
2807}