1use super::{Node, NodeError, NodeState};
4use crate::config::{ConnectPolicy, NostrDiscoveryPolicy, PeerAddress, PeerConfig};
5use crate::discovery::nostr::{
6 ADVERT_IDENTIFIER, ADVERT_VERSION, BootstrapEvent, MeshTraversalSignal, NostrDiscovery,
7 OverlayAdvert, OverlayEndpointAdvert, OverlayTransportKind,
8};
9use crate::discovery::{BootstrapHandoffResult, EstablishedTraversal};
10use crate::node::acl::PeerAclContext;
11use crate::node::wire::build_msg1;
12use crate::peer::PeerConnection;
13use crate::protocol::{Disconnect, DisconnectReason, SessionMessageType};
14use crate::transport::{Link, LinkDirection, LinkId, TransportAddr, TransportId, packet_channel};
15use crate::upper::tun::{TunDevice, TunState, run_tun_reader, shutdown_tun_interface};
16use crate::{NodeAddr, PeerIdentity};
17use secp256k1::PublicKey;
18use std::collections::{HashMap, HashSet};
19use std::net::{IpAddr, SocketAddr};
20use std::thread;
21use std::time::Duration;
22use tracing::{debug, info, warn};
23
24#[derive(Debug, Clone, Copy, PartialEq, Eq)]
25enum MeshSignalSessionAction {
26 Send,
27 Defer,
28 Drop,
29}
30
31#[cfg(debug_assertions)]
32fn node_start_debug_log(message: impl AsRef<str>) {
33 use std::io::Write as _;
34
35 if let Ok(mut file) = std::fs::OpenOptions::new()
36 .create(true)
37 .append(true)
38 .open(std::env::temp_dir().join("nvpn-fips-endpoint-debug.log"))
39 {
40 let _ = writeln!(
41 file,
42 "{:?} {}",
43 std::time::SystemTime::now(),
44 message.as_ref()
45 );
46 }
47}
48
49#[cfg(not(debug_assertions))]
50fn node_start_debug_log(_message: impl AsRef<str>) {}
51
52fn is_unroutable_advert_ip(ip: IpAddr) -> bool {
60 match ip {
61 IpAddr::V4(v4) => {
62 v4.is_private()
63 || v4.is_loopback()
64 || v4.is_link_local()
65 || v4.is_unspecified()
66 || v4.is_multicast()
67 || v4.is_broadcast()
68 || v4.is_documentation()
69 || (v4.octets()[0] == 100 && (v4.octets()[1] & 0xc0) == 64)
72 }
73 IpAddr::V6(v6) => {
74 v6.is_loopback()
75 || v6.is_unspecified()
76 || v6.is_unique_local()
77 || v6.is_multicast()
78 || (v6.segments()[0] & 0xffc0) == 0xfe80
80 }
81 }
82}
83
84fn socket_addr_families_compatible(local: SocketAddr, remote: SocketAddr) -> bool {
85 matches!(
86 (local, remote),
87 (SocketAddr::V4(_), SocketAddr::V4(_)) | (SocketAddr::V6(_), SocketAddr::V6(_))
88 )
89}
90
91const OPEN_DISCOVERY_RETRY_LIFETIME_MULTIPLIER: u64 = 2;
92const MAX_PARALLEL_PATH_CANDIDATES_PER_PEER: usize = 4;
93const MAX_AUTO_CONNECT_GRAPH_WARMUPS_PER_TICK: usize = 16;
94const MAX_DISCOVERY_CONNECTS_PER_TICK: usize = 16;
95
96impl Node {
97 pub(super) async fn update_peers(
109 &mut self,
110 new_peers: Vec<crate::config::PeerConfig>,
111 ) -> Result<crate::node::UpdatePeersOutcome, crate::node::NodeError> {
112 use std::collections::{HashMap, HashSet};
113
114 let mut new_by_addr: HashMap<crate::identity::NodeAddr, crate::config::PeerConfig> =
115 HashMap::with_capacity(new_peers.len());
116 let mut new_order = Vec::with_capacity(new_peers.len());
117 for peer in new_peers {
118 let identity = match PeerIdentity::from_npub(&peer.npub) {
119 Ok(id) => id,
120 Err(e) => {
121 return Err(crate::node::NodeError::InvalidPeerNpub {
122 npub: peer.npub.clone(),
123 reason: e.to_string(),
124 });
125 }
126 };
127 let node_addr = *identity.node_addr();
131 if !new_by_addr.contains_key(&node_addr) {
132 new_order.push(node_addr);
133 }
134 new_by_addr.insert(node_addr, peer);
135 }
136
137 let current_by_addr: HashMap<crate::identity::NodeAddr, crate::config::PeerConfig> = self
138 .config
139 .peers()
140 .iter()
141 .filter_map(|pc| {
142 PeerIdentity::from_npub(&pc.npub)
143 .ok()
144 .map(|id| (*id.node_addr(), pc.clone()))
145 })
146 .collect();
147
148 let new_addrs: HashSet<_> = new_by_addr.keys().copied().collect();
149 let current_addrs: HashSet<_> = current_by_addr.keys().copied().collect();
150
151 let removed: Vec<_> = current_addrs.difference(&new_addrs).copied().collect();
152 let added: Vec<_> = new_addrs.difference(¤t_addrs).copied().collect();
153 let kept: Vec<_> = new_addrs.intersection(¤t_addrs).copied().collect();
154
155 let mut outcome = crate::node::UpdatePeersOutcome::default();
156
157 for node_addr in &removed {
158 if self.retry_pending.remove(node_addr).is_some() {
159 debug!(
160 peer = %self.peer_display_name(node_addr),
161 "Dropping retry entry for peer removed from runtime peer list"
162 );
163 }
164 self.peer_aliases.remove(node_addr);
165 self.set_discovery_fallback_transit_allowed(*node_addr, false);
166 outcome.removed += 1;
167 }
168
169 let mut auto_connect_refresh_configs = Vec::new();
170 for node_addr in &kept {
171 let new_pc = &new_by_addr[node_addr];
172 let current_pc = ¤t_by_addr[node_addr];
173 if new_pc.addresses != current_pc.addresses
174 || new_pc.alias != current_pc.alias
175 || new_pc.connect_policy != current_pc.connect_policy
176 || new_pc.auto_reconnect != current_pc.auto_reconnect
177 || new_pc.discovery_fallback_transit != current_pc.discovery_fallback_transit
178 {
179 outcome.updated += 1;
180 self.set_discovery_fallback_transit_allowed(
181 *node_addr,
182 new_pc.discovery_fallback_transit,
183 );
184 if let Some(state) = self.retry_pending.get_mut(node_addr) {
185 state.peer_config = new_pc.clone();
186 state.retry_after_ms = Self::now_ms();
187 }
188 if let Some(alias) = new_pc.alias.clone() {
189 self.peer_aliases.insert(*node_addr, alias);
190 }
191 if new_pc.is_auto_connect() && !new_pc.addresses.is_empty() {
192 auto_connect_refresh_configs.push(new_pc.clone());
193 }
194 } else {
195 outcome.unchanged += 1;
196 self.set_discovery_fallback_transit_allowed(
197 *node_addr,
198 new_pc.discovery_fallback_transit,
199 );
200 if new_pc.is_auto_connect() && !new_pc.addresses.is_empty() {
201 auto_connect_refresh_configs.push(new_pc.clone());
202 }
203 }
204 }
205
206 let added_configs: Vec<crate::config::PeerConfig> = new_order
207 .iter()
208 .filter(|addr| added.contains(addr))
209 .map(|addr| new_by_addr[addr].clone())
210 .collect();
211
212 self.config.peers = new_order
216 .iter()
217 .filter_map(|addr| new_by_addr.get(addr).cloned())
218 .collect();
219 self.configured_peer_send_weights = Self::configured_peer_send_weights(&self.config);
220
221 for peer_config in added_configs {
222 outcome.added += 1;
223 let Ok(identity) = PeerIdentity::from_npub(&peer_config.npub) else {
224 continue;
225 };
226 let name = peer_config
227 .alias
228 .clone()
229 .unwrap_or_else(|| identity.short_npub());
230 self.peer_aliases.insert(*identity.node_addr(), name);
231 self.set_discovery_fallback_transit_allowed(
232 *identity.node_addr(),
233 peer_config.discovery_fallback_transit,
234 );
235 self.register_identity(*identity.node_addr(), identity.pubkey_full());
236
237 if !peer_config.is_auto_connect() {
238 continue;
239 }
240
241 match self
242 .try_auto_connect_graph_session(&peer_config, identity)
243 .await
244 {
245 Ok(true) => continue,
246 Ok(false) => {}
247 Err(err) => {
248 debug!(
249 npub = %peer_config.npub,
250 error = %err,
251 "Existing FIPS graph did not warm newly added peer"
252 );
253 }
254 }
255
256 if let Err(e) = self.initiate_peer_connection(&peer_config).await {
257 warn!(
258 npub = %peer_config.npub,
259 error = %e,
260 "Failed to initiate connection for newly added peer"
261 );
262 if let Ok(peer_identity) = PeerIdentity::from_npub(&peer_config.npub) {
263 self.schedule_retry_after_error(*peer_identity.node_addr(), Self::now_ms(), &e);
264 }
265 if matches!(e, crate::node::NodeError::NoTransportForType(_))
266 && let Some(bootstrap) = self.nostr_discovery.clone()
267 {
268 bootstrap
269 .request_advert_stale_check(peer_config.npub.clone())
270 .await;
271 }
272 }
273 }
274
275 for peer_config in auto_connect_refresh_configs {
276 let Ok(peer_identity) = PeerIdentity::from_npub(&peer_config.npub) else {
277 continue;
278 };
279 let node_addr = *peer_identity.node_addr();
280
281 if self.peers.contains_key(&node_addr) {
282 match self
283 .initiate_active_peer_alternative_connection(&peer_config)
284 .await
285 {
286 Ok(attempted) => {
287 if attempted {
288 debug!(
289 peer = %self.peer_display_name(&node_addr),
290 "Started non-disruptive alternate-path handshake for active peer"
291 );
292 }
293 }
294 Err(e) => {
295 debug!(
296 npub = %peer_config.npub,
297 error = %e,
298 "Active peer alternate-path refresh did not start"
299 );
300 }
301 }
302 continue;
303 }
304
305 match self
306 .try_auto_connect_graph_session(&peer_config, peer_identity)
307 .await
308 {
309 Ok(true) => continue,
310 Ok(false) => {}
311 Err(err) => {
312 debug!(
313 npub = %peer_config.npub,
314 error = %err,
315 "Existing FIPS graph did not warm refreshed peer"
316 );
317 }
318 }
319
320 match self.initiate_peer_connection(&peer_config).await {
321 Ok(()) => {
322 let hs_timeout_ms = self.config.node.rate_limit.handshake_timeout_secs * 1000;
323 if let Some(state) = self.retry_pending.get_mut(&node_addr) {
324 state.peer_config = peer_config;
325 state.retry_after_ms = Self::now_ms().saturating_add(hs_timeout_ms);
326 }
327 }
328 Err(e) => {
329 debug!(
330 npub = %peer_config.npub,
331 error = %e,
332 "Refreshed peer addresses did not initiate a direct connection"
333 );
334 self.schedule_retry_after_error(node_addr, Self::now_ms(), &e);
335 }
336 }
337 }
338
339 self.warm_auto_connect_graph_sessions().await;
340
341 Ok(outcome)
342 }
343
344 pub(super) async fn initiate_peer_connections(&mut self) {
345 let peer_identities: Vec<(PeerIdentity, Option<String>)> = self
351 .config
352 .peers()
353 .iter()
354 .filter_map(|pc| {
355 PeerIdentity::from_npub(&pc.npub)
356 .ok()
357 .map(|id| (id, pc.alias.clone()))
358 })
359 .collect();
360
361 for (identity, alias) in peer_identities {
362 let name = alias.unwrap_or_else(|| identity.short_npub());
363 self.peer_aliases.insert(*identity.node_addr(), name);
364 self.register_identity(*identity.node_addr(), identity.pubkey_full());
368 }
369
370 let peer_configs: Vec<_> = self.config.auto_connect_peers().cloned().collect();
372
373 if peer_configs.is_empty() {
374 debug!("No static peers configured");
375 return;
376 }
377
378 debug!(
379 count = peer_configs.len(),
380 "Initiating static peer connections"
381 );
382
383 for peer_config in peer_configs {
384 let peer_identity = match PeerIdentity::from_npub(&peer_config.npub) {
385 Ok(identity) => identity,
386 Err(_) => continue,
387 };
388 match self
389 .try_auto_connect_graph_session(&peer_config, peer_identity)
390 .await
391 {
392 Ok(true) => continue,
393 Ok(false) => {}
394 Err(err) => {
395 debug!(
396 npub = %peer_config.npub,
397 error = %err,
398 "Existing FIPS graph did not warm auto-connect peer"
399 );
400 }
401 }
402 if let Err(e) = self.initiate_peer_connection(&peer_config).await {
403 warn!(
404 npub = %peer_config.npub,
405 alias = ?peer_config.alias,
406 error = %e,
407 "Failed to initiate peer connection"
408 );
409 if let Ok(peer_identity) = PeerIdentity::from_npub(&peer_config.npub) {
413 self.schedule_retry_after_error(*peer_identity.node_addr(), Self::now_ms(), &e);
414 }
415 if matches!(e, crate::node::NodeError::NoTransportForType(_))
421 && let Some(bootstrap) = self.nostr_discovery.clone()
422 {
423 bootstrap
424 .request_advert_stale_check(peer_config.npub.clone())
425 .await;
426 }
427 }
428 }
429
430 self.warm_auto_connect_graph_sessions().await;
431 }
432
433 pub(in crate::node) async fn try_auto_connect_graph_session(
434 &mut self,
435 peer_config: &PeerConfig,
436 peer_identity: PeerIdentity,
437 ) -> Result<bool, NodeError> {
438 if !peer_config.is_auto_connect() {
439 return Ok(false);
440 }
441
442 let peer_node_addr = *peer_identity.node_addr();
443 if self
444 .peers
445 .get(&peer_node_addr)
446 .is_some_and(|peer| peer.can_send())
447 {
448 return Ok(false);
449 }
450 if self.auto_connect_should_race_direct_path(peer_config) {
451 return Ok(false);
452 }
453 if self
454 .sessions
455 .get(&peer_node_addr)
456 .is_some_and(|entry| entry.is_established() || entry.is_initiating())
457 {
458 return Ok(true);
459 }
460 if self.find_next_hop(&peer_node_addr).is_none() {
461 return Ok(false);
462 }
463
464 self.register_identity(peer_node_addr, peer_identity.pubkey_full());
465 match self
466 .initiate_session(peer_node_addr, peer_identity.pubkey_full())
467 .await
468 {
469 Ok(()) => {
470 debug!(
471 peer = %self.peer_display_name(&peer_node_addr),
472 "Warmed auto-connect peer session over existing FIPS graph"
473 );
474 Ok(true)
475 }
476 Err(NodeError::SendFailed { node_addr, reason })
477 if node_addr == peer_node_addr && reason == "no route to destination" =>
478 {
479 self.maybe_initiate_lookup(&peer_node_addr).await;
480 Ok(false)
481 }
482 Err(err) => Err(err),
483 }
484 }
485
486 fn auto_connect_should_race_direct_path(&self, peer_config: &PeerConfig) -> bool {
487 !peer_config.addresses.is_empty() || self.config.node.discovery.nostr.enabled
488 }
489
490 pub(super) async fn initiate_peer_connection(
494 &mut self,
495 peer_config: &crate::config::PeerConfig,
496 ) -> Result<(), NodeError> {
497 self.initiate_peer_connection_inner(peer_config).await
498 }
499
500 pub(super) async fn initiate_peer_retry_connection(
506 &mut self,
507 peer_config: &crate::config::PeerConfig,
508 ) -> Result<(), NodeError> {
509 self.initiate_peer_connection_inner(peer_config).await
510 }
511
512 pub(in crate::node) async fn initiate_active_peer_alternative_connection(
513 &mut self,
514 peer_config: &crate::config::PeerConfig,
515 ) -> Result<bool, NodeError> {
516 self.initiate_active_peer_alternative_connection_inner(peer_config, false)
517 .await
518 }
519
520 pub(in crate::node) async fn initiate_active_peer_direct_refresh_connection(
521 &mut self,
522 peer_config: &crate::config::PeerConfig,
523 ) -> Result<bool, NodeError> {
524 self.initiate_active_peer_alternative_connection_inner(peer_config, true)
525 .await
526 }
527
528 async fn initiate_active_peer_alternative_connection_inner(
529 &mut self,
530 peer_config: &crate::config::PeerConfig,
531 allow_same_path_refresh: bool,
532 ) -> Result<bool, NodeError> {
533 let peer_identity =
534 PeerIdentity::from_npub(&peer_config.npub).map_err(|e| NodeError::InvalidPeerNpub {
535 npub: peer_config.npub.clone(),
536 reason: e.to_string(),
537 })?;
538 let peer_node_addr = *peer_identity.node_addr();
539
540 if !self.peers.contains_key(&peer_node_addr) {
541 self.initiate_peer_connection(peer_config).await?;
542 return Ok(true);
543 }
544
545 self.try_active_peer_alternative_addresses(
551 peer_config,
552 peer_identity,
553 allow_same_path_refresh,
554 )
555 .await
556 }
557
558 async fn initiate_peer_connection_inner(
559 &mut self,
560 peer_config: &crate::config::PeerConfig,
561 ) -> Result<(), NodeError> {
562 let peer_identity =
564 PeerIdentity::from_npub(&peer_config.npub).map_err(|e| NodeError::InvalidPeerNpub {
565 npub: peer_config.npub.clone(),
566 reason: e.to_string(),
567 })?;
568
569 let peer_node_addr = *peer_identity.node_addr();
570
571 if self.peers.contains_key(&peer_node_addr) {
573 debug!(
574 npub = %peer_config.npub,
575 "Peer already exists, skipping"
576 );
577 return Ok(());
578 }
579
580 self.try_peer_addresses(peer_config, peer_identity, true)
581 .await
582 }
583
584 fn is_connecting_to_peer(&self, peer_node_addr: &NodeAddr) -> bool {
585 self.connections.values().any(|conn| {
586 conn.expected_identity()
587 .map(|id| id.node_addr() == peer_node_addr)
588 .unwrap_or(false)
589 })
590 }
591
592 fn is_connecting_to_peer_on_path(
593 &self,
594 peer_node_addr: &NodeAddr,
595 transport_id: TransportId,
596 remote_addr: &TransportAddr,
597 ) -> bool {
598 self.connections.values().any(|conn| {
599 conn.expected_identity()
600 .map(|id| id.node_addr() == peer_node_addr)
601 .unwrap_or(false)
602 && conn.transport_id() == Some(transport_id)
603 && conn.source_addr() == Some(remote_addr)
604 }) || self.pending_connects.iter().any(|pending| {
605 pending.peer_identity.node_addr() == peer_node_addr
606 && pending.transport_id == transport_id
607 && &pending.remote_addr == remote_addr
608 })
609 }
610
611 pub(in crate::node) fn should_warm_auto_connect_session(
612 &self,
613 peer_node_addr: &NodeAddr,
614 ) -> bool {
615 if self
616 .peers
617 .get(peer_node_addr)
618 .is_some_and(|peer| peer.can_send())
619 || self
620 .sessions
621 .get(peer_node_addr)
622 .is_some_and(|entry| entry.is_established())
623 {
624 return false;
625 }
626
627 self.config.peers().iter().any(|peer| {
628 peer.is_auto_connect()
629 && PeerIdentity::from_npub(&peer.npub)
630 .map(|identity| identity.node_addr() == peer_node_addr)
631 .unwrap_or(false)
632 })
633 }
634
635 pub(in crate::node) async fn warm_auto_connect_graph_sessions(&mut self) -> usize {
636 if !self.peers.values().any(|peer| peer.can_send()) {
637 return 0;
638 }
639
640 let mut budget = self.graph_session_warmup_budget();
641 if budget == 0 {
642 return 0;
643 }
644
645 let peer_identities: Vec<_> = self
646 .config
647 .auto_connect_peers()
648 .filter_map(|peer| PeerIdentity::from_npub(&peer.npub).ok())
649 .collect();
650
651 let mut warmed = 0;
652 for identity in peer_identities {
653 if budget == 0 {
654 break;
655 }
656
657 let peer_node_addr = *identity.node_addr();
658 if peer_node_addr == *self.identity.node_addr()
659 || !self.should_warm_auto_connect_session(&peer_node_addr)
660 || self
661 .sessions
662 .get(&peer_node_addr)
663 .is_some_and(|entry| entry.is_initiating())
664 {
665 continue;
666 }
667
668 self.register_identity(peer_node_addr, identity.pubkey_full());
669
670 if self.find_next_hop(&peer_node_addr).is_some() {
671 match self
672 .initiate_session(peer_node_addr, identity.pubkey_full())
673 .await
674 {
675 Ok(()) => {
676 warmed += 1;
677 budget = budget.saturating_sub(1);
678 debug!(
679 peer = %self.peer_display_name(&peer_node_addr),
680 "Warmed auto-connect peer session over existing FIPS graph"
681 );
682 }
683 Err(NodeError::SendFailed { node_addr, reason })
684 if node_addr == peer_node_addr && reason == "no route to destination" =>
685 {
686 self.maybe_initiate_lookup(&peer_node_addr).await;
687 warmed += 1;
688 budget = budget.saturating_sub(1);
689 }
690 Err(err) => {
691 debug!(
692 peer = %self.peer_display_name(&peer_node_addr),
693 error = %err,
694 "Failed to warm auto-connect peer session"
695 );
696 }
697 }
698 } else {
699 self.maybe_initiate_lookup(&peer_node_addr).await;
700 warmed += 1;
701 budget = budget.saturating_sub(1);
702 }
703 }
704
705 warmed
706 }
707
708 pub(in crate::node) fn graph_session_warmup_budget(&self) -> usize {
709 let max_destinations = self.config.node.session.pending_max_destinations;
710 if max_destinations == 0 {
711 return 0;
712 }
713
714 let pending_sessions = self
715 .sessions
716 .values()
717 .filter(|entry| !entry.is_established())
718 .count();
719 let pending_total = pending_sessions.saturating_add(self.pending_lookups.len());
720 max_destinations
721 .saturating_sub(pending_total)
722 .min(MAX_AUTO_CONNECT_GRAPH_WARMUPS_PER_TICK)
723 }
724
725 fn outbound_handshake_slots(&self) -> usize {
726 let used = self
727 .connections
728 .len()
729 .saturating_add(self.pending_connects.len());
730 if self.max_connections == 0 {
731 usize::MAX
732 } else {
733 self.max_connections.saturating_sub(used)
734 }
735 }
736
737 fn outbound_link_slots(&self) -> usize {
738 if self.max_links == 0 {
739 usize::MAX
740 } else {
741 self.max_links.saturating_sub(self.links.len())
742 }
743 }
744
745 fn path_candidate_attempt_budget(&self, peer_node_addr: &NodeAddr) -> usize {
746 if !self.peers.contains_key(peer_node_addr)
747 && self.max_peers > 0
748 && self.peers.len() >= self.max_peers
749 {
750 return 0;
751 }
752
753 let in_flight_for_peer = self
754 .connections
755 .values()
756 .filter(|conn| {
757 conn.expected_identity()
758 .map(|id| id.node_addr() == peer_node_addr)
759 .unwrap_or(false)
760 })
761 .count()
762 .saturating_add(
763 self.pending_connects
764 .iter()
765 .filter(|pending| pending.peer_identity.node_addr() == peer_node_addr)
766 .count(),
767 );
768
769 self.outbound_handshake_slots()
770 .min(self.outbound_link_slots())
771 .min(MAX_PARALLEL_PATH_CANDIDATES_PER_PEER.saturating_sub(in_flight_for_peer))
772 }
773
774 fn discovery_connect_budget(&self) -> usize {
775 self.outbound_handshake_slots()
776 .min(self.outbound_link_slots())
777 .min(MAX_DISCOVERY_CONNECTS_PER_TICK)
778 }
779
780 fn find_udp_transport_for_remote_addr(
787 &self,
788 remote_addr: SocketAddr,
789 ) -> Option<(TransportId, SocketAddr)> {
790 self.transports
791 .iter()
792 .filter(|(id, handle)| {
793 handle.transport_type().name == "udp"
794 && handle.is_operational()
795 && !self.bootstrap_transports.contains(id)
796 })
797 .filter_map(|(id, handle)| {
798 let local_addr = handle.local_addr()?;
799 socket_addr_families_compatible(local_addr, remote_addr)
800 .then_some((*id, local_addr))
801 })
802 .min_by_key(|(id, _)| id.as_u32())
803 }
804
805 pub(super) fn transport_discovery_candidate(
806 &self,
807 discovered_transport_id: TransportId,
808 discovered_addr: TransportAddr,
809 ) -> Option<(TransportId, TransportAddr, &'static str)> {
810 let transport = self.transports.get(&discovered_transport_id)?;
811 let transport_name = transport.transport_type().name;
812
813 if transport_name != "udp" {
814 return Some((discovered_transport_id, discovered_addr, transport_name));
815 }
816
817 let Some(remote_socket_addr) = discovered_addr
818 .as_str()
819 .and_then(|addr| addr.parse::<SocketAddr>().ok())
820 else {
821 if self.bootstrap_transports.contains(&discovered_transport_id) {
822 debug!(
823 transport_id = %discovered_transport_id,
824 remote_addr = %discovered_addr,
825 "transport discovery: skip non-numeric UDP address from bootstrap transport"
826 );
827 return None;
828 }
829 return Some((discovered_transport_id, discovered_addr, transport_name));
830 };
831
832 let Some((transport_id, local_addr)) =
833 self.find_udp_transport_for_remote_addr(remote_socket_addr)
834 else {
835 debug!(
836 transport_id = %discovered_transport_id,
837 remote_addr = %discovered_addr,
838 "transport discovery: skip UDP peer with no compatible local socket"
839 );
840 return None;
841 };
842
843 if transport_id != discovered_transport_id {
844 debug!(
845 discovered_transport_id = %discovered_transport_id,
846 selected_transport_id = %transport_id,
847 local_addr = %local_addr,
848 remote_addr = %remote_socket_addr,
849 "transport discovery: selected compatible UDP transport"
850 );
851 }
852
853 Some((
854 transport_id,
855 TransportAddr::from_socket_addr(remote_socket_addr),
856 transport_name,
857 ))
858 }
859
860 fn peer_address_string_for_transport_candidate(
861 &self,
862 transport_id: TransportId,
863 transport_name: &str,
864 remote_addr: &TransportAddr,
865 ) -> String {
866 #[cfg(not(any(target_os = "linux", target_os = "macos")))]
867 let _ = (transport_id, transport_name);
868
869 #[cfg(any(target_os = "linux", target_os = "macos"))]
870 if transport_name == "ethernet"
871 && remote_addr.as_bytes().len() == 6
872 && let Some(interface) = self
873 .transports
874 .get(&transport_id)
875 .and_then(|transport| transport.interface_name())
876 {
877 let mut mac = [0u8; 6];
878 mac.copy_from_slice(remote_addr.as_bytes());
879 return format!(
880 "{interface}/{}",
881 crate::transport::ethernet::format_mac(&mac)
882 );
883 }
884
885 remote_addr.to_string()
886 }
887
888 fn resolve_peer_address_for_match(
889 &self,
890 candidate: &PeerAddress,
891 ) -> Option<(TransportId, TransportAddr)> {
892 if candidate.transport == "udp" && candidate.addr.eq_ignore_ascii_case("nat") {
893 return None;
894 }
895
896 if candidate.transport == "ethernet" {
897 return self.resolve_ethernet_addr(&candidate.addr).ok();
898 }
899
900 if candidate.transport == "ble" {
901 #[cfg(bluer_available)]
902 {
903 return self.resolve_ble_addr(&candidate.addr).ok();
904 }
905 #[cfg(not(bluer_available))]
906 {
907 return None;
908 }
909 }
910
911 let transport_id = if candidate.transport == "udp"
912 && let Ok(remote_socket_addr) = candidate.addr.parse::<SocketAddr>()
913 {
914 self.find_udp_transport_for_remote_addr(remote_socket_addr)
915 .map(|(id, _)| id)?
916 } else {
917 self.find_transport_for_type(&candidate.transport)?
918 };
919
920 Some((transport_id, TransportAddr::from_string(&candidate.addr)))
921 }
922
923 pub(super) async fn initiate_connection(
934 &mut self,
935 transport_id: TransportId,
936 remote_addr: TransportAddr,
937 peer_identity: PeerIdentity,
938 ) -> Result<(), NodeError> {
939 let peer_node_addr = *peer_identity.node_addr();
940
941 if self.is_connecting_to_peer_on_path(&peer_node_addr, transport_id, &remote_addr) {
942 debug!(
943 peer = %self.peer_display_name(&peer_node_addr),
944 transport_id = %transport_id,
945 remote_addr = %remote_addr,
946 "Connection already in progress for candidate path"
947 );
948 return Ok(());
949 }
950
951 if self.outbound_handshake_slots() == 0 {
952 return Err(NodeError::MaxConnectionsExceeded {
953 max: self.max_connections,
954 });
955 }
956
957 if self.outbound_link_slots() == 0 {
958 return Err(NodeError::MaxLinksExceeded {
959 max: self.max_links,
960 });
961 }
962
963 if !self.peers.contains_key(&peer_node_addr)
964 && self.max_peers > 0
965 && self.peers.len() >= self.max_peers
966 {
967 return Err(NodeError::MaxPeersExceeded {
968 max: self.max_peers,
969 });
970 }
971
972 self.authorize_peer(
973 &peer_identity,
974 PeerAclContext::OutboundConnect,
975 transport_id,
976 &remote_addr,
977 )?;
978
979 let is_connection_oriented = self
980 .transports
981 .get(&transport_id)
982 .map(|t| t.transport_type().connection_oriented)
983 .unwrap_or(false);
984
985 let link_id = self.allocate_link_id();
987
988 let link = if is_connection_oriented {
989 Link::new(
990 link_id,
991 transport_id,
992 remote_addr.clone(),
993 LinkDirection::Outbound,
994 Duration::from_millis(self.config.node.base_rtt_ms),
995 )
996 } else {
997 Link::connectionless(
998 link_id,
999 transport_id,
1000 remote_addr.clone(),
1001 LinkDirection::Outbound,
1002 Duration::from_millis(self.config.node.base_rtt_ms),
1003 )
1004 };
1005
1006 self.links.insert(link_id, link);
1007
1008 self.addr_to_link
1010 .insert((transport_id, remote_addr.clone()), link_id);
1011
1012 if is_connection_oriented {
1013 if let Some(transport) = self.transports.get(&transport_id) {
1015 match transport.connect(&remote_addr).await {
1016 Ok(()) => {
1017 debug!(
1018 peer = %self.peer_display_name(&peer_node_addr),
1019 transport_id = %transport_id,
1020 remote_addr = %remote_addr,
1021 link_id = %link_id,
1022 "Transport connect initiated (non-blocking)"
1023 );
1024 self.pending_connects.push(super::PendingConnect {
1025 link_id,
1026 transport_id,
1027 remote_addr,
1028 peer_identity,
1029 });
1030 }
1031 Err(e) => {
1032 self.links.remove(&link_id);
1034 self.addr_to_link.remove(&(transport_id, remote_addr));
1035 return Err(NodeError::from_transport_error(e));
1036 }
1037 }
1038 }
1039 Ok(())
1040 } else {
1041 self.start_handshake(link_id, transport_id, remote_addr, peer_identity)
1043 .await
1044 }
1045 }
1046
1047 pub(super) async fn start_handshake(
1052 &mut self,
1053 link_id: LinkId,
1054 transport_id: TransportId,
1055 remote_addr: TransportAddr,
1056 peer_identity: PeerIdentity,
1057 ) -> Result<(), NodeError> {
1058 let peer_node_addr = *peer_identity.node_addr();
1059
1060 let current_time_ms = Self::now_ms();
1062 let mut connection = PeerConnection::outbound(link_id, peer_identity, current_time_ms);
1063
1064 let our_index = match self.index_allocator.allocate() {
1066 Ok(idx) => idx,
1067 Err(e) => {
1068 self.links.remove(&link_id);
1070 self.addr_to_link.remove(&(transport_id, remote_addr));
1071 return Err(NodeError::IndexAllocationFailed(e.to_string()));
1072 }
1073 };
1074
1075 let our_keypair = self.identity.keypair();
1077 let noise_msg1 =
1078 match connection.start_handshake(our_keypair, self.startup_epoch, current_time_ms) {
1079 Ok(msg) => msg,
1080 Err(e) => {
1081 let _ = self.index_allocator.free(our_index);
1083 self.links.remove(&link_id);
1084 self.addr_to_link.remove(&(transport_id, remote_addr));
1085 return Err(NodeError::HandshakeFailed(e.to_string()));
1086 }
1087 };
1088
1089 connection.set_our_index(our_index);
1091 connection.set_transport_id(transport_id);
1092 connection.set_source_addr(remote_addr.clone());
1093
1094 let wire_msg1 = build_msg1(our_index, &noise_msg1);
1096
1097 debug!(
1098 peer = %self.peer_display_name(&peer_node_addr),
1099 transport_id = %transport_id,
1100 remote_addr = %remote_addr,
1101 link_id = %link_id,
1102 our_index = %our_index,
1103 "Connection initiated"
1104 );
1105
1106 let resend_interval = self.config.node.rate_limit.handshake_resend_interval_ms;
1108 connection.set_handshake_msg1(wire_msg1.clone(), current_time_ms + resend_interval);
1109
1110 self.pending_outbound
1112 .insert((transport_id, our_index.as_u32()), link_id);
1113 self.connections.insert(link_id, connection);
1114
1115 let send_result = match self.transports.get(&transport_id) {
1120 Some(transport) => Some(transport.send(&remote_addr, &wire_msg1).await),
1121 None => None,
1122 };
1123 match send_result {
1124 Some(send_result) => {
1125 self.note_local_send_outcome(&send_result);
1126 match send_result {
1127 Ok(bytes) => {
1128 debug!(
1129 link_id = %link_id,
1130 our_index = %our_index,
1131 bytes,
1132 "Sent Noise handshake message 1 (wire format)"
1133 );
1134 }
1135 Err(e) => {
1136 warn!(
1137 link_id = %link_id,
1138 transport_id = %transport_id,
1139 remote_addr = %remote_addr,
1140 our_index = %our_index,
1141 error = %e,
1142 "Failed to send handshake message"
1143 );
1144 self.pending_outbound
1145 .remove(&(transport_id, our_index.as_u32()));
1146 self.connections.remove(&link_id);
1147 self.links.remove(&link_id);
1148 self.addr_to_link
1149 .remove(&(transport_id, remote_addr.clone()));
1150 let _ = self.index_allocator.free(our_index);
1151 return Err(NodeError::from_transport_error(e));
1152 }
1153 }
1154 }
1155 None => {
1156 self.pending_outbound
1157 .remove(&(transport_id, our_index.as_u32()));
1158 self.connections.remove(&link_id);
1159 self.links.remove(&link_id);
1160 self.addr_to_link
1161 .remove(&(transport_id, remote_addr.clone()));
1162 let _ = self.index_allocator.free(our_index);
1163 return Err(NodeError::TransportError(format!(
1164 "transport {transport_id} disappeared before first handshake send"
1165 )));
1166 }
1167 }
1168
1169 Ok(())
1170 }
1171
1172 pub(super) async fn poll_transport_discovery(&mut self) {
1178 let mut to_connect = Vec::new();
1180 let mut queued_per_peer: HashMap<NodeAddr, usize> = HashMap::new();
1181 let mut connect_budget = self.discovery_connect_budget();
1182 let mut skipped_budget = 0usize;
1183
1184 for transport in self.transports.values() {
1185 if !transport.is_operational() {
1186 continue;
1187 }
1188 if !transport.auto_connect() {
1189 let _ = transport.discover();
1191 continue;
1192 }
1193 let discovered = match transport.discover() {
1194 Ok(peers) => peers,
1195 Err(_) => continue,
1196 };
1197 for peer in discovered {
1198 let discovered_transport_id = peer.transport_id;
1199 let pubkey = match peer.pubkey_hint {
1200 Some(pk) => pk,
1201 None => continue,
1202 };
1203 let identity = PeerIdentity::from_pubkey(pubkey);
1204 let node_addr = *identity.node_addr();
1205
1206 if node_addr == *self.identity.node_addr() {
1208 continue;
1209 }
1210
1211 let Some((candidate_transport_id, remote_addr, transport_name)) =
1212 self.transport_discovery_candidate(discovered_transport_id, peer.addr)
1213 else {
1214 continue;
1215 };
1216
1217 if self.peers.contains_key(&node_addr) {
1218 let candidate = PeerAddress::new(
1219 transport_name,
1220 self.peer_address_string_for_transport_candidate(
1221 candidate_transport_id,
1222 transport_name,
1223 &remote_addr,
1224 ),
1225 );
1226 if self.active_peer_candidate_is_fresh_enough_to_skip(
1227 &node_addr,
1228 std::slice::from_ref(&candidate),
1229 ) {
1230 continue;
1231 }
1232 if self.is_connecting_to_peer_on_path(
1233 &node_addr,
1234 candidate_transport_id,
1235 &remote_addr,
1236 ) {
1237 continue;
1238 }
1239 let queued_for_peer = queued_per_peer.get(&node_addr).copied().unwrap_or(0);
1240 if connect_budget == 0
1241 || self
1242 .path_candidate_attempt_budget(&node_addr)
1243 .saturating_sub(queued_for_peer)
1244 == 0
1245 {
1246 skipped_budget = skipped_budget.saturating_add(1);
1247 continue;
1248 }
1249 to_connect.push((candidate_transport_id, remote_addr, identity, true));
1250 *queued_per_peer.entry(node_addr).or_default() += 1;
1251 connect_budget = connect_budget.saturating_sub(1);
1252 continue;
1253 }
1254
1255 if self.is_connecting_to_peer_on_path(
1256 &node_addr,
1257 candidate_transport_id,
1258 &remote_addr,
1259 ) {
1260 continue;
1261 }
1262
1263 let queued_for_peer = queued_per_peer.get(&node_addr).copied().unwrap_or(0);
1264 if connect_budget == 0
1265 || self
1266 .path_candidate_attempt_budget(&node_addr)
1267 .saturating_sub(queued_for_peer)
1268 == 0
1269 {
1270 skipped_budget = skipped_budget.saturating_add(1);
1271 continue;
1272 }
1273 to_connect.push((candidate_transport_id, remote_addr, identity, false));
1274 *queued_per_peer.entry(node_addr).or_default() += 1;
1275 connect_budget = connect_budget.saturating_sub(1);
1276 }
1277 }
1278
1279 if skipped_budget > 0 {
1280 debug!(
1281 skipped = skipped_budget,
1282 queued = to_connect.len(),
1283 "Transport discovery connect budget exhausted"
1284 );
1285 }
1286
1287 for (transport_id, remote_addr, identity, active_refresh) in to_connect {
1288 info!(
1289 peer = %self.peer_display_name(identity.node_addr()),
1290 transport_id = %transport_id,
1291 remote_addr = %remote_addr,
1292 active_refresh,
1293 "Auto-connecting to discovered peer"
1294 );
1295 if let Err(e) = self
1296 .initiate_connection(transport_id, remote_addr, identity)
1297 .await
1298 {
1299 warn!(error = %e, "Failed to auto-connect to discovered peer");
1300 }
1301 }
1302 }
1303
1304 pub(super) async fn poll_nostr_discovery(&mut self) {
1305 let Some(bootstrap) = self.nostr_discovery.clone() else {
1306 return;
1307 };
1308
1309 bootstrap.set_outbound_admission(self.open_discovery_outbound_admission_check());
1310 bootstrap.set_direct_refresh_admission(self.outbound_direct_refresh_admission_check());
1311
1312 self.drain_nostr_mesh_signals(&bootstrap).await;
1313
1314 for event in bootstrap.drain_events().await {
1315 match event {
1316 BootstrapEvent::Established { traversal } => {
1317 let active_refresh = PeerIdentity::from_npub(&traversal.peer_npub)
1318 .ok()
1319 .is_some_and(|identity| self.peers.contains_key(identity.node_addr()));
1320 let admission_allowed = if active_refresh {
1321 self.outbound_direct_refresh_admission_check()
1322 } else {
1323 self.outbound_admission_check()
1324 };
1325 if !admission_allowed {
1326 debug!(
1327 peer_npub = %traversal.peer_npub,
1328 peers = self.peers.len(),
1329 max_peers = self.max_peers,
1330 active_refresh,
1331 "Dropping established NAT traversal: at capacity"
1332 );
1333 continue;
1334 }
1335 let peer_npub = traversal.peer_npub.clone();
1336 match self.adopt_established_traversal(traversal).await {
1337 Ok(_) => {
1338 info!(peer_npub = %peer_npub, "Adopted NAT traversal socket");
1339 }
1340 Err(err) => {
1341 warn!(peer_npub = %peer_npub, error = %err, "Failed to adopt NAT traversal");
1342 if let Ok(peer_identity) = PeerIdentity::from_npub(&peer_npub) {
1343 self.schedule_retry(*peer_identity.node_addr(), Self::now_ms());
1344 }
1345 }
1346 }
1347 }
1348 BootstrapEvent::Failed {
1349 peer_config,
1350 reason,
1351 } => {
1352 let peer_identity = match PeerIdentity::from_npub(&peer_config.npub) {
1353 Ok(identity) => identity,
1354 Err(_) => continue,
1355 };
1356 let node_addr = *peer_identity.node_addr();
1357 let now_ms = Self::now_ms();
1358 if self.peers.contains_key(&node_addr) {
1359 if self.active_peer_should_keep_direct_retry(&node_addr, &peer_config) {
1360 let decision =
1361 bootstrap.record_traversal_failure(&peer_config.npub, now_ms);
1362 if decision.should_warn {
1363 warn!(
1364 npub = %peer_config.npub,
1365 error = %reason,
1366 consecutive_failures = decision.consecutive_failures,
1367 cooldown_secs = decision
1368 .cooldown_until_ms
1369 .map(|t| t.saturating_sub(now_ms) / 1000),
1370 "Direct-path NAT traversal upgrade failed"
1371 );
1372 } else {
1373 debug!(
1374 npub = %peer_config.npub,
1375 error = %reason,
1376 consecutive_failures = decision.consecutive_failures,
1377 "Direct-path NAT traversal upgrade failed (suppressed by warn-rate-limit)"
1378 );
1379 }
1380 if decision.crossed_threshold {
1381 bootstrap
1382 .request_advert_stale_check(peer_config.npub.clone())
1383 .await;
1384 }
1385 self.schedule_link_dead_reprobe(node_addr, now_ms);
1386 } else {
1387 debug!(
1388 npub = %peer_config.npub,
1389 error = %reason,
1390 "Ignoring failed NAT traversal for already-connected peer on fresh direct path"
1391 );
1392 }
1393 continue;
1394 }
1395 if self.is_connecting_to_peer(&node_addr) {
1396 debug!(
1397 npub = %peer_config.npub,
1398 error = %reason,
1399 "Ignoring failed NAT traversal while peer handshake is already in progress"
1400 );
1401 continue;
1402 }
1403
1404 let decision = bootstrap.record_traversal_failure(&peer_config.npub, now_ms);
1405 if decision.should_warn {
1406 warn!(
1407 npub = %peer_config.npub,
1408 error = %reason,
1409 consecutive_failures = decision.consecutive_failures,
1410 cooldown_secs = decision
1411 .cooldown_until_ms
1412 .map(|t| t.saturating_sub(now_ms) / 1000),
1413 "NAT traversal failed"
1414 );
1415 } else {
1416 debug!(
1417 npub = %peer_config.npub,
1418 error = %reason,
1419 consecutive_failures = decision.consecutive_failures,
1420 "NAT traversal failed (suppressed by warn-rate-limit)"
1421 );
1422 }
1423
1424 if decision.crossed_threshold {
1428 bootstrap
1429 .request_advert_stale_check(peer_config.npub.clone())
1430 .await;
1431 }
1432
1433 if self
1434 .try_peer_addresses(&peer_config, peer_identity, false)
1435 .await
1436 .is_ok()
1437 {
1438 continue;
1439 }
1440
1441 self.schedule_retry(node_addr, now_ms);
1442 if self.nostr_cooldown_applies_to_peer_config(&peer_config)
1443 && let Some(cooldown_until_ms) = decision.cooldown_until_ms
1444 && let Some(state) = self.retry_pending.get_mut(&node_addr)
1445 {
1446 state.retry_after_ms = state.retry_after_ms.max(cooldown_until_ms);
1450 }
1451 }
1452 }
1453 }
1454
1455 self.maybe_run_startup_open_discovery_sweep(&bootstrap)
1456 .await;
1457 self.queue_open_discovery_retries(&bootstrap).await;
1458 self.queue_active_fallback_direct_retries(&bootstrap);
1459
1460 if let Err(err) = self.refresh_overlay_advert(&bootstrap).await {
1464 debug!(error = %err, "Failed to refresh local Nostr overlay advert");
1465 }
1466 }
1467
1468 async fn drain_nostr_mesh_signals(&mut self, bootstrap: &std::sync::Arc<NostrDiscovery>) {
1469 let mut deferred = Vec::new();
1470
1471 for signal in bootstrap.drain_mesh_signals().await {
1472 let (peer_npub, msg_type, payload) = match &signal {
1473 MeshTraversalSignal::Offer { peer_npub, offer } => {
1474 let payload = match serde_json::to_vec(&offer) {
1475 Ok(payload) => payload,
1476 Err(error) => {
1477 debug!(
1478 peer = %peer_npub,
1479 error = %error,
1480 "Failed to encode mesh traversal offer"
1481 );
1482 continue;
1483 }
1484 };
1485 (
1486 peer_npub.clone(),
1487 SessionMessageType::TraversalOffer.to_byte(),
1488 payload,
1489 )
1490 }
1491 MeshTraversalSignal::Answer { peer_npub, answer } => {
1492 let payload = match serde_json::to_vec(&answer) {
1493 Ok(payload) => payload,
1494 Err(error) => {
1495 debug!(
1496 peer = %peer_npub,
1497 error = %error,
1498 "Failed to encode mesh traversal answer"
1499 );
1500 continue;
1501 }
1502 };
1503 (
1504 peer_npub.clone(),
1505 SessionMessageType::TraversalAnswer.to_byte(),
1506 payload,
1507 )
1508 }
1509 };
1510
1511 let peer_identity = match PeerIdentity::from_npub(&peer_npub) {
1512 Ok(identity) => identity,
1513 Err(error) => {
1514 debug!(
1515 peer = %peer_npub,
1516 error = %error,
1517 "Cannot send mesh traversal signal to invalid peer npub"
1518 );
1519 continue;
1520 }
1521 };
1522 let peer_addr = *peer_identity.node_addr();
1523 match self
1524 .mesh_signal_session_action(peer_addr, peer_identity.pubkey_full())
1525 .await
1526 {
1527 MeshSignalSessionAction::Send => {}
1528 MeshSignalSessionAction::Defer => {
1529 deferred.push(signal);
1530 continue;
1531 }
1532 MeshSignalSessionAction::Drop => continue,
1533 }
1534
1535 if let Err(error) = self.send_session_msg(&peer_addr, msg_type, &payload).await {
1536 debug!(
1537 peer = %self.peer_display_name(&peer_addr),
1538 error = %error,
1539 "Failed to send mesh traversal signal"
1540 );
1541 }
1542 }
1543
1544 for signal in deferred {
1545 bootstrap.requeue_mesh_signal(signal);
1546 }
1547 }
1548
1549 async fn mesh_signal_session_action(
1550 &mut self,
1551 peer_addr: NodeAddr,
1552 peer_pubkey: PublicKey,
1553 ) -> MeshSignalSessionAction {
1554 if let Some(entry) = self.sessions.get(&peer_addr) {
1555 if entry.is_established() {
1556 return MeshSignalSessionAction::Send;
1557 }
1558 if entry.is_initiating() || entry.is_awaiting_msg3() {
1559 debug!(
1560 peer = %self.peer_display_name(&peer_addr),
1561 "Deferring mesh traversal signal until end-to-end session is established"
1562 );
1563 return MeshSignalSessionAction::Defer;
1564 }
1565 }
1566
1567 if self.find_next_hop(&peer_addr).is_none() {
1568 debug!(
1569 peer = %self.peer_display_name(&peer_addr),
1570 "Cannot warm mesh traversal signal session without a FIPS route"
1571 );
1572 self.maybe_initiate_lookup(&peer_addr).await;
1573 return MeshSignalSessionAction::Drop;
1574 }
1575
1576 self.register_identity(peer_addr, peer_pubkey);
1577 match self.initiate_session(peer_addr, peer_pubkey).await {
1578 Ok(()) => {
1579 debug!(
1580 peer = %self.peer_display_name(&peer_addr),
1581 "Warming end-to-end session for mesh traversal signal"
1582 );
1583 MeshSignalSessionAction::Defer
1584 }
1585 Err(NodeError::SendFailed { node_addr, reason })
1586 if node_addr == peer_addr && reason == "no route to destination" =>
1587 {
1588 debug!(
1589 peer = %self.peer_display_name(&peer_addr),
1590 "Cannot warm mesh traversal signal session without a FIPS route"
1591 );
1592 self.maybe_initiate_lookup(&peer_addr).await;
1593 MeshSignalSessionAction::Drop
1594 }
1595 Err(error) => {
1596 debug!(
1597 peer = %self.peer_display_name(&peer_addr),
1598 error = %error,
1599 "Failed to warm end-to-end session for mesh traversal signal"
1600 );
1601 MeshSignalSessionAction::Drop
1602 }
1603 }
1604 }
1605
1606 pub(super) fn lan_discovery_scope(&self) -> Option<String> {
1612 if let Some(scope) = self.config.node.discovery.lan.scope.as_deref() {
1613 let scope = scope.trim();
1614 if !scope.is_empty() {
1615 return Some(scope.to_string());
1616 }
1617 }
1618
1619 let app = self.config.node.discovery.nostr.app.trim();
1620 if app.is_empty() {
1621 return None;
1622 }
1623 if let Some(rest) = app.strip_prefix("fips-overlay-v1:") {
1624 let scope = rest.trim();
1625 if scope.is_empty() {
1626 None
1627 } else {
1628 Some(scope.to_string())
1629 }
1630 } else {
1631 Some(app.to_string())
1632 }
1633 }
1634
1635 pub(super) fn start_local_instance_discovery(&mut self) {
1636 if !self.config.node.discovery.local.enabled {
1637 return;
1638 }
1639 let Some(scope) = self.lan_discovery_scope() else {
1640 debug!("local instance discovery not started: no discovery scope");
1641 return;
1642 };
1643 let now_ms = Self::now_ms();
1644 match crate::discovery::local::LocalInstanceRegistry::new(
1645 self.identity.npub(),
1646 scope,
1647 &self.config.node.discovery.local,
1648 now_ms,
1649 ) {
1650 Ok(registry) => {
1651 self.local_instance_registry = Some(registry);
1652 self.local_instance_started_at_ms = Some(now_ms);
1653 self.last_local_instance_publish_ms = None;
1654 self.last_local_instance_scan_ms = None;
1655 self.publish_local_instance_record(now_ms);
1656 info!("Same-host FIPS instance discovery enabled");
1657 }
1658 Err(crate::discovery::local::LocalInstanceRegistryError::Disabled) => {
1659 debug!("same-host FIPS instance discovery disabled");
1660 }
1661 Err(err) => {
1662 debug!(error = %err, "same-host FIPS instance discovery not started");
1663 }
1664 }
1665 }
1666
1667 fn local_instance_contacts(&self) -> Vec<crate::discovery::local::LocalInstanceContact> {
1668 let mut contacts = Vec::new();
1669 for handle in self.transports.values() {
1670 if !handle.is_operational() || !handle.accept_connections() {
1671 continue;
1672 }
1673 let transport = handle.transport_type().name;
1674 if transport != "udp" && transport != "tcp" {
1675 continue;
1676 }
1677 let Some(local_addr) = handle.local_addr() else {
1678 continue;
1679 };
1680 let Some(contact) =
1681 crate::discovery::local::contact_for_transport_addr(transport, local_addr)
1682 else {
1683 continue;
1684 };
1685 if contacts
1686 .iter()
1687 .any(|existing: &crate::discovery::local::LocalInstanceContact| {
1688 existing.transport == contact.transport && existing.addr == contact.addr
1689 })
1690 {
1691 continue;
1692 }
1693 contacts.push(contact);
1694 }
1695 contacts
1696 }
1697
1698 fn publish_local_instance_record(&mut self, now_ms: u64) {
1699 let Some(registry) = self.local_instance_registry.clone() else {
1700 return;
1701 };
1702 let contacts = self.local_instance_contacts();
1703 match registry.publish(contacts, now_ms) {
1704 Ok(()) => {
1705 self.last_local_instance_publish_ms = Some(now_ms);
1706 }
1707 Err(err) => {
1708 debug!(error = %err, "failed to publish same-host FIPS instance record");
1709 }
1710 }
1711 }
1712
1713 fn maybe_publish_local_instance_record(&mut self, now_ms: u64) {
1714 if self.local_instance_registry.is_none() {
1715 return;
1716 }
1717 let interval_ms = self.config.node.discovery.local.publish_interval_ms();
1718 let due = self
1719 .last_local_instance_publish_ms
1720 .map(|last| now_ms.saturating_sub(last) >= interval_ms)
1721 .unwrap_or(true);
1722 if due {
1723 self.publish_local_instance_record(now_ms);
1724 }
1725 }
1726
1727 fn local_instance_scan_due(&self, now_ms: u64) -> bool {
1728 if self.local_instance_registry.is_none() {
1729 return false;
1730 }
1731 let cfg = &self.config.node.discovery.local;
1732 let interval_ms = if self
1733 .local_instance_started_at_ms
1734 .map(|started| now_ms.saturating_sub(started) <= cfg.startup_scan_duration_ms())
1735 .unwrap_or(false)
1736 {
1737 cfg.startup_scan_interval_ms()
1738 } else {
1739 cfg.scan_interval_ms()
1740 };
1741 self.last_local_instance_scan_ms
1742 .map(|last| now_ms.saturating_sub(last) >= interval_ms)
1743 .unwrap_or(true)
1744 }
1745
1746 fn local_instance_peer_allowed(&self, identity: &PeerIdentity) -> bool {
1747 if self.config.peers().iter().any(|peer| {
1748 PeerIdentity::from_npub(&peer.npub)
1749 .map(|configured| configured.node_addr() == identity.node_addr())
1750 .unwrap_or(false)
1751 }) {
1752 return true;
1753 }
1754 self.config.node.discovery.nostr.policy == NostrDiscoveryPolicy::Open
1755 }
1756
1757 fn local_instance_peer_addresses(
1758 &self,
1759 record: &crate::discovery::local::LocalInstanceRecord,
1760 ) -> Vec<PeerAddress> {
1761 let mut addresses = Vec::new();
1762 for contact in &record.contacts {
1763 if contact.transport != "udp" && contact.transport != "tcp" {
1764 continue;
1765 }
1766 let Ok(socket_addr) = contact.addr.parse::<SocketAddr>() else {
1767 debug!(
1768 npub = %record.npub,
1769 transport = %contact.transport,
1770 addr = %contact.addr,
1771 "local instance discovery: skip non-socket contact"
1772 );
1773 continue;
1774 };
1775 if !socket_addr.ip().is_loopback() {
1776 debug!(
1777 npub = %record.npub,
1778 addr = %contact.addr,
1779 "local instance discovery: skip non-loopback contact"
1780 );
1781 continue;
1782 }
1783 let address =
1784 PeerAddress::with_priority(contact.transport.clone(), contact.addr.clone(), 10)
1785 .with_seen_at_ms(record.updated_at_ms);
1786 if addresses.iter().any(|existing: &PeerAddress| {
1787 existing.transport == address.transport && existing.addr == address.addr
1788 }) {
1789 continue;
1790 }
1791 addresses.push(address);
1792 }
1793 addresses
1794 }
1795
1796 pub(super) async fn poll_local_instance_discovery(&mut self) {
1800 let Some(registry) = self.local_instance_registry.clone() else {
1801 return;
1802 };
1803 let now_ms = Self::now_ms();
1804 self.maybe_publish_local_instance_record(now_ms);
1805 if !self.local_instance_scan_due(now_ms) {
1806 return;
1807 }
1808 self.last_local_instance_scan_ms = Some(now_ms);
1809
1810 let records = match registry.scan(now_ms, self.config.node.discovery.local.stale_after_ms())
1811 {
1812 Ok(records) => records,
1813 Err(err) => {
1814 debug!(error = %err, "same-host FIPS instance scan failed");
1815 return;
1816 }
1817 };
1818 if records.is_empty() {
1819 return;
1820 }
1821
1822 let mut connect_budget = self.discovery_connect_budget();
1823 let mut skipped_budget = 0usize;
1824 for record in records {
1825 let identity = match PeerIdentity::from_npub(&record.npub) {
1826 Ok(identity) => identity,
1827 Err(err) => {
1828 debug!(npub = %record.npub, error = %err, "local instance discovery: skip bad npub");
1829 continue;
1830 }
1831 };
1832 let peer_node_addr = *identity.node_addr();
1833 if peer_node_addr == *self.identity.node_addr() {
1834 continue;
1835 }
1836 if !self.local_instance_peer_allowed(&identity) {
1837 debug!(
1838 npub = %identity.short_npub(),
1839 "local instance discovery: skip unconfigured peer"
1840 );
1841 continue;
1842 }
1843
1844 let addresses = self.local_instance_peer_addresses(&record);
1845 if addresses.is_empty() {
1846 continue;
1847 }
1848
1849 if self.peers.contains_key(&peer_node_addr)
1850 && self.active_peer_candidate_is_fresh_enough_to_skip(&peer_node_addr, &addresses)
1851 {
1852 continue;
1853 }
1854
1855 for address in addresses {
1856 let Some((transport_id, remote_addr)) =
1857 self.resolve_peer_address_for_match(&address)
1858 else {
1859 continue;
1860 };
1861 if self.is_connecting_to_peer_on_path(&peer_node_addr, transport_id, &remote_addr) {
1862 continue;
1863 }
1864 if connect_budget == 0 || self.path_candidate_attempt_budget(&peer_node_addr) == 0 {
1865 skipped_budget = skipped_budget.saturating_add(1);
1866 continue;
1867 }
1868 info!(
1869 npub = %identity.short_npub(),
1870 transport = %address.transport,
1871 addr = %address.addr,
1872 "same-host FIPS instance discovery: initiating handshake"
1873 );
1874 if let Err(err) = self
1875 .initiate_connection(transport_id, remote_addr, identity)
1876 .await
1877 {
1878 debug!(
1879 npub = %record.npub,
1880 error = %err,
1881 "same-host FIPS instance discovery: failed to initiate connection"
1882 );
1883 }
1884 connect_budget = connect_budget.saturating_sub(1);
1885 }
1886 }
1887 if skipped_budget > 0 {
1888 debug!(
1889 skipped = skipped_budget,
1890 "same-host FIPS instance discovery connect budget exhausted"
1891 );
1892 }
1893 }
1894
1895 pub(super) async fn poll_lan_discovery(&mut self) {
1902 let Some(runtime) = self.lan_discovery.clone() else {
1903 return;
1904 };
1905 let events = runtime.drain_events().await;
1906 if events.is_empty() {
1907 return;
1908 }
1909 let mut connect_budget = self.discovery_connect_budget();
1910 let mut skipped_budget = 0usize;
1911 for event in events {
1912 let crate::discovery::lan::LanEvent::Discovered(peer) = event;
1913 let Some((transport_id, local_addr)) =
1914 self.find_udp_transport_for_remote_addr(peer.addr)
1915 else {
1916 debug!(
1917 addr = %peer.addr,
1918 "lan: skip discovered peer with no compatible UDP transport"
1919 );
1920 continue;
1921 };
1922 let identity = match crate::PeerIdentity::from_npub(&peer.npub) {
1923 Ok(id) => id,
1924 Err(err) => {
1925 debug!(npub = %peer.npub, error = %err, "lan: skip bad npub");
1926 continue;
1927 }
1928 };
1929 let peer_node_addr = *identity.node_addr();
1930 let remote_addr = crate::transport::TransportAddr::from_string(&peer.addr.to_string());
1931 if self.peers.contains_key(&peer_node_addr) {
1932 let candidate = PeerAddress::new("udp", peer.addr.to_string());
1933 if self.active_peer_candidate_is_fresh_enough_to_skip(
1934 &peer_node_addr,
1935 std::slice::from_ref(&candidate),
1936 ) {
1937 continue;
1938 }
1939 if self.is_connecting_to_peer_on_path(&peer_node_addr, transport_id, &remote_addr) {
1940 continue;
1941 }
1942 if connect_budget == 0 || self.path_candidate_attempt_budget(&peer_node_addr) == 0 {
1943 skipped_budget = skipped_budget.saturating_add(1);
1944 continue;
1945 }
1946 info!(
1947 npub = %identity.short_npub(),
1948 addr = %peer.addr,
1949 local_addr = %local_addr,
1950 "lan: initiating alternate-path handshake to active peer"
1951 );
1952 if let Err(err) = self
1953 .initiate_connection(transport_id, remote_addr, identity)
1954 .await
1955 {
1956 debug!(
1957 npub = %peer.npub,
1958 error = %err,
1959 "lan: failed to initiate active peer alternate-path handshake"
1960 );
1961 }
1962 connect_budget = connect_budget.saturating_sub(1);
1963 continue;
1964 }
1965 if self.is_connecting_to_peer_on_path(&peer_node_addr, transport_id, &remote_addr) {
1966 continue;
1967 }
1968 if connect_budget == 0 || self.path_candidate_attempt_budget(&peer_node_addr) == 0 {
1969 skipped_budget = skipped_budget.saturating_add(1);
1970 continue;
1971 }
1972 info!(
1973 npub = %identity.short_npub(),
1974 addr = %peer.addr,
1975 local_addr = %local_addr,
1976 "lan: initiating handshake to discovered peer"
1977 );
1978 if let Err(err) = self
1979 .initiate_connection(transport_id, remote_addr, identity)
1980 .await
1981 {
1982 debug!(
1983 npub = %peer.npub,
1984 error = %err,
1985 "lan: failed to initiate connection to discovered peer"
1986 );
1987 }
1988 connect_budget = connect_budget.saturating_sub(1);
1989 }
1990 if skipped_budget > 0 {
1991 debug!(
1992 skipped = skipped_budget,
1993 "lan: discovery connect budget exhausted"
1994 );
1995 }
1996 }
1997
1998 pub(super) async fn poll_pending_connects(&mut self) {
2005 if self.pending_connects.is_empty() {
2006 return;
2007 }
2008
2009 let mut completed = Vec::new();
2010
2011 for (i, pending) in self.pending_connects.iter().enumerate() {
2012 let state = if let Some(transport) = self.transports.get(&pending.transport_id) {
2013 transport.connection_state(&pending.remote_addr)
2014 } else {
2015 crate::transport::ConnectionState::Failed("transport removed".into())
2016 };
2017
2018 match state {
2019 crate::transport::ConnectionState::Connected => {
2020 completed.push((i, true, None));
2021 }
2022 crate::transport::ConnectionState::Failed(reason) => {
2023 completed.push((i, false, Some(reason)));
2024 }
2025 crate::transport::ConnectionState::Connecting => {
2026 }
2028 crate::transport::ConnectionState::None => {
2029 completed.push((i, false, Some("no connection attempt found".into())));
2031 }
2032 }
2033 }
2034
2035 for (i, success, reason) in completed.into_iter().rev() {
2037 let pending = self.pending_connects.remove(i);
2038
2039 if success {
2040 if let Some(link) = self.links.get_mut(&pending.link_id) {
2042 link.set_connected();
2043 }
2044
2045 debug!(
2046 peer = %self.peer_display_name(pending.peer_identity.node_addr()),
2047 transport_id = %pending.transport_id,
2048 remote_addr = %pending.remote_addr,
2049 link_id = %pending.link_id,
2050 "Transport connected, starting handshake"
2051 );
2052
2053 if let Err(e) = self
2055 .start_handshake(
2056 pending.link_id,
2057 pending.transport_id,
2058 pending.remote_addr.clone(),
2059 pending.peer_identity,
2060 )
2061 .await
2062 {
2063 warn!(
2064 link_id = %pending.link_id,
2065 error = %e,
2066 "Failed to start handshake after transport connect"
2067 );
2068 self.remove_link(&pending.link_id);
2070 }
2071 } else {
2072 let reason = reason.unwrap_or_default();
2073 warn!(
2074 peer = %self.peer_display_name(pending.peer_identity.node_addr()),
2075 transport_id = %pending.transport_id,
2076 remote_addr = %pending.remote_addr,
2077 link_id = %pending.link_id,
2078 reason = %reason,
2079 "Transport connect failed"
2080 );
2081
2082 self.remove_link(&pending.link_id);
2084 self.links.remove(&pending.link_id);
2085 self.schedule_retry(*pending.peer_identity.node_addr(), Self::now_ms());
2086 }
2087 }
2088 }
2089
2090 pub async fn start(&mut self) -> Result<(), NodeError> {
2097 node_start_debug_log("Node::start begin");
2098 if !self.state.can_start() {
2099 return Err(NodeError::AlreadyStarted);
2100 }
2101 self.state = NodeState::Starting;
2102 node_start_debug_log("Node::start state set to starting");
2103
2104 let packet_buffer_size = self.config.node.buffers.packet_channel;
2106 let (packet_tx, packet_rx) = packet_channel(packet_buffer_size);
2107 self.packet_tx = Some(packet_tx.clone());
2108 self.packet_rx = Some(packet_rx);
2109 node_start_debug_log("Node::start packet channel created");
2110
2111 node_start_debug_log("Node::start create transports begin");
2113 let transport_handles = self.create_transports(&packet_tx).await;
2114 node_start_debug_log(format!(
2115 "Node::start create transports complete count={}",
2116 transport_handles.len()
2117 ));
2118
2119 for mut handle in transport_handles {
2120 let transport_id = handle.transport_id();
2121 let transport_type = handle.transport_type().name;
2122 let name = handle.name().map(|s| s.to_string());
2123
2124 node_start_debug_log(format!(
2125 "Node::start transport start begin id={} type={} name={:?}",
2126 transport_id, transport_type, name
2127 ));
2128 match handle.start().await {
2129 Ok(()) => {
2130 node_start_debug_log(format!(
2131 "Node::start transport start ok id={} type={}",
2132 transport_id, transport_type
2133 ));
2134 self.transports.insert(transport_id, handle);
2135 }
2136 Err(e) => {
2137 node_start_debug_log(format!(
2138 "Node::start transport start error id={} type={} error={}",
2139 transport_id, transport_type, e
2140 ));
2141 if let Some(ref n) = name {
2142 warn!(transport_type, name = %n, error = %e, "Transport failed to start");
2143 } else {
2144 warn!(transport_type, error = %e, "Transport failed to start");
2145 }
2146 }
2147 }
2148 }
2149
2150 if !self.transports.is_empty() {
2151 info!(count = self.transports.len(), "Transports initialized");
2152 }
2153
2154 #[cfg(unix)]
2170 {
2171 if self.config.node.worker_pools_enabled {
2172 node_start_debug_log("Node::start worker pools begin");
2173 let cpu_default = std::thread::available_parallelism()
2174 .map(|n| n.get())
2175 .unwrap_or(1)
2176 .max(1);
2177 let encrypt_worker_count: usize = std::env::var("FIPS_ENCRYPT_WORKERS")
2178 .ok()
2179 .and_then(|s| s.parse().ok())
2180 .unwrap_or(cpu_default)
2181 .max(1);
2182 self.encrypt_workers = Some(super::encrypt_worker::EncryptWorkerPool::spawn(
2183 encrypt_worker_count,
2184 ));
2185 info!(
2186 workers = encrypt_worker_count,
2187 "Spawned FMP-encrypt worker pool"
2188 );
2189
2190 let decrypt_worker_count: usize = std::env::var("FIPS_DECRYPT_WORKERS")
2199 .ok()
2200 .and_then(|s| s.parse().ok())
2201 .unwrap_or(cpu_default);
2202 if decrypt_worker_count == 0 {
2203 info!("FIPS_DECRYPT_WORKERS=0 → in-line decrypt in rx_loop (no worker pool)");
2204 } else {
2205 self.decrypt_workers = Some(super::decrypt_worker::DecryptWorkerPool::spawn(
2206 decrypt_worker_count,
2207 ));
2208 info!(
2209 workers = decrypt_worker_count,
2210 "Spawned FMP+FSP-decrypt worker pool"
2211 );
2212 }
2213 node_start_debug_log("Node::start worker pools complete");
2214 } else {
2215 node_start_debug_log("Node::start worker pools disabled");
2216 info!("FIPS worker pools disabled; using in-line crypto/send path");
2217 }
2218 }
2219
2220 if self.config.node.discovery.nostr.enabled {
2221 node_start_debug_log("Node::start nostr discovery start begin");
2222 match NostrDiscovery::start(&self.identity, self.config.node.discovery.nostr.clone())
2223 .await
2224 {
2225 Ok(runtime) => {
2226 node_start_debug_log("Node::start nostr discovery runtime created");
2227 if let Err(err) = self.refresh_overlay_advert(&runtime).await {
2228 warn!(error = %err, "Failed to publish initial Nostr overlay advert");
2229 }
2230 node_start_debug_log("Node::start nostr overlay advert refreshed");
2231 self.nostr_discovery = Some(runtime);
2232 self.nostr_discovery_started_at_ms = Some(Self::now_ms());
2233 info!("Nostr overlay discovery enabled");
2234 }
2235 Err(err) => {
2236 node_start_debug_log(format!(
2237 "Node::start nostr discovery start error error={}",
2238 err
2239 ));
2240 warn!(error = %err, "Failed to start Nostr overlay discovery");
2241 }
2242 }
2243 }
2244
2245 if self.config.node.discovery.lan.enabled {
2249 node_start_debug_log("Node::start lan discovery start begin");
2250 let advertised_udp_port = self
2251 .transports
2252 .values()
2253 .filter(|h| h.is_operational())
2254 .filter(|h| h.transport_type().name == "udp")
2255 .find_map(|h| h.local_addr().map(|addr| addr.port()))
2256 .unwrap_or(0);
2257 let scope = self.lan_discovery_scope();
2258 match crate::discovery::lan::LanDiscovery::start(
2259 &self.identity,
2260 scope,
2261 advertised_udp_port,
2262 self.config.node.discovery.lan.clone(),
2263 )
2264 .await
2265 {
2266 Ok(runtime) => {
2267 node_start_debug_log("Node::start lan discovery start ok");
2268 self.lan_discovery = Some(runtime);
2269 info!("LAN mDNS discovery enabled");
2270 }
2271 Err(err) => {
2272 node_start_debug_log(format!(
2273 "Node::start lan discovery start error error={}",
2274 err
2275 ));
2276 debug!(error = %err, "LAN mDNS discovery not started");
2277 }
2278 }
2279 }
2280
2281 self.start_local_instance_discovery();
2282 self.poll_local_instance_discovery().await;
2283
2284 node_start_debug_log("Node::start initiate peer connections begin");
2287 self.initiate_peer_connections().await;
2288 node_start_debug_log("Node::start initiate peer connections complete");
2289
2290 if self.config.tun.enabled {
2292 node_start_debug_log("Node::start tun init begin");
2293 let address = *self.identity.address();
2294 match TunDevice::create(&self.config.tun, address).await {
2295 Ok(device) => {
2296 let mtu = device.mtu();
2297 let name = device.name().to_string();
2298 let our_addr = *device.address();
2299
2300 info!("TUN device active:");
2301 info!(" name: {}", name);
2302 info!(" address: {}", device.address());
2303 info!(" mtu: {}", mtu);
2304
2305 let effective_mtu = self.effective_ipv6_mtu();
2307 let max_mss = effective_mtu.saturating_sub(40).saturating_sub(20); info!("effective MTU: {} bytes", effective_mtu);
2310 debug!(" max TCP MSS: {} bytes", max_mss);
2311
2312 #[cfg(target_os = "macos")]
2316 let (shutdown_read_fd, shutdown_write_fd) = {
2317 let mut fds = [0i32; 2];
2318 if unsafe { libc::pipe(fds.as_mut_ptr()) } < 0 {
2319 return Err(NodeError::Tun(crate::upper::tun::TunError::Configure(
2320 "failed to create shutdown pipe".into(),
2321 )));
2322 }
2323 (fds[0], fds[1])
2324 };
2325
2326 let (writer, tun_tx) =
2330 device.create_writer(max_mss, self.path_mtu_lookup.clone())?;
2331
2332 let writer_handle = thread::spawn(move || {
2334 writer.run();
2335 });
2336
2337 let reader_tun_tx = tun_tx.clone();
2339
2340 let tun_channel_size = self.config.node.buffers.tun_channel;
2342 let (outbound_tx, outbound_rx) = tokio::sync::mpsc::channel(tun_channel_size);
2343
2344 let transport_mtu = self.transport_mtu();
2346 let path_mtu_lookup = self.path_mtu_lookup.clone();
2347 #[cfg(target_os = "macos")]
2348 let reader_handle = thread::spawn(move || {
2349 run_tun_reader(
2350 device,
2351 mtu,
2352 our_addr,
2353 reader_tun_tx,
2354 outbound_tx,
2355 transport_mtu,
2356 path_mtu_lookup,
2357 shutdown_read_fd,
2358 );
2359 });
2360 #[cfg(not(target_os = "macos"))]
2361 let reader_handle = thread::spawn(move || {
2362 run_tun_reader(
2363 device,
2364 mtu,
2365 our_addr,
2366 reader_tun_tx,
2367 outbound_tx,
2368 transport_mtu,
2369 path_mtu_lookup,
2370 );
2371 });
2372
2373 self.tun_state = TunState::Active;
2374 self.tun_name = Some(name);
2375 self.tun_tx = Some(tun_tx);
2376 self.tun_outbound_rx = Some(outbound_rx);
2377 self.tun_reader_handle = Some(reader_handle);
2378 self.tun_writer_handle = Some(writer_handle);
2379 #[cfg(target_os = "macos")]
2380 {
2381 self.tun_shutdown_fd = Some(shutdown_write_fd);
2382 }
2383 }
2384 Err(e) => {
2385 self.tun_state = TunState::Failed;
2386 warn!(error = %e, "Failed to initialize TUN, continuing without it");
2387 }
2388 }
2389 node_start_debug_log("Node::start tun init complete");
2390 }
2391
2392 if self.config.dns.enabled {
2409 node_start_debug_log("Node::start dns init begin");
2410 let addr_str = self.config.dns.bind_addr();
2411 match addr_str.parse::<std::net::IpAddr>() {
2412 Ok(ip) => {
2413 let bind = std::net::SocketAddr::new(ip, self.config.dns.port());
2414 match Self::bind_dns_socket(bind) {
2415 Ok(socket) => {
2416 let dns_channel_size = self.config.node.buffers.dns_channel;
2417 let (identity_tx, identity_rx) =
2418 tokio::sync::mpsc::channel(dns_channel_size);
2419 let dns_ttl = self.config.dns.ttl();
2420 let base_hosts = crate::upper::hosts::HostMap::from_peer_configs(
2421 self.config.peers(),
2422 );
2423 let reloader = if self.config.node.system_files_enabled {
2424 let hosts_path = std::path::PathBuf::from(
2425 crate::upper::hosts::DEFAULT_HOSTS_PATH,
2426 );
2427 crate::upper::hosts::HostMapReloader::new(base_hosts, hosts_path)
2428 } else {
2429 crate::upper::hosts::HostMapReloader::memory_only(base_hosts)
2430 };
2431 let mesh_ifindex = Self::lookup_mesh_ifindex(self.config.tun.name());
2439 info!(
2440 bind = %bind,
2441 hosts = reloader.hosts().len(),
2442 mesh_ifindex = ?mesh_ifindex,
2443 "DNS responder started for .fips domain (auto-reload enabled)"
2444 );
2445 let handle = tokio::spawn(crate::upper::dns::run_dns_responder(
2446 socket,
2447 identity_tx,
2448 dns_ttl,
2449 reloader,
2450 mesh_ifindex,
2451 ));
2452 self.dns_identity_rx = Some(identity_rx);
2453 self.dns_task = Some(handle);
2454 }
2455 Err(e) => {
2456 warn!(bind = %bind, error = %e, "Failed to start DNS responder");
2457 }
2458 }
2459 }
2460 Err(e) => {
2461 warn!(addr = %addr_str, error = %e, "Invalid dns.bind_addr; DNS responder not started");
2462 }
2463 }
2464 node_start_debug_log("Node::start dns init complete");
2465 }
2466
2467 self.state = NodeState::Running;
2468 node_start_debug_log("Node::start running");
2469 info!("Node started:");
2470 info!(" state: {}", self.state);
2471 info!(" transports: {}", self.transports.len());
2472 info!(" connections: {}", self.connections.len());
2473 Ok(())
2474 }
2475
2476 fn bind_dns_socket(
2489 addr: std::net::SocketAddr,
2490 ) -> Result<tokio::net::UdpSocket, std::io::Error> {
2491 use socket2::{Domain, Protocol, Socket, Type};
2492 let domain = if addr.is_ipv4() {
2493 Domain::IPV4
2494 } else {
2495 Domain::IPV6
2496 };
2497 let sock = Socket::new(domain, Type::DGRAM, Some(Protocol::UDP))?;
2498 if addr.is_ipv6() {
2499 sock.set_only_v6(false)?;
2500 #[cfg(unix)]
2501 Self::set_recv_pktinfo_v6(&sock)?;
2502 }
2503 sock.set_nonblocking(true)?;
2504 sock.bind(&addr.into())?;
2505 tokio::net::UdpSocket::from_std(sock.into())
2506 }
2507
2508 #[cfg(unix)]
2514 fn set_recv_pktinfo_v6(sock: &socket2::Socket) -> Result<(), std::io::Error> {
2515 use std::os::fd::AsRawFd;
2516 let enable: libc::c_int = 1;
2517 let ret = unsafe {
2518 libc::setsockopt(
2519 sock.as_raw_fd(),
2520 libc::IPPROTO_IPV6,
2521 libc::IPV6_RECVPKTINFO,
2522 &enable as *const _ as *const libc::c_void,
2523 std::mem::size_of::<libc::c_int>() as libc::socklen_t,
2524 )
2525 };
2526 if ret < 0 {
2527 return Err(std::io::Error::last_os_error());
2528 }
2529 Ok(())
2530 }
2531
2532 fn lookup_mesh_ifindex(name: &str) -> Option<u32> {
2539 #[cfg(unix)]
2540 {
2541 let c_name = std::ffi::CString::new(name).ok()?;
2542 let idx = unsafe { libc::if_nametoindex(c_name.as_ptr()) };
2543 if idx == 0 { None } else { Some(idx) }
2544 }
2545 #[cfg(not(unix))]
2546 {
2547 let _ = name;
2548 None
2549 }
2550 }
2551
2552 pub async fn stop(&mut self) -> Result<(), NodeError> {
2557 if !self.state.can_stop() {
2558 return Err(NodeError::NotStarted);
2559 }
2560 self.state = NodeState::Stopping;
2561 info!(state = %self.state, "Node stopping");
2562
2563 if let Some(handle) = self.dns_task.take() {
2565 handle.abort();
2566 debug!("DNS responder stopped");
2567 }
2568
2569 self.send_disconnect_to_all_peers(DisconnectReason::Shutdown)
2571 .await;
2572
2573 if let Some(bootstrap) = self.nostr_discovery.take()
2575 && let Err(e) = bootstrap.shutdown().await
2576 {
2577 warn!(error = %e, "Failed to shutdown Nostr overlay discovery");
2578 }
2579
2580 if let Some(lan) = self.lan_discovery.take() {
2584 lan.shutdown().await;
2585 }
2586
2587 if let Some(registry) = self.local_instance_registry.take()
2588 && let Err(err) = registry.remove()
2589 {
2590 debug!(error = %err, "failed to remove same-host FIPS instance record");
2591 }
2592
2593 let transport_ids: Vec<_> = self.transports.keys().cloned().collect();
2595 for transport_id in transport_ids {
2596 if let Some(mut handle) = self.transports.remove(&transport_id) {
2597 let transport_type = handle.transport_type().name;
2598 match handle.stop().await {
2599 Ok(()) => {
2600 info!(transport_id = %transport_id, transport_type, "Transport stopped");
2601 }
2602 Err(e) => {
2603 warn!(
2604 transport_id = %transport_id,
2605 transport_type,
2606 error = %e,
2607 "Transport stop failed"
2608 );
2609 }
2610 }
2611 }
2612 }
2613
2614 self.packet_tx.take();
2616 self.packet_rx.take();
2617
2618 if let Some(name) = self.tun_name.take() {
2620 info!(name = %name, "Shutting down TUN interface");
2621
2622 self.tun_tx.take();
2624
2625 if let Err(e) = shutdown_tun_interface(&name).await {
2627 warn!(name = %name, error = %e, "Failed to shutdown TUN interface");
2628 }
2629
2630 #[cfg(target_os = "macos")]
2633 if let Some(fd) = self.tun_shutdown_fd.take() {
2634 unsafe {
2635 libc::write(fd, b"x".as_ptr() as *const libc::c_void, 1);
2636 libc::close(fd);
2637 }
2638 }
2639
2640 if let Some(handle) = self.tun_reader_handle.take() {
2642 let _ = handle.join();
2643 }
2644 if let Some(handle) = self.tun_writer_handle.take() {
2645 let _ = handle.join();
2646 }
2647
2648 self.tun_state = TunState::Disabled;
2649 }
2650
2651 self.state = NodeState::Stopped;
2652 info!(state = %self.state, "Node stopped");
2653 Ok(())
2654 }
2655
2656 async fn send_disconnect_to_all_peers(&mut self, reason: DisconnectReason) {
2661 let disconnect = Disconnect::new(reason);
2662 let plaintext = disconnect.encode();
2663
2664 let peer_addrs: Vec<NodeAddr> = self
2666 .peers
2667 .iter()
2668 .filter(|(_, peer)| peer.can_send() && peer.has_session())
2669 .map(|(addr, _)| *addr)
2670 .collect();
2671
2672 if peer_addrs.is_empty() {
2673 debug!(
2674 total_peers = self.peers.len(),
2675 "No sendable peers for disconnect notification"
2676 );
2677 return;
2678 }
2679
2680 let mut sent = 0usize;
2681 for node_addr in &peer_addrs {
2682 match self
2683 .send_encrypted_link_message(node_addr, &plaintext)
2684 .await
2685 {
2686 Ok(()) => sent += 1,
2687 Err(e) => {
2688 debug!(
2689 peer = %self.peer_display_name(node_addr),
2690 error = %e,
2691 "Failed to send disconnect (transport may be down)"
2692 );
2693 }
2694 }
2695 }
2696
2697 info!(sent, total = peer_addrs.len(), reason = %reason, "Sent disconnect notifications");
2698 }
2699
2700 pub(in crate::node) fn static_peer_addresses(
2701 &self,
2702 peer_config: &PeerConfig,
2703 ) -> Vec<PeerAddress> {
2704 peer_config
2705 .addresses_by_priority()
2706 .into_iter()
2707 .cloned()
2708 .collect()
2709 }
2710
2711 async fn nostr_peer_fallback_addresses(
2712 &self,
2713 peer_config: &PeerConfig,
2714 existing: &[PeerAddress],
2715 ) -> Vec<PeerAddress> {
2716 if !self.config.node.discovery.nostr.enabled
2717 || self.config.node.discovery.nostr.policy
2718 == crate::config::NostrDiscoveryPolicy::Disabled
2719 {
2720 return Vec::new();
2721 }
2722
2723 let Some(bootstrap) = self.nostr_discovery.clone() else {
2724 return Vec::new();
2725 };
2726 if self.nostr_cooldown_applies_to_peer_config(peer_config)
2727 && bootstrap
2728 .cooldown_until(&peer_config.npub, Self::now_ms())
2729 .is_some()
2730 {
2731 debug!(
2732 npub = %peer_config.npub,
2733 "Skipping cached Nostr fallback endpoints while peer is in traversal cooldown"
2734 );
2735 return Vec::new();
2736 }
2737 let endpoints = match bootstrap
2738 .cached_advert_endpoints_for_peer(&peer_config.npub)
2739 .await
2740 {
2741 Some(endpoints) => endpoints,
2742 None => {
2743 debug!(
2744 npub = %peer_config.npub,
2745 "No cached Nostr advert endpoints for configured peer"
2746 );
2747 return Vec::new();
2748 }
2749 };
2750
2751 let mut fallback = Vec::new();
2752 let mut next_priority = existing
2753 .iter()
2754 .map(|addr| addr.priority)
2755 .max()
2756 .unwrap_or(100)
2757 .saturating_add(1);
2758 let seen_at_ms = Self::now_ms();
2764 for endpoint in endpoints {
2765 let Some(candidate) =
2766 Self::overlay_endpoint_to_peer_address(&endpoint, next_priority, seen_at_ms)
2767 else {
2768 continue;
2769 };
2770 if existing
2771 .iter()
2772 .any(|addr| addr.transport == candidate.transport && addr.addr == candidate.addr)
2773 || fallback.iter().any(|addr: &PeerAddress| {
2774 addr.transport == candidate.transport && addr.addr == candidate.addr
2775 })
2776 {
2777 continue;
2778 }
2779 fallback.push(candidate);
2780 next_priority = next_priority.saturating_add(1);
2781 }
2782 fallback
2783 }
2784
2785 pub(in crate::node) async fn request_nostr_bootstrap(&self, peer_config: &PeerConfig) -> bool {
2786 if !self.config.node.discovery.nostr.enabled
2787 || self.config.node.discovery.nostr.policy
2788 == crate::config::NostrDiscoveryPolicy::Disabled
2789 {
2790 return false;
2791 }
2792 let Some(bootstrap) = self.nostr_discovery.clone() else {
2793 return false;
2794 };
2795 let now_ms = Self::now_ms();
2796 if self.nostr_cooldown_applies_to_peer_config(peer_config)
2797 && let Some(cooldown_until_ms) = bootstrap.cooldown_until(&peer_config.npub, now_ms)
2798 {
2799 debug!(
2800 npub = %peer_config.npub,
2801 cooldown_secs = cooldown_until_ms.saturating_sub(now_ms) / 1000,
2802 "Skipping Nostr traversal request while peer is in cooldown"
2803 );
2804 return false;
2805 }
2806 bootstrap.set_outbound_admission(self.open_discovery_outbound_admission_check());
2807 bootstrap.set_direct_refresh_admission(self.outbound_direct_refresh_admission_check());
2808 let mesh_signaling_allowed = self.mesh_signaling_allowed_for_peer(peer_config);
2809 let started = bootstrap
2810 .request_connect_with_mesh_signaling(peer_config.clone(), mesh_signaling_allowed)
2811 .await;
2812 if started {
2813 info!(
2814 npub = %peer_config.npub,
2815 mesh_signaling_allowed,
2816 "Started background UDP NAT traversal attempt"
2817 );
2818 } else {
2819 debug!(
2820 npub = %peer_config.npub,
2821 mesh_signaling_allowed,
2822 "Background UDP NAT traversal attempt already in progress"
2823 );
2824 }
2825 true
2826 }
2827
2828 fn nostr_cooldown_applies_to_peer_config(&self, peer_config: &PeerConfig) -> bool {
2829 !self.mesh_signaling_allowed_for_peer(peer_config)
2830 }
2831
2832 pub(in crate::node) fn mesh_signaling_allowed_for_peer(
2833 &self,
2834 peer_config: &PeerConfig,
2835 ) -> bool {
2836 let Ok(identity) = PeerIdentity::from_npub(&peer_config.npub) else {
2837 return false;
2838 };
2839 let peer_addr = identity.node_addr();
2840 self.configured_peer(peer_addr).is_some()
2841 }
2842
2843 fn overlay_endpoint_to_peer_address(
2844 endpoint: &OverlayEndpointAdvert,
2845 priority: u8,
2846 seen_at_ms: u64,
2847 ) -> Option<PeerAddress> {
2848 let transport = match endpoint.transport {
2849 OverlayTransportKind::Udp => "udp",
2850 OverlayTransportKind::Tcp => "tcp",
2851 OverlayTransportKind::Tor => "tor",
2852 OverlayTransportKind::WebRtc => "webrtc",
2853 };
2854 Some(
2855 PeerAddress::with_priority(transport, endpoint.addr.clone(), priority)
2856 .with_seen_at_ms(seen_at_ms),
2857 )
2858 }
2859
2860 async fn attempt_peer_address_list(
2861 &mut self,
2862 peer_config: &PeerConfig,
2863 peer_identity: PeerIdentity,
2864 allow_bootstrap_nat: bool,
2865 addresses: &[PeerAddress],
2866 ) -> Result<(), NodeError> {
2867 let mut attempted = false;
2868 let mut local_route_error = None;
2869 let peer_node_addr = *peer_identity.node_addr();
2870 let mut concrete_budget = self.path_candidate_attempt_budget(&peer_node_addr);
2871
2872 for addr in addresses {
2873 if addr.transport == "udp" && addr.addr.eq_ignore_ascii_case("nat") {
2874 if !allow_bootstrap_nat {
2875 continue;
2876 }
2877 if self.request_nostr_bootstrap(peer_config).await {
2878 attempted = true;
2879 continue;
2880 }
2881 debug!(npub = %peer_config.npub, "No Nostr overlay runtime for udp:nat address");
2882 continue;
2883 }
2884
2885 let (transport_id, remote_addr) = if addr.transport == "ethernet" {
2886 match self.resolve_ethernet_addr(&addr.addr) {
2887 Ok(result) => result,
2888 Err(e) => {
2889 debug!(
2890 transport = %addr.transport,
2891 addr = %addr.addr,
2892 error = %e,
2893 "Failed to resolve Ethernet address"
2894 );
2895 continue;
2896 }
2897 }
2898 } else if addr.transport == "ble" {
2899 #[cfg(bluer_available)]
2900 {
2901 match self.resolve_ble_addr(&addr.addr) {
2902 Ok(result) => result,
2903 Err(e) => {
2904 debug!(
2905 transport = %addr.transport,
2906 addr = %addr.addr,
2907 error = %e,
2908 "Failed to resolve BLE address"
2909 );
2910 continue;
2911 }
2912 }
2913 }
2914 #[cfg(not(bluer_available))]
2915 {
2916 debug!(transport = %addr.transport, "BLE transport not available on this build");
2917 continue;
2918 }
2919 } else {
2920 let tid = if addr.transport == "udp"
2921 && let Ok(remote_socket_addr) = addr.addr.parse::<SocketAddr>()
2922 {
2923 match self.find_udp_transport_for_remote_addr(remote_socket_addr) {
2924 Some((id, _)) => id,
2925 None => {
2926 debug!(
2927 transport = %addr.transport,
2928 addr = %addr.addr,
2929 "No compatible operational UDP transport for address"
2930 );
2931 continue;
2932 }
2933 }
2934 } else {
2935 match self.find_transport_for_type(&addr.transport) {
2936 Some(id) => id,
2937 None => {
2938 debug!(
2939 transport = %addr.transport,
2940 addr = %addr.addr,
2941 "No operational transport for address type"
2942 );
2943 continue;
2944 }
2945 }
2946 };
2947 (tid, TransportAddr::from_string(&addr.addr))
2948 };
2949
2950 if self.is_connecting_to_peer_on_path(&peer_node_addr, transport_id, &remote_addr) {
2951 attempted = true;
2952 debug!(
2953 npub = %peer_config.npub,
2954 transport_id = %transport_id,
2955 remote_addr = %remote_addr,
2956 "Skipping duplicate in-flight candidate path"
2957 );
2958 continue;
2959 }
2960
2961 if concrete_budget == 0 {
2962 debug!(
2963 npub = %peer_config.npub,
2964 max_candidates = MAX_PARALLEL_PATH_CANDIDATES_PER_PEER,
2965 "Path candidate race budget exhausted"
2966 );
2967 break;
2968 }
2969
2970 match self
2971 .initiate_connection(transport_id, remote_addr, peer_identity)
2972 .await
2973 {
2974 Ok(()) => {
2975 attempted = true;
2976 concrete_budget = concrete_budget.saturating_sub(1);
2977 }
2978 Err(e @ NodeError::AccessDenied(_)) => return Err(e),
2979 Err(e) => {
2980 if e.is_local_route_unavailable() && local_route_error.is_none() {
2981 local_route_error = Some(e.to_string());
2982 }
2983 debug!(
2984 npub = %peer_config.npub,
2985 transport_id = %transport_id,
2986 error = %e,
2987 "Connection attempt failed, trying next address"
2988 );
2989 }
2990 }
2991 }
2992
2993 if attempted {
2994 return Ok(());
2995 }
2996
2997 if let Some(error) = local_route_error {
2998 return Err(NodeError::LocalRouteUnavailable(error));
2999 }
3000
3001 Err(NodeError::NoTransportForType(format!(
3002 "no operational transport for any of {}'s addresses",
3003 peer_config.npub
3004 )))
3005 }
3006
3007 async fn queue_open_discovery_retries(&mut self, bootstrap: &std::sync::Arc<NostrDiscovery>) {
3008 self.run_open_discovery_sweep(bootstrap, None, "per-tick")
3009 .await;
3010 }
3011
3012 pub(in crate::node) fn queue_active_fallback_direct_retries(
3013 &mut self,
3014 _bootstrap: &std::sync::Arc<NostrDiscovery>,
3015 ) {
3016 let now_ms = Self::now_ms();
3017 let peer_configs = self
3018 .config
3019 .auto_connect_peers()
3020 .cloned()
3021 .collect::<Vec<_>>();
3022
3023 for peer_config in peer_configs {
3024 let Ok(peer_identity) = PeerIdentity::from_npub(&peer_config.npub) else {
3025 continue;
3026 };
3027 let node_addr = *peer_identity.node_addr();
3028
3029 if self.retry_pending.contains_key(&node_addr)
3030 || !self.peers.contains_key(&node_addr)
3031 || self.is_connecting_to_peer(&node_addr)
3032 || !self.active_peer_should_keep_direct_retry(&node_addr, &peer_config)
3033 {
3034 continue;
3035 }
3036
3037 let mut state = super::retry::RetryState::new(peer_config.clone());
3038 state.reconnect = true;
3039 state.retry_after_ms = now_ms;
3040 self.retry_pending.insert(node_addr, state);
3041
3042 debug!(
3043 peer = %self.peer_display_name(&node_addr),
3044 "Queued direct-path retry for active fallback peer"
3045 );
3046 }
3047 }
3048
3049 pub(in crate::node) async fn run_open_discovery_sweep(
3060 &mut self,
3061 bootstrap: &std::sync::Arc<NostrDiscovery>,
3062 max_age_secs: Option<u64>,
3063 caller: &'static str,
3064 ) {
3065 if !self.config.node.discovery.nostr.enabled
3066 || self.config.node.discovery.nostr.policy != crate::config::NostrDiscoveryPolicy::Open
3067 {
3068 return;
3069 }
3070
3071 let configured_npubs = self
3072 .config
3073 .peers()
3074 .iter()
3075 .map(|peer| peer.npub.clone())
3076 .collect::<HashSet<_>>();
3077 let now_ms = Self::now_ms();
3078 let now_secs = now_ms / 1000;
3079 let mut enqueue_budget = self.open_discovery_enqueue_budget(&configured_npubs);
3080 if enqueue_budget == 0 {
3081 debug!(
3082 caller = %caller,
3083 "open-discovery sweep: enqueue budget is 0, skipping"
3084 );
3085 return;
3086 }
3087
3088 let candidates = bootstrap.cached_open_discovery_candidates(64).await;
3089 let cached_count = candidates.len();
3090 let mut enqueued = 0usize;
3091 let mut skipped_age = 0usize;
3092 let mut skipped_configured = 0usize;
3093 let mut skipped_self = 0usize;
3094 let mut skipped_connected = 0usize;
3095 let mut skipped_retry_pending = 0usize;
3096 let mut skipped_connecting = 0usize;
3097 let mut skipped_no_endpoints = 0usize;
3098 let mut skipped_invalid_npub = 0usize;
3099 let mut skipped_cooldown = 0usize;
3100
3101 for (npub, endpoints, created_at_secs) in candidates {
3102 if enqueue_budget == 0 {
3103 break;
3104 }
3105
3106 if let Some(max_age) = max_age_secs
3107 && now_secs.saturating_sub(created_at_secs) > max_age
3108 {
3109 skipped_age = skipped_age.saturating_add(1);
3110 continue;
3111 }
3112
3113 if configured_npubs.contains(&npub) {
3114 if let Ok(identity) = PeerIdentity::from_npub(&npub) {
3134 let configured_addr = *identity.node_addr();
3135 if bootstrap.cooldown_until(&npub, now_ms).is_some() {
3136 skipped_cooldown = skipped_cooldown.saturating_add(1);
3137 skipped_configured = skipped_configured.saturating_add(1);
3138 continue;
3139 }
3140 if let Some(state) = self.retry_pending.get_mut(&configured_addr)
3141 && state.retry_after_ms > now_ms
3142 {
3143 state.retry_after_ms = now_ms;
3144 debug!(
3145 caller = %caller,
3146 peer = %self.peer_display_name(&configured_addr),
3147 advert_age_secs = now_secs.saturating_sub(created_at_secs),
3148 "Expediting configured-peer retry after fresh overlay advert"
3149 );
3150 }
3151 }
3152 skipped_configured = skipped_configured.saturating_add(1);
3153 continue;
3154 }
3155
3156 let peer_identity = match PeerIdentity::from_npub(&npub) {
3157 Ok(identity) => identity,
3158 Err(_) => {
3159 skipped_invalid_npub = skipped_invalid_npub.saturating_add(1);
3160 continue;
3161 }
3162 };
3163 let node_addr = *peer_identity.node_addr();
3164 if node_addr == *self.identity.node_addr() {
3165 skipped_self = skipped_self.saturating_add(1);
3166 continue;
3167 }
3168 if self.peers.contains_key(&node_addr) {
3169 skipped_connected = skipped_connected.saturating_add(1);
3170 continue;
3171 }
3172 if self.retry_pending.contains_key(&node_addr) {
3173 skipped_retry_pending = skipped_retry_pending.saturating_add(1);
3174 continue;
3175 }
3176 if bootstrap.cooldown_until(&npub, now_ms).is_some() {
3177 skipped_cooldown = skipped_cooldown.saturating_add(1);
3178 continue;
3179 }
3180 let connecting = self.connections.values().any(|conn| {
3181 conn.expected_identity()
3182 .map(|id| id.node_addr() == &node_addr)
3183 .unwrap_or(false)
3184 });
3185 if connecting {
3186 skipped_connecting = skipped_connecting.saturating_add(1);
3187 continue;
3188 }
3189
3190 let mut addresses = Vec::new();
3191 let mut priority = 120u8;
3192 let seen_at_ms = Self::now_ms();
3193 for endpoint in endpoints {
3194 let Some(candidate) =
3195 Self::overlay_endpoint_to_peer_address(&endpoint, priority, seen_at_ms)
3196 else {
3197 continue;
3198 };
3199 if addresses.iter().any(|existing: &PeerAddress| {
3200 existing.transport == candidate.transport && existing.addr == candidate.addr
3201 }) {
3202 continue;
3203 }
3204 addresses.push(candidate);
3205 priority = priority.saturating_add(1);
3206 }
3207 if addresses.is_empty() {
3208 skipped_no_endpoints = skipped_no_endpoints.saturating_add(1);
3209 continue;
3210 }
3211
3212 self.peer_aliases
3213 .entry(node_addr)
3214 .or_insert_with(|| peer_identity.short_npub());
3215 self.register_identity(node_addr, peer_identity.pubkey_full());
3216
3217 let mut state = super::retry::RetryState::new(PeerConfig {
3218 npub: npub.clone(),
3219 alias: None,
3220 addresses,
3221 connect_policy: ConnectPolicy::AutoConnect,
3222 auto_reconnect: true,
3223 discovery_fallback_transit: false,
3224 });
3225 state.reconnect = false;
3226 state.retry_after_ms = now_ms;
3227 state.expires_at_ms = Some(self.open_discovery_retry_expires_at_ms(now_ms));
3228 self.retry_pending.insert(node_addr, state);
3229 info!(
3230 caller = %caller,
3231 peer = %peer_identity.short_npub(),
3232 advert_age_secs = now_secs.saturating_sub(created_at_secs),
3233 "open-discovery sweep: queued retry for cached advert"
3234 );
3235 enqueue_budget = enqueue_budget.saturating_sub(1);
3236 enqueued = enqueued.saturating_add(1);
3237 }
3238
3239 let total_skipped = skipped_age
3243 + skipped_configured
3244 + skipped_self
3245 + skipped_connected
3246 + skipped_retry_pending
3247 + skipped_connecting
3248 + skipped_no_endpoints
3249 + skipped_invalid_npub
3250 + skipped_cooldown;
3251 let should_summarize = caller == "startup" || enqueued > 0;
3252 if should_summarize {
3253 info!(
3254 caller = %caller,
3255 cached = cached_count,
3256 queued = enqueued,
3257 skipped_age = skipped_age,
3258 skipped_configured = skipped_configured,
3259 skipped_self = skipped_self,
3260 skipped_connected = skipped_connected,
3261 skipped_retry_pending = skipped_retry_pending,
3262 skipped_connecting = skipped_connecting,
3263 skipped_no_endpoints = skipped_no_endpoints,
3264 skipped_invalid_npub = skipped_invalid_npub,
3265 skipped_cooldown = skipped_cooldown,
3266 skipped_total = total_skipped,
3267 "open-discovery sweep complete"
3268 );
3269 }
3270 }
3271
3272 async fn maybe_run_startup_open_discovery_sweep(
3280 &mut self,
3281 bootstrap: &std::sync::Arc<NostrDiscovery>,
3282 ) {
3283 if self.startup_open_discovery_sweep_done {
3284 return;
3285 }
3286 if !self.config.node.discovery.nostr.enabled
3287 || self.config.node.discovery.nostr.policy != crate::config::NostrDiscoveryPolicy::Open
3288 {
3289 self.startup_open_discovery_sweep_done = true;
3291 return;
3292 }
3293 let Some(started_at_ms) = self.nostr_discovery_started_at_ms else {
3294 return;
3295 };
3296 let now_ms = Self::now_ms();
3297 let delay_ms = self
3298 .config
3299 .node
3300 .discovery
3301 .nostr
3302 .startup_sweep_delay_secs
3303 .saturating_mul(1000);
3304 if now_ms < started_at_ms.saturating_add(delay_ms) {
3305 return;
3306 }
3307
3308 let max_age_secs = self.config.node.discovery.nostr.startup_sweep_max_age_secs;
3309 self.run_open_discovery_sweep(bootstrap, Some(max_age_secs), "startup")
3310 .await;
3311 self.startup_open_discovery_sweep_done = true;
3312 }
3313
3314 fn available_outbound_slots(&self) -> usize {
3315 let connection_used = self
3316 .connections
3317 .len()
3318 .saturating_add(self.pending_connects.len());
3319 let connection_slots = if self.max_connections == 0 {
3320 usize::MAX
3321 } else {
3322 self.max_connections.saturating_sub(connection_used)
3323 };
3324
3325 let peer_slots = if self.max_peers == 0 {
3326 usize::MAX
3327 } else {
3328 self.max_peers.saturating_sub(self.peers.len())
3329 };
3330
3331 let link_slots = if self.max_links == 0 {
3332 usize::MAX
3333 } else {
3334 self.max_links.saturating_sub(self.links.len())
3335 };
3336
3337 connection_slots.min(peer_slots).min(link_slots)
3338 }
3339
3340 pub(in crate::node) fn open_discovery_enqueue_budget(
3341 &self,
3342 configured_npubs: &HashSet<String>,
3343 ) -> usize {
3344 let current_open_discovery_active = self
3345 .peers
3346 .values()
3347 .filter(|peer| !configured_npubs.contains(&peer.npub()))
3348 .count();
3349 let current_open_discovery_pending = self
3350 .retry_pending
3351 .values()
3352 .filter(|state| !configured_npubs.contains(&state.peer_config.npub))
3353 .count();
3354
3355 let cap_remaining = self
3356 .config
3357 .node
3358 .discovery
3359 .nostr
3360 .open_discovery_max_pending
3361 .saturating_sub(current_open_discovery_active)
3362 .saturating_sub(current_open_discovery_pending);
3363
3364 cap_remaining.min(self.available_outbound_slots())
3365 }
3366
3367 fn open_discovery_retry_expires_at_ms(&self, now_ms: u64) -> u64 {
3368 now_ms.saturating_add(
3369 self.config
3370 .node
3371 .discovery
3372 .nostr
3373 .advert_ttl_secs
3374 .saturating_mul(1000)
3375 .saturating_mul(OPEN_DISCOVERY_RETRY_LIFETIME_MULTIPLIER),
3376 )
3377 }
3378
3379 async fn build_overlay_advert(
3380 &self,
3381 bootstrap: &std::sync::Arc<NostrDiscovery>,
3382 ) -> Option<OverlayAdvert> {
3383 if !self.config.node.discovery.nostr.enabled {
3384 return None;
3385 }
3386
3387 let mut endpoints = Vec::new();
3388 let mut has_udp_nat = false;
3389 let mut has_webrtc = false;
3390
3391 for handle in self.transports.values() {
3392 if !handle.is_operational() {
3393 continue;
3394 }
3395
3396 match handle.transport_type().name {
3397 "udp" => {
3398 let Some(cfg) = self.lookup_udp_config(handle.name()) else {
3399 continue;
3400 };
3401 if !cfg.advertise_on_nostr() {
3402 continue;
3403 }
3404 if cfg.is_public() {
3405 if let Some(explicit) = cfg.external_advert_addr() {
3415 endpoints.push(OverlayEndpointAdvert {
3416 transport: OverlayTransportKind::Udp,
3417 addr: explicit.to_string(),
3418 });
3419 } else {
3420 match handle.local_addr() {
3421 Some(addr)
3422 if !addr.ip().is_unspecified()
3423 && !is_unroutable_advert_ip(addr.ip()) =>
3424 {
3425 endpoints.push(OverlayEndpointAdvert {
3426 transport: OverlayTransportKind::Udp,
3427 addr: addr.to_string(),
3428 });
3429 }
3430 Some(addr) => {
3431 let key = handle.transport_id().as_u32();
3432 let port = addr.port();
3433 if let Some(public) =
3434 bootstrap.learn_public_udp_addr(key, port).await
3435 {
3436 endpoints.push(OverlayEndpointAdvert {
3437 transport: OverlayTransportKind::Udp,
3438 addr: public.to_string(),
3439 });
3440 } else {
3441 warn!(
3442 transport_id = key,
3443 bind_addr = %addr,
3444 "advert: udp public=true but bind is wildcard \
3445 or private and STUN observation failed; \
3446 advertising no UDP endpoint. Either set \
3447 transports.udp.external_addr, bind to a \
3448 specific *public* IP, or ensure \
3449 node.discovery.nostr.stun_servers is reachable"
3450 );
3451 }
3452 }
3453 None => {}
3454 }
3455 }
3456 } else {
3457 endpoints.push(OverlayEndpointAdvert {
3458 transport: OverlayTransportKind::Udp,
3459 addr: "nat".to_string(),
3460 });
3461 has_udp_nat = true;
3462 }
3463 }
3464 "webrtc" => {
3465 let Some(cfg) = self.lookup_webrtc_config(handle.name()) else {
3466 continue;
3467 };
3468 if !cfg.advertise_on_nostr() {
3469 continue;
3470 }
3471 endpoints.push(OverlayEndpointAdvert {
3472 transport: OverlayTransportKind::WebRtc,
3473 addr: hex::encode(self.identity.pubkey_full().serialize()),
3474 });
3475 has_webrtc = true;
3476 }
3477 "tcp" => {
3478 let Some(cfg) = self.lookup_tcp_config(handle.name()) else {
3479 continue;
3480 };
3481 if !cfg.advertise_on_nostr() {
3482 continue;
3483 }
3484 if let Some(explicit) = cfg.external_advert_addr() {
3496 endpoints.push(OverlayEndpointAdvert {
3497 transport: OverlayTransportKind::Tcp,
3498 addr: explicit.to_string(),
3499 });
3500 } else {
3501 match handle.local_addr() {
3502 Some(addr)
3503 if !addr.ip().is_unspecified()
3504 && !is_unroutable_advert_ip(addr.ip()) =>
3505 {
3506 endpoints.push(OverlayEndpointAdvert {
3507 transport: OverlayTransportKind::Tcp,
3508 addr: addr.to_string(),
3509 });
3510 }
3511 Some(addr) => {
3512 warn!(
3513 bind_addr = %addr,
3514 "advert: tcp advertise_on_nostr=true bound to wildcard \
3515 or private IP and no transports.tcp.external_addr set; \
3516 advertising no TCP endpoint. Either set external_addr \
3517 to the public IP (recommended for cloud 1:1-NAT setups) \
3518 or bind explicitly to the public IP"
3519 );
3520 }
3521 None => {}
3522 }
3523 }
3524 }
3525 "tor" => {
3526 let Some(cfg) = self.lookup_tor_config(handle.name()) else {
3527 continue;
3528 };
3529 if !cfg.advertise_on_nostr() {
3530 continue;
3531 }
3532 if let Some(addr) = handle.onion_address() {
3533 endpoints.push(OverlayEndpointAdvert {
3534 transport: OverlayTransportKind::Tor,
3535 addr: format!("{}:{}", addr, cfg.advertised_port()),
3536 });
3537 }
3538 }
3539 _ => {}
3540 }
3541 }
3542
3543 if endpoints.is_empty() {
3544 return None;
3545 }
3546
3547 Some(OverlayAdvert {
3548 identifier: ADVERT_IDENTIFIER.to_string(),
3549 version: ADVERT_VERSION,
3550 endpoints,
3551 signal_relays: (has_udp_nat || has_webrtc)
3552 .then(|| self.config.node.discovery.nostr.dm_relays.clone()),
3553 stun_servers: (has_udp_nat || has_webrtc)
3554 .then(|| self.config.node.discovery.nostr.stun_servers.clone()),
3555 })
3556 }
3557
3558 async fn refresh_overlay_advert(
3559 &self,
3560 bootstrap: &std::sync::Arc<NostrDiscovery>,
3561 ) -> Result<(), crate::discovery::nostr::BootstrapError> {
3562 let advert = self.build_overlay_advert(bootstrap).await;
3563 bootstrap.update_local_advert(advert).await
3564 }
3565
3566 fn lookup_udp_config(&self, transport_name: Option<&str>) -> Option<&crate::config::UdpConfig> {
3567 match (&self.config.transports.udp, transport_name) {
3568 (crate::config::TransportInstances::Single(cfg), None) => Some(cfg),
3569 (crate::config::TransportInstances::Named(configs), Some(name)) => configs.get(name),
3570 _ => None,
3571 }
3572 }
3573
3574 fn lookup_tcp_config(&self, transport_name: Option<&str>) -> Option<&crate::config::TcpConfig> {
3575 match (&self.config.transports.tcp, transport_name) {
3576 (crate::config::TransportInstances::Single(cfg), None) => Some(cfg),
3577 (crate::config::TransportInstances::Named(configs), Some(name)) => configs.get(name),
3578 _ => None,
3579 }
3580 }
3581
3582 fn lookup_tor_config(&self, transport_name: Option<&str>) -> Option<&crate::config::TorConfig> {
3583 match (&self.config.transports.tor, transport_name) {
3584 (crate::config::TransportInstances::Single(cfg), None) => Some(cfg),
3585 (crate::config::TransportInstances::Named(configs), Some(name)) => configs.get(name),
3586 _ => None,
3587 }
3588 }
3589
3590 fn lookup_webrtc_config(
3591 &self,
3592 transport_name: Option<&str>,
3593 ) -> Option<&crate::config::WebRtcConfig> {
3594 match (&self.config.transports.webrtc, transport_name) {
3595 (crate::config::TransportInstances::Single(cfg), None) => Some(cfg),
3596 (crate::config::TransportInstances::Named(configs), Some(name)) => configs.get(name),
3597 _ => None,
3598 }
3599 }
3600
3601 pub(in crate::node) async fn try_peer_addresses(
3602 &mut self,
3603 peer_config: &PeerConfig,
3604 peer_identity: PeerIdentity,
3605 allow_bootstrap_nat: bool,
3606 ) -> Result<(), NodeError> {
3607 let peer_node_addr = *peer_identity.node_addr();
3608 if self.peers.contains_key(&peer_node_addr) {
3609 debug!(
3610 npub = %peer_config.npub,
3611 "Peer already exists, skipping address attempts"
3612 );
3613 return Ok(());
3614 }
3615
3616 let candidates = self.peer_address_candidates(peer_config).await;
3617
3618 if candidates.is_empty() {
3619 if allow_bootstrap_nat && self.request_nostr_bootstrap(peer_config).await {
3620 return Ok(());
3621 }
3622 return Err(NodeError::NoTransportForType(format!(
3623 "no addresses known for {}",
3624 peer_config.npub
3625 )));
3626 }
3627
3628 if self
3629 .attempt_peer_address_list(peer_config, peer_identity, allow_bootstrap_nat, &candidates)
3630 .await
3631 .is_ok()
3632 {
3633 if allow_bootstrap_nat {
3634 self.request_nostr_bootstrap(peer_config).await;
3635 }
3636 return Ok(());
3637 }
3638
3639 if allow_bootstrap_nat && self.request_nostr_bootstrap(peer_config).await {
3640 return Ok(());
3641 }
3642
3643 Err(NodeError::NoTransportForType(format!(
3644 "no operational transport for any of {}'s addresses",
3645 peer_config.npub
3646 )))
3647 }
3648
3649 async fn try_active_peer_alternative_addresses(
3650 &mut self,
3651 peer_config: &PeerConfig,
3652 peer_identity: PeerIdentity,
3653 allow_same_path_refresh: bool,
3654 ) -> Result<bool, NodeError> {
3655 let peer_node_addr = *peer_identity.node_addr();
3656 let mut candidates = self.peer_address_candidates(peer_config).await;
3657 let same_path_refresh_needed = allow_same_path_refresh
3658 && (self.active_peer_needs_same_path_refresh(&peer_node_addr)
3659 || self
3660 .peers
3661 .get(&peer_node_addr)
3662 .is_some_and(|peer| !peer.can_send()));
3663 if same_path_refresh_needed
3664 && let Some(candidate) = self.active_peer_current_udp_candidate(&peer_node_addr)
3665 && !candidates.iter().any(|existing| {
3666 existing.transport == candidate.transport && existing.addr == candidate.addr
3667 })
3668 {
3669 candidates.push(candidate);
3670 Self::sort_peer_address_candidates(&mut candidates);
3671 }
3672 let should_try_nostr =
3673 self.active_peer_should_keep_direct_retry(&peer_node_addr, peer_config);
3674
3675 if candidates.is_empty() {
3676 if should_try_nostr && self.request_nostr_bootstrap(peer_config).await {
3677 return Ok(true);
3678 }
3679 return Err(NodeError::NoTransportForType(format!(
3680 "no addresses known for {}",
3681 peer_config.npub
3682 )));
3683 }
3684
3685 let alternatives: Vec<_> = candidates
3686 .into_iter()
3687 .filter(|addr| {
3688 same_path_refresh_needed
3689 || !self.active_peer_matches_candidate(&peer_node_addr, addr)
3690 })
3691 .collect();
3692
3693 if alternatives.is_empty() {
3694 if should_try_nostr && self.request_nostr_bootstrap(peer_config).await {
3695 return Ok(true);
3696 }
3697 return Ok(false);
3698 }
3699
3700 let needs_separate_nostr_attempt = should_try_nostr
3701 && !alternatives
3702 .iter()
3703 .any(|addr| addr.transport == "udp" && addr.addr.eq_ignore_ascii_case("nat"));
3704 let address_result = self
3705 .attempt_peer_address_list(peer_config, peer_identity, true, &alternatives)
3706 .await;
3707 let nostr_attempted =
3708 needs_separate_nostr_attempt && self.request_nostr_bootstrap(peer_config).await;
3709
3710 match address_result {
3711 Ok(()) => Ok(true),
3712 Err(err) if nostr_attempted => {
3713 debug!(
3714 npub = %peer_config.npub,
3715 error = %err,
3716 "Static active-peer direct-path alternatives failed; Nostr traversal still queued"
3717 );
3718 Ok(true)
3719 }
3720 Err(err) => Err(err),
3721 }
3722 }
3723
3724 async fn peer_address_candidates(&self, peer_config: &PeerConfig) -> Vec<PeerAddress> {
3725 let static_addresses = self.static_peer_addresses(peer_config);
3732 let overlay_addresses = self
3733 .nostr_peer_fallback_addresses(peer_config, &static_addresses)
3734 .await;
3735
3736 let mut candidates = Vec::with_capacity(overlay_addresses.len() + static_addresses.len());
3737 for addr in overlay_addresses.into_iter().chain(static_addresses) {
3738 if !candidates.iter().any(|existing: &PeerAddress| {
3739 existing.transport == addr.transport && existing.addr == addr.addr
3740 }) {
3741 candidates.push(addr);
3742 }
3743 }
3744
3745 Self::sort_peer_address_candidates(&mut candidates);
3746
3747 candidates
3748 }
3749
3750 fn sort_peer_address_candidates(candidates: &mut [PeerAddress]) {
3751 candidates.sort_by(|a, b| {
3757 if a.priority != b.priority {
3758 return a.priority.cmp(&b.priority);
3759 }
3760 match (a.seen_at_ms, b.seen_at_ms) {
3761 (Some(a_ts), Some(b_ts)) => b_ts.cmp(&a_ts),
3762 (Some(_), None) => std::cmp::Ordering::Less,
3763 (None, Some(_)) => std::cmp::Ordering::Greater,
3764 (None, None) => std::cmp::Ordering::Equal,
3765 }
3766 });
3767 }
3768
3769 fn active_peer_matches_any_candidate(
3770 &self,
3771 peer_node_addr: &NodeAddr,
3772 candidates: &[PeerAddress],
3773 ) -> bool {
3774 candidates
3775 .iter()
3776 .any(|candidate| self.active_peer_matches_candidate(peer_node_addr, candidate))
3777 }
3778
3779 pub(in crate::node) fn active_peer_candidate_is_fresh_enough_to_skip(
3780 &self,
3781 peer_node_addr: &NodeAddr,
3782 candidates: &[PeerAddress],
3783 ) -> bool {
3784 if !self
3785 .peers
3786 .get(peer_node_addr)
3787 .is_some_and(|peer| peer.can_send())
3788 {
3789 return false;
3790 }
3791 if !self.active_peer_matches_any_candidate(peer_node_addr, candidates) {
3792 return false;
3793 }
3794 !self.active_peer_needs_same_path_refresh(peer_node_addr)
3795 }
3796
3797 pub(in crate::node) fn active_peer_should_keep_direct_retry(
3798 &self,
3799 peer_node_addr: &NodeAddr,
3800 peer_config: &PeerConfig,
3801 ) -> bool {
3802 let Some(peer) = self.peers.get(peer_node_addr) else {
3803 return false;
3804 };
3805
3806 let static_addresses = self.static_peer_addresses(peer_config);
3807 if !static_addresses.is_empty() {
3808 return !self
3809 .active_peer_candidate_is_fresh_enough_to_skip(peer_node_addr, &static_addresses);
3810 }
3811
3812 if peer_config.npub.is_empty() {
3813 return false;
3814 }
3815
3816 if !self.config.node.discovery.nostr.enabled
3817 || self.config.node.discovery.nostr.policy == NostrDiscoveryPolicy::Disabled
3818 {
3819 return false;
3820 }
3821
3822 let Some(transport_id) = peer.transport_id() else {
3823 return true;
3824 };
3825
3826 if self.bootstrap_transports.contains(&transport_id) {
3827 return self.active_peer_needs_same_path_refresh(peer_node_addr);
3828 }
3829
3830 let Some(transport) = self.transports.get(&transport_id) else {
3831 return true;
3832 };
3833
3834 if transport.transport_type().name != "udp" {
3835 return true;
3836 }
3837
3838 self.active_peer_needs_same_path_refresh(peer_node_addr)
3839 }
3840
3841 pub(in crate::node) fn clear_retry_unless_direct_refresh_needed(
3842 &mut self,
3843 peer_node_addr: &NodeAddr,
3844 ) {
3845 let keep_retry = self
3846 .retry_pending
3847 .get(peer_node_addr)
3848 .map(|state| state.peer_config.clone())
3849 .is_some_and(|peer_config| {
3850 self.active_peer_should_keep_direct_retry(peer_node_addr, &peer_config)
3851 });
3852
3853 if !keep_retry {
3854 self.retry_pending.remove(peer_node_addr);
3855 }
3856 }
3857
3858 fn active_peer_needs_same_path_refresh(&self, peer_node_addr: &NodeAddr) -> bool {
3859 let Some(peer) = self.peers.get(peer_node_addr) else {
3860 return false;
3861 };
3862 let stale_after_ms = self
3863 .config
3864 .node
3865 .heartbeat_interval_secs
3866 .saturating_mul(1000)
3867 .max(1000);
3868 peer.idle_time(Self::now_ms()) > stale_after_ms
3869 }
3870
3871 pub(in crate::node) fn active_peer_current_udp_candidate(
3872 &self,
3873 peer_node_addr: &NodeAddr,
3874 ) -> Option<PeerAddress> {
3875 let peer = self.peers.get(peer_node_addr)?;
3876 let current_addr = peer.current_addr()?;
3877 if let Some(transport_id) = peer.transport_id() {
3878 if let Some(transport) = self.transports.get(&transport_id) {
3879 if transport.transport_type().name != "udp" {
3880 return None;
3881 }
3882 } else if !self
3883 .transports
3884 .values()
3885 .any(|transport| transport.transport_type().name == "udp")
3886 {
3887 return None;
3888 }
3889 } else if !self
3890 .transports
3891 .values()
3892 .any(|transport| transport.transport_type().name == "udp")
3893 {
3894 return None;
3895 }
3896 let socket_addr = current_addr.as_str()?.parse::<SocketAddr>().ok()?;
3897
3898 Some(
3899 PeerAddress::with_priority("udp", socket_addr.to_string(), 240)
3900 .with_seen_at_ms(Self::now_ms()),
3901 )
3902 }
3903
3904 pub(in crate::node) fn active_peer_matches_candidate(
3905 &self,
3906 peer_node_addr: &NodeAddr,
3907 candidate: &PeerAddress,
3908 ) -> bool {
3909 let Some(peer) = self.peers.get(peer_node_addr) else {
3910 return false;
3911 };
3912 let Some(current_addr) = peer.current_addr() else {
3913 return false;
3914 };
3915 if let Some(peer_transport_id) = peer.transport_id()
3916 && let Some((candidate_transport_id, candidate_addr)) =
3917 self.resolve_peer_address_for_match(candidate)
3918 {
3919 return peer_transport_id == candidate_transport_id && current_addr == &candidate_addr;
3920 }
3921 if peer
3922 .transport_id()
3923 .map(|id| self.bootstrap_transports.contains(&id))
3924 .unwrap_or(false)
3925 {
3926 return false;
3927 }
3928 let current_addr = current_addr.to_string();
3929 let current_transport = peer
3930 .transport_id()
3931 .and_then(|id| self.transports.get(&id))
3932 .map(|transport| transport.transport_type().name);
3933
3934 candidate.addr == current_addr
3935 && current_transport
3936 .map(|transport| transport == candidate.transport)
3937 .unwrap_or(true)
3938 }
3939
3940 fn configured_path_priority(
3941 &self,
3942 peer_node_addr: &NodeAddr,
3943 transport_id: TransportId,
3944 remote_addr: &TransportAddr,
3945 ) -> Option<u8> {
3946 self.configured_peer(peer_node_addr)?
3947 .addresses
3948 .iter()
3949 .filter_map(|candidate| {
3950 let (candidate_transport_id, candidate_addr) =
3951 self.resolve_peer_address_for_match(candidate)?;
3952 (candidate_transport_id == transport_id && &candidate_addr == remote_addr)
3953 .then_some(candidate.priority)
3954 })
3955 .min()
3956 }
3957
3958 pub(in crate::node) fn outbound_alternate_path_priority_allows_replace(
3959 &self,
3960 peer_node_addr: &NodeAddr,
3961 candidate_transport_id: TransportId,
3962 candidate_addr: &TransportAddr,
3963 ) -> bool {
3964 const UNKNOWN_PATH_PRIORITY: u16 = u8::MAX as u16 + 1;
3965
3966 let Some(peer) = self.peers.get(peer_node_addr) else {
3967 return true;
3968 };
3969 let Some(current_transport_id) = peer.transport_id() else {
3970 return true;
3971 };
3972 let Some(current_addr) = peer.current_addr() else {
3973 return true;
3974 };
3975
3976 let current_priority = self
3977 .configured_path_priority(peer_node_addr, current_transport_id, current_addr)
3978 .map(u16::from)
3979 .unwrap_or(UNKNOWN_PATH_PRIORITY);
3980 let candidate_priority = self
3981 .configured_path_priority(peer_node_addr, candidate_transport_id, candidate_addr)
3982 .map(u16::from)
3983 .unwrap_or(UNKNOWN_PATH_PRIORITY);
3984
3985 if candidate_priority <= current_priority {
3986 return true;
3987 }
3988
3989 debug!(
3990 peer = %self.peer_display_name(peer_node_addr),
3991 current_transport_id = %current_transport_id,
3992 current_addr = %current_addr,
3993 current_priority,
3994 candidate_transport_id = %candidate_transport_id,
3995 candidate_addr = %candidate_addr,
3996 candidate_priority,
3997 "Suppressing lower-priority outbound alternate path while current path remains healthy"
3998 );
3999 false
4000 }
4001
4002 pub(in crate::node) fn active_peer_uses_recent_endpoint_path(
4003 &self,
4004 peer_node_addr: &NodeAddr,
4005 peer_config: &PeerConfig,
4006 ) -> bool {
4007 peer_config.addresses.iter().any(|addr| {
4008 addr.seen_at_ms.is_some() && self.active_peer_matches_candidate(peer_node_addr, addr)
4009 })
4010 }
4011
4012 pub(in crate::node) fn active_peer_uses_traversal_path(
4013 &self,
4014 peer_node_addr: &NodeAddr,
4015 peer_config: &PeerConfig,
4016 ) -> bool {
4017 let via_bootstrap_transport = self
4018 .peers
4019 .get(peer_node_addr)
4020 .and_then(|peer| peer.transport_id())
4021 .map(|id| self.bootstrap_transports.contains(&id))
4022 .unwrap_or(false);
4023
4024 via_bootstrap_transport
4025 || self.active_peer_uses_recent_endpoint_path(peer_node_addr, peer_config)
4026 }
4027
4028 pub(crate) async fn api_connect(
4036 &mut self,
4037 npub: &str,
4038 address: &str,
4039 transport: &str,
4040 ) -> Result<serde_json::Value, String> {
4041 let peer_config = PeerConfig {
4042 npub: npub.to_string(),
4043 alias: None,
4044 addresses: vec![PeerAddress::new(transport, address)],
4045 connect_policy: ConnectPolicy::Manual,
4046 auto_reconnect: false,
4047 discovery_fallback_transit: true,
4048 };
4049
4050 if let Ok(identity) = PeerIdentity::from_npub(npub) {
4052 self.peer_aliases
4053 .insert(*identity.node_addr(), identity.short_npub());
4054 self.register_identity(*identity.node_addr(), identity.pubkey_full());
4055 }
4056
4057 self.initiate_peer_connection(&peer_config)
4058 .await
4059 .map(|()| {
4060 info!(
4061 npub = %npub,
4062 address = %address,
4063 transport = %transport,
4064 "API connect initiated"
4065 );
4066 serde_json::json!({
4067 "npub": npub,
4068 "address": address,
4069 "transport": transport,
4070 })
4071 })
4072 .map_err(|e| e.to_string())
4073 }
4074
4075 pub(crate) fn api_disconnect(&mut self, npub: &str) -> Result<serde_json::Value, String> {
4079 let peer_identity =
4080 PeerIdentity::from_npub(npub).map_err(|e| format!("invalid npub '{npub}': {e}"))?;
4081 let node_addr = *peer_identity.node_addr();
4082
4083 if !self.peers.contains_key(&node_addr) {
4084 return Err(format!("peer not found: {npub}"));
4085 }
4086
4087 self.remove_active_peer(&node_addr);
4089
4090 self.retry_pending.remove(&node_addr);
4092
4093 info!(npub = %npub, "API disconnect completed");
4094
4095 Ok(serde_json::json!({
4096 "npub": npub,
4097 "disconnected": true,
4098 }))
4099 }
4100
4101 pub async fn adopt_established_traversal(
4108 &mut self,
4109 traversal: EstablishedTraversal,
4110 ) -> Result<BootstrapHandoffResult, NodeError> {
4111 debug!(
4112 peer_npub = %traversal.peer_npub,
4113 session_id = %traversal.session_id,
4114 remote_addr = %traversal.remote_addr,
4115 "adopting established traversal socket"
4116 );
4117
4118 if !self.state.is_operational() {
4119 return Err(NodeError::NotStarted);
4120 }
4121
4122 let packet_tx = self.packet_tx.clone().ok_or(NodeError::NotStarted)?;
4123 let peer_identity = PeerIdentity::from_npub(&traversal.peer_npub).map_err(|e| {
4124 NodeError::InvalidPeerNpub {
4125 npub: traversal.peer_npub.clone(),
4126 reason: e.to_string(),
4127 }
4128 })?;
4129 let peer_node_addr = *peer_identity.node_addr();
4130 if self.peers.contains_key(&peer_node_addr) {
4131 debug!(
4132 peer_npub = %traversal.peer_npub,
4133 "Adopting NAT traversal handoff as alternate path for already-connected peer"
4134 );
4135 }
4136
4137 self.peer_aliases
4138 .insert(peer_node_addr, peer_identity.short_npub());
4139 self.register_identity(peer_node_addr, peer_identity.pubkey_full());
4140
4141 let transport_id = self.allocate_transport_id();
4142 let inherited_config = traversal.transport_config.clone().unwrap_or_else(|| {
4162 let mut cfg = self
4163 .lookup_udp_config(traversal.transport_name.as_deref())
4164 .or_else(|| self.lookup_udp_config(None))
4165 .cloned()
4166 .unwrap_or_default();
4167 cfg.bind_addr = None;
4168 cfg.external_addr = None;
4169 cfg
4170 });
4171 let mut transport = crate::transport::udp::UdpTransport::new(
4172 transport_id,
4173 traversal.transport_name.clone(),
4174 inherited_config,
4175 packet_tx,
4176 );
4177
4178 transport
4179 .adopt_socket_async(traversal.socket)
4180 .await
4181 .map_err(|e| NodeError::BootstrapHandoff(e.to_string()))?;
4182
4183 let local_addr = transport.local_addr().ok_or_else(|| {
4184 NodeError::BootstrapHandoff("adopted UDP transport has no local address".into())
4185 })?;
4186
4187 self.transports.insert(
4188 transport_id,
4189 crate::transport::TransportHandle::Udp(transport),
4190 );
4191 self.bootstrap_transports.insert(transport_id);
4192 self.bootstrap_transport_npubs
4193 .insert(transport_id, traversal.peer_npub.clone());
4194
4195 let remote_addr = TransportAddr::from_string(&traversal.remote_addr.to_string());
4196 if let Err(err) = self
4197 .initiate_connection(transport_id, remote_addr.clone(), peer_identity)
4198 .await
4199 {
4200 self.bootstrap_transports.remove(&transport_id);
4201 self.bootstrap_transport_npubs.remove(&transport_id);
4202 if let Some(mut handle) = self.transports.remove(&transport_id) {
4203 let _ = handle.stop().await;
4204 }
4205 return Err(err);
4206 }
4207
4208 info!(
4209 peer = %self.peer_display_name(&peer_node_addr),
4210 transport_id = %transport_id,
4211 local_addr = %local_addr,
4212 remote_addr = %traversal.remote_addr,
4213 session_id = %traversal.session_id,
4214 "adopted NAT traversal socket; handshake initiated"
4215 );
4216
4217 Ok(BootstrapHandoffResult {
4218 transport_id,
4219 local_addr,
4220 remote_addr: traversal.remote_addr,
4221 peer_node_addr,
4222 session_id: traversal.session_id,
4223 })
4224 }
4225}