1mod acl;
8mod bloom;
9mod decrypt_worker;
10mod discovery_rate_limit;
11mod encrypt_worker;
12mod handlers;
13mod lifecycle;
14mod rate_limit;
15mod retry;
16mod routing;
17mod routing_error_rate_limit;
18pub(crate) mod session;
19pub(crate) mod session_wire;
20pub(crate) mod stats;
21pub(crate) mod stats_history;
22#[cfg(test)]
23mod tests;
24mod tree;
25pub(crate) mod wire;
26
27use self::discovery_rate_limit::{DiscoveryBackoff, DiscoveryForwardRateLimiter};
28use self::rate_limit::HandshakeRateLimiter;
29use self::routing::{LearnedRouteTable, LearnedRouteTableSnapshot};
30use self::routing_error_rate_limit::RoutingErrorRateLimiter;
31use self::wire::{
32 ESTABLISHED_HEADER_SIZE, FLAG_CE, FLAG_KEY_EPOCH, FLAG_SP, build_encrypted,
33 build_established_header, prepend_inner_header,
34};
35use crate::bloom::BloomState;
36use crate::cache::CoordCache;
37use crate::config::RoutingMode;
38use crate::node::session::SessionEntry;
39use crate::peer::{ActivePeer, PeerConnection};
40#[cfg(any(target_os = "linux", target_os = "macos"))]
41use crate::transport::ethernet::EthernetTransport;
42use crate::transport::tcp::TcpTransport;
43use crate::transport::tor::TorTransport;
44use crate::transport::udp::UdpTransport;
45use crate::transport::{
46 Link, LinkId, PacketRx, PacketTx, TransportAddr, TransportError, TransportHandle, TransportId,
47};
48use crate::tree::TreeState;
49use crate::upper::hosts::HostMap;
50use crate::upper::icmp_rate_limit::IcmpRateLimiter;
51use crate::upper::tun::{TunError, TunOutboundRx, TunState, TunTx};
52use crate::utils::index::IndexAllocator;
53use crate::{
54 Config, ConfigError, FipsAddress, Identity, IdentityError, NodeAddr, PeerIdentity,
55 SessionMessageType, encode_npub,
56};
57use rand::Rng;
58use std::collections::{HashMap, HashSet, VecDeque};
59use std::fmt;
60use std::sync::Arc;
61use std::thread::JoinHandle;
62use thiserror::Error;
63use tracing::{debug, warn};
64
65pub(crate) const REKEY_JITTER_SECS: i64 = 15;
72
73#[derive(Debug, Error)]
75pub enum NodeError {
76 #[error("node not started")]
77 NotStarted,
78
79 #[error("node already started")]
80 AlreadyStarted,
81
82 #[error("node already stopped")]
83 AlreadyStopped,
84
85 #[error("transport not found: {0}")]
86 TransportNotFound(TransportId),
87
88 #[error("no transport available for type: {0}")]
89 NoTransportForType(String),
90
91 #[error("link not found: {0}")]
92 LinkNotFound(LinkId),
93
94 #[error("connection not found: {0}")]
95 ConnectionNotFound(LinkId),
96
97 #[error("peer not found: {0:?}")]
98 PeerNotFound(NodeAddr),
99
100 #[error("peer already exists: {0:?}")]
101 PeerAlreadyExists(NodeAddr),
102
103 #[error("connection already exists for link: {0}")]
104 ConnectionAlreadyExists(LinkId),
105
106 #[error("invalid peer npub '{npub}': {reason}")]
107 InvalidPeerNpub { npub: String, reason: String },
108
109 #[error("access denied: {0}")]
110 AccessDenied(String),
111
112 #[error("max connections exceeded: {max}")]
113 MaxConnectionsExceeded { max: usize },
114
115 #[error("max peers exceeded: {max}")]
116 MaxPeersExceeded { max: usize },
117
118 #[error("max links exceeded: {max}")]
119 MaxLinksExceeded { max: usize },
120
121 #[error("handshake incomplete for link {0}")]
122 HandshakeIncomplete(LinkId),
123
124 #[error("no session available for link {0}")]
125 NoSession(LinkId),
126
127 #[error("promotion failed for link {link_id}: {reason}")]
128 PromotionFailed { link_id: LinkId, reason: String },
129
130 #[error("send failed to {node_addr}: {reason}")]
131 SendFailed { node_addr: NodeAddr, reason: String },
132
133 #[error("mtu exceeded forwarding to {node_addr}: packet {packet_size} > mtu {mtu}")]
134 MtuExceeded {
135 node_addr: NodeAddr,
136 packet_size: usize,
137 mtu: u16,
138 },
139
140 #[error("config error: {0}")]
141 Config(#[from] ConfigError),
142
143 #[error("identity error: {0}")]
144 Identity(#[from] IdentityError),
145
146 #[error("TUN error: {0}")]
147 Tun(#[from] TunError),
148
149 #[error("index allocation failed: {0}")]
150 IndexAllocationFailed(String),
151
152 #[error("handshake failed: {0}")]
153 HandshakeFailed(String),
154
155 #[error("transport error: {0}")]
156 TransportError(String),
157
158 #[error("bootstrap handoff failed: {0}")]
159 BootstrapHandoff(String),
160}
161
162#[derive(Debug, Clone, PartialEq, Eq)]
164pub struct NodeDeliveredPacket {
165 pub source_node_addr: NodeAddr,
167 pub source_npub: Option<String>,
169 pub destination: FipsAddress,
171 pub packet: Vec<u8>,
173}
174
175#[derive(Debug, Clone)]
176struct IdentityCacheEntry {
177 node_addr: NodeAddr,
178 pubkey: secp256k1::PublicKey,
179 npub: String,
180 last_seen_ms: u64,
181}
182
183impl IdentityCacheEntry {
184 fn new(
185 node_addr: NodeAddr,
186 pubkey: secp256k1::PublicKey,
187 npub: String,
188 last_seen_ms: u64,
189 ) -> Self {
190 Self {
191 node_addr,
192 pubkey,
193 npub,
194 last_seen_ms,
195 }
196 }
197}
198
199#[derive(Debug)]
201pub struct ExternalPacketIo {
202 pub outbound_tx: crate::upper::tun::TunOutboundTx,
204 pub inbound_rx: tokio::sync::mpsc::Receiver<NodeDeliveredPacket>,
206}
207
208#[derive(Debug)]
210pub(crate) struct EndpointDataIo {
211 pub(crate) command_tx: tokio::sync::mpsc::Sender<NodeEndpointCommand>,
220 pub(crate) event_rx: tokio::sync::mpsc::UnboundedReceiver<NodeEndpointEvent>,
230 pub(crate) event_tx: tokio::sync::mpsc::UnboundedSender<NodeEndpointEvent>,
236}
237
238fn endpoint_data_command_capacity(requested: usize) -> usize {
239 if let Ok(raw) = std::env::var("FIPS_ENDPOINT_DATA_QUEUE_CAP")
240 && let Ok(value) = raw.trim().parse::<usize>()
241 && value > 0
242 {
243 return value;
244 }
245
246 requested.max(1).max(32_768)
247}
248
249#[derive(Debug)]
251pub(crate) enum NodeEndpointCommand {
252 Send {
256 remote: PeerIdentity,
257 payload: Vec<u8>,
258 queued_at: Option<std::time::Instant>,
259 response_tx: tokio::sync::oneshot::Sender<Result<(), NodeError>>,
260 },
261 SendOneway {
267 remote: PeerIdentity,
268 payload: Vec<u8>,
269 queued_at: Option<std::time::Instant>,
270 },
271 PeerSnapshot {
272 response_tx: tokio::sync::oneshot::Sender<Vec<NodeEndpointPeer>>,
273 },
274}
275
276#[derive(Debug)]
278pub(crate) enum NodeEndpointEvent {
279 Data {
280 source_node_addr: NodeAddr,
281 source_npub: Option<String>,
282 payload: Vec<u8>,
283 queued_at: Option<std::time::Instant>,
284 },
285}
286
287#[derive(Debug, Clone, PartialEq, Eq)]
289pub(crate) struct NodeEndpointPeer {
290 pub(crate) npub: String,
291 pub(crate) transport_addr: Option<String>,
292 pub(crate) transport_type: Option<String>,
293 pub(crate) link_id: u64,
294 pub(crate) srtt_ms: Option<u64>,
295 pub(crate) packets_sent: u64,
296 pub(crate) packets_recv: u64,
297 pub(crate) bytes_sent: u64,
298 pub(crate) bytes_recv: u64,
299}
300
301#[derive(Clone, Copy, Debug, PartialEq, Eq)]
303pub enum NodeState {
304 Created,
306 Starting,
308 Running,
310 Stopping,
312 Stopped,
314}
315
316impl NodeState {
317 pub fn is_operational(&self) -> bool {
319 matches!(self, NodeState::Running)
320 }
321
322 pub fn can_start(&self) -> bool {
324 matches!(self, NodeState::Created | NodeState::Stopped)
325 }
326
327 pub fn can_stop(&self) -> bool {
329 matches!(self, NodeState::Running)
330 }
331}
332
333impl fmt::Display for NodeState {
334 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
335 let s = match self {
336 NodeState::Created => "created",
337 NodeState::Starting => "starting",
338 NodeState::Running => "running",
339 NodeState::Stopping => "stopping",
340 NodeState::Stopped => "stopped",
341 };
342 write!(f, "{}", s)
343 }
344}
345
346#[derive(Clone, Debug)]
353pub(crate) struct RecentRequest {
354 pub(crate) from_peer: NodeAddr,
356 pub(crate) timestamp_ms: u64,
358 pub(crate) response_forwarded: bool,
362}
363
364impl RecentRequest {
365 pub(crate) fn new(from_peer: NodeAddr, timestamp_ms: u64) -> Self {
366 Self {
367 from_peer,
368 timestamp_ms,
369 response_forwarded: false,
370 }
371 }
372
373 pub(crate) fn is_expired(&self, current_time_ms: u64, expiry_ms: u64) -> bool {
375 current_time_ms.saturating_sub(self.timestamp_ms) > expiry_ms
376 }
377}
378
379type AddrKey = (TransportId, TransportAddr);
381
382#[derive(Debug, Default)]
387struct TransportDropState {
388 prev_drops: u64,
390 dropping: bool,
392}
393
394struct PendingConnect {
400 link_id: LinkId,
402 transport_id: TransportId,
404 remote_addr: TransportAddr,
406 peer_identity: PeerIdentity,
408}
409
410pub struct Node {
424 identity: Identity,
427
428 startup_epoch: [u8; 8],
431
432 started_at: std::time::Instant,
434
435 config: Config,
438
439 state: NodeState,
442
443 is_leaf_only: bool,
445
446 tree_state: TreeState,
449
450 bloom_state: BloomState,
453
454 coord_cache: CoordCache,
457 learned_routes: LearnedRouteTable,
459 recent_requests: HashMap<u64, RecentRequest>,
462 path_mtu_lookup: Arc<std::sync::RwLock<HashMap<crate::FipsAddress, u16>>>,
468
469 transports: HashMap<TransportId, TransportHandle>,
472 transport_drops: HashMap<TransportId, TransportDropState>,
474 links: HashMap<LinkId, Link>,
476 addr_to_link: HashMap<AddrKey, LinkId>,
478
479 packet_tx: Option<PacketTx>,
482 packet_rx: Option<PacketRx>,
484
485 connections: HashMap<LinkId, PeerConnection>,
489
490 peers: HashMap<NodeAddr, ActivePeer>,
494
495 sessions: HashMap<NodeAddr, SessionEntry>,
499
500 identity_cache: HashMap<[u8; 15], IdentityCacheEntry>,
504
505 pending_tun_packets: HashMap<NodeAddr, VecDeque<Vec<u8>>>,
509 pending_endpoint_data: HashMap<NodeAddr, VecDeque<Vec<u8>>>,
511 pending_lookups: HashMap<NodeAddr, handlers::discovery::PendingLookup>,
515
516 max_connections: usize,
519 max_peers: usize,
521 max_links: usize,
523
524 next_link_id: u64,
527 next_transport_id: u32,
529
530 stats: stats::NodeStats,
533
534 stats_history: stats_history::StatsHistory,
536
537 tun_state: TunState,
540 tun_name: Option<String>,
542 tun_tx: Option<TunTx>,
544 tun_outbound_rx: Option<TunOutboundRx>,
546 external_packet_tx: Option<tokio::sync::mpsc::Sender<NodeDeliveredPacket>>,
548 endpoint_command_rx: Option<tokio::sync::mpsc::Receiver<NodeEndpointCommand>>,
550 endpoint_event_tx: Option<tokio::sync::mpsc::UnboundedSender<NodeEndpointEvent>>,
552 encrypt_workers: Option<encrypt_worker::EncryptWorkerPool>,
558 decrypt_workers: Option<decrypt_worker::DecryptWorkerPool>,
561 decrypt_registered_sessions: std::collections::HashSet<(TransportId, u32)>,
570 decrypt_fallback_rx:
574 Option<tokio::sync::mpsc::UnboundedReceiver<decrypt_worker::DecryptWorkerEvent>>,
575 decrypt_fallback_tx: tokio::sync::mpsc::UnboundedSender<decrypt_worker::DecryptWorkerEvent>,
576 tun_reader_handle: Option<JoinHandle<()>>,
578 tun_writer_handle: Option<JoinHandle<()>>,
580 #[cfg(target_os = "macos")]
583 tun_shutdown_fd: Option<std::os::unix::io::RawFd>,
584
585 dns_identity_rx: Option<crate::upper::dns::DnsIdentityRx>,
588 dns_task: Option<tokio::task::JoinHandle<()>>,
590
591 index_allocator: IndexAllocator,
594 peers_by_index: HashMap<(TransportId, u32), NodeAddr>,
597 pending_outbound: HashMap<(TransportId, u32), LinkId>,
600
601 msg1_rate_limiter: HandshakeRateLimiter,
604 icmp_rate_limiter: IcmpRateLimiter,
606 routing_error_rate_limiter: RoutingErrorRateLimiter,
608 coords_response_rate_limiter: RoutingErrorRateLimiter,
610 discovery_backoff: DiscoveryBackoff,
612 discovery_forward_limiter: DiscoveryForwardRateLimiter,
614
615 pending_connects: Vec<PendingConnect>,
621
622 retry_pending: HashMap<NodeAddr, retry::RetryState>,
628
629 nostr_discovery: Option<Arc<crate::discovery::nostr::NostrDiscovery>>,
631 lan_discovery: Option<Arc<crate::discovery::lan::LanDiscovery>>,
636 nostr_discovery_started_at_ms: Option<u64>,
641 startup_open_discovery_sweep_done: bool,
645 bootstrap_transports: HashSet<TransportId>,
647 bootstrap_transport_npubs: HashMap<TransportId, String>,
654
655 last_parent_reeval: Option<crate::time::Instant>,
658
659 last_congestion_log: Option<std::time::Instant>,
662
663 estimated_mesh_size: Option<u64>,
666 last_mesh_size_log: Option<std::time::Instant>,
668
669 last_self_warn: Option<std::time::Instant>,
675
676 last_local_send_failure_at: Option<std::time::Instant>,
684
685 peer_aliases: HashMap<NodeAddr, String>,
689
690 peer_acl: acl::PeerAclReloader,
692
693 host_map: Arc<HostMap>,
697}
698
699impl Node {
700 pub fn new(config: Config) -> Result<Self, NodeError> {
702 config.validate()?;
703 let identity = config.create_identity()?;
704 let node_addr = *identity.node_addr();
705 let is_leaf_only = config.is_leaf_only();
706
707 let (decrypt_fallback_tx, decrypt_fallback_rx) = tokio::sync::mpsc::unbounded_channel();
708 let decrypt_fallback_rx = Some(decrypt_fallback_rx);
709
710 let mut startup_epoch = [0u8; 8];
711 rand::rng().fill_bytes(&mut startup_epoch);
712
713 let mut bloom_state = if is_leaf_only {
714 BloomState::leaf_only(node_addr)
715 } else {
716 BloomState::new(node_addr)
717 };
718 bloom_state.set_update_debounce_ms(config.node.bloom.update_debounce_ms);
719
720 let tun_state = if config.tun.enabled {
721 TunState::Configured
722 } else {
723 TunState::Disabled
724 };
725
726 let mut tree_state = TreeState::new(node_addr);
728 tree_state.set_parent_hysteresis(config.node.tree.parent_hysteresis);
729 tree_state.set_hold_down(config.node.tree.hold_down_secs);
730 tree_state.set_flap_dampening(
731 config.node.tree.flap_threshold,
732 config.node.tree.flap_window_secs,
733 config.node.tree.flap_dampening_secs,
734 );
735 tree_state
736 .sign_declaration(&identity)
737 .expect("signing own declaration should never fail");
738
739 let coord_cache = CoordCache::new(
740 config.node.cache.coord_size,
741 config.node.cache.coord_ttl_secs * 1000,
742 );
743 let rl = &config.node.rate_limit;
744 let msg1_rate_limiter = HandshakeRateLimiter::with_params(
745 rate_limit::TokenBucket::with_params(rl.handshake_burst, rl.handshake_rate),
746 config.node.limits.max_pending_inbound,
747 );
748
749 let max_connections = config.node.limits.max_connections;
750 let max_peers = config.node.limits.max_peers;
751 let max_links = config.node.limits.max_links;
752 let coords_response_interval_ms = config.node.session.coords_response_interval_ms;
753 let backoff_base_secs = config.node.discovery.backoff_base_secs;
754 let backoff_max_secs = config.node.discovery.backoff_max_secs;
755 let forward_min_interval_secs = config.node.discovery.forward_min_interval_secs;
756
757 let (host_map, peer_acl) = Self::host_map_and_peer_acl(&config);
758
759 Ok(Self {
760 identity,
761 startup_epoch,
762 started_at: std::time::Instant::now(),
763 config,
764 state: NodeState::Created,
765 is_leaf_only,
766 tree_state,
767 bloom_state,
768 coord_cache,
769 learned_routes: LearnedRouteTable::default(),
770 recent_requests: HashMap::new(),
771 transports: HashMap::new(),
772 transport_drops: HashMap::new(),
773 links: HashMap::new(),
774 addr_to_link: HashMap::new(),
775 packet_tx: None,
776 packet_rx: None,
777 connections: HashMap::new(),
778 peers: HashMap::new(),
779 sessions: HashMap::new(),
780 identity_cache: HashMap::new(),
781 pending_tun_packets: HashMap::new(),
782 pending_endpoint_data: HashMap::new(),
783 pending_lookups: HashMap::new(),
784 max_connections,
785 max_peers,
786 max_links,
787 next_link_id: 1,
788 next_transport_id: 1,
789 stats: stats::NodeStats::new(),
790 stats_history: stats_history::StatsHistory::new(),
791 tun_state,
792 tun_name: None,
793 tun_tx: None,
794 tun_outbound_rx: None,
795 external_packet_tx: None,
796 endpoint_command_rx: None,
797 endpoint_event_tx: None,
798 encrypt_workers: None,
799 decrypt_workers: None,
800 decrypt_registered_sessions: std::collections::HashSet::new(),
801 decrypt_fallback_tx,
802 decrypt_fallback_rx,
803 tun_reader_handle: None,
804 tun_writer_handle: None,
805 #[cfg(target_os = "macos")]
806 tun_shutdown_fd: None,
807 dns_identity_rx: None,
808 dns_task: None,
809 index_allocator: IndexAllocator::new(),
810 peers_by_index: HashMap::new(),
811 pending_outbound: HashMap::new(),
812 msg1_rate_limiter,
813 icmp_rate_limiter: IcmpRateLimiter::new(),
814 routing_error_rate_limiter: RoutingErrorRateLimiter::new(),
815 coords_response_rate_limiter: RoutingErrorRateLimiter::with_interval(
816 std::time::Duration::from_millis(coords_response_interval_ms),
817 ),
818 discovery_backoff: DiscoveryBackoff::with_params(backoff_base_secs, backoff_max_secs),
819 discovery_forward_limiter: DiscoveryForwardRateLimiter::with_interval(
820 std::time::Duration::from_secs(forward_min_interval_secs),
821 ),
822 pending_connects: Vec::new(),
823 retry_pending: HashMap::new(),
824 nostr_discovery: None,
825 nostr_discovery_started_at_ms: None,
826 lan_discovery: None,
827 startup_open_discovery_sweep_done: false,
828 bootstrap_transports: HashSet::new(),
829 bootstrap_transport_npubs: HashMap::new(),
830 last_parent_reeval: None,
831 last_congestion_log: None,
832 estimated_mesh_size: None,
833 last_mesh_size_log: None,
834 last_self_warn: None,
835 last_local_send_failure_at: None,
836 peer_aliases: HashMap::new(),
837 peer_acl,
838 host_map,
839 path_mtu_lookup: Arc::new(std::sync::RwLock::new(HashMap::new())),
840 })
841 }
842
843 pub fn with_identity(identity: Identity, config: Config) -> Result<Self, NodeError> {
848 config.validate()?;
849 let node_addr = *identity.node_addr();
850
851 let (decrypt_fallback_tx, decrypt_fallback_rx) = tokio::sync::mpsc::unbounded_channel();
852 let decrypt_fallback_rx = Some(decrypt_fallback_rx);
853
854 let mut startup_epoch = [0u8; 8];
855 rand::rng().fill_bytes(&mut startup_epoch);
856
857 let tun_state = if config.tun.enabled {
858 TunState::Configured
859 } else {
860 TunState::Disabled
861 };
862
863 let mut tree_state = TreeState::new(node_addr);
865 tree_state.set_parent_hysteresis(config.node.tree.parent_hysteresis);
866 tree_state.set_hold_down(config.node.tree.hold_down_secs);
867 tree_state.set_flap_dampening(
868 config.node.tree.flap_threshold,
869 config.node.tree.flap_window_secs,
870 config.node.tree.flap_dampening_secs,
871 );
872 tree_state
873 .sign_declaration(&identity)
874 .expect("signing own declaration should never fail");
875
876 let mut bloom_state = BloomState::new(node_addr);
877 bloom_state.set_update_debounce_ms(config.node.bloom.update_debounce_ms);
878
879 let coord_cache = CoordCache::new(
880 config.node.cache.coord_size,
881 config.node.cache.coord_ttl_secs * 1000,
882 );
883 let rl = &config.node.rate_limit;
884 let msg1_rate_limiter = HandshakeRateLimiter::with_params(
885 rate_limit::TokenBucket::with_params(rl.handshake_burst, rl.handshake_rate),
886 config.node.limits.max_pending_inbound,
887 );
888
889 let max_connections = config.node.limits.max_connections;
890 let max_peers = config.node.limits.max_peers;
891 let max_links = config.node.limits.max_links;
892 let coords_response_interval_ms = config.node.session.coords_response_interval_ms;
893
894 let (host_map, peer_acl) = Self::host_map_and_peer_acl(&config);
895
896 Ok(Self {
897 identity,
898 startup_epoch,
899 started_at: std::time::Instant::now(),
900 config,
901 state: NodeState::Created,
902 is_leaf_only: false,
903 tree_state,
904 bloom_state,
905 coord_cache,
906 learned_routes: LearnedRouteTable::default(),
907 recent_requests: HashMap::new(),
908 transports: HashMap::new(),
909 transport_drops: HashMap::new(),
910 links: HashMap::new(),
911 addr_to_link: HashMap::new(),
912 packet_tx: None,
913 packet_rx: None,
914 connections: HashMap::new(),
915 peers: HashMap::new(),
916 sessions: HashMap::new(),
917 identity_cache: HashMap::new(),
918 pending_tun_packets: HashMap::new(),
919 pending_endpoint_data: HashMap::new(),
920 pending_lookups: HashMap::new(),
921 max_connections,
922 max_peers,
923 max_links,
924 next_link_id: 1,
925 next_transport_id: 1,
926 stats: stats::NodeStats::new(),
927 stats_history: stats_history::StatsHistory::new(),
928 tun_state,
929 tun_name: None,
930 tun_tx: None,
931 tun_outbound_rx: None,
932 external_packet_tx: None,
933 endpoint_command_rx: None,
934 endpoint_event_tx: None,
935 encrypt_workers: None,
936 decrypt_workers: None,
937 decrypt_registered_sessions: std::collections::HashSet::new(),
938 decrypt_fallback_tx,
939 decrypt_fallback_rx,
940 tun_reader_handle: None,
941 tun_writer_handle: None,
942 #[cfg(target_os = "macos")]
943 tun_shutdown_fd: None,
944 dns_identity_rx: None,
945 dns_task: None,
946 index_allocator: IndexAllocator::new(),
947 peers_by_index: HashMap::new(),
948 pending_outbound: HashMap::new(),
949 msg1_rate_limiter,
950 icmp_rate_limiter: IcmpRateLimiter::new(),
951 routing_error_rate_limiter: RoutingErrorRateLimiter::new(),
952 coords_response_rate_limiter: RoutingErrorRateLimiter::with_interval(
953 std::time::Duration::from_millis(coords_response_interval_ms),
954 ),
955 discovery_backoff: DiscoveryBackoff::new(),
956 discovery_forward_limiter: DiscoveryForwardRateLimiter::new(),
957 pending_connects: Vec::new(),
958 retry_pending: HashMap::new(),
959 nostr_discovery: None,
960 nostr_discovery_started_at_ms: None,
961 lan_discovery: None,
962 startup_open_discovery_sweep_done: false,
963 bootstrap_transports: HashSet::new(),
964 bootstrap_transport_npubs: HashMap::new(),
965 last_parent_reeval: None,
966 last_congestion_log: None,
967 estimated_mesh_size: None,
968 last_mesh_size_log: None,
969 last_self_warn: None,
970 last_local_send_failure_at: None,
971 peer_aliases: HashMap::new(),
972 peer_acl,
973 host_map,
974 path_mtu_lookup: Arc::new(std::sync::RwLock::new(HashMap::new())),
975 })
976 }
977
978 pub fn leaf_only(config: Config) -> Result<Self, NodeError> {
980 let mut node = Self::new(config)?;
981 node.is_leaf_only = true;
982 node.bloom_state = BloomState::leaf_only(*node.identity.node_addr());
983 Ok(node)
984 }
985
986 fn host_map_and_peer_acl(config: &Config) -> (Arc<HostMap>, acl::PeerAclReloader) {
987 let base_host_map = HostMap::from_peer_configs(config.peers());
988 if !config.node.system_files_enabled {
989 return (
990 Arc::new(base_host_map.clone()),
991 acl::PeerAclReloader::memory_only(base_host_map),
992 );
993 }
994
995 let mut host_map = base_host_map.clone();
996 let hosts_path = std::path::PathBuf::from(crate::upper::hosts::DEFAULT_HOSTS_PATH);
997 let hosts_file = HostMap::load_hosts_file(std::path::Path::new(
998 crate::upper::hosts::DEFAULT_HOSTS_PATH,
999 ));
1000 host_map.merge(hosts_file);
1001 let peer_acl = acl::PeerAclReloader::with_alias_sources(
1002 std::path::PathBuf::from(acl::DEFAULT_PEERS_ALLOW_PATH),
1003 std::path::PathBuf::from(acl::DEFAULT_PEERS_DENY_PATH),
1004 base_host_map,
1005 hosts_path,
1006 );
1007 (Arc::new(host_map), peer_acl)
1008 }
1009
1010 async fn create_transports(&mut self, packet_tx: &PacketTx) -> Vec<TransportHandle> {
1014 let mut transports = Vec::new();
1015
1016 let udp_instances: Vec<_> = self
1018 .config
1019 .transports
1020 .udp
1021 .iter()
1022 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1023 .collect();
1024
1025 for (name, udp_config) in udp_instances {
1027 let transport_id = self.allocate_transport_id();
1028 let udp = UdpTransport::new(transport_id, name, udp_config, packet_tx.clone());
1029 transports.push(TransportHandle::Udp(udp));
1030 }
1031
1032 #[cfg(feature = "sim-transport")]
1033 {
1034 let sim_instances: Vec<_> = self
1035 .config
1036 .transports
1037 .sim
1038 .iter()
1039 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1040 .collect();
1041
1042 for (name, sim_config) in sim_instances {
1043 let transport_id = self.allocate_transport_id();
1044 let sim = crate::transport::sim::SimTransport::new(
1045 transport_id,
1046 name,
1047 sim_config,
1048 packet_tx.clone(),
1049 );
1050 transports.push(TransportHandle::Sim(sim));
1051 }
1052 }
1053
1054 #[cfg(any(target_os = "linux", target_os = "macos"))]
1056 {
1057 let eth_instances: Vec<_> = self
1058 .config
1059 .transports
1060 .ethernet
1061 .iter()
1062 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1063 .collect();
1064 let xonly = self.identity.pubkey();
1065 for (name, eth_config) in eth_instances {
1066 let transport_id = self.allocate_transport_id();
1067 let mut eth =
1068 EthernetTransport::new(transport_id, name, eth_config, packet_tx.clone());
1069 eth.set_local_pubkey(xonly);
1070 transports.push(TransportHandle::Ethernet(eth));
1071 }
1072 }
1073
1074 let tcp_instances: Vec<_> = self
1076 .config
1077 .transports
1078 .tcp
1079 .iter()
1080 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1081 .collect();
1082
1083 for (name, tcp_config) in tcp_instances {
1084 let transport_id = self.allocate_transport_id();
1085 let tcp = TcpTransport::new(transport_id, name, tcp_config, packet_tx.clone());
1086 transports.push(TransportHandle::Tcp(tcp));
1087 }
1088
1089 let tor_instances: Vec<_> = self
1091 .config
1092 .transports
1093 .tor
1094 .iter()
1095 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1096 .collect();
1097
1098 for (name, tor_config) in tor_instances {
1099 let transport_id = self.allocate_transport_id();
1100 let tor = TorTransport::new(transport_id, name, tor_config, packet_tx.clone());
1101 transports.push(TransportHandle::Tor(tor));
1102 }
1103
1104 #[cfg(bluer_available)]
1106 {
1107 let ble_instances: Vec<_> = self
1108 .config
1109 .transports
1110 .ble
1111 .iter()
1112 .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1113 .collect();
1114
1115 #[cfg(all(bluer_available, not(test)))]
1116 for (name, ble_config) in ble_instances {
1117 let transport_id = self.allocate_transport_id();
1118 let adapter = ble_config.adapter().to_string();
1119 let mtu = ble_config.mtu();
1120 match crate::transport::ble::io::BluerIo::new(&adapter, mtu).await {
1121 Ok(io) => {
1122 let mut ble = crate::transport::ble::BleTransport::new(
1123 transport_id,
1124 name,
1125 ble_config,
1126 io,
1127 packet_tx.clone(),
1128 );
1129 ble.set_local_pubkey(self.identity.pubkey().serialize());
1130 transports.push(TransportHandle::Ble(ble));
1131 }
1132 Err(e) => {
1133 tracing::warn!(adapter = %adapter, error = %e, "failed to initialize BLE adapter");
1134 }
1135 }
1136 }
1137
1138 #[cfg(any(not(bluer_available), test))]
1139 if !ble_instances.is_empty() {
1140 #[cfg(not(test))]
1141 tracing::warn!("BLE transport configured but this build lacks BlueZ support");
1142 }
1143 }
1144
1145 transports
1146 }
1147
1148 fn find_transport_for_type(&self, transport_type: &str) -> Option<TransportId> {
1158 self.transports
1159 .iter()
1160 .filter(|(id, handle)| {
1161 handle.transport_type().name == transport_type
1162 && handle.is_operational()
1163 && !self.bootstrap_transports.contains(id)
1164 })
1165 .min_by_key(|(id, _)| id.as_u32())
1166 .map(|(id, _)| *id)
1167 }
1168
1169 #[allow(unused_variables)]
1175 fn resolve_ethernet_addr(
1176 &self,
1177 addr_str: &str,
1178 ) -> Result<(TransportId, TransportAddr), NodeError> {
1179 #[cfg(any(target_os = "linux", target_os = "macos"))]
1180 {
1181 let (iface, mac_str) = addr_str.split_once('/').ok_or_else(|| {
1182 NodeError::NoTransportForType(format!(
1183 "invalid Ethernet address format '{}': expected 'interface/mac'",
1184 addr_str
1185 ))
1186 })?;
1187
1188 let transport_id = self
1190 .transports
1191 .iter()
1192 .find(|(_, handle)| {
1193 handle.transport_type().name == "ethernet"
1194 && handle.is_operational()
1195 && handle.interface_name() == Some(iface)
1196 })
1197 .map(|(id, _)| *id)
1198 .ok_or_else(|| {
1199 NodeError::NoTransportForType(format!(
1200 "no operational Ethernet transport for interface '{}'",
1201 iface
1202 ))
1203 })?;
1204
1205 let mac = crate::transport::ethernet::parse_mac_string(mac_str).map_err(|e| {
1206 NodeError::NoTransportForType(format!("invalid MAC in '{}': {}", addr_str, e))
1207 })?;
1208
1209 Ok((transport_id, TransportAddr::from_bytes(&mac)))
1210 }
1211 #[cfg(not(any(target_os = "linux", target_os = "macos")))]
1212 {
1213 Err(NodeError::NoTransportForType(
1214 "Ethernet transport is not supported on this platform".to_string(),
1215 ))
1216 }
1217 }
1218
1219 #[cfg(bluer_available)]
1223 fn resolve_ble_addr(&self, addr_str: &str) -> Result<(TransportId, TransportAddr), NodeError> {
1224 let ta = TransportAddr::from_string(addr_str);
1225 let adapter = crate::transport::ble::addr::adapter_from_addr(&ta).ok_or_else(|| {
1226 NodeError::NoTransportForType(format!(
1227 "invalid BLE address format '{}': expected 'adapter/mac'",
1228 addr_str
1229 ))
1230 })?;
1231
1232 let transport_id = self
1234 .transports
1235 .iter()
1236 .find(|(_, handle)| handle.transport_type().name == "ble" && handle.is_operational())
1237 .map(|(id, _)| *id)
1238 .ok_or_else(|| {
1239 NodeError::NoTransportForType(format!(
1240 "no operational BLE transport for adapter '{}'",
1241 adapter
1242 ))
1243 })?;
1244
1245 crate::transport::ble::addr::BleAddr::parse(addr_str).map_err(|e| {
1247 NodeError::NoTransportForType(format!("invalid BLE address '{}': {}", addr_str, e))
1248 })?;
1249
1250 Ok((transport_id, TransportAddr::from_string(addr_str)))
1251 }
1252
1253 pub fn identity(&self) -> &Identity {
1257 &self.identity
1258 }
1259
1260 pub fn node_addr(&self) -> &NodeAddr {
1262 self.identity.node_addr()
1263 }
1264
1265 pub fn npub(&self) -> String {
1267 self.identity.npub()
1268 }
1269
1270 pub(crate) fn peer_display_name(&self, addr: &NodeAddr) -> String {
1279 if let Some(hostname) = self.host_map.lookup_hostname(addr) {
1280 return hostname.to_string();
1281 }
1282 if let Some(name) = self.peer_aliases.get(addr) {
1283 return name.clone();
1284 }
1285 if let Some(peer) = self.peers.get(addr) {
1286 return peer.identity().short_npub();
1287 }
1288 if let Some(entry) = self.sessions.get(addr) {
1289 let (xonly, _) = entry.remote_pubkey().x_only_public_key();
1290 return PeerIdentity::from_pubkey(xonly).short_npub();
1291 }
1292 addr.short_hex()
1293 }
1294
1295 pub(in crate::node) fn deregister_session_index(&mut self, cache_key: (TransportId, u32)) {
1307 let owning_peer = self.peers_by_index.get(&cache_key).copied();
1311 self.peers_by_index.remove(&cache_key);
1312 if self.decrypt_registered_sessions.remove(&cache_key)
1313 && let Some(workers) = self.decrypt_workers.as_ref()
1314 {
1315 workers.unregister_session(cache_key);
1316 }
1317 if let Some(peer_addr) = owning_peer {
1328 let peer_has_other_index = self
1329 .peers_by_index
1330 .values()
1331 .any(|other| *other == peer_addr);
1332 if !peer_has_other_index {
1333 self.clear_connected_udp_for_peer(&peer_addr);
1334 }
1335 }
1336 }
1337
1338 pub(in crate::node) fn ensure_current_session_index_registered(
1347 &mut self,
1348 node_addr: &NodeAddr,
1349 context: &'static str,
1350 ) -> bool {
1351 let Some(peer) = self.peers.get(node_addr) else {
1352 return false;
1353 };
1354 let Some(transport_id) = peer.transport_id() else {
1355 warn!(
1356 peer = %self.peer_display_name(node_addr),
1357 context,
1358 "Cannot register current session index without transport id"
1359 );
1360 return false;
1361 };
1362 let Some(our_index) = peer.our_index() else {
1363 warn!(
1364 peer = %self.peer_display_name(node_addr),
1365 context,
1366 "Cannot register current session index without local index"
1367 );
1368 return false;
1369 };
1370
1371 let cache_key = (transport_id, our_index.as_u32());
1372 match self.peers_by_index.get(&cache_key).copied() {
1373 Some(existing) if existing == *node_addr => true,
1374 Some(existing) => {
1375 warn!(
1376 peer = %self.peer_display_name(node_addr),
1377 previous_owner = %self.peer_display_name(&existing),
1378 transport_id = %transport_id,
1379 our_index = %our_index,
1380 context,
1381 "Repairing current session index with stale owner"
1382 );
1383 self.peers_by_index.insert(cache_key, *node_addr);
1384 true
1385 }
1386 None => {
1387 warn!(
1388 peer = %self.peer_display_name(node_addr),
1389 transport_id = %transport_id,
1390 our_index = %our_index,
1391 context,
1392 "Repairing missing current session index"
1393 );
1394 self.peers_by_index.insert(cache_key, *node_addr);
1395 true
1396 }
1397 }
1398 }
1399
1400 pub fn config(&self) -> &Config {
1404 &self.config
1405 }
1406
1407 pub fn effective_ipv6_mtu(&self) -> u16 {
1413 crate::upper::icmp::effective_ipv6_mtu(self.transport_mtu())
1414 }
1415
1416 pub fn transport_mtu(&self) -> u16 {
1433 let min_operational = self
1434 .transports
1435 .values()
1436 .filter(|h| h.is_operational())
1437 .map(|h| h.mtu())
1438 .min();
1439 if let Some(mtu) = min_operational {
1440 return mtu;
1441 }
1442 if let Some((_, cfg)) = self.config.transports.udp.iter().next() {
1444 return cfg.mtu();
1445 }
1446 1280
1447 }
1448
1449 pub fn state(&self) -> NodeState {
1453 self.state
1454 }
1455
1456 pub fn uptime(&self) -> std::time::Duration {
1458 self.started_at.elapsed()
1459 }
1460
1461 pub fn is_running(&self) -> bool {
1463 self.state.is_operational()
1464 }
1465
1466 pub fn is_leaf_only(&self) -> bool {
1468 self.is_leaf_only
1469 }
1470
1471 pub fn tree_state(&self) -> &TreeState {
1475 &self.tree_state
1476 }
1477
1478 pub fn tree_state_mut(&mut self) -> &mut TreeState {
1480 &mut self.tree_state
1481 }
1482
1483 pub fn bloom_state(&self) -> &BloomState {
1487 &self.bloom_state
1488 }
1489
1490 pub fn bloom_state_mut(&mut self) -> &mut BloomState {
1492 &mut self.bloom_state
1493 }
1494
1495 pub fn estimated_mesh_size(&self) -> Option<u64> {
1499 self.estimated_mesh_size
1500 }
1501
1502 pub(crate) fn compute_mesh_size(&mut self) {
1508 let my_addr = *self.tree_state.my_node_addr();
1509 let parent_id = *self.tree_state.my_declaration().parent_id();
1510 let is_root = self.tree_state.is_root();
1511
1512 let max_fpr = self.config.node.bloom.max_inbound_fpr;
1513 let mut total: f64 = 1.0; let mut child_count: u32 = 0;
1515 let mut has_data = false;
1516
1517 if !is_root
1523 && let Some(parent) = self.peers.get(&parent_id)
1524 && let Some(filter) = parent.inbound_filter()
1525 {
1526 match filter.estimated_count(max_fpr) {
1527 Some(n) => {
1528 total += n;
1529 has_data = true;
1530 }
1531 None => {
1532 self.estimated_mesh_size = None;
1533 return;
1534 }
1535 }
1536 }
1537
1538 for (peer_addr, peer) in &self.peers {
1540 if let Some(decl) = self.tree_state.peer_declaration(peer_addr)
1541 && *decl.parent_id() == my_addr
1542 {
1543 child_count += 1;
1544 if let Some(filter) = peer.inbound_filter() {
1545 match filter.estimated_count(max_fpr) {
1546 Some(n) => {
1547 total += n;
1548 has_data = true;
1549 }
1550 None => {
1551 self.estimated_mesh_size = None;
1552 return;
1553 }
1554 }
1555 }
1556 }
1557 }
1558
1559 if !has_data {
1560 self.estimated_mesh_size = None;
1561 return;
1562 }
1563
1564 let size = total.round() as u64;
1565 self.estimated_mesh_size = Some(size);
1566
1567 let now = std::time::Instant::now();
1569 let should_log = match self.last_mesh_size_log {
1570 None => true,
1571 Some(last) => {
1572 now.duration_since(last)
1573 >= std::time::Duration::from_secs(self.config.node.mmp.log_interval_secs)
1574 }
1575 };
1576 if should_log {
1577 tracing::debug!(
1578 estimated_mesh_size = size,
1579 peers = self.peers.len(),
1580 children = child_count,
1581 "Mesh size estimate"
1582 );
1583 self.last_mesh_size_log = Some(now);
1584 }
1585 }
1586
1587 pub fn coord_cache(&self) -> &CoordCache {
1591 &self.coord_cache
1592 }
1593
1594 pub fn coord_cache_mut(&mut self) -> &mut CoordCache {
1596 &mut self.coord_cache
1597 }
1598
1599 pub fn stats(&self) -> &stats::NodeStats {
1603 &self.stats
1604 }
1605
1606 pub(crate) fn stats_mut(&mut self) -> &mut stats::NodeStats {
1608 &mut self.stats
1609 }
1610
1611 pub fn stats_history(&self) -> &stats_history::StatsHistory {
1613 &self.stats_history
1614 }
1615
1616 pub(crate) fn record_stats_history(&mut self) {
1619 let fwd = &self.stats.forwarding;
1620 let peers_with_mmp: Vec<f64> = self
1621 .peers
1622 .values()
1623 .filter_map(|p| p.mmp().map(|m| m.metrics.loss_rate()))
1624 .collect();
1625 let loss_rate = if peers_with_mmp.is_empty() {
1626 0.0
1627 } else {
1628 peers_with_mmp.iter().sum::<f64>() / peers_with_mmp.len() as f64
1629 };
1630
1631 let snap = stats_history::Snapshot {
1632 mesh_size: self.estimated_mesh_size,
1633 tree_depth: self.tree_state.my_coords().depth() as u32,
1634 peer_count: self.peers.len() as u64,
1635 parent_switches_total: self.stats.tree.parent_switches,
1636 bytes_in_total: fwd.received_bytes,
1637 bytes_out_total: fwd.forwarded_bytes + fwd.originated_bytes,
1638 packets_in_total: fwd.received_packets,
1639 packets_out_total: fwd.forwarded_packets + fwd.originated_packets,
1640 loss_rate,
1641 active_sessions: self.sessions.len() as u64,
1642 };
1643
1644 let now = std::time::Instant::now();
1645 let peer_snaps: Vec<stats_history::PeerSnapshot> = self
1646 .peers
1647 .values()
1648 .map(|p| {
1649 let stats = p.link_stats();
1650 let (srtt_ms, loss_rate, ecn_ce) = match p.mmp() {
1651 Some(m) => (
1652 m.metrics.srtt_ms(),
1653 Some(m.metrics.loss_rate()),
1654 m.receiver.ecn_ce_count() as u64,
1655 ),
1656 None => (None, None, 0),
1657 };
1658 stats_history::PeerSnapshot {
1659 node_addr: *p.node_addr(),
1660 last_seen: now,
1661 srtt_ms,
1662 loss_rate,
1663 bytes_in_total: stats.bytes_recv,
1664 bytes_out_total: stats.bytes_sent,
1665 packets_in_total: stats.packets_recv,
1666 packets_out_total: stats.packets_sent,
1667 ecn_ce_total: ecn_ce,
1668 }
1669 })
1670 .collect();
1671
1672 self.stats_history.tick(now, &snap, &peer_snaps);
1673 }
1674
1675 pub fn tun_state(&self) -> TunState {
1679 self.tun_state
1680 }
1681
1682 pub fn tun_name(&self) -> Option<&str> {
1684 self.tun_name.as_deref()
1685 }
1686
1687 pub fn set_max_connections(&mut self, max: usize) {
1691 self.max_connections = max;
1692 }
1693
1694 pub fn set_max_peers(&mut self, max: usize) {
1696 self.max_peers = max;
1697 }
1698
1699 pub fn set_max_links(&mut self, max: usize) {
1701 self.max_links = max;
1702 }
1703
1704 pub fn connection_count(&self) -> usize {
1708 self.connections.len()
1709 }
1710
1711 pub fn peer_count(&self) -> usize {
1713 self.peers.len()
1714 }
1715
1716 pub fn link_count(&self) -> usize {
1718 self.links.len()
1719 }
1720
1721 pub fn transport_count(&self) -> usize {
1723 self.transports.len()
1724 }
1725
1726 pub fn allocate_transport_id(&mut self) -> TransportId {
1730 let id = TransportId::new(self.next_transport_id);
1731 self.next_transport_id += 1;
1732 id
1733 }
1734
1735 pub fn get_transport(&self, id: &TransportId) -> Option<&TransportHandle> {
1737 self.transports.get(id)
1738 }
1739
1740 pub fn get_transport_mut(&mut self, id: &TransportId) -> Option<&mut TransportHandle> {
1742 self.transports.get_mut(id)
1743 }
1744
1745 pub fn transport_ids(&self) -> impl Iterator<Item = &TransportId> {
1747 self.transports.keys()
1748 }
1749
1750 pub fn packet_rx(&mut self) -> Option<&mut PacketRx> {
1752 self.packet_rx.as_mut()
1753 }
1754
1755 pub fn allocate_link_id(&mut self) -> LinkId {
1759 let id = LinkId::new(self.next_link_id);
1760 self.next_link_id += 1;
1761 id
1762 }
1763
1764 pub fn add_link(&mut self, link: Link) -> Result<(), NodeError> {
1766 if self.max_links > 0 && self.links.len() >= self.max_links {
1767 return Err(NodeError::MaxLinksExceeded {
1768 max: self.max_links,
1769 });
1770 }
1771 let link_id = link.link_id();
1772 let transport_id = link.transport_id();
1773 let remote_addr = link.remote_addr().clone();
1774
1775 self.links.insert(link_id, link);
1776 self.addr_to_link
1777 .insert((transport_id, remote_addr), link_id);
1778 Ok(())
1779 }
1780
1781 pub fn get_link(&self, link_id: &LinkId) -> Option<&Link> {
1783 self.links.get(link_id)
1784 }
1785
1786 pub fn get_link_mut(&mut self, link_id: &LinkId) -> Option<&mut Link> {
1788 self.links.get_mut(link_id)
1789 }
1790
1791 pub fn find_link_by_addr(
1793 &self,
1794 transport_id: TransportId,
1795 addr: &TransportAddr,
1796 ) -> Option<LinkId> {
1797 self.addr_to_link
1798 .get(&(transport_id, addr.clone()))
1799 .copied()
1800 }
1801
1802 pub fn remove_link(&mut self, link_id: &LinkId) -> Option<Link> {
1808 if let Some(link) = self.links.remove(link_id) {
1809 let key = (link.transport_id(), link.remote_addr().clone());
1811 if self.addr_to_link.get(&key) == Some(link_id) {
1812 self.addr_to_link.remove(&key);
1813 }
1814 Some(link)
1815 } else {
1816 None
1817 }
1818 }
1819
1820 pub(crate) fn cleanup_bootstrap_transport_if_unused(&mut self, transport_id: TransportId) {
1821 if !self.bootstrap_transports.contains(&transport_id) {
1822 return;
1823 }
1824
1825 let transport_in_use = self
1826 .links
1827 .values()
1828 .any(|link| link.transport_id() == transport_id)
1829 || self
1830 .connections
1831 .values()
1832 .any(|conn| conn.transport_id() == Some(transport_id))
1833 || self
1834 .peers
1835 .values()
1836 .any(|peer| peer.transport_id() == Some(transport_id))
1837 || self
1838 .pending_connects
1839 .iter()
1840 .any(|pending| pending.transport_id == transport_id);
1841
1842 if transport_in_use {
1843 return;
1844 }
1845
1846 tracing::debug!(
1847 transport_id = %transport_id,
1848 "bootstrap transport has no remaining references; dropping"
1849 );
1850
1851 self.bootstrap_transports.remove(&transport_id);
1852 self.bootstrap_transport_npubs.remove(&transport_id);
1853 self.transport_drops.remove(&transport_id);
1854 self.transports.remove(&transport_id);
1855 }
1856
1857 pub fn links(&self) -> impl Iterator<Item = &Link> {
1859 self.links.values()
1860 }
1861
1862 pub fn add_connection(&mut self, connection: PeerConnection) -> Result<(), NodeError> {
1866 let link_id = connection.link_id();
1867
1868 if self.connections.contains_key(&link_id) {
1869 return Err(NodeError::ConnectionAlreadyExists(link_id));
1870 }
1871
1872 if self.max_connections > 0 && self.connections.len() >= self.max_connections {
1873 return Err(NodeError::MaxConnectionsExceeded {
1874 max: self.max_connections,
1875 });
1876 }
1877
1878 self.connections.insert(link_id, connection);
1879 Ok(())
1880 }
1881
1882 pub fn get_connection(&self, link_id: &LinkId) -> Option<&PeerConnection> {
1884 self.connections.get(link_id)
1885 }
1886
1887 pub fn get_connection_mut(&mut self, link_id: &LinkId) -> Option<&mut PeerConnection> {
1889 self.connections.get_mut(link_id)
1890 }
1891
1892 pub fn remove_connection(&mut self, link_id: &LinkId) -> Option<PeerConnection> {
1894 self.connections.remove(link_id)
1895 }
1896
1897 pub fn connections(&self) -> impl Iterator<Item = &PeerConnection> {
1899 self.connections.values()
1900 }
1901
1902 pub fn get_peer(&self, node_addr: &NodeAddr) -> Option<&ActivePeer> {
1906 self.peers.get(node_addr)
1907 }
1908
1909 pub fn get_peer_mut(&mut self, node_addr: &NodeAddr) -> Option<&mut ActivePeer> {
1911 self.peers.get_mut(node_addr)
1912 }
1913
1914 pub fn remove_peer(&mut self, node_addr: &NodeAddr) -> Option<ActivePeer> {
1916 self.peers.remove(node_addr)
1917 }
1918
1919 pub fn peers(&self) -> impl Iterator<Item = &ActivePeer> {
1921 self.peers.values()
1922 }
1923
1924 pub fn nostr_discovery_handle(&self) -> Option<&crate::discovery::nostr::NostrDiscovery> {
1928 self.nostr_discovery.as_deref()
1929 }
1930
1931 pub fn peer_ids(&self) -> impl Iterator<Item = &NodeAddr> {
1933 self.peers.keys()
1934 }
1935
1936 pub fn sendable_peers(&self) -> impl Iterator<Item = &ActivePeer> {
1938 self.peers.values().filter(|p| p.can_send())
1939 }
1940
1941 pub fn sendable_peer_count(&self) -> usize {
1943 self.peers.values().filter(|p| p.can_send()).count()
1944 }
1945
1946 #[cfg(test)]
1951 pub(crate) fn disable_discovery_forward_rate_limit(&mut self) {
1952 self.discovery_forward_limiter
1953 .set_interval(std::time::Duration::ZERO);
1954 }
1955
1956 #[cfg(test)]
1957 pub(crate) fn get_session(&self, remote: &NodeAddr) -> Option<&SessionEntry> {
1958 self.sessions.get(remote)
1959 }
1960
1961 #[cfg(test)]
1963 pub(crate) fn get_session_mut(&mut self, remote: &NodeAddr) -> Option<&mut SessionEntry> {
1964 self.sessions.get_mut(remote)
1965 }
1966
1967 #[cfg(test)]
1969 pub(crate) fn remove_session(&mut self, remote: &NodeAddr) -> Option<SessionEntry> {
1970 self.sessions.remove(remote)
1971 }
1972
1973 #[cfg(test)]
1975 pub(crate) fn path_mtu_lookup_get(&self, fips_addr: &crate::FipsAddress) -> Option<u16> {
1976 self.path_mtu_lookup
1977 .read()
1978 .ok()
1979 .and_then(|map| map.get(fips_addr).copied())
1980 }
1981
1982 #[cfg(test)]
1984 pub(crate) fn path_mtu_lookup_insert(&self, fips_addr: crate::FipsAddress, mtu: u16) {
1985 if let Ok(mut map) = self.path_mtu_lookup.write() {
1986 map.insert(fips_addr, mtu);
1987 }
1988 }
1989
1990 pub fn session_count(&self) -> usize {
1992 self.sessions.len()
1993 }
1994
1995 pub(crate) fn session_entries(&self) -> impl Iterator<Item = (&NodeAddr, &SessionEntry)> {
1997 self.sessions.iter()
1998 }
1999
2000 pub(crate) fn register_identity(
2004 &mut self,
2005 node_addr: NodeAddr,
2006 pubkey: secp256k1::PublicKey,
2007 ) -> bool {
2008 let mut prefix = [0u8; 15];
2009 prefix.copy_from_slice(&node_addr.as_bytes()[0..15]);
2010 if let Some(entry) = self.identity_cache.get(&prefix)
2011 && entry.node_addr == node_addr
2012 && entry.pubkey == pubkey
2013 {
2014 return true;
2018 }
2019
2020 let (xonly, _) = pubkey.x_only_public_key();
2021 let derived_node_addr = NodeAddr::from_pubkey(&xonly);
2022 if derived_node_addr != node_addr {
2023 debug!(
2024 claimed_node_addr = %node_addr,
2025 derived_node_addr = %derived_node_addr,
2026 "Rejected identity cache entry with mismatched public key"
2027 );
2028 return false;
2029 }
2030
2031 let now_ms = Self::now_ms();
2032 if let Some(entry) = self.identity_cache.get_mut(&prefix)
2033 && entry.node_addr == node_addr
2034 {
2035 entry.pubkey = pubkey;
2036 entry.last_seen_ms = now_ms;
2037 return true;
2038 }
2039
2040 let npub = encode_npub(&xonly);
2041 self.identity_cache.insert(
2042 prefix,
2043 IdentityCacheEntry::new(node_addr, pubkey, npub, now_ms),
2044 );
2045 let max = self.config.node.cache.identity_size;
2047 if self.identity_cache.len() > max
2048 && let Some(oldest_key) = self
2049 .identity_cache
2050 .iter()
2051 .min_by_key(|(_, entry)| entry.last_seen_ms)
2052 .map(|(k, _)| *k)
2053 {
2054 self.identity_cache.remove(&oldest_key);
2055 }
2056 true
2057 }
2058
2059 pub(crate) fn lookup_by_fips_prefix(
2061 &mut self,
2062 prefix: &[u8; 15],
2063 ) -> Option<(NodeAddr, secp256k1::PublicKey)> {
2064 if let Some(entry) = self.identity_cache.get_mut(prefix) {
2065 entry.last_seen_ms = Self::now_ms(); Some((entry.node_addr, entry.pubkey))
2067 } else {
2068 None
2069 }
2070 }
2071
2072 pub(crate) fn has_cached_identity(&self, addr: &NodeAddr) -> bool {
2074 let mut prefix = [0u8; 15];
2075 prefix.copy_from_slice(&addr.as_bytes()[0..15]);
2076 self.identity_cache.contains_key(&prefix)
2077 }
2078
2079 pub fn identity_cache_len(&self) -> usize {
2081 self.identity_cache.len()
2082 }
2083
2084 pub fn identity_cache_iter(
2089 &self,
2090 ) -> impl Iterator<Item = (&NodeAddr, &secp256k1::PublicKey, u64)> {
2091 self.identity_cache
2092 .values()
2093 .map(|entry| (&entry.node_addr, &entry.pubkey, entry.last_seen_ms))
2094 }
2095
2096 pub fn identity_cache_max(&self) -> usize {
2098 self.config.node.cache.identity_size
2099 }
2100
2101 pub fn pending_lookup_count(&self) -> usize {
2103 self.pending_lookups.len()
2104 }
2105
2106 pub fn pending_lookups_iter(
2108 &self,
2109 ) -> impl Iterator<Item = (&NodeAddr, &handlers::discovery::PendingLookup)> {
2110 self.pending_lookups.iter()
2111 }
2112
2113 pub fn recent_request_count(&self) -> usize {
2115 self.recent_requests.len()
2116 }
2117
2118 pub fn pending_tun_destinations(&self) -> usize {
2120 self.pending_tun_packets.len()
2121 }
2122
2123 pub fn pending_tun_total_packets(&self) -> usize {
2125 self.pending_tun_packets.values().map(|q| q.len()).sum()
2126 }
2127
2128 pub fn retry_state_iter(&self) -> impl Iterator<Item = (&NodeAddr, &retry::RetryState)> {
2130 self.retry_pending.iter()
2131 }
2132
2133 pub(crate) fn is_tree_peer(&self, peer_addr: &NodeAddr) -> bool {
2140 if !self.tree_state.is_root() && self.tree_state.my_declaration().parent_id() == peer_addr {
2142 return true;
2143 }
2144 if let Some(decl) = self.tree_state.peer_declaration(peer_addr)
2146 && decl.parent_id() == self.node_addr()
2147 {
2148 return true;
2149 }
2150 false
2151 }
2152
2153 pub fn find_next_hop(&mut self, dest_node_addr: &NodeAddr) -> Option<&ActivePeer> {
2176 if dest_node_addr == self.node_addr() {
2178 return None;
2179 }
2180
2181 if let Some(peer) = self.peers.get(dest_node_addr)
2183 && peer.can_send()
2184 {
2185 return Some(peer);
2186 }
2187
2188 let now_ms = Self::now_ms();
2189
2190 let sendable_learned_peers = if self.config.node.routing.mode == RoutingMode::ReplyLearned {
2191 Some(
2192 self.peers
2193 .iter()
2194 .filter(|(_, peer)| peer.can_send())
2195 .map(|(addr, _)| *addr)
2196 .collect::<HashSet<_>>(),
2197 )
2198 } else {
2199 None
2200 };
2201
2202 let explore_fallback = sendable_learned_peers.as_ref().is_some_and(|sendable| {
2209 self.learned_routes.should_explore_fallback(
2210 dest_node_addr,
2211 now_ms,
2212 self.config.node.routing.learned_fallback_explore_interval,
2213 |addr| sendable.contains(addr),
2214 )
2215 });
2216 if let Some(sendable) = &sendable_learned_peers
2217 && !explore_fallback
2218 && let Some(next_hop_addr) =
2219 self.learned_routes
2220 .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2221 {
2222 return self.peers.get(&next_hop_addr);
2223 }
2224
2225 let Some(dest_coords) = self
2227 .coord_cache
2228 .get_and_touch(dest_node_addr, now_ms)
2229 .cloned()
2230 else {
2231 if let Some(sendable) = &sendable_learned_peers
2232 && let Some(next_hop_addr) =
2233 self.learned_routes
2234 .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2235 {
2236 return self.peers.get(&next_hop_addr);
2237 }
2238 return None;
2239 };
2240
2241 let coordinate_route_addr = {
2244 let candidates: Vec<&ActivePeer> = self.destination_in_filters(dest_node_addr);
2245 if !candidates.is_empty() {
2246 self.select_best_candidate(&candidates, &dest_coords)
2247 .map(|peer| *peer.node_addr())
2248 } else {
2249 None
2250 }
2251 };
2252 if let Some(next_hop_addr) = coordinate_route_addr {
2253 return self.peers.get(&next_hop_addr);
2254 }
2255
2256 let tree_route_addr = self
2258 .tree_state
2259 .find_next_hop(&dest_coords)
2260 .filter(|next_hop_id| {
2261 self.peers
2262 .get(next_hop_id)
2263 .is_some_and(|peer| peer.can_send())
2264 });
2265 if let Some(next_hop_addr) = tree_route_addr {
2266 return self.peers.get(&next_hop_addr);
2267 }
2268 if explore_fallback {
2269 return sendable_learned_peers.as_ref().and_then(|sendable| {
2270 self.learned_routes
2271 .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2272 .and_then(|next_hop_addr| self.peers.get(&next_hop_addr))
2273 });
2274 }
2275
2276 if let Some(sendable) = &sendable_learned_peers
2277 && let Some(next_hop_addr) =
2278 self.learned_routes
2279 .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2280 {
2281 return self.peers.get(&next_hop_addr);
2282 }
2283
2284 None
2285 }
2286
2287 pub(in crate::node) fn learn_reverse_route(
2288 &mut self,
2289 destination: NodeAddr,
2290 next_hop: NodeAddr,
2291 ) {
2292 if self.config.node.routing.mode != RoutingMode::ReplyLearned
2293 || destination == *self.node_addr()
2294 {
2295 return;
2296 }
2297 let now_ms = Self::now_ms();
2298 self.learned_routes.learn(
2299 destination,
2300 next_hop,
2301 now_ms,
2302 self.config.node.routing.learned_ttl_secs,
2303 self.config.node.routing.max_learned_routes_per_dest,
2304 );
2305 }
2306
2307 pub(in crate::node) fn record_route_failure(
2308 &mut self,
2309 destination: NodeAddr,
2310 next_hop: NodeAddr,
2311 ) {
2312 if self.config.node.routing.mode != RoutingMode::ReplyLearned {
2313 return;
2314 }
2315 self.learned_routes.record_failure(&destination, &next_hop);
2316 }
2317
2318 pub(crate) fn learned_route_table_snapshot(&self, now_ms: u64) -> LearnedRouteTableSnapshot {
2319 self.learned_routes.snapshot(now_ms)
2320 }
2321
2322 pub(in crate::node) fn purge_learned_routes(&mut self, now_ms: u64) {
2323 self.learned_routes.purge_expired(now_ms);
2324 }
2325
2326 fn select_best_candidate<'a>(
2335 &'a self,
2336 candidates: &[&'a ActivePeer],
2337 dest_coords: &crate::tree::TreeCoordinate,
2338 ) -> Option<&'a ActivePeer> {
2339 let my_distance = self.tree_state.my_coords().distance_to(dest_coords);
2340
2341 let mut best: Option<(&ActivePeer, f64, usize)> = None;
2342
2343 for &candidate in candidates {
2344 if !candidate.can_send() {
2345 continue;
2346 }
2347
2348 let cost = candidate.link_cost();
2349
2350 let dist = self
2351 .tree_state
2352 .peer_coords(candidate.node_addr())
2353 .map(|pc| pc.distance_to(dest_coords))
2354 .unwrap_or(usize::MAX);
2355
2356 if dist >= my_distance {
2359 continue;
2360 }
2361
2362 let dominated = match &best {
2363 None => true,
2364 Some((_, best_cost, best_dist)) => {
2365 cost < *best_cost
2366 || (cost == *best_cost && dist < *best_dist)
2367 || (cost == *best_cost
2368 && dist == *best_dist
2369 && candidate.node_addr() < best.as_ref().unwrap().0.node_addr())
2370 }
2371 };
2372
2373 if dominated {
2374 best = Some((candidate, cost, dist));
2375 }
2376 }
2377
2378 best.map(|(peer, _, _)| peer)
2379 }
2380
2381 pub fn destination_in_filters(&self, dest: &NodeAddr) -> Vec<&ActivePeer> {
2383 self.peers.values().filter(|p| p.may_reach(dest)).collect()
2384 }
2385
2386 pub fn tun_tx(&self) -> Option<&TunTx> {
2390 self.tun_tx.as_ref()
2391 }
2392
2393 pub fn attach_external_packet_io(
2400 &mut self,
2401 capacity: usize,
2402 ) -> Result<ExternalPacketIo, NodeError> {
2403 if self.state != NodeState::Created {
2404 return Err(NodeError::Config(ConfigError::Validation(
2405 "external packet I/O must be attached before node start".to_string(),
2406 )));
2407 }
2408 if self.config.tun.enabled {
2409 return Err(NodeError::Config(ConfigError::Validation(
2410 "external packet I/O requires tun.enabled=false".to_string(),
2411 )));
2412 }
2413
2414 let capacity = capacity.max(1);
2415 let (outbound_tx, outbound_rx) = tokio::sync::mpsc::channel(capacity);
2416 let (inbound_tx, inbound_rx) = tokio::sync::mpsc::channel(capacity);
2417 self.tun_outbound_rx = Some(outbound_rx);
2418 self.external_packet_tx = Some(inbound_tx);
2419
2420 Ok(ExternalPacketIo {
2421 outbound_tx,
2422 inbound_rx,
2423 })
2424 }
2425
2426 pub(crate) fn attach_endpoint_data_io(
2431 &mut self,
2432 capacity: usize,
2433 ) -> Result<EndpointDataIo, NodeError> {
2434 if self.state != NodeState::Created {
2435 return Err(NodeError::Config(ConfigError::Validation(
2436 "endpoint data I/O must be attached before node start".to_string(),
2437 )));
2438 }
2439
2440 let command_capacity = endpoint_data_command_capacity(capacity);
2441 let (command_tx, command_rx) = tokio::sync::mpsc::channel(command_capacity);
2442 let (event_tx, event_rx) = tokio::sync::mpsc::unbounded_channel();
2447 self.endpoint_command_rx = Some(command_rx);
2448 self.endpoint_event_tx = Some(event_tx.clone());
2449
2450 Ok(EndpointDataIo {
2451 command_tx,
2452 event_rx,
2453 event_tx,
2454 })
2455 }
2456
2457 pub(crate) fn pubkey_for_node_addr(&self, addr: &NodeAddr) -> Option<secp256k1::PublicKey> {
2458 let mut prefix = [0u8; 15];
2459 prefix.copy_from_slice(&addr.as_bytes()[0..15]);
2460 self.identity_cache
2461 .get(&prefix)
2462 .filter(|entry| &entry.node_addr == addr)
2463 .map(|entry| entry.pubkey)
2464 }
2465
2466 pub(crate) fn npub_for_node_addr(&self, addr: &NodeAddr) -> Option<String> {
2467 let mut prefix = [0u8; 15];
2468 prefix.copy_from_slice(&addr.as_bytes()[0..15]);
2469 self.identity_cache
2470 .get(&prefix)
2471 .filter(|entry| &entry.node_addr == addr)
2472 .map(|entry| entry.npub.clone())
2473 }
2474
2475 pub(in crate::node) fn deliver_external_ipv6_packet(
2476 &self,
2477 src_addr: &NodeAddr,
2478 packet: Vec<u8>,
2479 ) {
2480 let Some(external_packet_tx) = &self.external_packet_tx else {
2481 return;
2482 };
2483 if packet.len() < 40 {
2484 return;
2485 }
2486 let Ok(destination) = FipsAddress::from_slice(&packet[24..40]) else {
2487 return;
2488 };
2489 let delivered = NodeDeliveredPacket {
2490 source_node_addr: *src_addr,
2491 source_npub: self.npub_for_node_addr(src_addr),
2492 destination,
2493 packet,
2494 };
2495 if let Err(error) = external_packet_tx.try_send(delivered) {
2496 debug!(error = %error, "Failed to deliver packet to external app sink");
2497 }
2498 }
2499
2500 pub(super) async fn send_encrypted_link_message(
2514 &mut self,
2515 node_addr: &NodeAddr,
2516 plaintext: &[u8],
2517 ) -> Result<(), NodeError> {
2518 self.send_encrypted_link_message_with_ce(node_addr, plaintext, false)
2519 .await
2520 }
2521
2522 pub(in crate::node) fn note_local_send_outcome(
2528 &mut self,
2529 result: &Result<usize, TransportError>,
2530 ) {
2531 match result {
2532 Ok(_) => {
2533 if self.last_local_send_failure_at.is_some() {
2534 self.last_local_send_failure_at = None;
2535 }
2536 }
2537 Err(TransportError::Io(e))
2538 if matches!(
2539 e.kind(),
2540 std::io::ErrorKind::NetworkUnreachable
2541 | std::io::ErrorKind::HostUnreachable
2542 | std::io::ErrorKind::AddrNotAvailable
2543 ) =>
2544 {
2545 self.last_local_send_failure_at = Some(std::time::Instant::now());
2546 }
2547 Err(_) => {}
2548 }
2549 }
2550
2551 pub(in crate::node) fn last_local_send_failure_at(&self) -> Option<std::time::Instant> {
2554 self.last_local_send_failure_at
2555 }
2556
2557 pub(super) async fn send_encrypted_link_message_with_ce(
2561 &mut self,
2562 node_addr: &NodeAddr,
2563 plaintext: &[u8],
2564 ce_flag: bool,
2565 ) -> Result<(), NodeError> {
2566 let peer = self
2567 .peers
2568 .get_mut(node_addr)
2569 .ok_or(NodeError::PeerNotFound(*node_addr))?;
2570
2571 let their_index = peer.their_index().ok_or_else(|| NodeError::SendFailed {
2572 node_addr: *node_addr,
2573 reason: "no their_index".into(),
2574 })?;
2575 let transport_id = peer.transport_id().ok_or_else(|| NodeError::SendFailed {
2576 node_addr: *node_addr,
2577 reason: "no transport_id".into(),
2578 })?;
2579 let remote_addr = peer
2580 .current_addr()
2581 .cloned()
2582 .ok_or_else(|| NodeError::SendFailed {
2583 node_addr: *node_addr,
2584 reason: "no current_addr".into(),
2585 })?;
2586 #[cfg(any(target_os = "linux", target_os = "macos"))]
2587 let connected_socket = peer.connected_udp();
2588
2589 let timestamp_ms = peer.session_elapsed_ms();
2591
2592 let sp_flag = peer.mmp().map(|mmp| mmp.spin_bit.tx_bit()).unwrap_or(false);
2594 let mut flags = if sp_flag { FLAG_SP } else { 0 };
2595 if ce_flag {
2596 flags |= FLAG_CE;
2597 }
2598 if peer.current_k_bit() {
2599 flags |= FLAG_KEY_EPOCH;
2600 }
2601
2602 let session = peer
2603 .noise_session_mut()
2604 .ok_or_else(|| NodeError::SendFailed {
2605 node_addr: *node_addr,
2606 reason: "no noise session".into(),
2607 })?;
2608
2609 const INNER_TS_LEN: usize = 4;
2617 let counter = session.current_send_counter();
2618 let inner_len = INNER_TS_LEN + plaintext.len();
2619 let payload_len = inner_len as u16;
2620 let header = build_established_header(their_index, counter, flags, payload_len);
2621
2622 let transport_for_send = self
2641 .transports
2642 .get(&transport_id)
2643 .ok_or(NodeError::TransportNotFound(transport_id))?;
2644 let is_udp = matches!(transport_for_send, TransportHandle::Udp(_));
2645 if let Some(workers) = self.encrypt_workers.as_ref().cloned()
2646 && is_udp
2647 && let Some(cipher_clone) = session.send_cipher_clone()
2648 {
2649 {
2650 let reserved_counter =
2654 session
2655 .take_send_counter()
2656 .map_err(|e| NodeError::SendFailed {
2657 node_addr: *node_addr,
2658 reason: format!("counter reservation failed: {}", e),
2659 })?;
2660 debug_assert_eq!(reserved_counter, counter);
2661 let header =
2665 build_established_header(their_index, reserved_counter, flags, payload_len);
2666 let transport = transport_for_send;
2667 let send_target = {
2674 if let TransportHandle::Udp(udp) = transport {
2675 let socket_addr = {
2676 #[cfg(any(target_os = "linux", target_os = "macos"))]
2677 {
2678 match connected_socket.as_ref() {
2679 Some(socket) => Some(socket.peer_addr()),
2680 None => udp.resolve_for_off_task(&remote_addr).await.ok(),
2681 }
2682 }
2683 #[cfg(not(any(target_os = "linux", target_os = "macos")))]
2684 {
2685 udp.resolve_for_off_task(&remote_addr).await.ok()
2686 }
2687 };
2688 match (udp.async_socket(), socket_addr) {
2689 (Some(socket), Some(socket_addr)) => Some((socket, socket_addr)),
2690 _ => None,
2691 }
2692 } else {
2693 None
2694 }
2695 };
2696 if let Some((socket, socket_addr)) = send_target {
2697 let wire_capacity = ESTABLISHED_HEADER_SIZE + inner_len + 16;
2713 let mut wire_buf = Vec::with_capacity(wire_capacity);
2714 wire_buf.extend_from_slice(&header);
2715 wire_buf.extend_from_slice(×tamp_ms.to_le_bytes());
2716 wire_buf.extend_from_slice(plaintext);
2717 let predicted_bytes = wire_capacity;
2718 if let Some(peer) = self.peers.get_mut(node_addr) {
2725 peer.link_stats_mut().record_sent(predicted_bytes);
2726 if let Some(mmp) = peer.mmp_mut() {
2727 mmp.sender
2728 .record_sent(reserved_counter, timestamp_ms, predicted_bytes);
2729 }
2730 }
2731 workers.dispatch(self::encrypt_worker::FmpSendJob {
2732 cipher: cipher_clone,
2733 counter: reserved_counter,
2734 wire_buf,
2735 fsp_seal: None,
2736 socket,
2737 dest_addr: socket_addr,
2738 #[cfg(any(target_os = "linux", target_os = "macos"))]
2739 connected_socket,
2740 drop_on_backpressure: plaintext
2741 .first()
2742 .is_some_and(|ty| *ty == SessionMessageType::EndpointData.to_byte()),
2743 queued_at: crate::perf_profile::stamp(),
2744 });
2745 return Ok(());
2746 }
2747 }
2748 }
2749
2750 let inner_plaintext = prepend_inner_header(timestamp_ms, plaintext);
2755 let ciphertext = {
2757 let _t = crate::perf_profile::Timer::start(crate::perf_profile::Stage::FmpEncrypt);
2758 session
2759 .encrypt_with_aad(&inner_plaintext, &header)
2760 .map_err(|e| NodeError::SendFailed {
2761 node_addr: *node_addr,
2762 reason: format!("encryption failed: {}", e),
2763 })?
2764 };
2765
2766 let wire_packet = build_encrypted(&header, &ciphertext);
2767
2768 let send_result = {
2770 let _t = crate::perf_profile::Timer::start(crate::perf_profile::Stage::UdpSend);
2771 let transport = self
2772 .transports
2773 .get(&transport_id)
2774 .ok_or(NodeError::TransportNotFound(transport_id))?;
2775 transport.send(&remote_addr, &wire_packet).await
2776 };
2777 self.note_local_send_outcome(&send_result);
2778 let bytes_sent = send_result.map_err(|e| match e {
2779 TransportError::MtuExceeded { packet_size, mtu } => NodeError::MtuExceeded {
2780 node_addr: *node_addr,
2781 packet_size,
2782 mtu,
2783 },
2784 other => NodeError::SendFailed {
2785 node_addr: *node_addr,
2786 reason: format!("transport send: {}", other),
2787 },
2788 })?;
2789
2790 if let Some(peer) = self.peers.get_mut(node_addr) {
2792 peer.link_stats_mut().record_sent(bytes_sent);
2793 if let Some(mmp) = peer.mmp_mut() {
2795 mmp.sender.record_sent(counter, timestamp_ms, bytes_sent);
2796 }
2797 }
2798
2799 Ok(())
2800 }
2801}
2802
2803impl fmt::Debug for Node {
2804 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2805 f.debug_struct("Node")
2806 .field("node_addr", self.node_addr())
2807 .field("state", &self.state)
2808 .field("is_leaf_only", &self.is_leaf_only)
2809 .field("connections", &self.connection_count())
2810 .field("peers", &self.peer_count())
2811 .field("links", &self.link_count())
2812 .field("transports", &self.transport_count())
2813 .finish()
2814 }
2815}