1use super::{Node, NodeError, NodeState};
4use crate::config::{ConnectPolicy, NostrDiscoveryPolicy, PeerAddress, PeerConfig};
5use crate::discovery::nostr::{
6 ADVERT_IDENTIFIER, ADVERT_VERSION, BootstrapEvent, MeshTraversalSignal, NostrDiscovery,
7 OverlayAdvert, OverlayEndpointAdvert, OverlayTransportKind,
8};
9use crate::discovery::{BootstrapHandoffResult, EstablishedTraversal};
10use crate::node::acl::PeerAclContext;
11use crate::node::wire::build_msg1;
12use crate::peer::PeerConnection;
13use crate::protocol::{Disconnect, DisconnectReason, SessionMessageType};
14use crate::transport::{Link, LinkDirection, LinkId, TransportAddr, TransportId, packet_channel};
15use crate::upper::tun::{TunDevice, TunState, run_tun_reader, shutdown_tun_interface};
16use crate::{NodeAddr, PeerIdentity};
17use secp256k1::PublicKey;
18use std::collections::{HashMap, HashSet};
19use std::net::{IpAddr, SocketAddr};
20use std::thread;
21use std::time::Duration;
22use tracing::{debug, info, warn};
23
24#[derive(Debug, Clone, Copy, PartialEq, Eq)]
25enum MeshSignalSessionAction {
26 Send,
27 Defer,
28 Drop,
29}
30
31#[cfg(debug_assertions)]
32fn node_start_debug_log(message: impl AsRef<str>) {
33 use std::io::Write as _;
34
35 if let Ok(mut file) = std::fs::OpenOptions::new()
36 .create(true)
37 .append(true)
38 .open(std::env::temp_dir().join("nvpn-fips-endpoint-debug.log"))
39 {
40 let _ = writeln!(
41 file,
42 "{:?} {}",
43 std::time::SystemTime::now(),
44 message.as_ref()
45 );
46 }
47}
48
49#[cfg(not(debug_assertions))]
50fn node_start_debug_log(_message: impl AsRef<str>) {}
51
52fn is_unroutable_advert_ip(ip: IpAddr) -> bool {
60 match ip {
61 IpAddr::V4(v4) => {
62 v4.is_private()
63 || v4.is_loopback()
64 || v4.is_link_local()
65 || v4.is_unspecified()
66 || v4.is_multicast()
67 || v4.is_broadcast()
68 || v4.is_documentation()
69 || (v4.octets()[0] == 100 && (v4.octets()[1] & 0xc0) == 64)
72 }
73 IpAddr::V6(v6) => {
74 v6.is_loopback()
75 || v6.is_unspecified()
76 || v6.is_unique_local()
77 || v6.is_multicast()
78 || (v6.segments()[0] & 0xffc0) == 0xfe80
80 }
81 }
82}
83
84fn socket_addr_families_compatible(local: SocketAddr, remote: SocketAddr) -> bool {
85 matches!(
86 (local, remote),
87 (SocketAddr::V4(_), SocketAddr::V4(_)) | (SocketAddr::V6(_), SocketAddr::V6(_))
88 )
89}
90
91const OPEN_DISCOVERY_RETRY_LIFETIME_MULTIPLIER: u64 = 2;
92const MAX_PARALLEL_PATH_CANDIDATES_PER_PEER: usize = 4;
93const MAX_AUTO_CONNECT_GRAPH_WARMUPS_PER_TICK: usize = 16;
94const MAX_DISCOVERY_CONNECTS_PER_TICK: usize = 16;
95
96impl Node {
97 pub(super) async fn update_peers(
109 &mut self,
110 new_peers: Vec<crate::config::PeerConfig>,
111 ) -> Result<crate::node::UpdatePeersOutcome, crate::node::NodeError> {
112 use std::collections::{HashMap, HashSet};
113
114 let mut new_by_addr: HashMap<crate::identity::NodeAddr, crate::config::PeerConfig> =
115 HashMap::with_capacity(new_peers.len());
116 let mut new_order = Vec::with_capacity(new_peers.len());
117 for peer in new_peers {
118 let identity = match PeerIdentity::from_npub(&peer.npub) {
119 Ok(id) => id,
120 Err(e) => {
121 return Err(crate::node::NodeError::InvalidPeerNpub {
122 npub: peer.npub.clone(),
123 reason: e.to_string(),
124 });
125 }
126 };
127 let node_addr = *identity.node_addr();
131 if !new_by_addr.contains_key(&node_addr) {
132 new_order.push(node_addr);
133 }
134 new_by_addr.insert(node_addr, peer);
135 }
136
137 let current_by_addr: HashMap<crate::identity::NodeAddr, crate::config::PeerConfig> = self
138 .config
139 .peers()
140 .iter()
141 .filter_map(|pc| {
142 PeerIdentity::from_npub(&pc.npub)
143 .ok()
144 .map(|id| (*id.node_addr(), pc.clone()))
145 })
146 .collect();
147
148 let new_addrs: HashSet<_> = new_by_addr.keys().copied().collect();
149 let current_addrs: HashSet<_> = current_by_addr.keys().copied().collect();
150
151 let removed: Vec<_> = current_addrs.difference(&new_addrs).copied().collect();
152 let added: Vec<_> = new_addrs.difference(¤t_addrs).copied().collect();
153 let kept: Vec<_> = new_addrs.intersection(¤t_addrs).copied().collect();
154
155 let mut outcome = crate::node::UpdatePeersOutcome::default();
156
157 for node_addr in &removed {
158 if self.retry_pending.remove(node_addr).is_some() {
159 debug!(
160 peer = %self.peer_display_name(node_addr),
161 "Dropping retry entry for peer removed from runtime peer list"
162 );
163 }
164 self.peer_aliases.remove(node_addr);
165 self.set_discovery_fallback_transit_allowed(*node_addr, false);
166 outcome.removed += 1;
167 }
168
169 let mut auto_connect_refresh_configs = Vec::new();
170 for node_addr in &kept {
171 let new_pc = &new_by_addr[node_addr];
172 let current_pc = ¤t_by_addr[node_addr];
173 if new_pc.addresses != current_pc.addresses
174 || new_pc.alias != current_pc.alias
175 || new_pc.connect_policy != current_pc.connect_policy
176 || new_pc.auto_reconnect != current_pc.auto_reconnect
177 || new_pc.discovery_fallback_transit != current_pc.discovery_fallback_transit
178 {
179 outcome.updated += 1;
180 self.set_discovery_fallback_transit_allowed(
181 *node_addr,
182 new_pc.discovery_fallback_transit,
183 );
184 if let Some(state) = self.retry_pending.get_mut(node_addr) {
185 state.peer_config = new_pc.clone();
186 state.retry_after_ms = Self::now_ms();
187 }
188 if let Some(alias) = new_pc.alias.clone() {
189 self.peer_aliases.insert(*node_addr, alias);
190 }
191 if new_pc.is_auto_connect() && !new_pc.addresses.is_empty() {
192 auto_connect_refresh_configs.push(new_pc.clone());
193 }
194 } else {
195 outcome.unchanged += 1;
196 self.set_discovery_fallback_transit_allowed(
197 *node_addr,
198 new_pc.discovery_fallback_transit,
199 );
200 if let Some(state) = self.retry_pending.get_mut(node_addr) {
201 state.peer_config = new_pc.clone();
202 }
203 if new_pc.is_auto_connect() && !new_pc.addresses.is_empty() {
204 auto_connect_refresh_configs.push(new_pc.clone());
205 }
206 }
207 }
208
209 let added_configs: Vec<crate::config::PeerConfig> = new_order
210 .iter()
211 .filter(|addr| added.contains(addr))
212 .map(|addr| new_by_addr[addr].clone())
213 .collect();
214
215 self.config.peers = new_order
219 .iter()
220 .filter_map(|addr| new_by_addr.get(addr).cloned())
221 .collect();
222 self.configured_peer_send_weights = Self::configured_peer_send_weights(&self.config);
223
224 for peer_config in added_configs {
225 outcome.added += 1;
226 let Ok(identity) = PeerIdentity::from_npub(&peer_config.npub) else {
227 continue;
228 };
229 let name = peer_config
230 .alias
231 .clone()
232 .unwrap_or_else(|| identity.short_npub());
233 self.peer_aliases.insert(*identity.node_addr(), name);
234 self.set_discovery_fallback_transit_allowed(
235 *identity.node_addr(),
236 peer_config.discovery_fallback_transit,
237 );
238 self.register_identity(*identity.node_addr(), identity.pubkey_full());
239
240 if !peer_config.is_auto_connect() {
241 continue;
242 }
243
244 match self
245 .try_auto_connect_graph_session(&peer_config, identity)
246 .await
247 {
248 Ok(true) => continue,
249 Ok(false) => {}
250 Err(err) => {
251 debug!(
252 npub = %peer_config.npub,
253 error = %err,
254 "Existing FIPS graph did not warm newly added peer"
255 );
256 }
257 }
258
259 if let Err(e) = self.initiate_peer_connection(&peer_config).await {
260 warn!(
261 npub = %peer_config.npub,
262 error = %e,
263 "Failed to initiate connection for newly added peer"
264 );
265 if let Ok(peer_identity) = PeerIdentity::from_npub(&peer_config.npub) {
266 self.schedule_retry_after_error(*peer_identity.node_addr(), Self::now_ms(), &e);
267 }
268 if matches!(e, crate::node::NodeError::NoTransportForType(_))
269 && let Some(bootstrap) = self.nostr_discovery.clone()
270 {
271 bootstrap
272 .request_advert_stale_check(peer_config.npub.clone())
273 .await;
274 }
275 }
276 }
277
278 for peer_config in auto_connect_refresh_configs {
279 let Ok(peer_identity) = PeerIdentity::from_npub(&peer_config.npub) else {
280 continue;
281 };
282 let node_addr = *peer_identity.node_addr();
283
284 if self.peers.contains_key(&node_addr) {
285 match self
286 .initiate_active_peer_alternative_connection(&peer_config)
287 .await
288 {
289 Ok(attempted) => {
290 if attempted {
291 debug!(
292 peer = %self.peer_display_name(&node_addr),
293 "Started non-disruptive alternate-path handshake for active peer"
294 );
295 }
296 }
297 Err(e) => {
298 debug!(
299 npub = %peer_config.npub,
300 error = %e,
301 "Active peer alternate-path refresh did not start"
302 );
303 }
304 }
305 continue;
306 }
307
308 match self
309 .try_auto_connect_graph_session(&peer_config, peer_identity)
310 .await
311 {
312 Ok(true) => continue,
313 Ok(false) => {}
314 Err(err) => {
315 debug!(
316 npub = %peer_config.npub,
317 error = %err,
318 "Existing FIPS graph did not warm refreshed peer"
319 );
320 }
321 }
322
323 match self.initiate_peer_connection(&peer_config).await {
324 Ok(()) => {
325 let hs_timeout_ms = self.config.node.rate_limit.handshake_timeout_secs * 1000;
326 if let Some(state) = self.retry_pending.get_mut(&node_addr) {
327 state.peer_config = peer_config;
328 state.retry_after_ms = Self::now_ms().saturating_add(hs_timeout_ms);
329 }
330 }
331 Err(e) => {
332 debug!(
333 npub = %peer_config.npub,
334 error = %e,
335 "Refreshed peer addresses did not initiate a direct connection"
336 );
337 self.schedule_retry_after_error(node_addr, Self::now_ms(), &e);
338 }
339 }
340 }
341
342 self.warm_auto_connect_graph_sessions().await;
343
344 Ok(outcome)
345 }
346
347 pub(super) async fn initiate_peer_connections(&mut self) {
348 let peer_identities: Vec<(PeerIdentity, Option<String>)> = self
354 .config
355 .peers()
356 .iter()
357 .filter_map(|pc| {
358 PeerIdentity::from_npub(&pc.npub)
359 .ok()
360 .map(|id| (id, pc.alias.clone()))
361 })
362 .collect();
363
364 for (identity, alias) in peer_identities {
365 let name = alias.unwrap_or_else(|| identity.short_npub());
366 self.peer_aliases.insert(*identity.node_addr(), name);
367 self.register_identity(*identity.node_addr(), identity.pubkey_full());
371 }
372
373 let peer_configs: Vec<_> = self.config.auto_connect_peers().cloned().collect();
375
376 if peer_configs.is_empty() {
377 debug!("No static peers configured");
378 return;
379 }
380
381 debug!(
382 count = peer_configs.len(),
383 "Initiating static peer connections"
384 );
385
386 for peer_config in peer_configs {
387 let peer_identity = match PeerIdentity::from_npub(&peer_config.npub) {
388 Ok(identity) => identity,
389 Err(_) => continue,
390 };
391 match self
392 .try_auto_connect_graph_session(&peer_config, peer_identity)
393 .await
394 {
395 Ok(true) => continue,
396 Ok(false) => {}
397 Err(err) => {
398 debug!(
399 npub = %peer_config.npub,
400 error = %err,
401 "Existing FIPS graph did not warm auto-connect peer"
402 );
403 }
404 }
405 if let Err(e) = self.initiate_peer_connection(&peer_config).await {
406 warn!(
407 npub = %peer_config.npub,
408 alias = ?peer_config.alias,
409 error = %e,
410 "Failed to initiate peer connection"
411 );
412 if let Ok(peer_identity) = PeerIdentity::from_npub(&peer_config.npub) {
416 self.schedule_retry_after_error(*peer_identity.node_addr(), Self::now_ms(), &e);
417 }
418 if matches!(e, crate::node::NodeError::NoTransportForType(_))
424 && let Some(bootstrap) = self.nostr_discovery.clone()
425 {
426 bootstrap
427 .request_advert_stale_check(peer_config.npub.clone())
428 .await;
429 }
430 }
431 }
432
433 self.warm_auto_connect_graph_sessions().await;
434 }
435
436 pub(in crate::node) async fn try_auto_connect_graph_session(
437 &mut self,
438 peer_config: &PeerConfig,
439 peer_identity: PeerIdentity,
440 ) -> Result<bool, NodeError> {
441 if !peer_config.is_auto_connect() {
442 return Ok(false);
443 }
444
445 let peer_node_addr = *peer_identity.node_addr();
446 if self
447 .peers
448 .get(&peer_node_addr)
449 .is_some_and(|peer| peer.can_send())
450 {
451 return Ok(false);
452 }
453 if self.auto_connect_should_race_direct_path(peer_config) {
454 return Ok(false);
455 }
456 if self
457 .sessions
458 .get(&peer_node_addr)
459 .is_some_and(|entry| entry.is_established() || entry.is_initiating())
460 {
461 return Ok(true);
462 }
463 if self.find_next_hop(&peer_node_addr).is_none() {
464 return Ok(false);
465 }
466
467 self.register_identity(peer_node_addr, peer_identity.pubkey_full());
468 match self
469 .initiate_session(peer_node_addr, peer_identity.pubkey_full())
470 .await
471 {
472 Ok(()) => {
473 debug!(
474 peer = %self.peer_display_name(&peer_node_addr),
475 "Warmed auto-connect peer session over existing FIPS graph"
476 );
477 Ok(true)
478 }
479 Err(NodeError::SendFailed { node_addr, reason })
480 if node_addr == peer_node_addr && reason == "no route to destination" =>
481 {
482 self.maybe_initiate_lookup(&peer_node_addr).await;
483 Ok(false)
484 }
485 Err(err) => Err(err),
486 }
487 }
488
489 fn auto_connect_should_race_direct_path(&self, peer_config: &PeerConfig) -> bool {
490 !peer_config.addresses.is_empty() || self.config.node.discovery.nostr.enabled
491 }
492
493 pub(super) async fn initiate_peer_connection(
497 &mut self,
498 peer_config: &crate::config::PeerConfig,
499 ) -> Result<(), NodeError> {
500 self.initiate_peer_connection_inner(peer_config).await
501 }
502
503 pub(super) async fn initiate_peer_retry_connection(
509 &mut self,
510 peer_config: &crate::config::PeerConfig,
511 ) -> Result<(), NodeError> {
512 self.initiate_peer_connection_inner(peer_config).await
513 }
514
515 pub(in crate::node) async fn initiate_active_peer_alternative_connection(
516 &mut self,
517 peer_config: &crate::config::PeerConfig,
518 ) -> Result<bool, NodeError> {
519 self.initiate_active_peer_alternative_connection_inner(peer_config, false)
520 .await
521 }
522
523 pub(in crate::node) async fn initiate_active_peer_direct_refresh_connection(
524 &mut self,
525 peer_config: &crate::config::PeerConfig,
526 ) -> Result<bool, NodeError> {
527 self.initiate_active_peer_alternative_connection_inner(peer_config, true)
528 .await
529 }
530
531 async fn initiate_active_peer_alternative_connection_inner(
532 &mut self,
533 peer_config: &crate::config::PeerConfig,
534 allow_same_path_refresh: bool,
535 ) -> Result<bool, NodeError> {
536 let peer_identity =
537 PeerIdentity::from_npub(&peer_config.npub).map_err(|e| NodeError::InvalidPeerNpub {
538 npub: peer_config.npub.clone(),
539 reason: e.to_string(),
540 })?;
541 let peer_node_addr = *peer_identity.node_addr();
542
543 if !self.peers.contains_key(&peer_node_addr) {
544 self.initiate_peer_connection(peer_config).await?;
545 return Ok(true);
546 }
547
548 self.try_active_peer_alternative_addresses(
554 peer_config,
555 peer_identity,
556 allow_same_path_refresh,
557 )
558 .await
559 }
560
561 async fn initiate_peer_connection_inner(
562 &mut self,
563 peer_config: &crate::config::PeerConfig,
564 ) -> Result<(), NodeError> {
565 let peer_identity =
567 PeerIdentity::from_npub(&peer_config.npub).map_err(|e| NodeError::InvalidPeerNpub {
568 npub: peer_config.npub.clone(),
569 reason: e.to_string(),
570 })?;
571
572 let peer_node_addr = *peer_identity.node_addr();
573
574 if self.peers.contains_key(&peer_node_addr) {
576 debug!(
577 npub = %peer_config.npub,
578 "Peer already exists, skipping"
579 );
580 return Ok(());
581 }
582
583 self.try_peer_addresses(peer_config, peer_identity, true)
584 .await
585 }
586
587 fn is_connecting_to_peer(&self, peer_node_addr: &NodeAddr) -> bool {
588 self.connections.values().any(|conn| {
589 conn.expected_identity()
590 .map(|id| id.node_addr() == peer_node_addr)
591 .unwrap_or(false)
592 })
593 }
594
595 fn is_connecting_to_peer_on_path(
596 &self,
597 peer_node_addr: &NodeAddr,
598 transport_id: TransportId,
599 remote_addr: &TransportAddr,
600 ) -> bool {
601 self.connections.values().any(|conn| {
602 conn.expected_identity()
603 .map(|id| id.node_addr() == peer_node_addr)
604 .unwrap_or(false)
605 && conn.transport_id() == Some(transport_id)
606 && conn.source_addr() == Some(remote_addr)
607 }) || self.pending_connects.iter().any(|pending| {
608 pending.peer_identity.node_addr() == peer_node_addr
609 && pending.transport_id == transport_id
610 && &pending.remote_addr == remote_addr
611 })
612 }
613
614 pub(in crate::node) fn should_warm_auto_connect_session(
615 &self,
616 peer_node_addr: &NodeAddr,
617 ) -> bool {
618 if self
619 .peers
620 .get(peer_node_addr)
621 .is_some_and(|peer| peer.can_send())
622 || self
623 .sessions
624 .get(peer_node_addr)
625 .is_some_and(|entry| entry.is_established())
626 {
627 return false;
628 }
629
630 self.config.peers().iter().any(|peer| {
631 peer.is_auto_connect()
632 && PeerIdentity::from_npub(&peer.npub)
633 .map(|identity| identity.node_addr() == peer_node_addr)
634 .unwrap_or(false)
635 })
636 }
637
638 pub(in crate::node) async fn warm_auto_connect_graph_sessions(&mut self) -> usize {
639 if !self.peers.values().any(|peer| peer.can_send()) {
640 return 0;
641 }
642
643 let mut budget = self.graph_session_warmup_budget();
644 if budget == 0 {
645 return 0;
646 }
647
648 let peer_identities: Vec<_> = self
649 .config
650 .auto_connect_peers()
651 .filter_map(|peer| PeerIdentity::from_npub(&peer.npub).ok())
652 .collect();
653
654 let mut warmed = 0;
655 for identity in peer_identities {
656 if budget == 0 {
657 break;
658 }
659
660 let peer_node_addr = *identity.node_addr();
661 if peer_node_addr == *self.identity.node_addr()
662 || !self.should_warm_auto_connect_session(&peer_node_addr)
663 || self
664 .sessions
665 .get(&peer_node_addr)
666 .is_some_and(|entry| entry.is_initiating())
667 {
668 continue;
669 }
670
671 self.register_identity(peer_node_addr, identity.pubkey_full());
672
673 if self.find_next_hop(&peer_node_addr).is_some() {
674 match self
675 .initiate_session(peer_node_addr, identity.pubkey_full())
676 .await
677 {
678 Ok(()) => {
679 warmed += 1;
680 budget = budget.saturating_sub(1);
681 debug!(
682 peer = %self.peer_display_name(&peer_node_addr),
683 "Warmed auto-connect peer session over existing FIPS graph"
684 );
685 }
686 Err(NodeError::SendFailed { node_addr, reason })
687 if node_addr == peer_node_addr && reason == "no route to destination" =>
688 {
689 self.maybe_initiate_lookup(&peer_node_addr).await;
690 warmed += 1;
691 budget = budget.saturating_sub(1);
692 }
693 Err(err) => {
694 debug!(
695 peer = %self.peer_display_name(&peer_node_addr),
696 error = %err,
697 "Failed to warm auto-connect peer session"
698 );
699 }
700 }
701 } else {
702 self.maybe_initiate_lookup(&peer_node_addr).await;
703 warmed += 1;
704 budget = budget.saturating_sub(1);
705 }
706 }
707
708 warmed
709 }
710
711 pub(in crate::node) fn graph_session_warmup_budget(&self) -> usize {
712 let max_destinations = self.config.node.session.pending_max_destinations;
713 if max_destinations == 0 {
714 return 0;
715 }
716
717 let pending_sessions = self
718 .sessions
719 .values()
720 .filter(|entry| !entry.is_established())
721 .count();
722 let pending_total = pending_sessions.saturating_add(self.pending_lookups.len());
723 max_destinations
724 .saturating_sub(pending_total)
725 .min(MAX_AUTO_CONNECT_GRAPH_WARMUPS_PER_TICK)
726 }
727
728 fn outbound_handshake_slots(&self) -> usize {
729 let used = self
730 .connections
731 .len()
732 .saturating_add(self.pending_connects.len());
733 if self.max_connections == 0 {
734 usize::MAX
735 } else {
736 self.max_connections.saturating_sub(used)
737 }
738 }
739
740 fn outbound_link_slots(&self) -> usize {
741 if self.max_links == 0 {
742 usize::MAX
743 } else {
744 self.max_links.saturating_sub(self.links.len())
745 }
746 }
747
748 fn path_candidate_attempt_budget(&self, peer_node_addr: &NodeAddr) -> usize {
749 if !self.peers.contains_key(peer_node_addr)
750 && self.max_peers > 0
751 && self.peers.len() >= self.max_peers
752 {
753 return 0;
754 }
755
756 let in_flight_for_peer = self
757 .connections
758 .values()
759 .filter(|conn| {
760 conn.expected_identity()
761 .map(|id| id.node_addr() == peer_node_addr)
762 .unwrap_or(false)
763 })
764 .count()
765 .saturating_add(
766 self.pending_connects
767 .iter()
768 .filter(|pending| pending.peer_identity.node_addr() == peer_node_addr)
769 .count(),
770 );
771
772 self.outbound_handshake_slots()
773 .min(self.outbound_link_slots())
774 .min(MAX_PARALLEL_PATH_CANDIDATES_PER_PEER.saturating_sub(in_flight_for_peer))
775 }
776
777 fn reclaim_lower_priority_inflight_candidate_for_peer(
778 &mut self,
779 peer_node_addr: &NodeAddr,
780 candidate: &PeerAddress,
781 ) -> bool {
782 const UNKNOWN_PATH_PRIORITY: u16 = u8::MAX as u16 + 1;
783
784 let Some((candidate_transport_id, candidate_addr)) =
785 self.resolve_peer_address_for_match(candidate)
786 else {
787 return false;
788 };
789 let Some(candidate_priority) =
790 self.configured_path_priority(peer_node_addr, candidate_transport_id, &candidate_addr)
791 else {
792 return false;
793 };
794 let candidate_priority = u16::from(candidate_priority);
795
796 let victim = self
797 .connections
798 .iter()
799 .filter_map(|(link_id, conn)| {
800 let identity = conn.expected_identity()?;
801 if identity.node_addr() != peer_node_addr {
802 return None;
803 }
804 let transport_id = conn.transport_id()?;
805 let remote_addr = conn.source_addr()?;
806 if transport_id == candidate_transport_id && remote_addr == &candidate_addr {
807 return None;
808 }
809 let priority = self
810 .configured_path_priority(peer_node_addr, transport_id, remote_addr)
811 .map(u16::from)
812 .unwrap_or(UNKNOWN_PATH_PRIORITY);
813 (priority > candidate_priority).then_some((
814 *link_id,
815 priority,
816 conn.started_at(),
817 transport_id,
818 remote_addr.clone(),
819 ))
820 })
821 .max_by_key(|(_, priority, started_at, _, _)| {
822 (*priority, std::cmp::Reverse(*started_at))
823 });
824
825 let Some((link_id, victim_priority, _, victim_transport_id, victim_addr)) = victim else {
826 return false;
827 };
828
829 let Some(conn) = self.connections.remove(&link_id) else {
830 return false;
831 };
832 if let Some(idx) = conn.our_index()
833 && let Some(transport_id) = conn.transport_id()
834 {
835 self.pending_outbound.remove(&(transport_id, idx.as_u32()));
836 let _ = self.index_allocator.free(idx);
837 }
838 self.remove_link(&link_id);
839 self.cleanup_bootstrap_transport_if_unused(victim_transport_id);
840
841 debug!(
842 peer = %self.peer_display_name(peer_node_addr),
843 candidate_transport_id = %candidate_transport_id,
844 candidate_addr = %candidate_addr,
845 candidate_priority,
846 victim_link_id = %link_id,
847 victim_transport_id = %victim_transport_id,
848 victim_addr = %victim_addr,
849 victim_priority,
850 "Reclaimed lower-priority in-flight candidate slot for configured direct path"
851 );
852
853 true
854 }
855
856 fn discovery_connect_budget(&self) -> usize {
857 self.outbound_handshake_slots()
858 .min(self.outbound_link_slots())
859 .min(MAX_DISCOVERY_CONNECTS_PER_TICK)
860 }
861
862 fn find_udp_transport_for_remote_addr(
869 &self,
870 remote_addr: SocketAddr,
871 ) -> Option<(TransportId, SocketAddr)> {
872 self.transports
873 .iter()
874 .filter(|(id, handle)| {
875 handle.transport_type().name == "udp"
876 && handle.is_operational()
877 && !self.bootstrap_transports.contains(id)
878 })
879 .filter_map(|(id, handle)| {
880 let local_addr = handle.local_addr()?;
881 socket_addr_families_compatible(local_addr, remote_addr)
882 .then_some((*id, local_addr))
883 })
884 .min_by_key(|(id, _)| id.as_u32())
885 }
886
887 pub(super) fn transport_discovery_candidate(
888 &self,
889 discovered_transport_id: TransportId,
890 discovered_addr: TransportAddr,
891 ) -> Option<(TransportId, TransportAddr, &'static str)> {
892 let transport = self.transports.get(&discovered_transport_id)?;
893 let transport_name = transport.transport_type().name;
894
895 if transport_name != "udp" {
896 return Some((discovered_transport_id, discovered_addr, transport_name));
897 }
898
899 let Some(remote_socket_addr) = discovered_addr
900 .as_str()
901 .and_then(|addr| addr.parse::<SocketAddr>().ok())
902 else {
903 if self.bootstrap_transports.contains(&discovered_transport_id) {
904 debug!(
905 transport_id = %discovered_transport_id,
906 remote_addr = %discovered_addr,
907 "transport discovery: skip non-numeric UDP address from bootstrap transport"
908 );
909 return None;
910 }
911 return Some((discovered_transport_id, discovered_addr, transport_name));
912 };
913
914 let Some((transport_id, local_addr)) =
915 self.find_udp_transport_for_remote_addr(remote_socket_addr)
916 else {
917 debug!(
918 transport_id = %discovered_transport_id,
919 remote_addr = %discovered_addr,
920 "transport discovery: skip UDP peer with no compatible local socket"
921 );
922 return None;
923 };
924
925 if transport_id != discovered_transport_id {
926 debug!(
927 discovered_transport_id = %discovered_transport_id,
928 selected_transport_id = %transport_id,
929 local_addr = %local_addr,
930 remote_addr = %remote_socket_addr,
931 "transport discovery: selected compatible UDP transport"
932 );
933 }
934
935 Some((
936 transport_id,
937 TransportAddr::from_socket_addr(remote_socket_addr),
938 transport_name,
939 ))
940 }
941
942 fn peer_address_string_for_transport_candidate(
943 &self,
944 transport_id: TransportId,
945 transport_name: &str,
946 remote_addr: &TransportAddr,
947 ) -> String {
948 #[cfg(not(any(target_os = "linux", target_os = "macos")))]
949 let _ = (transport_id, transport_name);
950
951 #[cfg(any(target_os = "linux", target_os = "macos"))]
952 if transport_name == "ethernet"
953 && remote_addr.as_bytes().len() == 6
954 && let Some(interface) = self
955 .transports
956 .get(&transport_id)
957 .and_then(|transport| transport.interface_name())
958 {
959 let mut mac = [0u8; 6];
960 mac.copy_from_slice(remote_addr.as_bytes());
961 return format!(
962 "{interface}/{}",
963 crate::transport::ethernet::format_mac(&mac)
964 );
965 }
966
967 remote_addr.to_string()
968 }
969
970 fn resolve_peer_address_for_match(
971 &self,
972 candidate: &PeerAddress,
973 ) -> Option<(TransportId, TransportAddr)> {
974 if candidate.transport == "udp" && candidate.addr.eq_ignore_ascii_case("nat") {
975 return None;
976 }
977
978 if candidate.transport == "ethernet" {
979 return self.resolve_ethernet_addr(&candidate.addr).ok();
980 }
981
982 if candidate.transport == "ble" {
983 #[cfg(bluer_available)]
984 {
985 return self.resolve_ble_addr(&candidate.addr).ok();
986 }
987 #[cfg(not(bluer_available))]
988 {
989 return None;
990 }
991 }
992
993 let transport_id = if candidate.transport == "udp"
994 && let Ok(remote_socket_addr) = candidate.addr.parse::<SocketAddr>()
995 {
996 self.find_udp_transport_for_remote_addr(remote_socket_addr)
997 .map(|(id, _)| id)?
998 } else {
999 self.find_transport_for_type(&candidate.transport)?
1000 };
1001
1002 Some((transport_id, TransportAddr::from_string(&candidate.addr)))
1003 }
1004
1005 pub(super) async fn initiate_connection(
1016 &mut self,
1017 transport_id: TransportId,
1018 remote_addr: TransportAddr,
1019 peer_identity: PeerIdentity,
1020 ) -> Result<(), NodeError> {
1021 let peer_node_addr = *peer_identity.node_addr();
1022
1023 if self.is_connecting_to_peer_on_path(&peer_node_addr, transport_id, &remote_addr) {
1024 debug!(
1025 peer = %self.peer_display_name(&peer_node_addr),
1026 transport_id = %transport_id,
1027 remote_addr = %remote_addr,
1028 "Connection already in progress for candidate path"
1029 );
1030 return Ok(());
1031 }
1032
1033 if self.outbound_handshake_slots() == 0 {
1034 return Err(NodeError::MaxConnectionsExceeded {
1035 max: self.max_connections,
1036 });
1037 }
1038
1039 if self.outbound_link_slots() == 0 {
1040 return Err(NodeError::MaxLinksExceeded {
1041 max: self.max_links,
1042 });
1043 }
1044
1045 if !self.peers.contains_key(&peer_node_addr)
1046 && self.max_peers > 0
1047 && self.peers.len() >= self.max_peers
1048 {
1049 return Err(NodeError::MaxPeersExceeded {
1050 max: self.max_peers,
1051 });
1052 }
1053
1054 self.authorize_peer(
1055 &peer_identity,
1056 PeerAclContext::OutboundConnect,
1057 transport_id,
1058 &remote_addr,
1059 )?;
1060
1061 let is_connection_oriented = self
1062 .transports
1063 .get(&transport_id)
1064 .map(|t| t.transport_type().connection_oriented)
1065 .unwrap_or(false);
1066
1067 let link_id = self.allocate_link_id();
1069
1070 let link = if is_connection_oriented {
1071 Link::new(
1072 link_id,
1073 transport_id,
1074 remote_addr.clone(),
1075 LinkDirection::Outbound,
1076 Duration::from_millis(self.config.node.base_rtt_ms),
1077 )
1078 } else {
1079 Link::connectionless(
1080 link_id,
1081 transport_id,
1082 remote_addr.clone(),
1083 LinkDirection::Outbound,
1084 Duration::from_millis(self.config.node.base_rtt_ms),
1085 )
1086 };
1087
1088 self.links.insert(link_id, link);
1089
1090 self.addr_to_link
1092 .insert((transport_id, remote_addr.clone()), link_id);
1093
1094 if is_connection_oriented {
1095 if let Some(transport) = self.transports.get(&transport_id) {
1097 match transport.connect(&remote_addr).await {
1098 Ok(()) => {
1099 debug!(
1100 peer = %self.peer_display_name(&peer_node_addr),
1101 transport_id = %transport_id,
1102 remote_addr = %remote_addr,
1103 link_id = %link_id,
1104 "Transport connect initiated (non-blocking)"
1105 );
1106 self.pending_connects.push(super::PendingConnect {
1107 link_id,
1108 transport_id,
1109 remote_addr,
1110 peer_identity,
1111 });
1112 }
1113 Err(e) => {
1114 self.links.remove(&link_id);
1116 self.addr_to_link.remove(&(transport_id, remote_addr));
1117 return Err(NodeError::from_transport_error(e));
1118 }
1119 }
1120 }
1121 Ok(())
1122 } else {
1123 self.start_handshake(link_id, transport_id, remote_addr, peer_identity)
1125 .await
1126 }
1127 }
1128
1129 pub(super) async fn start_handshake(
1134 &mut self,
1135 link_id: LinkId,
1136 transport_id: TransportId,
1137 remote_addr: TransportAddr,
1138 peer_identity: PeerIdentity,
1139 ) -> Result<(), NodeError> {
1140 let peer_node_addr = *peer_identity.node_addr();
1141
1142 let current_time_ms = Self::now_ms();
1144 let mut connection = PeerConnection::outbound(link_id, peer_identity, current_time_ms);
1145
1146 let our_index = match self.index_allocator.allocate() {
1148 Ok(idx) => idx,
1149 Err(e) => {
1150 self.links.remove(&link_id);
1152 self.addr_to_link.remove(&(transport_id, remote_addr));
1153 return Err(NodeError::IndexAllocationFailed(e.to_string()));
1154 }
1155 };
1156
1157 let our_keypair = self.identity.keypair();
1159 let noise_msg1 =
1160 match connection.start_handshake(our_keypair, self.startup_epoch, current_time_ms) {
1161 Ok(msg) => msg,
1162 Err(e) => {
1163 let _ = self.index_allocator.free(our_index);
1165 self.links.remove(&link_id);
1166 self.addr_to_link.remove(&(transport_id, remote_addr));
1167 return Err(NodeError::HandshakeFailed(e.to_string()));
1168 }
1169 };
1170
1171 connection.set_our_index(our_index);
1173 connection.set_transport_id(transport_id);
1174 connection.set_source_addr(remote_addr.clone());
1175
1176 let wire_msg1 = build_msg1(our_index, &noise_msg1);
1178
1179 debug!(
1180 peer = %self.peer_display_name(&peer_node_addr),
1181 transport_id = %transport_id,
1182 remote_addr = %remote_addr,
1183 link_id = %link_id,
1184 our_index = %our_index,
1185 "Connection initiated"
1186 );
1187
1188 let resend_interval = self.config.node.rate_limit.handshake_resend_interval_ms;
1190 connection.set_handshake_msg1(wire_msg1.clone(), current_time_ms + resend_interval);
1191
1192 self.pending_outbound
1194 .insert((transport_id, our_index.as_u32()), link_id);
1195 self.connections.insert(link_id, connection);
1196
1197 let send_result = match self.transports.get(&transport_id) {
1202 Some(transport) => Some(transport.send(&remote_addr, &wire_msg1).await),
1203 None => None,
1204 };
1205 match send_result {
1206 Some(send_result) => {
1207 self.note_local_send_outcome(&peer_node_addr, &send_result);
1208 match send_result {
1209 Ok(bytes) => {
1210 debug!(
1211 link_id = %link_id,
1212 our_index = %our_index,
1213 bytes,
1214 "Sent Noise handshake message 1 (wire format)"
1215 );
1216 }
1217 Err(e) => {
1218 warn!(
1219 link_id = %link_id,
1220 transport_id = %transport_id,
1221 remote_addr = %remote_addr,
1222 our_index = %our_index,
1223 error = %e,
1224 "Failed to send handshake message"
1225 );
1226 self.pending_outbound
1227 .remove(&(transport_id, our_index.as_u32()));
1228 self.connections.remove(&link_id);
1229 self.links.remove(&link_id);
1230 self.addr_to_link
1231 .remove(&(transport_id, remote_addr.clone()));
1232 let _ = self.index_allocator.free(our_index);
1233 return Err(NodeError::from_transport_error(e));
1234 }
1235 }
1236 }
1237 None => {
1238 self.pending_outbound
1239 .remove(&(transport_id, our_index.as_u32()));
1240 self.connections.remove(&link_id);
1241 self.links.remove(&link_id);
1242 self.addr_to_link
1243 .remove(&(transport_id, remote_addr.clone()));
1244 let _ = self.index_allocator.free(our_index);
1245 return Err(NodeError::TransportError(format!(
1246 "transport {transport_id} disappeared before first handshake send"
1247 )));
1248 }
1249 }
1250
1251 Ok(())
1252 }
1253
1254 pub(super) async fn poll_transport_discovery(&mut self) {
1260 let mut to_connect = Vec::new();
1262 let mut queued_per_peer: HashMap<NodeAddr, usize> = HashMap::new();
1263 let mut connect_budget = self.discovery_connect_budget();
1264 let mut skipped_budget = 0usize;
1265
1266 for transport in self.transports.values() {
1267 if !transport.is_operational() {
1268 continue;
1269 }
1270 if !transport.auto_connect() {
1271 let _ = transport.discover();
1273 continue;
1274 }
1275 let discovered = match transport.discover() {
1276 Ok(peers) => peers,
1277 Err(_) => continue,
1278 };
1279 for peer in discovered {
1280 let discovered_transport_id = peer.transport_id;
1281 let pubkey = match peer.pubkey_hint {
1282 Some(pk) => pk,
1283 None => continue,
1284 };
1285 let identity = PeerIdentity::from_pubkey(pubkey);
1286 let node_addr = *identity.node_addr();
1287
1288 if node_addr == *self.identity.node_addr() {
1290 continue;
1291 }
1292
1293 let Some((candidate_transport_id, remote_addr, transport_name)) =
1294 self.transport_discovery_candidate(discovered_transport_id, peer.addr)
1295 else {
1296 continue;
1297 };
1298
1299 if self.peers.contains_key(&node_addr) {
1300 let candidate = PeerAddress::new(
1301 transport_name,
1302 self.peer_address_string_for_transport_candidate(
1303 candidate_transport_id,
1304 transport_name,
1305 &remote_addr,
1306 ),
1307 );
1308 if self.active_peer_candidate_is_fresh_enough_to_skip(
1309 &node_addr,
1310 std::slice::from_ref(&candidate),
1311 ) {
1312 continue;
1313 }
1314 if self.is_connecting_to_peer_on_path(
1315 &node_addr,
1316 candidate_transport_id,
1317 &remote_addr,
1318 ) {
1319 continue;
1320 }
1321 let queued_for_peer = queued_per_peer.get(&node_addr).copied().unwrap_or(0);
1322 if connect_budget == 0
1323 || self
1324 .path_candidate_attempt_budget(&node_addr)
1325 .saturating_sub(queued_for_peer)
1326 == 0
1327 {
1328 skipped_budget = skipped_budget.saturating_add(1);
1329 continue;
1330 }
1331 to_connect.push((candidate_transport_id, remote_addr, identity, true));
1332 *queued_per_peer.entry(node_addr).or_default() += 1;
1333 connect_budget = connect_budget.saturating_sub(1);
1334 continue;
1335 }
1336
1337 if self.is_connecting_to_peer_on_path(
1338 &node_addr,
1339 candidate_transport_id,
1340 &remote_addr,
1341 ) {
1342 continue;
1343 }
1344
1345 let queued_for_peer = queued_per_peer.get(&node_addr).copied().unwrap_or(0);
1346 if connect_budget == 0
1347 || self
1348 .path_candidate_attempt_budget(&node_addr)
1349 .saturating_sub(queued_for_peer)
1350 == 0
1351 {
1352 skipped_budget = skipped_budget.saturating_add(1);
1353 continue;
1354 }
1355 to_connect.push((candidate_transport_id, remote_addr, identity, false));
1356 *queued_per_peer.entry(node_addr).or_default() += 1;
1357 connect_budget = connect_budget.saturating_sub(1);
1358 }
1359 }
1360
1361 if skipped_budget > 0 {
1362 debug!(
1363 skipped = skipped_budget,
1364 queued = to_connect.len(),
1365 "Transport discovery connect budget exhausted"
1366 );
1367 }
1368
1369 for (transport_id, remote_addr, identity, active_refresh) in to_connect {
1370 info!(
1371 peer = %self.peer_display_name(identity.node_addr()),
1372 transport_id = %transport_id,
1373 remote_addr = %remote_addr,
1374 active_refresh,
1375 "Auto-connecting to discovered peer"
1376 );
1377 if let Err(e) = self
1378 .initiate_connection(transport_id, remote_addr, identity)
1379 .await
1380 {
1381 warn!(error = %e, "Failed to auto-connect to discovered peer");
1382 }
1383 }
1384 }
1385
1386 pub(super) async fn poll_nostr_discovery(&mut self) {
1387 let Some(bootstrap) = self.nostr_discovery.clone() else {
1388 return;
1389 };
1390
1391 bootstrap.set_outbound_admission(self.open_discovery_outbound_admission_check());
1392 bootstrap.set_direct_refresh_admission(self.outbound_direct_refresh_admission_check());
1393
1394 self.drain_nostr_mesh_signals(&bootstrap).await;
1395
1396 for event in bootstrap.drain_events().await {
1397 match event {
1398 BootstrapEvent::Established { traversal } => {
1399 let active_refresh = PeerIdentity::from_npub(&traversal.peer_npub)
1400 .ok()
1401 .is_some_and(|identity| self.peers.contains_key(identity.node_addr()));
1402 let admission_allowed = if active_refresh {
1403 self.outbound_direct_refresh_admission_check()
1404 } else {
1405 self.outbound_admission_check()
1406 };
1407 if !admission_allowed {
1408 debug!(
1409 peer_npub = %traversal.peer_npub,
1410 peers = self.peers.len(),
1411 max_peers = self.max_peers,
1412 active_refresh,
1413 "Dropping established NAT traversal: at capacity"
1414 );
1415 continue;
1416 }
1417 let peer_npub = traversal.peer_npub.clone();
1418 match self.adopt_established_traversal(traversal).await {
1419 Ok(_) => {
1420 info!(peer_npub = %peer_npub, "Adopted NAT traversal socket");
1421 }
1422 Err(err) => {
1423 warn!(peer_npub = %peer_npub, error = %err, "Failed to adopt NAT traversal");
1424 if let Ok(peer_identity) = PeerIdentity::from_npub(&peer_npub) {
1425 self.schedule_retry(*peer_identity.node_addr(), Self::now_ms());
1426 }
1427 }
1428 }
1429 }
1430 BootstrapEvent::Failed {
1431 peer_config,
1432 reason,
1433 } => {
1434 let peer_identity = match PeerIdentity::from_npub(&peer_config.npub) {
1435 Ok(identity) => identity,
1436 Err(_) => continue,
1437 };
1438 let node_addr = *peer_identity.node_addr();
1439 let now_ms = Self::now_ms();
1440 if self.peers.contains_key(&node_addr) {
1441 if self.active_peer_should_keep_direct_retry(&node_addr, &peer_config) {
1442 let decision =
1443 bootstrap.record_traversal_failure(&peer_config.npub, now_ms);
1444 if decision.should_warn {
1445 warn!(
1446 npub = %peer_config.npub,
1447 error = %reason,
1448 consecutive_failures = decision.consecutive_failures,
1449 cooldown_secs = decision
1450 .cooldown_until_ms
1451 .map(|t| t.saturating_sub(now_ms) / 1000),
1452 "Direct-path NAT traversal upgrade failed"
1453 );
1454 } else {
1455 debug!(
1456 npub = %peer_config.npub,
1457 error = %reason,
1458 consecutive_failures = decision.consecutive_failures,
1459 "Direct-path NAT traversal upgrade failed (suppressed by warn-rate-limit)"
1460 );
1461 }
1462 if decision.crossed_threshold {
1463 bootstrap
1464 .request_advert_stale_check(peer_config.npub.clone())
1465 .await;
1466 }
1467 self.schedule_link_dead_reprobe(node_addr, now_ms);
1468 } else {
1469 debug!(
1470 npub = %peer_config.npub,
1471 error = %reason,
1472 "Ignoring failed NAT traversal for already-connected peer on fresh direct path"
1473 );
1474 }
1475 continue;
1476 }
1477 if self.is_connecting_to_peer(&node_addr) {
1478 debug!(
1479 npub = %peer_config.npub,
1480 error = %reason,
1481 "Ignoring failed NAT traversal while peer handshake is already in progress"
1482 );
1483 continue;
1484 }
1485
1486 let decision = bootstrap.record_traversal_failure(&peer_config.npub, now_ms);
1487 if decision.should_warn {
1488 warn!(
1489 npub = %peer_config.npub,
1490 error = %reason,
1491 consecutive_failures = decision.consecutive_failures,
1492 cooldown_secs = decision
1493 .cooldown_until_ms
1494 .map(|t| t.saturating_sub(now_ms) / 1000),
1495 "NAT traversal failed"
1496 );
1497 } else {
1498 debug!(
1499 npub = %peer_config.npub,
1500 error = %reason,
1501 consecutive_failures = decision.consecutive_failures,
1502 "NAT traversal failed (suppressed by warn-rate-limit)"
1503 );
1504 }
1505
1506 if decision.crossed_threshold {
1510 bootstrap
1511 .request_advert_stale_check(peer_config.npub.clone())
1512 .await;
1513 }
1514
1515 if self
1516 .try_peer_addresses(&peer_config, peer_identity, false)
1517 .await
1518 .is_ok()
1519 {
1520 continue;
1521 }
1522
1523 self.schedule_retry(node_addr, now_ms);
1524 if self.nostr_cooldown_applies_to_peer_config(&peer_config)
1525 && let Some(cooldown_until_ms) = decision.cooldown_until_ms
1526 && let Some(state) = self.retry_pending.get_mut(&node_addr)
1527 {
1528 state.retry_after_ms = state.retry_after_ms.max(cooldown_until_ms);
1532 }
1533 }
1534 }
1535 }
1536
1537 self.maybe_run_startup_open_discovery_sweep(&bootstrap)
1538 .await;
1539 self.queue_open_discovery_retries(&bootstrap).await;
1540 self.queue_active_fallback_direct_retries(&bootstrap);
1541
1542 if let Err(err) = self.refresh_overlay_advert(&bootstrap).await {
1546 debug!(error = %err, "Failed to refresh local Nostr overlay advert");
1547 }
1548 }
1549
1550 async fn drain_nostr_mesh_signals(&mut self, bootstrap: &std::sync::Arc<NostrDiscovery>) {
1551 let mut deferred = Vec::new();
1552
1553 for signal in bootstrap.drain_mesh_signals().await {
1554 let (peer_npub, msg_type, payload) = match &signal {
1555 MeshTraversalSignal::Offer { peer_npub, offer } => {
1556 let payload = match serde_json::to_vec(&offer) {
1557 Ok(payload) => payload,
1558 Err(error) => {
1559 debug!(
1560 peer = %peer_npub,
1561 error = %error,
1562 "Failed to encode mesh traversal offer"
1563 );
1564 continue;
1565 }
1566 };
1567 (
1568 peer_npub.clone(),
1569 SessionMessageType::TraversalOffer.to_byte(),
1570 payload,
1571 )
1572 }
1573 MeshTraversalSignal::Answer { peer_npub, answer } => {
1574 let payload = match serde_json::to_vec(&answer) {
1575 Ok(payload) => payload,
1576 Err(error) => {
1577 debug!(
1578 peer = %peer_npub,
1579 error = %error,
1580 "Failed to encode mesh traversal answer"
1581 );
1582 continue;
1583 }
1584 };
1585 (
1586 peer_npub.clone(),
1587 SessionMessageType::TraversalAnswer.to_byte(),
1588 payload,
1589 )
1590 }
1591 };
1592
1593 let peer_identity = match PeerIdentity::from_npub(&peer_npub) {
1594 Ok(identity) => identity,
1595 Err(error) => {
1596 debug!(
1597 peer = %peer_npub,
1598 error = %error,
1599 "Cannot send mesh traversal signal to invalid peer npub"
1600 );
1601 continue;
1602 }
1603 };
1604 let peer_addr = *peer_identity.node_addr();
1605 match self
1606 .mesh_signal_session_action(peer_addr, peer_identity.pubkey_full())
1607 .await
1608 {
1609 MeshSignalSessionAction::Send => {}
1610 MeshSignalSessionAction::Defer => {
1611 deferred.push(signal);
1612 continue;
1613 }
1614 MeshSignalSessionAction::Drop => continue,
1615 }
1616
1617 if let Err(error) = self.send_session_msg(&peer_addr, msg_type, &payload).await {
1618 debug!(
1619 peer = %self.peer_display_name(&peer_addr),
1620 error = %error,
1621 "Failed to send mesh traversal signal"
1622 );
1623 }
1624 }
1625
1626 for signal in deferred {
1627 bootstrap.requeue_mesh_signal(signal);
1628 }
1629 }
1630
1631 async fn mesh_signal_session_action(
1632 &mut self,
1633 peer_addr: NodeAddr,
1634 peer_pubkey: PublicKey,
1635 ) -> MeshSignalSessionAction {
1636 if let Some(entry) = self.sessions.get(&peer_addr) {
1637 if entry.is_established() {
1638 return MeshSignalSessionAction::Send;
1639 }
1640 if entry.is_initiating() || entry.is_awaiting_msg3() {
1641 debug!(
1642 peer = %self.peer_display_name(&peer_addr),
1643 "Deferring mesh traversal signal until end-to-end session is established"
1644 );
1645 return MeshSignalSessionAction::Defer;
1646 }
1647 }
1648
1649 if self.find_next_hop(&peer_addr).is_none() {
1650 debug!(
1651 peer = %self.peer_display_name(&peer_addr),
1652 "Cannot warm mesh traversal signal session without a FIPS route"
1653 );
1654 self.maybe_initiate_lookup(&peer_addr).await;
1655 return MeshSignalSessionAction::Drop;
1656 }
1657
1658 self.register_identity(peer_addr, peer_pubkey);
1659 match self.initiate_session(peer_addr, peer_pubkey).await {
1660 Ok(()) => {
1661 debug!(
1662 peer = %self.peer_display_name(&peer_addr),
1663 "Warming end-to-end session for mesh traversal signal"
1664 );
1665 MeshSignalSessionAction::Defer
1666 }
1667 Err(NodeError::SendFailed { node_addr, reason })
1668 if node_addr == peer_addr && reason == "no route to destination" =>
1669 {
1670 debug!(
1671 peer = %self.peer_display_name(&peer_addr),
1672 "Cannot warm mesh traversal signal session without a FIPS route"
1673 );
1674 self.maybe_initiate_lookup(&peer_addr).await;
1675 MeshSignalSessionAction::Drop
1676 }
1677 Err(error) => {
1678 debug!(
1679 peer = %self.peer_display_name(&peer_addr),
1680 error = %error,
1681 "Failed to warm end-to-end session for mesh traversal signal"
1682 );
1683 MeshSignalSessionAction::Drop
1684 }
1685 }
1686 }
1687
1688 pub(super) fn lan_discovery_scope(&self) -> Option<String> {
1694 if let Some(scope) = self.config.node.discovery.lan.scope.as_deref() {
1695 let scope = scope.trim();
1696 if !scope.is_empty() {
1697 return Some(scope.to_string());
1698 }
1699 }
1700
1701 let app = self.config.node.discovery.nostr.app.trim();
1702 if app.is_empty() {
1703 return None;
1704 }
1705 if let Some(rest) = app.strip_prefix("fips-overlay-v1:") {
1706 let scope = rest.trim();
1707 if scope.is_empty() {
1708 None
1709 } else {
1710 Some(scope.to_string())
1711 }
1712 } else {
1713 Some(app.to_string())
1714 }
1715 }
1716
1717 pub(super) fn start_local_instance_discovery(&mut self) {
1718 if !self.config.node.discovery.local.enabled {
1719 return;
1720 }
1721 let Some(scope) = self.lan_discovery_scope() else {
1722 debug!("local instance discovery not started: no discovery scope");
1723 return;
1724 };
1725 let now_ms = Self::now_ms();
1726 match crate::discovery::local::LocalInstanceRegistry::new(
1727 self.identity.npub(),
1728 scope,
1729 &self.config.node.discovery.local,
1730 now_ms,
1731 ) {
1732 Ok(registry) => {
1733 self.local_instance_registry = Some(registry);
1734 self.local_instance_started_at_ms = Some(now_ms);
1735 self.last_local_instance_publish_ms = None;
1736 self.last_local_instance_scan_ms = None;
1737 self.publish_local_instance_record(now_ms);
1738 info!("Same-host FIPS instance discovery enabled");
1739 }
1740 Err(crate::discovery::local::LocalInstanceRegistryError::Disabled) => {
1741 debug!("same-host FIPS instance discovery disabled");
1742 }
1743 Err(err) => {
1744 debug!(error = %err, "same-host FIPS instance discovery not started");
1745 }
1746 }
1747 }
1748
1749 fn local_instance_contacts(&self) -> Vec<crate::discovery::local::LocalInstanceContact> {
1750 let mut contacts = Vec::new();
1751 for handle in self.transports.values() {
1752 if !handle.is_operational() || !handle.accept_connections() {
1753 continue;
1754 }
1755 let transport = handle.transport_type().name;
1756 if transport != "udp" && transport != "tcp" {
1757 continue;
1758 }
1759 let Some(local_addr) = handle.local_addr() else {
1760 continue;
1761 };
1762 let Some(contact) =
1763 crate::discovery::local::contact_for_transport_addr(transport, local_addr)
1764 else {
1765 continue;
1766 };
1767 if contacts
1768 .iter()
1769 .any(|existing: &crate::discovery::local::LocalInstanceContact| {
1770 existing.transport == contact.transport && existing.addr == contact.addr
1771 })
1772 {
1773 continue;
1774 }
1775 contacts.push(contact);
1776 }
1777 contacts
1778 }
1779
1780 fn publish_local_instance_record(&mut self, now_ms: u64) {
1781 let Some(registry) = self.local_instance_registry.clone() else {
1782 return;
1783 };
1784 let contacts = self.local_instance_contacts();
1785 match registry.publish(contacts, now_ms) {
1786 Ok(()) => {
1787 self.last_local_instance_publish_ms = Some(now_ms);
1788 }
1789 Err(err) => {
1790 debug!(error = %err, "failed to publish same-host FIPS instance record");
1791 }
1792 }
1793 }
1794
1795 fn maybe_publish_local_instance_record(&mut self, now_ms: u64) {
1796 if self.local_instance_registry.is_none() {
1797 return;
1798 }
1799 let interval_ms = self.config.node.discovery.local.publish_interval_ms();
1800 let due = self
1801 .last_local_instance_publish_ms
1802 .map(|last| now_ms.saturating_sub(last) >= interval_ms)
1803 .unwrap_or(true);
1804 if due {
1805 self.publish_local_instance_record(now_ms);
1806 }
1807 }
1808
1809 fn local_instance_scan_due(&self, now_ms: u64) -> bool {
1810 if self.local_instance_registry.is_none() {
1811 return false;
1812 }
1813 let cfg = &self.config.node.discovery.local;
1814 let interval_ms = if self
1815 .local_instance_started_at_ms
1816 .map(|started| now_ms.saturating_sub(started) <= cfg.startup_scan_duration_ms())
1817 .unwrap_or(false)
1818 {
1819 cfg.startup_scan_interval_ms()
1820 } else {
1821 cfg.scan_interval_ms()
1822 };
1823 self.last_local_instance_scan_ms
1824 .map(|last| now_ms.saturating_sub(last) >= interval_ms)
1825 .unwrap_or(true)
1826 }
1827
1828 fn local_instance_peer_allowed(&self, identity: &PeerIdentity) -> bool {
1829 if self.config.peers().iter().any(|peer| {
1830 PeerIdentity::from_npub(&peer.npub)
1831 .map(|configured| configured.node_addr() == identity.node_addr())
1832 .unwrap_or(false)
1833 }) {
1834 return true;
1835 }
1836 self.config.node.discovery.nostr.policy == NostrDiscoveryPolicy::Open
1837 }
1838
1839 fn local_instance_peer_addresses(
1840 &self,
1841 record: &crate::discovery::local::LocalInstanceRecord,
1842 ) -> Vec<PeerAddress> {
1843 let mut addresses = Vec::new();
1844 for contact in &record.contacts {
1845 if contact.transport != "udp" && contact.transport != "tcp" {
1846 continue;
1847 }
1848 let Ok(socket_addr) = contact.addr.parse::<SocketAddr>() else {
1849 debug!(
1850 npub = %record.npub,
1851 transport = %contact.transport,
1852 addr = %contact.addr,
1853 "local instance discovery: skip non-socket contact"
1854 );
1855 continue;
1856 };
1857 if !socket_addr.ip().is_loopback() {
1858 debug!(
1859 npub = %record.npub,
1860 addr = %contact.addr,
1861 "local instance discovery: skip non-loopback contact"
1862 );
1863 continue;
1864 }
1865 let address =
1866 PeerAddress::with_priority(contact.transport.clone(), contact.addr.clone(), 10)
1867 .with_seen_at_ms(record.updated_at_ms);
1868 if addresses.iter().any(|existing: &PeerAddress| {
1869 existing.transport == address.transport && existing.addr == address.addr
1870 }) {
1871 continue;
1872 }
1873 addresses.push(address);
1874 }
1875 addresses
1876 }
1877
1878 pub(super) async fn poll_local_instance_discovery(&mut self) {
1882 let Some(registry) = self.local_instance_registry.clone() else {
1883 return;
1884 };
1885 let now_ms = Self::now_ms();
1886 self.maybe_publish_local_instance_record(now_ms);
1887 if !self.local_instance_scan_due(now_ms) {
1888 return;
1889 }
1890 self.last_local_instance_scan_ms = Some(now_ms);
1891
1892 let records = match registry.scan(now_ms, self.config.node.discovery.local.stale_after_ms())
1893 {
1894 Ok(records) => records,
1895 Err(err) => {
1896 debug!(error = %err, "same-host FIPS instance scan failed");
1897 return;
1898 }
1899 };
1900 if records.is_empty() {
1901 return;
1902 }
1903
1904 let mut connect_budget = self.discovery_connect_budget();
1905 let mut skipped_budget = 0usize;
1906 for record in records {
1907 let identity = match PeerIdentity::from_npub(&record.npub) {
1908 Ok(identity) => identity,
1909 Err(err) => {
1910 debug!(npub = %record.npub, error = %err, "local instance discovery: skip bad npub");
1911 continue;
1912 }
1913 };
1914 let peer_node_addr = *identity.node_addr();
1915 if peer_node_addr == *self.identity.node_addr() {
1916 continue;
1917 }
1918 if !self.local_instance_peer_allowed(&identity) {
1919 debug!(
1920 npub = %identity.short_npub(),
1921 "local instance discovery: skip unconfigured peer"
1922 );
1923 continue;
1924 }
1925
1926 let addresses = self.local_instance_peer_addresses(&record);
1927 if addresses.is_empty() {
1928 continue;
1929 }
1930
1931 if self.peers.contains_key(&peer_node_addr)
1932 && self.active_peer_candidate_is_fresh_enough_to_skip(&peer_node_addr, &addresses)
1933 {
1934 continue;
1935 }
1936
1937 for address in addresses {
1938 let Some((transport_id, remote_addr)) =
1939 self.resolve_peer_address_for_match(&address)
1940 else {
1941 continue;
1942 };
1943 if self.is_connecting_to_peer_on_path(&peer_node_addr, transport_id, &remote_addr) {
1944 continue;
1945 }
1946 if connect_budget == 0 || self.path_candidate_attempt_budget(&peer_node_addr) == 0 {
1947 skipped_budget = skipped_budget.saturating_add(1);
1948 continue;
1949 }
1950 info!(
1951 npub = %identity.short_npub(),
1952 transport = %address.transport,
1953 addr = %address.addr,
1954 "same-host FIPS instance discovery: initiating handshake"
1955 );
1956 if let Err(err) = self
1957 .initiate_connection(transport_id, remote_addr, identity)
1958 .await
1959 {
1960 debug!(
1961 npub = %record.npub,
1962 error = %err,
1963 "same-host FIPS instance discovery: failed to initiate connection"
1964 );
1965 }
1966 connect_budget = connect_budget.saturating_sub(1);
1967 }
1968 }
1969 if skipped_budget > 0 {
1970 debug!(
1971 skipped = skipped_budget,
1972 "same-host FIPS instance discovery connect budget exhausted"
1973 );
1974 }
1975 }
1976
1977 pub(super) async fn poll_lan_discovery(&mut self) {
1984 let Some(runtime) = self.lan_discovery.clone() else {
1985 return;
1986 };
1987 let events = runtime.drain_events().await;
1988 if events.is_empty() {
1989 return;
1990 }
1991 let mut connect_budget = self.discovery_connect_budget();
1992 let mut skipped_budget = 0usize;
1993 for event in events {
1994 let crate::discovery::lan::LanEvent::Discovered(peer) = event;
1995 let Some((transport_id, local_addr)) =
1996 self.find_udp_transport_for_remote_addr(peer.addr)
1997 else {
1998 debug!(
1999 addr = %peer.addr,
2000 "lan: skip discovered peer with no compatible UDP transport"
2001 );
2002 continue;
2003 };
2004 let identity = match crate::PeerIdentity::from_npub(&peer.npub) {
2005 Ok(id) => id,
2006 Err(err) => {
2007 debug!(npub = %peer.npub, error = %err, "lan: skip bad npub");
2008 continue;
2009 }
2010 };
2011 let peer_node_addr = *identity.node_addr();
2012 let remote_addr = crate::transport::TransportAddr::from_string(&peer.addr.to_string());
2013 if self.peers.contains_key(&peer_node_addr) {
2014 let candidate = PeerAddress::new("udp", peer.addr.to_string());
2015 if self.active_peer_candidate_is_fresh_enough_to_skip(
2016 &peer_node_addr,
2017 std::slice::from_ref(&candidate),
2018 ) {
2019 continue;
2020 }
2021 if self.is_connecting_to_peer_on_path(&peer_node_addr, transport_id, &remote_addr) {
2022 continue;
2023 }
2024 if connect_budget == 0 || self.path_candidate_attempt_budget(&peer_node_addr) == 0 {
2025 skipped_budget = skipped_budget.saturating_add(1);
2026 continue;
2027 }
2028 info!(
2029 npub = %identity.short_npub(),
2030 addr = %peer.addr,
2031 local_addr = %local_addr,
2032 "lan: initiating alternate-path handshake to active peer"
2033 );
2034 if let Err(err) = self
2035 .initiate_connection(transport_id, remote_addr, identity)
2036 .await
2037 {
2038 debug!(
2039 npub = %peer.npub,
2040 error = %err,
2041 "lan: failed to initiate active peer alternate-path handshake"
2042 );
2043 }
2044 connect_budget = connect_budget.saturating_sub(1);
2045 continue;
2046 }
2047 if self.is_connecting_to_peer_on_path(&peer_node_addr, transport_id, &remote_addr) {
2048 continue;
2049 }
2050 if connect_budget == 0 || self.path_candidate_attempt_budget(&peer_node_addr) == 0 {
2051 skipped_budget = skipped_budget.saturating_add(1);
2052 continue;
2053 }
2054 info!(
2055 npub = %identity.short_npub(),
2056 addr = %peer.addr,
2057 local_addr = %local_addr,
2058 "lan: initiating handshake to discovered peer"
2059 );
2060 if let Err(err) = self
2061 .initiate_connection(transport_id, remote_addr, identity)
2062 .await
2063 {
2064 debug!(
2065 npub = %peer.npub,
2066 error = %err,
2067 "lan: failed to initiate connection to discovered peer"
2068 );
2069 }
2070 connect_budget = connect_budget.saturating_sub(1);
2071 }
2072 if skipped_budget > 0 {
2073 debug!(
2074 skipped = skipped_budget,
2075 "lan: discovery connect budget exhausted"
2076 );
2077 }
2078 }
2079
2080 pub(super) async fn poll_pending_connects(&mut self) {
2087 if self.pending_connects.is_empty() {
2088 return;
2089 }
2090
2091 let mut completed = Vec::new();
2092
2093 for (i, pending) in self.pending_connects.iter().enumerate() {
2094 let state = if let Some(transport) = self.transports.get(&pending.transport_id) {
2095 transport.connection_state(&pending.remote_addr)
2096 } else {
2097 crate::transport::ConnectionState::Failed("transport removed".into())
2098 };
2099
2100 match state {
2101 crate::transport::ConnectionState::Connected => {
2102 completed.push((i, true, None));
2103 }
2104 crate::transport::ConnectionState::Failed(reason) => {
2105 completed.push((i, false, Some(reason)));
2106 }
2107 crate::transport::ConnectionState::Connecting => {
2108 }
2110 crate::transport::ConnectionState::None => {
2111 completed.push((i, false, Some("no connection attempt found".into())));
2113 }
2114 }
2115 }
2116
2117 for (i, success, reason) in completed.into_iter().rev() {
2119 let pending = self.pending_connects.remove(i);
2120
2121 if success {
2122 if let Some(link) = self.links.get_mut(&pending.link_id) {
2124 link.set_connected();
2125 }
2126
2127 debug!(
2128 peer = %self.peer_display_name(pending.peer_identity.node_addr()),
2129 transport_id = %pending.transport_id,
2130 remote_addr = %pending.remote_addr,
2131 link_id = %pending.link_id,
2132 "Transport connected, starting handshake"
2133 );
2134
2135 if let Err(e) = self
2137 .start_handshake(
2138 pending.link_id,
2139 pending.transport_id,
2140 pending.remote_addr.clone(),
2141 pending.peer_identity,
2142 )
2143 .await
2144 {
2145 warn!(
2146 link_id = %pending.link_id,
2147 error = %e,
2148 "Failed to start handshake after transport connect"
2149 );
2150 self.remove_link(&pending.link_id);
2152 }
2153 } else {
2154 let reason = reason.unwrap_or_default();
2155 warn!(
2156 peer = %self.peer_display_name(pending.peer_identity.node_addr()),
2157 transport_id = %pending.transport_id,
2158 remote_addr = %pending.remote_addr,
2159 link_id = %pending.link_id,
2160 reason = %reason,
2161 "Transport connect failed"
2162 );
2163
2164 self.remove_link(&pending.link_id);
2166 self.links.remove(&pending.link_id);
2167 self.schedule_retry(*pending.peer_identity.node_addr(), Self::now_ms());
2168 }
2169 }
2170 }
2171
2172 pub async fn start(&mut self) -> Result<(), NodeError> {
2179 node_start_debug_log("Node::start begin");
2180 if !self.state.can_start() {
2181 return Err(NodeError::AlreadyStarted);
2182 }
2183 self.state = NodeState::Starting;
2184 node_start_debug_log("Node::start state set to starting");
2185
2186 let packet_buffer_size = self.config.node.buffers.packet_channel;
2188 let (packet_tx, packet_rx) = packet_channel(packet_buffer_size);
2189 self.packet_tx = Some(packet_tx.clone());
2190 self.packet_rx = Some(packet_rx);
2191 node_start_debug_log("Node::start packet channel created");
2192
2193 node_start_debug_log("Node::start create transports begin");
2195 let transport_handles = self.create_transports(&packet_tx).await;
2196 node_start_debug_log(format!(
2197 "Node::start create transports complete count={}",
2198 transport_handles.len()
2199 ));
2200
2201 for mut handle in transport_handles {
2202 let transport_id = handle.transport_id();
2203 let transport_type = handle.transport_type().name;
2204 let name = handle.name().map(|s| s.to_string());
2205
2206 node_start_debug_log(format!(
2207 "Node::start transport start begin id={} type={} name={:?}",
2208 transport_id, transport_type, name
2209 ));
2210 match handle.start().await {
2211 Ok(()) => {
2212 node_start_debug_log(format!(
2213 "Node::start transport start ok id={} type={}",
2214 transport_id, transport_type
2215 ));
2216 self.transports.insert(transport_id, handle);
2217 }
2218 Err(e) => {
2219 node_start_debug_log(format!(
2220 "Node::start transport start error id={} type={} error={}",
2221 transport_id, transport_type, e
2222 ));
2223 if let Some(ref n) = name {
2224 warn!(transport_type, name = %n, error = %e, "Transport failed to start");
2225 } else {
2226 warn!(transport_type, error = %e, "Transport failed to start");
2227 }
2228 }
2229 }
2230 }
2231
2232 if !self.transports.is_empty() {
2233 info!(count = self.transports.len(), "Transports initialized");
2234 }
2235
2236 #[cfg(unix)]
2252 {
2253 if self.config.node.worker_pools_enabled {
2254 node_start_debug_log("Node::start worker pools begin");
2255 let cpu_default = std::thread::available_parallelism()
2256 .map(|n| n.get())
2257 .unwrap_or(1)
2258 .max(1);
2259 let encrypt_worker_count: usize = std::env::var("FIPS_ENCRYPT_WORKERS")
2260 .ok()
2261 .and_then(|s| s.parse().ok())
2262 .unwrap_or(cpu_default)
2263 .max(1);
2264 self.encrypt_workers = Some(super::encrypt_worker::EncryptWorkerPool::spawn(
2265 encrypt_worker_count,
2266 ));
2267 info!(
2268 workers = encrypt_worker_count,
2269 "Spawned FMP-encrypt worker pool"
2270 );
2271
2272 let decrypt_worker_count: usize = std::env::var("FIPS_DECRYPT_WORKERS")
2281 .ok()
2282 .and_then(|s| s.parse().ok())
2283 .unwrap_or(cpu_default);
2284 if decrypt_worker_count == 0 {
2285 info!("FIPS_DECRYPT_WORKERS=0 → in-line decrypt in rx_loop (no worker pool)");
2286 } else {
2287 self.decrypt_workers = Some(super::decrypt_worker::DecryptWorkerPool::spawn(
2288 decrypt_worker_count,
2289 ));
2290 info!(
2291 workers = decrypt_worker_count,
2292 "Spawned FMP+FSP-decrypt worker pool"
2293 );
2294 }
2295 node_start_debug_log("Node::start worker pools complete");
2296 } else {
2297 node_start_debug_log("Node::start worker pools disabled");
2298 info!("FIPS worker pools disabled; using in-line crypto/send path");
2299 }
2300 }
2301
2302 if self.config.node.discovery.nostr.enabled {
2303 node_start_debug_log("Node::start nostr discovery start begin");
2304 match NostrDiscovery::start(&self.identity, self.config.node.discovery.nostr.clone())
2305 .await
2306 {
2307 Ok(runtime) => {
2308 node_start_debug_log("Node::start nostr discovery runtime created");
2309 if let Err(err) = self.refresh_overlay_advert(&runtime).await {
2310 warn!(error = %err, "Failed to publish initial Nostr overlay advert");
2311 }
2312 node_start_debug_log("Node::start nostr overlay advert refreshed");
2313 self.nostr_discovery = Some(runtime);
2314 self.nostr_discovery_started_at_ms = Some(Self::now_ms());
2315 info!("Nostr overlay discovery enabled");
2316 }
2317 Err(err) => {
2318 node_start_debug_log(format!(
2319 "Node::start nostr discovery start error error={}",
2320 err
2321 ));
2322 warn!(error = %err, "Failed to start Nostr overlay discovery");
2323 }
2324 }
2325 }
2326
2327 if self.config.node.discovery.lan.enabled {
2331 node_start_debug_log("Node::start lan discovery start begin");
2332 let advertised_udp_port = self
2333 .transports
2334 .values()
2335 .filter(|h| h.is_operational())
2336 .filter(|h| h.transport_type().name == "udp")
2337 .find_map(|h| h.local_addr().map(|addr| addr.port()))
2338 .unwrap_or(0);
2339 let scope = self.lan_discovery_scope();
2340 match crate::discovery::lan::LanDiscovery::start(
2341 &self.identity,
2342 scope,
2343 advertised_udp_port,
2344 self.config.node.discovery.lan.clone(),
2345 )
2346 .await
2347 {
2348 Ok(runtime) => {
2349 node_start_debug_log("Node::start lan discovery start ok");
2350 self.lan_discovery = Some(runtime);
2351 info!("LAN mDNS discovery enabled");
2352 }
2353 Err(err) => {
2354 node_start_debug_log(format!(
2355 "Node::start lan discovery start error error={}",
2356 err
2357 ));
2358 debug!(error = %err, "LAN mDNS discovery not started");
2359 }
2360 }
2361 }
2362
2363 self.start_local_instance_discovery();
2364 self.poll_local_instance_discovery().await;
2365
2366 node_start_debug_log("Node::start initiate peer connections begin");
2369 self.initiate_peer_connections().await;
2370 node_start_debug_log("Node::start initiate peer connections complete");
2371
2372 if self.config.tun.enabled {
2374 node_start_debug_log("Node::start tun init begin");
2375 let address = *self.identity.address();
2376 match TunDevice::create(&self.config.tun, address).await {
2377 Ok(device) => {
2378 let mtu = device.mtu();
2379 let name = device.name().to_string();
2380 let our_addr = *device.address();
2381
2382 info!("TUN device active:");
2383 info!(" name: {}", name);
2384 info!(" address: {}", device.address());
2385 info!(" mtu: {}", mtu);
2386
2387 let effective_mtu = self.effective_ipv6_mtu();
2389 let max_mss = effective_mtu.saturating_sub(40).saturating_sub(20); info!("effective MTU: {} bytes", effective_mtu);
2392 debug!(" max TCP MSS: {} bytes", max_mss);
2393
2394 #[cfg(target_os = "macos")]
2398 let (shutdown_read_fd, shutdown_write_fd) = {
2399 let mut fds = [0i32; 2];
2400 if unsafe { libc::pipe(fds.as_mut_ptr()) } < 0 {
2401 return Err(NodeError::Tun(crate::upper::tun::TunError::Configure(
2402 "failed to create shutdown pipe".into(),
2403 )));
2404 }
2405 (fds[0], fds[1])
2406 };
2407
2408 let (writer, tun_tx) =
2412 device.create_writer(max_mss, self.path_mtu_lookup.clone())?;
2413
2414 let writer_handle = thread::spawn(move || {
2416 writer.run();
2417 });
2418
2419 let reader_tun_tx = tun_tx.clone();
2421
2422 let tun_channel_size = self.config.node.buffers.tun_channel;
2424 let (outbound_tx, outbound_rx) = tokio::sync::mpsc::channel(tun_channel_size);
2425
2426 let transport_mtu = self.transport_mtu();
2428 let path_mtu_lookup = self.path_mtu_lookup.clone();
2429 #[cfg(target_os = "macos")]
2430 let reader_handle = thread::spawn(move || {
2431 run_tun_reader(
2432 device,
2433 mtu,
2434 our_addr,
2435 reader_tun_tx,
2436 outbound_tx,
2437 transport_mtu,
2438 path_mtu_lookup,
2439 shutdown_read_fd,
2440 );
2441 });
2442 #[cfg(not(target_os = "macos"))]
2443 let reader_handle = thread::spawn(move || {
2444 run_tun_reader(
2445 device,
2446 mtu,
2447 our_addr,
2448 reader_tun_tx,
2449 outbound_tx,
2450 transport_mtu,
2451 path_mtu_lookup,
2452 );
2453 });
2454
2455 self.tun_state = TunState::Active;
2456 self.tun_name = Some(name);
2457 self.tun_tx = Some(tun_tx);
2458 self.tun_outbound_rx = Some(outbound_rx);
2459 self.tun_reader_handle = Some(reader_handle);
2460 self.tun_writer_handle = Some(writer_handle);
2461 #[cfg(target_os = "macos")]
2462 {
2463 self.tun_shutdown_fd = Some(shutdown_write_fd);
2464 }
2465 }
2466 Err(e) => {
2467 self.tun_state = TunState::Failed;
2468 warn!(error = %e, "Failed to initialize TUN, continuing without it");
2469 }
2470 }
2471 node_start_debug_log("Node::start tun init complete");
2472 }
2473
2474 if self.config.dns.enabled {
2491 node_start_debug_log("Node::start dns init begin");
2492 let addr_str = self.config.dns.bind_addr();
2493 match addr_str.parse::<std::net::IpAddr>() {
2494 Ok(ip) => {
2495 let bind = std::net::SocketAddr::new(ip, self.config.dns.port());
2496 match Self::bind_dns_socket(bind) {
2497 Ok(socket) => {
2498 let dns_channel_size = self.config.node.buffers.dns_channel;
2499 let (identity_tx, identity_rx) =
2500 tokio::sync::mpsc::channel(dns_channel_size);
2501 let dns_ttl = self.config.dns.ttl();
2502 let base_hosts = crate::upper::hosts::HostMap::from_peer_configs(
2503 self.config.peers(),
2504 );
2505 let reloader = if self.config.node.system_files_enabled {
2506 let hosts_path = std::path::PathBuf::from(
2507 crate::upper::hosts::DEFAULT_HOSTS_PATH,
2508 );
2509 crate::upper::hosts::HostMapReloader::new(base_hosts, hosts_path)
2510 } else {
2511 crate::upper::hosts::HostMapReloader::memory_only(base_hosts)
2512 };
2513 let mesh_ifindex = Self::lookup_mesh_ifindex(self.config.tun.name());
2521 info!(
2522 bind = %bind,
2523 hosts = reloader.hosts().len(),
2524 mesh_ifindex = ?mesh_ifindex,
2525 "DNS responder started for .fips domain (auto-reload enabled)"
2526 );
2527 let handle = tokio::spawn(crate::upper::dns::run_dns_responder(
2528 socket,
2529 identity_tx,
2530 dns_ttl,
2531 reloader,
2532 mesh_ifindex,
2533 ));
2534 self.dns_identity_rx = Some(identity_rx);
2535 self.dns_task = Some(handle);
2536 }
2537 Err(e) => {
2538 warn!(bind = %bind, error = %e, "Failed to start DNS responder");
2539 }
2540 }
2541 }
2542 Err(e) => {
2543 warn!(addr = %addr_str, error = %e, "Invalid dns.bind_addr; DNS responder not started");
2544 }
2545 }
2546 node_start_debug_log("Node::start dns init complete");
2547 }
2548
2549 self.state = NodeState::Running;
2550 node_start_debug_log("Node::start running");
2551 info!("Node started:");
2552 info!(" state: {}", self.state);
2553 info!(" transports: {}", self.transports.len());
2554 info!(" connections: {}", self.connections.len());
2555 Ok(())
2556 }
2557
2558 fn bind_dns_socket(
2571 addr: std::net::SocketAddr,
2572 ) -> Result<tokio::net::UdpSocket, std::io::Error> {
2573 use socket2::{Domain, Protocol, Socket, Type};
2574 let domain = if addr.is_ipv4() {
2575 Domain::IPV4
2576 } else {
2577 Domain::IPV6
2578 };
2579 let sock = Socket::new(domain, Type::DGRAM, Some(Protocol::UDP))?;
2580 if addr.is_ipv6() {
2581 sock.set_only_v6(false)?;
2582 #[cfg(unix)]
2583 Self::set_recv_pktinfo_v6(&sock)?;
2584 }
2585 sock.set_nonblocking(true)?;
2586 sock.bind(&addr.into())?;
2587 tokio::net::UdpSocket::from_std(sock.into())
2588 }
2589
2590 #[cfg(unix)]
2596 fn set_recv_pktinfo_v6(sock: &socket2::Socket) -> Result<(), std::io::Error> {
2597 use std::os::fd::AsRawFd;
2598 let enable: libc::c_int = 1;
2599 let ret = unsafe {
2600 libc::setsockopt(
2601 sock.as_raw_fd(),
2602 libc::IPPROTO_IPV6,
2603 libc::IPV6_RECVPKTINFO,
2604 &enable as *const _ as *const libc::c_void,
2605 std::mem::size_of::<libc::c_int>() as libc::socklen_t,
2606 )
2607 };
2608 if ret < 0 {
2609 return Err(std::io::Error::last_os_error());
2610 }
2611 Ok(())
2612 }
2613
2614 fn lookup_mesh_ifindex(name: &str) -> Option<u32> {
2621 #[cfg(unix)]
2622 {
2623 let c_name = std::ffi::CString::new(name).ok()?;
2624 let idx = unsafe { libc::if_nametoindex(c_name.as_ptr()) };
2625 if idx == 0 { None } else { Some(idx) }
2626 }
2627 #[cfg(not(unix))]
2628 {
2629 let _ = name;
2630 None
2631 }
2632 }
2633
2634 pub async fn stop(&mut self) -> Result<(), NodeError> {
2639 if !self.state.can_stop() {
2640 return Err(NodeError::NotStarted);
2641 }
2642 self.state = NodeState::Stopping;
2643 info!(state = %self.state, "Node stopping");
2644
2645 if let Some(handle) = self.dns_task.take() {
2647 handle.abort();
2648 debug!("DNS responder stopped");
2649 }
2650
2651 self.send_disconnect_to_all_peers(DisconnectReason::Shutdown)
2653 .await;
2654
2655 if let Some(bootstrap) = self.nostr_discovery.take()
2657 && let Err(e) = bootstrap.shutdown().await
2658 {
2659 warn!(error = %e, "Failed to shutdown Nostr overlay discovery");
2660 }
2661
2662 if let Some(lan) = self.lan_discovery.take() {
2666 lan.shutdown().await;
2667 }
2668
2669 if let Some(registry) = self.local_instance_registry.take()
2670 && let Err(err) = registry.remove()
2671 {
2672 debug!(error = %err, "failed to remove same-host FIPS instance record");
2673 }
2674
2675 let transport_ids: Vec<_> = self.transports.keys().cloned().collect();
2677 for transport_id in transport_ids {
2678 if let Some(mut handle) = self.transports.remove(&transport_id) {
2679 let transport_type = handle.transport_type().name;
2680 match handle.stop().await {
2681 Ok(()) => {
2682 info!(transport_id = %transport_id, transport_type, "Transport stopped");
2683 }
2684 Err(e) => {
2685 warn!(
2686 transport_id = %transport_id,
2687 transport_type,
2688 error = %e,
2689 "Transport stop failed"
2690 );
2691 }
2692 }
2693 }
2694 }
2695
2696 self.packet_tx.take();
2698 self.packet_rx.take();
2699
2700 if let Some(name) = self.tun_name.take() {
2702 info!(name = %name, "Shutting down TUN interface");
2703
2704 self.tun_tx.take();
2706
2707 if let Err(e) = shutdown_tun_interface(&name).await {
2709 warn!(name = %name, error = %e, "Failed to shutdown TUN interface");
2710 }
2711
2712 #[cfg(target_os = "macos")]
2715 if let Some(fd) = self.tun_shutdown_fd.take() {
2716 unsafe {
2717 libc::write(fd, b"x".as_ptr() as *const libc::c_void, 1);
2718 libc::close(fd);
2719 }
2720 }
2721
2722 if let Some(handle) = self.tun_reader_handle.take() {
2724 let _ = handle.join();
2725 }
2726 if let Some(handle) = self.tun_writer_handle.take() {
2727 let _ = handle.join();
2728 }
2729
2730 self.tun_state = TunState::Disabled;
2731 }
2732
2733 self.state = NodeState::Stopped;
2734 info!(state = %self.state, "Node stopped");
2735 Ok(())
2736 }
2737
2738 async fn send_disconnect_to_all_peers(&mut self, reason: DisconnectReason) {
2743 let disconnect = Disconnect::new(reason);
2744 let plaintext = disconnect.encode();
2745
2746 let peer_addrs: Vec<NodeAddr> = self
2748 .peers
2749 .iter()
2750 .filter(|(_, peer)| peer.can_send() && peer.has_session())
2751 .map(|(addr, _)| *addr)
2752 .collect();
2753
2754 if peer_addrs.is_empty() {
2755 debug!(
2756 total_peers = self.peers.len(),
2757 "No sendable peers for disconnect notification"
2758 );
2759 return;
2760 }
2761
2762 let mut sent = 0usize;
2763 for node_addr in &peer_addrs {
2764 match self
2765 .send_encrypted_link_message(node_addr, &plaintext)
2766 .await
2767 {
2768 Ok(()) => sent += 1,
2769 Err(e) => {
2770 debug!(
2771 peer = %self.peer_display_name(node_addr),
2772 error = %e,
2773 "Failed to send disconnect (transport may be down)"
2774 );
2775 }
2776 }
2777 }
2778
2779 info!(sent, total = peer_addrs.len(), reason = %reason, "Sent disconnect notifications");
2780 }
2781
2782 pub(in crate::node) fn static_peer_addresses(
2783 &self,
2784 peer_config: &PeerConfig,
2785 ) -> Vec<PeerAddress> {
2786 peer_config
2787 .addresses_by_priority()
2788 .into_iter()
2789 .cloned()
2790 .collect()
2791 }
2792
2793 async fn nostr_peer_fallback_addresses(
2794 &self,
2795 peer_config: &PeerConfig,
2796 existing: &[PeerAddress],
2797 ) -> Vec<PeerAddress> {
2798 if !self.config.node.discovery.nostr.enabled
2799 || self.config.node.discovery.nostr.policy
2800 == crate::config::NostrDiscoveryPolicy::Disabled
2801 {
2802 return Vec::new();
2803 }
2804
2805 let Some(bootstrap) = self.nostr_discovery.clone() else {
2806 return Vec::new();
2807 };
2808 if self.nostr_cooldown_applies_to_peer_config(peer_config)
2809 && bootstrap
2810 .cooldown_until(&peer_config.npub, Self::now_ms())
2811 .is_some()
2812 {
2813 debug!(
2814 npub = %peer_config.npub,
2815 "Skipping cached Nostr fallback endpoints while peer is in traversal cooldown"
2816 );
2817 return Vec::new();
2818 }
2819 let endpoints = match bootstrap
2820 .cached_advert_endpoints_for_peer(&peer_config.npub)
2821 .await
2822 {
2823 Some(endpoints) => endpoints,
2824 None => {
2825 debug!(
2826 npub = %peer_config.npub,
2827 "No cached Nostr advert endpoints for configured peer"
2828 );
2829 return Vec::new();
2830 }
2831 };
2832
2833 let mut fallback = Vec::new();
2834 let mut next_priority = existing
2835 .iter()
2836 .map(|addr| addr.priority)
2837 .max()
2838 .unwrap_or(100)
2839 .saturating_add(1);
2840 let seen_at_ms = Self::now_ms();
2846 for endpoint in endpoints {
2847 let Some(candidate) =
2848 Self::overlay_endpoint_to_peer_address(&endpoint, next_priority, seen_at_ms)
2849 else {
2850 continue;
2851 };
2852 if existing
2853 .iter()
2854 .any(|addr| addr.transport == candidate.transport && addr.addr == candidate.addr)
2855 || fallback.iter().any(|addr: &PeerAddress| {
2856 addr.transport == candidate.transport && addr.addr == candidate.addr
2857 })
2858 {
2859 continue;
2860 }
2861 fallback.push(candidate);
2862 next_priority = next_priority.saturating_add(1);
2863 }
2864 fallback
2865 }
2866
2867 pub(in crate::node) async fn request_nostr_bootstrap(&self, peer_config: &PeerConfig) -> bool {
2868 if !self.config.node.discovery.nostr.enabled
2869 || self.config.node.discovery.nostr.policy
2870 == crate::config::NostrDiscoveryPolicy::Disabled
2871 {
2872 return false;
2873 }
2874 let Some(bootstrap) = self.nostr_discovery.clone() else {
2875 return false;
2876 };
2877 let now_ms = Self::now_ms();
2878 if self.nostr_cooldown_applies_to_peer_config(peer_config)
2879 && let Some(cooldown_until_ms) = bootstrap.cooldown_until(&peer_config.npub, now_ms)
2880 {
2881 debug!(
2882 npub = %peer_config.npub,
2883 cooldown_secs = cooldown_until_ms.saturating_sub(now_ms) / 1000,
2884 "Skipping Nostr traversal request while peer is in cooldown"
2885 );
2886 return false;
2887 }
2888 bootstrap.set_outbound_admission(self.open_discovery_outbound_admission_check());
2889 bootstrap.set_direct_refresh_admission(self.outbound_direct_refresh_admission_check());
2890 let mesh_signaling_allowed = self.mesh_signaling_allowed_for_peer(peer_config);
2891 let started = bootstrap
2892 .request_connect_with_mesh_signaling(peer_config.clone(), mesh_signaling_allowed)
2893 .await;
2894 if started {
2895 info!(
2896 npub = %peer_config.npub,
2897 mesh_signaling_allowed,
2898 "Started background UDP NAT traversal attempt"
2899 );
2900 } else {
2901 debug!(
2902 npub = %peer_config.npub,
2903 mesh_signaling_allowed,
2904 "Background UDP NAT traversal attempt already in progress"
2905 );
2906 }
2907 true
2908 }
2909
2910 fn nostr_cooldown_applies_to_peer_config(&self, peer_config: &PeerConfig) -> bool {
2911 !self.mesh_signaling_allowed_for_peer(peer_config)
2912 }
2913
2914 pub(in crate::node) fn mesh_signaling_allowed_for_peer(
2915 &self,
2916 peer_config: &PeerConfig,
2917 ) -> bool {
2918 let Ok(identity) = PeerIdentity::from_npub(&peer_config.npub) else {
2919 return false;
2920 };
2921 let peer_addr = identity.node_addr();
2922 self.configured_peer(peer_addr).is_some()
2923 }
2924
2925 fn overlay_endpoint_to_peer_address(
2926 endpoint: &OverlayEndpointAdvert,
2927 priority: u8,
2928 seen_at_ms: u64,
2929 ) -> Option<PeerAddress> {
2930 let transport = match endpoint.transport {
2931 OverlayTransportKind::Udp => "udp",
2932 OverlayTransportKind::Tcp => "tcp",
2933 OverlayTransportKind::Tor => "tor",
2934 OverlayTransportKind::WebRtc => "webrtc",
2935 };
2936 Some(
2937 PeerAddress::with_priority(transport, endpoint.addr.clone(), priority)
2938 .with_seen_at_ms(seen_at_ms),
2939 )
2940 }
2941
2942 async fn attempt_peer_address_list(
2943 &mut self,
2944 peer_config: &PeerConfig,
2945 peer_identity: PeerIdentity,
2946 allow_bootstrap_nat: bool,
2947 addresses: &[PeerAddress],
2948 ) -> Result<(), NodeError> {
2949 let mut attempted = false;
2950 let mut local_route_error = None;
2951 let peer_node_addr = *peer_identity.node_addr();
2952 let mut concrete_budget = self.path_candidate_attempt_budget(&peer_node_addr);
2953
2954 for addr in addresses {
2955 if addr.transport == "udp" && addr.addr.eq_ignore_ascii_case("nat") {
2956 if !allow_bootstrap_nat {
2957 continue;
2958 }
2959 if self.request_nostr_bootstrap(peer_config).await {
2960 attempted = true;
2961 continue;
2962 }
2963 debug!(npub = %peer_config.npub, "No Nostr overlay runtime for udp:nat address");
2964 continue;
2965 }
2966
2967 let (transport_id, remote_addr) = if addr.transport == "ethernet" {
2968 match self.resolve_ethernet_addr(&addr.addr) {
2969 Ok(result) => result,
2970 Err(e) => {
2971 debug!(
2972 transport = %addr.transport,
2973 addr = %addr.addr,
2974 error = %e,
2975 "Failed to resolve Ethernet address"
2976 );
2977 continue;
2978 }
2979 }
2980 } else if addr.transport == "ble" {
2981 #[cfg(bluer_available)]
2982 {
2983 match self.resolve_ble_addr(&addr.addr) {
2984 Ok(result) => result,
2985 Err(e) => {
2986 debug!(
2987 transport = %addr.transport,
2988 addr = %addr.addr,
2989 error = %e,
2990 "Failed to resolve BLE address"
2991 );
2992 continue;
2993 }
2994 }
2995 }
2996 #[cfg(not(bluer_available))]
2997 {
2998 debug!(transport = %addr.transport, "BLE transport not available on this build");
2999 continue;
3000 }
3001 } else {
3002 let tid = if addr.transport == "udp"
3003 && let Ok(remote_socket_addr) = addr.addr.parse::<SocketAddr>()
3004 {
3005 match self.find_udp_transport_for_remote_addr(remote_socket_addr) {
3006 Some((id, _)) => id,
3007 None => {
3008 debug!(
3009 transport = %addr.transport,
3010 addr = %addr.addr,
3011 "No compatible operational UDP transport for address"
3012 );
3013 continue;
3014 }
3015 }
3016 } else {
3017 match self.find_transport_for_type(&addr.transport) {
3018 Some(id) => id,
3019 None => {
3020 debug!(
3021 transport = %addr.transport,
3022 addr = %addr.addr,
3023 "No operational transport for address type"
3024 );
3025 continue;
3026 }
3027 }
3028 };
3029 (tid, TransportAddr::from_string(&addr.addr))
3030 };
3031
3032 if self.is_connecting_to_peer_on_path(&peer_node_addr, transport_id, &remote_addr) {
3033 attempted = true;
3034 debug!(
3035 npub = %peer_config.npub,
3036 transport_id = %transport_id,
3037 remote_addr = %remote_addr,
3038 "Skipping duplicate in-flight candidate path"
3039 );
3040 continue;
3041 }
3042
3043 if concrete_budget == 0
3044 && self.reclaim_lower_priority_inflight_candidate_for_peer(&peer_node_addr, addr)
3045 {
3046 concrete_budget = self.path_candidate_attempt_budget(&peer_node_addr);
3047 }
3048
3049 if concrete_budget == 0 {
3050 debug!(
3051 npub = %peer_config.npub,
3052 max_candidates = MAX_PARALLEL_PATH_CANDIDATES_PER_PEER,
3053 "Path candidate race budget exhausted"
3054 );
3055 break;
3056 }
3057
3058 match self
3059 .initiate_connection(transport_id, remote_addr, peer_identity)
3060 .await
3061 {
3062 Ok(()) => {
3063 attempted = true;
3064 concrete_budget = concrete_budget.saturating_sub(1);
3065 }
3066 Err(e @ NodeError::AccessDenied(_)) => return Err(e),
3067 Err(e) => {
3068 if e.is_local_route_unavailable() && local_route_error.is_none() {
3069 local_route_error = Some(e.to_string());
3070 }
3071 debug!(
3072 npub = %peer_config.npub,
3073 transport_id = %transport_id,
3074 error = %e,
3075 "Connection attempt failed, trying next address"
3076 );
3077 }
3078 }
3079 }
3080
3081 if attempted {
3082 return Ok(());
3083 }
3084
3085 if let Some(error) = local_route_error {
3086 return Err(NodeError::LocalRouteUnavailable(error));
3087 }
3088
3089 Err(NodeError::NoTransportForType(format!(
3090 "no operational transport for any of {}'s addresses",
3091 peer_config.npub
3092 )))
3093 }
3094
3095 async fn queue_open_discovery_retries(&mut self, bootstrap: &std::sync::Arc<NostrDiscovery>) {
3096 self.run_open_discovery_sweep(bootstrap, None, "per-tick")
3097 .await;
3098 }
3099
3100 pub(in crate::node) fn queue_active_fallback_direct_retries(
3101 &mut self,
3102 _bootstrap: &std::sync::Arc<NostrDiscovery>,
3103 ) {
3104 let now_ms = Self::now_ms();
3105 let peer_configs = self
3106 .config
3107 .auto_connect_peers()
3108 .cloned()
3109 .collect::<Vec<_>>();
3110
3111 for peer_config in peer_configs {
3112 let Ok(peer_identity) = PeerIdentity::from_npub(&peer_config.npub) else {
3113 continue;
3114 };
3115 let node_addr = *peer_identity.node_addr();
3116
3117 if self.retry_pending.contains_key(&node_addr)
3118 || !self.peers.contains_key(&node_addr)
3119 || self.is_connecting_to_peer(&node_addr)
3120 || !self.active_peer_should_keep_direct_retry(&node_addr, &peer_config)
3121 {
3122 continue;
3123 }
3124
3125 let mut state = super::retry::RetryState::new(peer_config.clone());
3126 state.reconnect = true;
3127 state.retry_after_ms = now_ms;
3128 self.retry_pending.insert(node_addr, state);
3129
3130 debug!(
3131 peer = %self.peer_display_name(&node_addr),
3132 "Queued direct-path retry for active fallback peer"
3133 );
3134 }
3135 }
3136
3137 pub(in crate::node) async fn run_open_discovery_sweep(
3148 &mut self,
3149 bootstrap: &std::sync::Arc<NostrDiscovery>,
3150 max_age_secs: Option<u64>,
3151 caller: &'static str,
3152 ) {
3153 if !self.config.node.discovery.nostr.enabled
3154 || self.config.node.discovery.nostr.policy != crate::config::NostrDiscoveryPolicy::Open
3155 {
3156 return;
3157 }
3158
3159 let configured_npubs = self
3160 .config
3161 .peers()
3162 .iter()
3163 .map(|peer| peer.npub.clone())
3164 .collect::<HashSet<_>>();
3165 let now_ms = Self::now_ms();
3166 let now_secs = now_ms / 1000;
3167 let mut enqueue_budget = self.open_discovery_enqueue_budget(&configured_npubs);
3168 if enqueue_budget == 0 {
3169 debug!(
3170 caller = %caller,
3171 "open-discovery sweep: enqueue budget is 0, skipping"
3172 );
3173 return;
3174 }
3175
3176 let candidates = bootstrap.cached_open_discovery_candidates(64).await;
3177 let cached_count = candidates.len();
3178 let mut enqueued = 0usize;
3179 let mut skipped_age = 0usize;
3180 let mut skipped_configured = 0usize;
3181 let mut skipped_self = 0usize;
3182 let mut skipped_connected = 0usize;
3183 let mut skipped_retry_pending = 0usize;
3184 let mut skipped_connecting = 0usize;
3185 let mut skipped_no_endpoints = 0usize;
3186 let mut skipped_invalid_npub = 0usize;
3187 let mut skipped_cooldown = 0usize;
3188
3189 for (npub, endpoints, created_at_secs) in candidates {
3190 if enqueue_budget == 0 {
3191 break;
3192 }
3193
3194 if let Some(max_age) = max_age_secs
3195 && now_secs.saturating_sub(created_at_secs) > max_age
3196 {
3197 skipped_age = skipped_age.saturating_add(1);
3198 continue;
3199 }
3200
3201 if configured_npubs.contains(&npub) {
3202 if let Ok(identity) = PeerIdentity::from_npub(&npub) {
3222 let configured_addr = *identity.node_addr();
3223 if bootstrap.cooldown_until(&npub, now_ms).is_some() {
3224 skipped_cooldown = skipped_cooldown.saturating_add(1);
3225 skipped_configured = skipped_configured.saturating_add(1);
3226 continue;
3227 }
3228 if let Some(state) = self.retry_pending.get_mut(&configured_addr)
3229 && state.retry_after_ms > now_ms
3230 {
3231 state.retry_after_ms = now_ms;
3232 debug!(
3233 caller = %caller,
3234 peer = %self.peer_display_name(&configured_addr),
3235 advert_age_secs = now_secs.saturating_sub(created_at_secs),
3236 "Expediting configured-peer retry after fresh overlay advert"
3237 );
3238 }
3239 }
3240 skipped_configured = skipped_configured.saturating_add(1);
3241 continue;
3242 }
3243
3244 let peer_identity = match PeerIdentity::from_npub(&npub) {
3245 Ok(identity) => identity,
3246 Err(_) => {
3247 skipped_invalid_npub = skipped_invalid_npub.saturating_add(1);
3248 continue;
3249 }
3250 };
3251 let node_addr = *peer_identity.node_addr();
3252 if node_addr == *self.identity.node_addr() {
3253 skipped_self = skipped_self.saturating_add(1);
3254 continue;
3255 }
3256 if self.peers.contains_key(&node_addr) {
3257 skipped_connected = skipped_connected.saturating_add(1);
3258 continue;
3259 }
3260 if self.retry_pending.contains_key(&node_addr) {
3261 skipped_retry_pending = skipped_retry_pending.saturating_add(1);
3262 continue;
3263 }
3264 if bootstrap.cooldown_until(&npub, now_ms).is_some() {
3265 skipped_cooldown = skipped_cooldown.saturating_add(1);
3266 continue;
3267 }
3268 let connecting = self.connections.values().any(|conn| {
3269 conn.expected_identity()
3270 .map(|id| id.node_addr() == &node_addr)
3271 .unwrap_or(false)
3272 });
3273 if connecting {
3274 skipped_connecting = skipped_connecting.saturating_add(1);
3275 continue;
3276 }
3277
3278 let mut addresses = Vec::new();
3279 let mut priority = 120u8;
3280 let seen_at_ms = Self::now_ms();
3281 for endpoint in endpoints {
3282 let Some(candidate) =
3283 Self::overlay_endpoint_to_peer_address(&endpoint, priority, seen_at_ms)
3284 else {
3285 continue;
3286 };
3287 if addresses.iter().any(|existing: &PeerAddress| {
3288 existing.transport == candidate.transport && existing.addr == candidate.addr
3289 }) {
3290 continue;
3291 }
3292 addresses.push(candidate);
3293 priority = priority.saturating_add(1);
3294 }
3295 if addresses.is_empty() {
3296 skipped_no_endpoints = skipped_no_endpoints.saturating_add(1);
3297 continue;
3298 }
3299
3300 self.peer_aliases
3301 .entry(node_addr)
3302 .or_insert_with(|| peer_identity.short_npub());
3303 self.register_identity(node_addr, peer_identity.pubkey_full());
3304
3305 let mut state = super::retry::RetryState::new(PeerConfig {
3306 npub: npub.clone(),
3307 alias: None,
3308 addresses,
3309 connect_policy: ConnectPolicy::AutoConnect,
3310 auto_reconnect: true,
3311 discovery_fallback_transit: false,
3312 });
3313 state.reconnect = false;
3314 state.retry_after_ms = now_ms;
3315 state.expires_at_ms = Some(self.open_discovery_retry_expires_at_ms(now_ms));
3316 self.retry_pending.insert(node_addr, state);
3317 info!(
3318 caller = %caller,
3319 peer = %peer_identity.short_npub(),
3320 advert_age_secs = now_secs.saturating_sub(created_at_secs),
3321 "open-discovery sweep: queued retry for cached advert"
3322 );
3323 enqueue_budget = enqueue_budget.saturating_sub(1);
3324 enqueued = enqueued.saturating_add(1);
3325 }
3326
3327 let total_skipped = skipped_age
3331 + skipped_configured
3332 + skipped_self
3333 + skipped_connected
3334 + skipped_retry_pending
3335 + skipped_connecting
3336 + skipped_no_endpoints
3337 + skipped_invalid_npub
3338 + skipped_cooldown;
3339 let should_summarize = caller == "startup" || enqueued > 0;
3340 if should_summarize {
3341 info!(
3342 caller = %caller,
3343 cached = cached_count,
3344 queued = enqueued,
3345 skipped_age = skipped_age,
3346 skipped_configured = skipped_configured,
3347 skipped_self = skipped_self,
3348 skipped_connected = skipped_connected,
3349 skipped_retry_pending = skipped_retry_pending,
3350 skipped_connecting = skipped_connecting,
3351 skipped_no_endpoints = skipped_no_endpoints,
3352 skipped_invalid_npub = skipped_invalid_npub,
3353 skipped_cooldown = skipped_cooldown,
3354 skipped_total = total_skipped,
3355 "open-discovery sweep complete"
3356 );
3357 }
3358 }
3359
3360 async fn maybe_run_startup_open_discovery_sweep(
3368 &mut self,
3369 bootstrap: &std::sync::Arc<NostrDiscovery>,
3370 ) {
3371 if self.startup_open_discovery_sweep_done {
3372 return;
3373 }
3374 if !self.config.node.discovery.nostr.enabled
3375 || self.config.node.discovery.nostr.policy != crate::config::NostrDiscoveryPolicy::Open
3376 {
3377 self.startup_open_discovery_sweep_done = true;
3379 return;
3380 }
3381 let Some(started_at_ms) = self.nostr_discovery_started_at_ms else {
3382 return;
3383 };
3384 let now_ms = Self::now_ms();
3385 let delay_ms = self
3386 .config
3387 .node
3388 .discovery
3389 .nostr
3390 .startup_sweep_delay_secs
3391 .saturating_mul(1000);
3392 if now_ms < started_at_ms.saturating_add(delay_ms) {
3393 return;
3394 }
3395
3396 let max_age_secs = self.config.node.discovery.nostr.startup_sweep_max_age_secs;
3397 self.run_open_discovery_sweep(bootstrap, Some(max_age_secs), "startup")
3398 .await;
3399 self.startup_open_discovery_sweep_done = true;
3400 }
3401
3402 fn available_outbound_slots(&self) -> usize {
3403 let connection_used = self
3404 .connections
3405 .len()
3406 .saturating_add(self.pending_connects.len());
3407 let connection_slots = if self.max_connections == 0 {
3408 usize::MAX
3409 } else {
3410 self.max_connections.saturating_sub(connection_used)
3411 };
3412
3413 let peer_slots = if self.max_peers == 0 {
3414 usize::MAX
3415 } else {
3416 self.max_peers.saturating_sub(self.peers.len())
3417 };
3418
3419 let link_slots = if self.max_links == 0 {
3420 usize::MAX
3421 } else {
3422 self.max_links.saturating_sub(self.links.len())
3423 };
3424
3425 connection_slots.min(peer_slots).min(link_slots)
3426 }
3427
3428 pub(in crate::node) fn open_discovery_enqueue_budget(
3429 &self,
3430 configured_npubs: &HashSet<String>,
3431 ) -> usize {
3432 let current_open_discovery_active = self
3433 .peers
3434 .values()
3435 .filter(|peer| !configured_npubs.contains(&peer.npub()))
3436 .count();
3437 let current_open_discovery_pending = self
3438 .retry_pending
3439 .values()
3440 .filter(|state| !configured_npubs.contains(&state.peer_config.npub))
3441 .count();
3442
3443 let cap_remaining = self
3444 .config
3445 .node
3446 .discovery
3447 .nostr
3448 .open_discovery_max_pending
3449 .saturating_sub(current_open_discovery_active)
3450 .saturating_sub(current_open_discovery_pending);
3451
3452 cap_remaining.min(self.available_outbound_slots())
3453 }
3454
3455 fn open_discovery_retry_expires_at_ms(&self, now_ms: u64) -> u64 {
3456 now_ms.saturating_add(
3457 self.config
3458 .node
3459 .discovery
3460 .nostr
3461 .advert_ttl_secs
3462 .saturating_mul(1000)
3463 .saturating_mul(OPEN_DISCOVERY_RETRY_LIFETIME_MULTIPLIER),
3464 )
3465 }
3466
3467 async fn build_overlay_advert(
3468 &self,
3469 bootstrap: &std::sync::Arc<NostrDiscovery>,
3470 ) -> Option<OverlayAdvert> {
3471 if !self.config.node.discovery.nostr.enabled {
3472 return None;
3473 }
3474
3475 let mut endpoints = Vec::new();
3476 let mut has_udp_nat = false;
3477 let mut has_webrtc = false;
3478
3479 for handle in self.transports.values() {
3480 if !handle.is_operational() {
3481 continue;
3482 }
3483
3484 match handle.transport_type().name {
3485 "udp" => {
3486 let Some(cfg) = self.lookup_udp_config(handle.name()) else {
3487 continue;
3488 };
3489 if !cfg.advertise_on_nostr() {
3490 continue;
3491 }
3492 if cfg.is_public() {
3493 if let Some(explicit) = cfg.external_advert_addr() {
3503 endpoints.push(OverlayEndpointAdvert {
3504 transport: OverlayTransportKind::Udp,
3505 addr: explicit.to_string(),
3506 });
3507 } else {
3508 match handle.local_addr() {
3509 Some(addr)
3510 if !addr.ip().is_unspecified()
3511 && !is_unroutable_advert_ip(addr.ip()) =>
3512 {
3513 endpoints.push(OverlayEndpointAdvert {
3514 transport: OverlayTransportKind::Udp,
3515 addr: addr.to_string(),
3516 });
3517 }
3518 Some(addr) => {
3519 let key = handle.transport_id().as_u32();
3520 let port = addr.port();
3521 if let Some(public) =
3522 bootstrap.learn_public_udp_addr(key, port).await
3523 {
3524 endpoints.push(OverlayEndpointAdvert {
3525 transport: OverlayTransportKind::Udp,
3526 addr: public.to_string(),
3527 });
3528 } else {
3529 warn!(
3530 transport_id = key,
3531 bind_addr = %addr,
3532 "advert: udp public=true but bind is wildcard \
3533 or private and STUN observation failed; \
3534 advertising no UDP endpoint. Either set \
3535 transports.udp.external_addr, bind to a \
3536 specific *public* IP, or ensure \
3537 node.discovery.nostr.stun_servers is reachable"
3538 );
3539 }
3540 }
3541 None => {}
3542 }
3543 }
3544 } else {
3545 endpoints.push(OverlayEndpointAdvert {
3546 transport: OverlayTransportKind::Udp,
3547 addr: "nat".to_string(),
3548 });
3549 has_udp_nat = true;
3550 }
3551 }
3552 "webrtc" => {
3553 let Some(cfg) = self.lookup_webrtc_config(handle.name()) else {
3554 continue;
3555 };
3556 if !cfg.advertise_on_nostr() {
3557 continue;
3558 }
3559 endpoints.push(OverlayEndpointAdvert {
3560 transport: OverlayTransportKind::WebRtc,
3561 addr: hex::encode(self.identity.pubkey_full().serialize()),
3562 });
3563 has_webrtc = true;
3564 }
3565 "tcp" => {
3566 let Some(cfg) = self.lookup_tcp_config(handle.name()) else {
3567 continue;
3568 };
3569 if !cfg.advertise_on_nostr() {
3570 continue;
3571 }
3572 if let Some(explicit) = cfg.external_advert_addr() {
3584 endpoints.push(OverlayEndpointAdvert {
3585 transport: OverlayTransportKind::Tcp,
3586 addr: explicit.to_string(),
3587 });
3588 } else {
3589 match handle.local_addr() {
3590 Some(addr)
3591 if !addr.ip().is_unspecified()
3592 && !is_unroutable_advert_ip(addr.ip()) =>
3593 {
3594 endpoints.push(OverlayEndpointAdvert {
3595 transport: OverlayTransportKind::Tcp,
3596 addr: addr.to_string(),
3597 });
3598 }
3599 Some(addr) => {
3600 warn!(
3601 bind_addr = %addr,
3602 "advert: tcp advertise_on_nostr=true bound to wildcard \
3603 or private IP and no transports.tcp.external_addr set; \
3604 advertising no TCP endpoint. Either set external_addr \
3605 to the public IP (recommended for cloud 1:1-NAT setups) \
3606 or bind explicitly to the public IP"
3607 );
3608 }
3609 None => {}
3610 }
3611 }
3612 }
3613 "tor" => {
3614 let Some(cfg) = self.lookup_tor_config(handle.name()) else {
3615 continue;
3616 };
3617 if !cfg.advertise_on_nostr() {
3618 continue;
3619 }
3620 if let Some(addr) = handle.onion_address() {
3621 endpoints.push(OverlayEndpointAdvert {
3622 transport: OverlayTransportKind::Tor,
3623 addr: format!("{}:{}", addr, cfg.advertised_port()),
3624 });
3625 }
3626 }
3627 _ => {}
3628 }
3629 }
3630
3631 if endpoints.is_empty() {
3632 return None;
3633 }
3634
3635 Some(OverlayAdvert {
3636 identifier: ADVERT_IDENTIFIER.to_string(),
3637 version: ADVERT_VERSION,
3638 endpoints,
3639 signal_relays: (has_udp_nat || has_webrtc)
3640 .then(|| self.config.node.discovery.nostr.dm_relays.clone()),
3641 stun_servers: (has_udp_nat || has_webrtc)
3642 .then(|| self.config.node.discovery.nostr.stun_servers.clone()),
3643 })
3644 }
3645
3646 async fn refresh_overlay_advert(
3647 &self,
3648 bootstrap: &std::sync::Arc<NostrDiscovery>,
3649 ) -> Result<(), crate::discovery::nostr::BootstrapError> {
3650 let advert = self.build_overlay_advert(bootstrap).await;
3651 bootstrap.update_local_advert(advert).await
3652 }
3653
3654 fn lookup_udp_config(&self, transport_name: Option<&str>) -> Option<&crate::config::UdpConfig> {
3655 match (&self.config.transports.udp, transport_name) {
3656 (crate::config::TransportInstances::Single(cfg), None) => Some(cfg),
3657 (crate::config::TransportInstances::Named(configs), Some(name)) => configs.get(name),
3658 _ => None,
3659 }
3660 }
3661
3662 fn lookup_tcp_config(&self, transport_name: Option<&str>) -> Option<&crate::config::TcpConfig> {
3663 match (&self.config.transports.tcp, transport_name) {
3664 (crate::config::TransportInstances::Single(cfg), None) => Some(cfg),
3665 (crate::config::TransportInstances::Named(configs), Some(name)) => configs.get(name),
3666 _ => None,
3667 }
3668 }
3669
3670 fn lookup_tor_config(&self, transport_name: Option<&str>) -> Option<&crate::config::TorConfig> {
3671 match (&self.config.transports.tor, transport_name) {
3672 (crate::config::TransportInstances::Single(cfg), None) => Some(cfg),
3673 (crate::config::TransportInstances::Named(configs), Some(name)) => configs.get(name),
3674 _ => None,
3675 }
3676 }
3677
3678 fn lookup_webrtc_config(
3679 &self,
3680 transport_name: Option<&str>,
3681 ) -> Option<&crate::config::WebRtcConfig> {
3682 match (&self.config.transports.webrtc, transport_name) {
3683 (crate::config::TransportInstances::Single(cfg), None) => Some(cfg),
3684 (crate::config::TransportInstances::Named(configs), Some(name)) => configs.get(name),
3685 _ => None,
3686 }
3687 }
3688
3689 pub(in crate::node) async fn try_peer_addresses(
3690 &mut self,
3691 peer_config: &PeerConfig,
3692 peer_identity: PeerIdentity,
3693 allow_bootstrap_nat: bool,
3694 ) -> Result<(), NodeError> {
3695 let peer_node_addr = *peer_identity.node_addr();
3696 if self.peers.contains_key(&peer_node_addr) {
3697 debug!(
3698 npub = %peer_config.npub,
3699 "Peer already exists, skipping address attempts"
3700 );
3701 return Ok(());
3702 }
3703
3704 let candidates = self.peer_address_candidates(peer_config).await;
3705
3706 if candidates.is_empty() {
3707 if allow_bootstrap_nat && self.request_nostr_bootstrap(peer_config).await {
3708 return Ok(());
3709 }
3710 return Err(NodeError::NoTransportForType(format!(
3711 "no addresses known for {}",
3712 peer_config.npub
3713 )));
3714 }
3715
3716 if self
3717 .attempt_peer_address_list(peer_config, peer_identity, allow_bootstrap_nat, &candidates)
3718 .await
3719 .is_ok()
3720 {
3721 if allow_bootstrap_nat {
3722 self.request_nostr_bootstrap(peer_config).await;
3723 }
3724 return Ok(());
3725 }
3726
3727 if allow_bootstrap_nat && self.request_nostr_bootstrap(peer_config).await {
3728 return Ok(());
3729 }
3730
3731 Err(NodeError::NoTransportForType(format!(
3732 "no operational transport for any of {}'s addresses",
3733 peer_config.npub
3734 )))
3735 }
3736
3737 async fn try_active_peer_alternative_addresses(
3738 &mut self,
3739 peer_config: &PeerConfig,
3740 peer_identity: PeerIdentity,
3741 allow_same_path_refresh: bool,
3742 ) -> Result<bool, NodeError> {
3743 let peer_node_addr = *peer_identity.node_addr();
3744 let mut candidates = self.peer_address_candidates(peer_config).await;
3745 let same_path_refresh_needed = allow_same_path_refresh
3746 && (self.active_peer_needs_same_path_refresh(&peer_node_addr)
3747 || self
3748 .peers
3749 .get(&peer_node_addr)
3750 .is_some_and(|peer| !peer.can_send()));
3751 if same_path_refresh_needed
3752 && let Some(candidate) = self.active_peer_current_udp_candidate(&peer_node_addr)
3753 && !candidates.iter().any(|existing| {
3754 existing.transport == candidate.transport && existing.addr == candidate.addr
3755 })
3756 {
3757 candidates.push(candidate);
3758 Self::sort_peer_address_candidates(&mut candidates);
3759 }
3760 let should_try_nostr =
3761 self.active_peer_should_keep_direct_retry(&peer_node_addr, peer_config);
3762
3763 if candidates.is_empty() {
3764 if should_try_nostr && self.request_nostr_bootstrap(peer_config).await {
3765 return Ok(true);
3766 }
3767 return Err(NodeError::NoTransportForType(format!(
3768 "no addresses known for {}",
3769 peer_config.npub
3770 )));
3771 }
3772
3773 let alternatives: Vec<_> = candidates
3774 .into_iter()
3775 .filter(|addr| {
3776 same_path_refresh_needed
3777 || !self.active_peer_matches_candidate(&peer_node_addr, addr)
3778 })
3779 .collect();
3780
3781 if alternatives.is_empty() {
3782 if should_try_nostr && self.request_nostr_bootstrap(peer_config).await {
3783 return Ok(true);
3784 }
3785 return Ok(false);
3786 }
3787
3788 let needs_separate_nostr_attempt = should_try_nostr
3789 && !alternatives
3790 .iter()
3791 .any(|addr| addr.transport == "udp" && addr.addr.eq_ignore_ascii_case("nat"));
3792 let address_result = self
3793 .attempt_peer_address_list(peer_config, peer_identity, true, &alternatives)
3794 .await;
3795 let nostr_attempted =
3796 needs_separate_nostr_attempt && self.request_nostr_bootstrap(peer_config).await;
3797
3798 match address_result {
3799 Ok(()) => Ok(true),
3800 Err(err) if nostr_attempted => {
3801 debug!(
3802 npub = %peer_config.npub,
3803 error = %err,
3804 "Static active-peer direct-path alternatives failed; Nostr traversal still queued"
3805 );
3806 Ok(true)
3807 }
3808 Err(err) => Err(err),
3809 }
3810 }
3811
3812 async fn peer_address_candidates(&self, peer_config: &PeerConfig) -> Vec<PeerAddress> {
3813 let static_addresses = self.static_peer_addresses(peer_config);
3820 let overlay_addresses = self
3821 .nostr_peer_fallback_addresses(peer_config, &static_addresses)
3822 .await;
3823
3824 let mut candidates = Vec::with_capacity(overlay_addresses.len() + static_addresses.len());
3825 for addr in overlay_addresses.into_iter().chain(static_addresses) {
3826 if !candidates.iter().any(|existing: &PeerAddress| {
3827 existing.transport == addr.transport && existing.addr == addr.addr
3828 }) {
3829 candidates.push(addr);
3830 }
3831 }
3832
3833 Self::sort_peer_address_candidates(&mut candidates);
3834
3835 candidates
3836 }
3837
3838 fn sort_peer_address_candidates(candidates: &mut [PeerAddress]) {
3839 candidates.sort_by(|a, b| {
3845 if a.priority != b.priority {
3846 return a.priority.cmp(&b.priority);
3847 }
3848 match (a.seen_at_ms, b.seen_at_ms) {
3849 (Some(a_ts), Some(b_ts)) => b_ts.cmp(&a_ts),
3850 (Some(_), None) => std::cmp::Ordering::Less,
3851 (None, Some(_)) => std::cmp::Ordering::Greater,
3852 (None, None) => std::cmp::Ordering::Equal,
3853 }
3854 });
3855 }
3856
3857 fn active_peer_matches_any_candidate(
3858 &self,
3859 peer_node_addr: &NodeAddr,
3860 candidates: &[PeerAddress],
3861 ) -> bool {
3862 candidates
3863 .iter()
3864 .any(|candidate| self.active_peer_matches_candidate(peer_node_addr, candidate))
3865 }
3866
3867 pub(in crate::node) fn active_peer_candidate_is_fresh_enough_to_skip(
3868 &self,
3869 peer_node_addr: &NodeAddr,
3870 candidates: &[PeerAddress],
3871 ) -> bool {
3872 if !self
3873 .peers
3874 .get(peer_node_addr)
3875 .is_some_and(|peer| peer.can_send())
3876 {
3877 return false;
3878 }
3879 if !self.active_peer_matches_any_candidate(peer_node_addr, candidates) {
3880 return false;
3881 }
3882 !self.active_peer_needs_same_path_refresh(peer_node_addr)
3883 }
3884
3885 pub(in crate::node) fn active_peer_should_keep_direct_retry(
3886 &self,
3887 peer_node_addr: &NodeAddr,
3888 peer_config: &PeerConfig,
3889 ) -> bool {
3890 let Some(peer) = self.peers.get(peer_node_addr) else {
3891 return false;
3892 };
3893
3894 let static_addresses = self.static_peer_addresses(peer_config);
3895 if !static_addresses.is_empty() {
3896 return !self
3897 .active_peer_candidate_is_fresh_enough_to_skip(peer_node_addr, &static_addresses);
3898 }
3899
3900 if peer_config.npub.is_empty() {
3901 return false;
3902 }
3903
3904 if !self.config.node.discovery.nostr.enabled
3905 || self.config.node.discovery.nostr.policy == NostrDiscoveryPolicy::Disabled
3906 {
3907 return false;
3908 }
3909
3910 let Some(transport_id) = peer.transport_id() else {
3911 return true;
3912 };
3913
3914 if self.bootstrap_transports.contains(&transport_id) {
3915 return self.active_peer_needs_same_path_refresh(peer_node_addr);
3916 }
3917
3918 let Some(transport) = self.transports.get(&transport_id) else {
3919 return true;
3920 };
3921
3922 if transport.transport_type().name != "udp" {
3923 return true;
3924 }
3925
3926 self.active_peer_needs_same_path_refresh(peer_node_addr)
3927 }
3928
3929 pub(in crate::node) fn clear_retry_unless_direct_refresh_needed(
3930 &mut self,
3931 peer_node_addr: &NodeAddr,
3932 ) {
3933 let keep_retry = self
3934 .retry_pending
3935 .get(peer_node_addr)
3936 .map(|state| state.peer_config.clone())
3937 .is_some_and(|peer_config| {
3938 self.active_peer_should_keep_direct_retry(peer_node_addr, &peer_config)
3939 });
3940
3941 if !keep_retry {
3942 self.retry_pending.remove(peer_node_addr);
3943 }
3944 }
3945
3946 fn active_peer_needs_same_path_refresh(&self, peer_node_addr: &NodeAddr) -> bool {
3947 let Some(peer) = self.peers.get(peer_node_addr) else {
3948 return false;
3949 };
3950 let stale_after_ms = self
3951 .config
3952 .node
3953 .heartbeat_interval_secs
3954 .saturating_mul(1000)
3955 .max(1000);
3956 peer.idle_time(Self::now_ms()) > stale_after_ms
3957 }
3958
3959 pub(in crate::node) fn active_peer_current_udp_candidate(
3960 &self,
3961 peer_node_addr: &NodeAddr,
3962 ) -> Option<PeerAddress> {
3963 let peer = self.peers.get(peer_node_addr)?;
3964 let current_addr = peer.current_addr()?;
3965 if let Some(transport_id) = peer.transport_id() {
3966 if let Some(transport) = self.transports.get(&transport_id) {
3967 if transport.transport_type().name != "udp" {
3968 return None;
3969 }
3970 } else if !self
3971 .transports
3972 .values()
3973 .any(|transport| transport.transport_type().name == "udp")
3974 {
3975 return None;
3976 }
3977 } else if !self
3978 .transports
3979 .values()
3980 .any(|transport| transport.transport_type().name == "udp")
3981 {
3982 return None;
3983 }
3984 let socket_addr = current_addr.as_str()?.parse::<SocketAddr>().ok()?;
3985
3986 Some(
3987 PeerAddress::with_priority("udp", socket_addr.to_string(), 240)
3988 .with_seen_at_ms(Self::now_ms()),
3989 )
3990 }
3991
3992 pub(in crate::node) fn active_peer_matches_candidate(
3993 &self,
3994 peer_node_addr: &NodeAddr,
3995 candidate: &PeerAddress,
3996 ) -> bool {
3997 let Some(peer) = self.peers.get(peer_node_addr) else {
3998 return false;
3999 };
4000 let Some(current_addr) = peer.current_addr() else {
4001 return false;
4002 };
4003 if let Some(peer_transport_id) = peer.transport_id()
4004 && let Some((candidate_transport_id, candidate_addr)) =
4005 self.resolve_peer_address_for_match(candidate)
4006 {
4007 return peer_transport_id == candidate_transport_id && current_addr == &candidate_addr;
4008 }
4009 if peer
4010 .transport_id()
4011 .map(|id| self.bootstrap_transports.contains(&id))
4012 .unwrap_or(false)
4013 {
4014 return false;
4015 }
4016 let current_addr = current_addr.to_string();
4017 let current_transport = peer
4018 .transport_id()
4019 .and_then(|id| self.transports.get(&id))
4020 .map(|transport| transport.transport_type().name);
4021
4022 candidate.addr == current_addr
4023 && current_transport
4024 .map(|transport| transport == candidate.transport)
4025 .unwrap_or(true)
4026 }
4027
4028 fn configured_path_priority(
4029 &self,
4030 peer_node_addr: &NodeAddr,
4031 transport_id: TransportId,
4032 remote_addr: &TransportAddr,
4033 ) -> Option<u8> {
4034 self.configured_peer(peer_node_addr)?
4035 .addresses
4036 .iter()
4037 .filter_map(|candidate| {
4038 let (candidate_transport_id, candidate_addr) =
4039 self.resolve_peer_address_for_match(candidate)?;
4040 (candidate_transport_id == transport_id && &candidate_addr == remote_addr)
4041 .then_some(candidate.priority)
4042 })
4043 .min()
4044 }
4045
4046 pub(in crate::node) fn alternate_path_priority_allows_replace(
4047 &self,
4048 peer_node_addr: &NodeAddr,
4049 candidate_transport_id: TransportId,
4050 candidate_addr: &TransportAddr,
4051 ) -> bool {
4052 const UNKNOWN_PATH_PRIORITY: u16 = u8::MAX as u16 + 1;
4053
4054 let Some(peer) = self.peers.get(peer_node_addr) else {
4055 return true;
4056 };
4057 let Some(current_transport_id) = peer.transport_id() else {
4058 return true;
4059 };
4060 let Some(current_addr) = peer.current_addr() else {
4061 return true;
4062 };
4063
4064 let current_priority = self
4065 .configured_path_priority(peer_node_addr, current_transport_id, current_addr)
4066 .map(u16::from)
4067 .unwrap_or(UNKNOWN_PATH_PRIORITY);
4068 let candidate_priority = self
4069 .configured_path_priority(peer_node_addr, candidate_transport_id, candidate_addr)
4070 .map(u16::from)
4071 .unwrap_or(UNKNOWN_PATH_PRIORITY);
4072
4073 if candidate_priority <= current_priority {
4074 return true;
4075 }
4076
4077 debug!(
4078 peer = %self.peer_display_name(peer_node_addr),
4079 current_transport_id = %current_transport_id,
4080 current_addr = %current_addr,
4081 current_priority,
4082 candidate_transport_id = %candidate_transport_id,
4083 candidate_addr = %candidate_addr,
4084 candidate_priority,
4085 "Suppressing lower-priority alternate path while current path remains healthy"
4086 );
4087 false
4088 }
4089
4090 pub(in crate::node) fn authenticated_packet_path_allows_bookkeeping(
4091 &mut self,
4092 peer_node_addr: &NodeAddr,
4093 candidate_transport_id: TransportId,
4094 candidate_addr: &TransportAddr,
4095 now_ms: u64,
4096 ) -> bool {
4097 let Some(peer) = self.peers.get(peer_node_addr) else {
4098 return true;
4099 };
4100
4101 if peer.transport_id() == Some(candidate_transport_id)
4102 && peer.current_addr() == Some(candidate_addr)
4103 {
4104 return true;
4105 }
4106
4107 let current_path_sendable = peer.can_send();
4108 if !current_path_sendable
4109 || self.session_direct_path_blocks_direct_payload(peer_node_addr, now_ms)
4110 {
4111 return true;
4112 }
4113
4114 self.alternate_path_priority_allows_replace(
4115 peer_node_addr,
4116 candidate_transport_id,
4117 candidate_addr,
4118 )
4119 }
4120
4121 pub(in crate::node) fn active_peer_uses_recent_endpoint_path(
4122 &self,
4123 peer_node_addr: &NodeAddr,
4124 peer_config: &PeerConfig,
4125 ) -> bool {
4126 peer_config.addresses.iter().any(|addr| {
4127 addr.seen_at_ms.is_some() && self.active_peer_matches_candidate(peer_node_addr, addr)
4128 })
4129 }
4130
4131 pub(in crate::node) fn active_peer_uses_traversal_path(
4132 &self,
4133 peer_node_addr: &NodeAddr,
4134 peer_config: &PeerConfig,
4135 ) -> bool {
4136 let via_bootstrap_transport = self
4137 .peers
4138 .get(peer_node_addr)
4139 .and_then(|peer| peer.transport_id())
4140 .map(|id| self.bootstrap_transports.contains(&id))
4141 .unwrap_or(false);
4142
4143 via_bootstrap_transport
4144 || self.active_peer_uses_recent_endpoint_path(peer_node_addr, peer_config)
4145 }
4146
4147 pub(crate) async fn api_connect(
4155 &mut self,
4156 npub: &str,
4157 address: &str,
4158 transport: &str,
4159 ) -> Result<serde_json::Value, String> {
4160 let peer_config = PeerConfig {
4161 npub: npub.to_string(),
4162 alias: None,
4163 addresses: vec![PeerAddress::new(transport, address)],
4164 connect_policy: ConnectPolicy::Manual,
4165 auto_reconnect: false,
4166 discovery_fallback_transit: true,
4167 };
4168
4169 if let Ok(identity) = PeerIdentity::from_npub(npub) {
4171 self.peer_aliases
4172 .insert(*identity.node_addr(), identity.short_npub());
4173 self.register_identity(*identity.node_addr(), identity.pubkey_full());
4174 }
4175
4176 self.initiate_peer_connection(&peer_config)
4177 .await
4178 .map(|()| {
4179 info!(
4180 npub = %npub,
4181 address = %address,
4182 transport = %transport,
4183 "API connect initiated"
4184 );
4185 serde_json::json!({
4186 "npub": npub,
4187 "address": address,
4188 "transport": transport,
4189 })
4190 })
4191 .map_err(|e| e.to_string())
4192 }
4193
4194 pub(crate) fn api_disconnect(&mut self, npub: &str) -> Result<serde_json::Value, String> {
4198 let peer_identity =
4199 PeerIdentity::from_npub(npub).map_err(|e| format!("invalid npub '{npub}': {e}"))?;
4200 let node_addr = *peer_identity.node_addr();
4201
4202 if !self.peers.contains_key(&node_addr) {
4203 return Err(format!("peer not found: {npub}"));
4204 }
4205
4206 self.remove_active_peer(&node_addr);
4208
4209 self.retry_pending.remove(&node_addr);
4211
4212 info!(npub = %npub, "API disconnect completed");
4213
4214 Ok(serde_json::json!({
4215 "npub": npub,
4216 "disconnected": true,
4217 }))
4218 }
4219
4220 pub async fn adopt_established_traversal(
4227 &mut self,
4228 traversal: EstablishedTraversal,
4229 ) -> Result<BootstrapHandoffResult, NodeError> {
4230 debug!(
4231 peer_npub = %traversal.peer_npub,
4232 session_id = %traversal.session_id,
4233 remote_addr = %traversal.remote_addr,
4234 "adopting established traversal socket"
4235 );
4236
4237 if !self.state.is_operational() {
4238 return Err(NodeError::NotStarted);
4239 }
4240
4241 let packet_tx = self.packet_tx.clone().ok_or(NodeError::NotStarted)?;
4242 let peer_identity = PeerIdentity::from_npub(&traversal.peer_npub).map_err(|e| {
4243 NodeError::InvalidPeerNpub {
4244 npub: traversal.peer_npub.clone(),
4245 reason: e.to_string(),
4246 }
4247 })?;
4248 let peer_node_addr = *peer_identity.node_addr();
4249 if self.peers.contains_key(&peer_node_addr) {
4250 debug!(
4251 peer_npub = %traversal.peer_npub,
4252 "Adopting NAT traversal handoff as alternate path for already-connected peer"
4253 );
4254 }
4255
4256 self.peer_aliases
4257 .insert(peer_node_addr, peer_identity.short_npub());
4258 self.register_identity(peer_node_addr, peer_identity.pubkey_full());
4259
4260 let transport_id = self.allocate_transport_id();
4261 let inherited_config = traversal.transport_config.clone().unwrap_or_else(|| {
4281 let mut cfg = self
4282 .lookup_udp_config(traversal.transport_name.as_deref())
4283 .or_else(|| self.lookup_udp_config(None))
4284 .cloned()
4285 .unwrap_or_default();
4286 cfg.bind_addr = None;
4287 cfg.external_addr = None;
4288 cfg
4289 });
4290 let mut transport = crate::transport::udp::UdpTransport::new(
4291 transport_id,
4292 traversal.transport_name.clone(),
4293 inherited_config,
4294 packet_tx,
4295 );
4296
4297 transport
4298 .adopt_socket_async(traversal.socket)
4299 .await
4300 .map_err(|e| NodeError::BootstrapHandoff(e.to_string()))?;
4301
4302 let local_addr = transport.local_addr().ok_or_else(|| {
4303 NodeError::BootstrapHandoff("adopted UDP transport has no local address".into())
4304 })?;
4305
4306 self.transports.insert(
4307 transport_id,
4308 crate::transport::TransportHandle::Udp(transport),
4309 );
4310 self.bootstrap_transports.insert(transport_id);
4311 self.bootstrap_transport_npubs
4312 .insert(transport_id, traversal.peer_npub.clone());
4313
4314 let remote_addr = TransportAddr::from_string(&traversal.remote_addr.to_string());
4315 if let Err(err) = self
4316 .initiate_connection(transport_id, remote_addr.clone(), peer_identity)
4317 .await
4318 {
4319 self.bootstrap_transports.remove(&transport_id);
4320 self.bootstrap_transport_npubs.remove(&transport_id);
4321 if let Some(mut handle) = self.transports.remove(&transport_id) {
4322 let _ = handle.stop().await;
4323 }
4324 return Err(err);
4325 }
4326
4327 info!(
4328 peer = %self.peer_display_name(&peer_node_addr),
4329 transport_id = %transport_id,
4330 local_addr = %local_addr,
4331 remote_addr = %traversal.remote_addr,
4332 session_id = %traversal.session_id,
4333 "adopted NAT traversal socket; handshake initiated"
4334 );
4335
4336 Ok(BootstrapHandoffResult {
4337 transport_id,
4338 local_addr,
4339 remote_addr: traversal.remote_addr,
4340 peer_node_addr,
4341 session_id: traversal.session_id,
4342 })
4343 }
4344}