1use super::{Node, NodeError, NodeState};
4use crate::config::{ConnectPolicy, NostrDiscoveryPolicy, PeerAddress, PeerConfig};
5use crate::discovery::nostr::{
6 ADVERT_IDENTIFIER, ADVERT_VERSION, BootstrapEvent, MeshTraversalSignal, NostrDiscovery,
7 OverlayAdvert, OverlayEndpointAdvert, OverlayTransportKind,
8};
9use crate::discovery::{BootstrapHandoffResult, EstablishedTraversal};
10use crate::node::acl::PeerAclContext;
11use crate::node::wire::build_msg1;
12use crate::peer::PeerConnection;
13use crate::protocol::{Disconnect, DisconnectReason, SessionMessageType};
14use crate::transport::{Link, LinkDirection, LinkId, TransportAddr, TransportId, packet_channel};
15use crate::upper::tun::{TunDevice, TunState, run_tun_reader, shutdown_tun_interface};
16use crate::{NodeAddr, PeerIdentity};
17use secp256k1::PublicKey;
18use std::collections::{HashMap, HashSet};
19use std::net::{IpAddr, SocketAddr};
20use std::thread;
21use std::time::Duration;
22use tracing::{debug, info, warn};
23
24#[derive(Debug, Clone, Copy, PartialEq, Eq)]
25enum MeshSignalSessionAction {
26 Send,
27 Defer,
28 Drop,
29}
30
31#[cfg(debug_assertions)]
32fn node_start_debug_log(message: impl AsRef<str>) {
33 use std::io::Write as _;
34
35 if let Ok(mut file) = std::fs::OpenOptions::new()
36 .create(true)
37 .append(true)
38 .open(std::env::temp_dir().join("nvpn-fips-endpoint-debug.log"))
39 {
40 let _ = writeln!(
41 file,
42 "{:?} {}",
43 std::time::SystemTime::now(),
44 message.as_ref()
45 );
46 }
47}
48
49#[cfg(not(debug_assertions))]
50fn node_start_debug_log(_message: impl AsRef<str>) {}
51
52fn is_unroutable_advert_ip(ip: IpAddr) -> bool {
60 match ip {
61 IpAddr::V4(v4) => {
62 v4.is_private()
63 || v4.is_loopback()
64 || v4.is_link_local()
65 || v4.is_unspecified()
66 || v4.is_multicast()
67 || v4.is_broadcast()
68 || v4.is_documentation()
69 || (v4.octets()[0] == 100 && (v4.octets()[1] & 0xc0) == 64)
72 }
73 IpAddr::V6(v6) => {
74 v6.is_loopback()
75 || v6.is_unspecified()
76 || v6.is_unique_local()
77 || v6.is_multicast()
78 || (v6.segments()[0] & 0xffc0) == 0xfe80
80 }
81 }
82}
83
84fn socket_addr_families_compatible(local: SocketAddr, remote: SocketAddr) -> bool {
85 matches!(
86 (local, remote),
87 (SocketAddr::V4(_), SocketAddr::V4(_)) | (SocketAddr::V6(_), SocketAddr::V6(_))
88 )
89}
90
91const OPEN_DISCOVERY_RETRY_LIFETIME_MULTIPLIER: u64 = 2;
92const MAX_PARALLEL_PATH_CANDIDATES_PER_PEER: usize = 4;
93const MAX_AUTO_CONNECT_GRAPH_WARMUPS_PER_TICK: usize = 16;
94const MAX_DISCOVERY_CONNECTS_PER_TICK: usize = 16;
95
96impl Node {
97 pub(super) async fn update_peers(
109 &mut self,
110 new_peers: Vec<crate::config::PeerConfig>,
111 ) -> Result<crate::node::UpdatePeersOutcome, crate::node::NodeError> {
112 use std::collections::{HashMap, HashSet};
113
114 let mut new_by_addr: HashMap<crate::identity::NodeAddr, crate::config::PeerConfig> =
115 HashMap::with_capacity(new_peers.len());
116 let mut new_order = Vec::with_capacity(new_peers.len());
117 for peer in new_peers {
118 let identity = match PeerIdentity::from_npub(&peer.npub) {
119 Ok(id) => id,
120 Err(e) => {
121 return Err(crate::node::NodeError::InvalidPeerNpub {
122 npub: peer.npub.clone(),
123 reason: e.to_string(),
124 });
125 }
126 };
127 let node_addr = *identity.node_addr();
131 if !new_by_addr.contains_key(&node_addr) {
132 new_order.push(node_addr);
133 }
134 new_by_addr.insert(node_addr, peer);
135 }
136
137 let current_by_addr: HashMap<crate::identity::NodeAddr, crate::config::PeerConfig> = self
138 .config
139 .peers()
140 .iter()
141 .filter_map(|pc| {
142 PeerIdentity::from_npub(&pc.npub)
143 .ok()
144 .map(|id| (*id.node_addr(), pc.clone()))
145 })
146 .collect();
147
148 let new_addrs: HashSet<_> = new_by_addr.keys().copied().collect();
149 let current_addrs: HashSet<_> = current_by_addr.keys().copied().collect();
150
151 let removed: Vec<_> = current_addrs.difference(&new_addrs).copied().collect();
152 let added: Vec<_> = new_addrs.difference(¤t_addrs).copied().collect();
153 let kept: Vec<_> = new_addrs.intersection(¤t_addrs).copied().collect();
154
155 let mut outcome = crate::node::UpdatePeersOutcome::default();
156
157 for node_addr in &removed {
158 if self.retry_pending.remove(node_addr).is_some() {
159 debug!(
160 peer = %self.peer_display_name(node_addr),
161 "Dropping retry entry for peer removed from runtime peer list"
162 );
163 }
164 self.peer_aliases.remove(node_addr);
165 self.set_discovery_fallback_transit_allowed(*node_addr, false);
166 outcome.removed += 1;
167 }
168
169 let mut auto_connect_refresh_configs = Vec::new();
170 for node_addr in &kept {
171 let new_pc = &new_by_addr[node_addr];
172 let current_pc = ¤t_by_addr[node_addr];
173 if new_pc.addresses != current_pc.addresses
174 || new_pc.alias != current_pc.alias
175 || new_pc.connect_policy != current_pc.connect_policy
176 || new_pc.auto_reconnect != current_pc.auto_reconnect
177 || new_pc.discovery_fallback_transit != current_pc.discovery_fallback_transit
178 {
179 outcome.updated += 1;
180 self.set_discovery_fallback_transit_allowed(
181 *node_addr,
182 new_pc.discovery_fallback_transit,
183 );
184 if let Some(state) = self.retry_pending.get_mut(node_addr) {
185 state.peer_config = new_pc.clone();
186 state.retry_after_ms = Self::now_ms();
187 }
188 if let Some(alias) = new_pc.alias.clone() {
189 self.peer_aliases.insert(*node_addr, alias);
190 }
191 if new_pc.is_auto_connect() && !new_pc.addresses.is_empty() {
192 auto_connect_refresh_configs.push(new_pc.clone());
193 }
194 } else {
195 outcome.unchanged += 1;
196 self.set_discovery_fallback_transit_allowed(
197 *node_addr,
198 new_pc.discovery_fallback_transit,
199 );
200 if new_pc.is_auto_connect() && !new_pc.addresses.is_empty() {
201 auto_connect_refresh_configs.push(new_pc.clone());
202 }
203 }
204 }
205
206 let added_configs: Vec<crate::config::PeerConfig> = new_order
207 .iter()
208 .filter(|addr| added.contains(addr))
209 .map(|addr| new_by_addr[addr].clone())
210 .collect();
211
212 self.config.peers = new_order
216 .iter()
217 .filter_map(|addr| new_by_addr.get(addr).cloned())
218 .collect();
219 self.configured_peer_send_weights = Self::configured_peer_send_weights(&self.config);
220
221 for peer_config in added_configs {
222 outcome.added += 1;
223 let Ok(identity) = PeerIdentity::from_npub(&peer_config.npub) else {
224 continue;
225 };
226 let name = peer_config
227 .alias
228 .clone()
229 .unwrap_or_else(|| identity.short_npub());
230 self.peer_aliases.insert(*identity.node_addr(), name);
231 self.set_discovery_fallback_transit_allowed(
232 *identity.node_addr(),
233 peer_config.discovery_fallback_transit,
234 );
235 self.register_identity(*identity.node_addr(), identity.pubkey_full());
236
237 if !peer_config.is_auto_connect() {
238 continue;
239 }
240
241 match self
242 .try_auto_connect_graph_session(&peer_config, identity)
243 .await
244 {
245 Ok(true) => continue,
246 Ok(false) => {}
247 Err(err) => {
248 debug!(
249 npub = %peer_config.npub,
250 error = %err,
251 "Existing FIPS graph did not warm newly added peer"
252 );
253 }
254 }
255
256 if let Err(e) = self.initiate_peer_connection(&peer_config).await {
257 warn!(
258 npub = %peer_config.npub,
259 error = %e,
260 "Failed to initiate connection for newly added peer"
261 );
262 if let Ok(peer_identity) = PeerIdentity::from_npub(&peer_config.npub) {
263 self.schedule_retry_after_error(*peer_identity.node_addr(), Self::now_ms(), &e);
264 }
265 if matches!(e, crate::node::NodeError::NoTransportForType(_))
266 && let Some(bootstrap) = self.nostr_discovery.clone()
267 {
268 bootstrap
269 .request_advert_stale_check(peer_config.npub.clone())
270 .await;
271 }
272 }
273 }
274
275 for peer_config in auto_connect_refresh_configs {
276 let Ok(peer_identity) = PeerIdentity::from_npub(&peer_config.npub) else {
277 continue;
278 };
279 let node_addr = *peer_identity.node_addr();
280
281 if self.peers.contains_key(&node_addr) {
282 match self
283 .initiate_active_peer_alternative_connection(&peer_config)
284 .await
285 {
286 Ok(attempted) => {
287 if attempted {
288 debug!(
289 peer = %self.peer_display_name(&node_addr),
290 "Started non-disruptive alternate-path handshake for active peer"
291 );
292 }
293 }
294 Err(e) => {
295 debug!(
296 npub = %peer_config.npub,
297 error = %e,
298 "Active peer alternate-path refresh did not start"
299 );
300 }
301 }
302 continue;
303 }
304
305 match self
306 .try_auto_connect_graph_session(&peer_config, peer_identity)
307 .await
308 {
309 Ok(true) => continue,
310 Ok(false) => {}
311 Err(err) => {
312 debug!(
313 npub = %peer_config.npub,
314 error = %err,
315 "Existing FIPS graph did not warm refreshed peer"
316 );
317 }
318 }
319
320 match self.initiate_peer_connection(&peer_config).await {
321 Ok(()) => {
322 let hs_timeout_ms = self.config.node.rate_limit.handshake_timeout_secs * 1000;
323 if let Some(state) = self.retry_pending.get_mut(&node_addr) {
324 state.peer_config = peer_config;
325 state.retry_after_ms = Self::now_ms().saturating_add(hs_timeout_ms);
326 }
327 }
328 Err(e) => {
329 debug!(
330 npub = %peer_config.npub,
331 error = %e,
332 "Refreshed peer addresses did not initiate a direct connection"
333 );
334 self.schedule_retry_after_error(node_addr, Self::now_ms(), &e);
335 }
336 }
337 }
338
339 self.warm_auto_connect_graph_sessions().await;
340
341 Ok(outcome)
342 }
343
344 pub(super) async fn initiate_peer_connections(&mut self) {
345 let peer_identities: Vec<(PeerIdentity, Option<String>)> = self
351 .config
352 .peers()
353 .iter()
354 .filter_map(|pc| {
355 PeerIdentity::from_npub(&pc.npub)
356 .ok()
357 .map(|id| (id, pc.alias.clone()))
358 })
359 .collect();
360
361 for (identity, alias) in peer_identities {
362 let name = alias.unwrap_or_else(|| identity.short_npub());
363 self.peer_aliases.insert(*identity.node_addr(), name);
364 self.register_identity(*identity.node_addr(), identity.pubkey_full());
368 }
369
370 let peer_configs: Vec<_> = self.config.auto_connect_peers().cloned().collect();
372
373 if peer_configs.is_empty() {
374 debug!("No static peers configured");
375 return;
376 }
377
378 debug!(
379 count = peer_configs.len(),
380 "Initiating static peer connections"
381 );
382
383 for peer_config in peer_configs {
384 let peer_identity = match PeerIdentity::from_npub(&peer_config.npub) {
385 Ok(identity) => identity,
386 Err(_) => continue,
387 };
388 match self
389 .try_auto_connect_graph_session(&peer_config, peer_identity)
390 .await
391 {
392 Ok(true) => continue,
393 Ok(false) => {}
394 Err(err) => {
395 debug!(
396 npub = %peer_config.npub,
397 error = %err,
398 "Existing FIPS graph did not warm auto-connect peer"
399 );
400 }
401 }
402 if let Err(e) = self.initiate_peer_connection(&peer_config).await {
403 warn!(
404 npub = %peer_config.npub,
405 alias = ?peer_config.alias,
406 error = %e,
407 "Failed to initiate peer connection"
408 );
409 if let Ok(peer_identity) = PeerIdentity::from_npub(&peer_config.npub) {
413 self.schedule_retry_after_error(*peer_identity.node_addr(), Self::now_ms(), &e);
414 }
415 if matches!(e, crate::node::NodeError::NoTransportForType(_))
421 && let Some(bootstrap) = self.nostr_discovery.clone()
422 {
423 bootstrap
424 .request_advert_stale_check(peer_config.npub.clone())
425 .await;
426 }
427 }
428 }
429
430 self.warm_auto_connect_graph_sessions().await;
431 }
432
433 pub(in crate::node) async fn try_auto_connect_graph_session(
434 &mut self,
435 peer_config: &PeerConfig,
436 peer_identity: PeerIdentity,
437 ) -> Result<bool, NodeError> {
438 if !peer_config.is_auto_connect() {
439 return Ok(false);
440 }
441
442 let peer_node_addr = *peer_identity.node_addr();
443 if self
444 .peers
445 .get(&peer_node_addr)
446 .is_some_and(|peer| peer.can_send())
447 {
448 return Ok(false);
449 }
450 if self.auto_connect_should_race_direct_path(peer_config) {
451 return Ok(false);
452 }
453 if self
454 .sessions
455 .get(&peer_node_addr)
456 .is_some_and(|entry| entry.is_established() || entry.is_initiating())
457 {
458 return Ok(true);
459 }
460 if self.find_next_hop(&peer_node_addr).is_none() {
461 return Ok(false);
462 }
463
464 self.register_identity(peer_node_addr, peer_identity.pubkey_full());
465 match self
466 .initiate_session(peer_node_addr, peer_identity.pubkey_full())
467 .await
468 {
469 Ok(()) => {
470 debug!(
471 peer = %self.peer_display_name(&peer_node_addr),
472 "Warmed auto-connect peer session over existing FIPS graph"
473 );
474 Ok(true)
475 }
476 Err(NodeError::SendFailed { node_addr, reason })
477 if node_addr == peer_node_addr && reason == "no route to destination" =>
478 {
479 self.maybe_initiate_lookup(&peer_node_addr).await;
480 Ok(false)
481 }
482 Err(err) => Err(err),
483 }
484 }
485
486 fn auto_connect_should_race_direct_path(&self, peer_config: &PeerConfig) -> bool {
487 !peer_config.addresses.is_empty() || self.config.node.discovery.nostr.enabled
488 }
489
490 pub(super) async fn initiate_peer_connection(
494 &mut self,
495 peer_config: &crate::config::PeerConfig,
496 ) -> Result<(), NodeError> {
497 self.initiate_peer_connection_inner(peer_config).await
498 }
499
500 pub(super) async fn initiate_peer_retry_connection(
506 &mut self,
507 peer_config: &crate::config::PeerConfig,
508 ) -> Result<(), NodeError> {
509 self.initiate_peer_connection_inner(peer_config).await
510 }
511
512 pub(in crate::node) async fn initiate_active_peer_alternative_connection(
513 &mut self,
514 peer_config: &crate::config::PeerConfig,
515 ) -> Result<bool, NodeError> {
516 self.initiate_active_peer_alternative_connection_inner(peer_config, false)
517 .await
518 }
519
520 pub(in crate::node) async fn initiate_active_peer_direct_refresh_connection(
521 &mut self,
522 peer_config: &crate::config::PeerConfig,
523 ) -> Result<bool, NodeError> {
524 self.initiate_active_peer_alternative_connection_inner(peer_config, true)
525 .await
526 }
527
528 async fn initiate_active_peer_alternative_connection_inner(
529 &mut self,
530 peer_config: &crate::config::PeerConfig,
531 allow_same_path_refresh: bool,
532 ) -> Result<bool, NodeError> {
533 let peer_identity =
534 PeerIdentity::from_npub(&peer_config.npub).map_err(|e| NodeError::InvalidPeerNpub {
535 npub: peer_config.npub.clone(),
536 reason: e.to_string(),
537 })?;
538 let peer_node_addr = *peer_identity.node_addr();
539
540 if !self.peers.contains_key(&peer_node_addr) {
541 self.initiate_peer_connection(peer_config).await?;
542 return Ok(true);
543 }
544
545 self.try_active_peer_alternative_addresses(
551 peer_config,
552 peer_identity,
553 allow_same_path_refresh,
554 )
555 .await
556 }
557
558 async fn initiate_peer_connection_inner(
559 &mut self,
560 peer_config: &crate::config::PeerConfig,
561 ) -> Result<(), NodeError> {
562 let peer_identity =
564 PeerIdentity::from_npub(&peer_config.npub).map_err(|e| NodeError::InvalidPeerNpub {
565 npub: peer_config.npub.clone(),
566 reason: e.to_string(),
567 })?;
568
569 let peer_node_addr = *peer_identity.node_addr();
570
571 if self.peers.contains_key(&peer_node_addr) {
573 debug!(
574 npub = %peer_config.npub,
575 "Peer already exists, skipping"
576 );
577 return Ok(());
578 }
579
580 self.try_peer_addresses(peer_config, peer_identity, true)
581 .await
582 }
583
584 fn is_connecting_to_peer(&self, peer_node_addr: &NodeAddr) -> bool {
585 self.connections.values().any(|conn| {
586 conn.expected_identity()
587 .map(|id| id.node_addr() == peer_node_addr)
588 .unwrap_or(false)
589 })
590 }
591
592 fn is_connecting_to_peer_on_path(
593 &self,
594 peer_node_addr: &NodeAddr,
595 transport_id: TransportId,
596 remote_addr: &TransportAddr,
597 ) -> bool {
598 self.connections.values().any(|conn| {
599 conn.expected_identity()
600 .map(|id| id.node_addr() == peer_node_addr)
601 .unwrap_or(false)
602 && conn.transport_id() == Some(transport_id)
603 && conn.source_addr() == Some(remote_addr)
604 }) || self.pending_connects.iter().any(|pending| {
605 pending.peer_identity.node_addr() == peer_node_addr
606 && pending.transport_id == transport_id
607 && &pending.remote_addr == remote_addr
608 })
609 }
610
611 pub(in crate::node) fn should_warm_auto_connect_session(
612 &self,
613 peer_node_addr: &NodeAddr,
614 ) -> bool {
615 if self
616 .peers
617 .get(peer_node_addr)
618 .is_some_and(|peer| peer.can_send())
619 || self
620 .sessions
621 .get(peer_node_addr)
622 .is_some_and(|entry| entry.is_established())
623 {
624 return false;
625 }
626
627 self.config.peers().iter().any(|peer| {
628 peer.is_auto_connect()
629 && PeerIdentity::from_npub(&peer.npub)
630 .map(|identity| identity.node_addr() == peer_node_addr)
631 .unwrap_or(false)
632 })
633 }
634
635 pub(in crate::node) async fn warm_auto_connect_graph_sessions(&mut self) -> usize {
636 if !self.peers.values().any(|peer| peer.can_send()) {
637 return 0;
638 }
639
640 let mut budget = self.graph_session_warmup_budget();
641 if budget == 0 {
642 return 0;
643 }
644
645 let peer_identities: Vec<_> = self
646 .config
647 .auto_connect_peers()
648 .filter_map(|peer| PeerIdentity::from_npub(&peer.npub).ok())
649 .collect();
650
651 let mut warmed = 0;
652 for identity in peer_identities {
653 if budget == 0 {
654 break;
655 }
656
657 let peer_node_addr = *identity.node_addr();
658 if peer_node_addr == *self.identity.node_addr()
659 || !self.should_warm_auto_connect_session(&peer_node_addr)
660 || self
661 .sessions
662 .get(&peer_node_addr)
663 .is_some_and(|entry| entry.is_initiating())
664 {
665 continue;
666 }
667
668 self.register_identity(peer_node_addr, identity.pubkey_full());
669
670 if self.find_next_hop(&peer_node_addr).is_some() {
671 match self
672 .initiate_session(peer_node_addr, identity.pubkey_full())
673 .await
674 {
675 Ok(()) => {
676 warmed += 1;
677 budget = budget.saturating_sub(1);
678 debug!(
679 peer = %self.peer_display_name(&peer_node_addr),
680 "Warmed auto-connect peer session over existing FIPS graph"
681 );
682 }
683 Err(NodeError::SendFailed { node_addr, reason })
684 if node_addr == peer_node_addr && reason == "no route to destination" =>
685 {
686 self.maybe_initiate_lookup(&peer_node_addr).await;
687 warmed += 1;
688 budget = budget.saturating_sub(1);
689 }
690 Err(err) => {
691 debug!(
692 peer = %self.peer_display_name(&peer_node_addr),
693 error = %err,
694 "Failed to warm auto-connect peer session"
695 );
696 }
697 }
698 } else {
699 self.maybe_initiate_lookup(&peer_node_addr).await;
700 warmed += 1;
701 budget = budget.saturating_sub(1);
702 }
703 }
704
705 warmed
706 }
707
708 pub(in crate::node) fn graph_session_warmup_budget(&self) -> usize {
709 let max_destinations = self.config.node.session.pending_max_destinations;
710 if max_destinations == 0 {
711 return 0;
712 }
713
714 let pending_sessions = self
715 .sessions
716 .values()
717 .filter(|entry| !entry.is_established())
718 .count();
719 let pending_total = pending_sessions.saturating_add(self.pending_lookups.len());
720 max_destinations
721 .saturating_sub(pending_total)
722 .min(MAX_AUTO_CONNECT_GRAPH_WARMUPS_PER_TICK)
723 }
724
725 fn outbound_handshake_slots(&self) -> usize {
726 let used = self
727 .connections
728 .len()
729 .saturating_add(self.pending_connects.len());
730 if self.max_connections == 0 {
731 usize::MAX
732 } else {
733 self.max_connections.saturating_sub(used)
734 }
735 }
736
737 fn outbound_link_slots(&self) -> usize {
738 if self.max_links == 0 {
739 usize::MAX
740 } else {
741 self.max_links.saturating_sub(self.links.len())
742 }
743 }
744
745 fn path_candidate_attempt_budget(&self, peer_node_addr: &NodeAddr) -> usize {
746 if !self.peers.contains_key(peer_node_addr)
747 && self.max_peers > 0
748 && self.peers.len() >= self.max_peers
749 {
750 return 0;
751 }
752
753 let in_flight_for_peer = self
754 .connections
755 .values()
756 .filter(|conn| {
757 conn.expected_identity()
758 .map(|id| id.node_addr() == peer_node_addr)
759 .unwrap_or(false)
760 })
761 .count()
762 .saturating_add(
763 self.pending_connects
764 .iter()
765 .filter(|pending| pending.peer_identity.node_addr() == peer_node_addr)
766 .count(),
767 );
768
769 self.outbound_handshake_slots()
770 .min(self.outbound_link_slots())
771 .min(MAX_PARALLEL_PATH_CANDIDATES_PER_PEER.saturating_sub(in_flight_for_peer))
772 }
773
774 fn discovery_connect_budget(&self) -> usize {
775 self.outbound_handshake_slots()
776 .min(self.outbound_link_slots())
777 .min(MAX_DISCOVERY_CONNECTS_PER_TICK)
778 }
779
780 fn find_udp_transport_for_remote_addr(
787 &self,
788 remote_addr: SocketAddr,
789 ) -> Option<(TransportId, SocketAddr)> {
790 self.transports
791 .iter()
792 .filter(|(id, handle)| {
793 handle.transport_type().name == "udp"
794 && handle.is_operational()
795 && !self.bootstrap_transports.contains(id)
796 })
797 .filter_map(|(id, handle)| {
798 let local_addr = handle.local_addr()?;
799 socket_addr_families_compatible(local_addr, remote_addr)
800 .then_some((*id, local_addr))
801 })
802 .min_by_key(|(id, _)| id.as_u32())
803 }
804
805 pub(super) fn transport_discovery_candidate(
806 &self,
807 discovered_transport_id: TransportId,
808 discovered_addr: TransportAddr,
809 ) -> Option<(TransportId, TransportAddr, &'static str)> {
810 let transport = self.transports.get(&discovered_transport_id)?;
811 let transport_name = transport.transport_type().name;
812
813 if transport_name != "udp" {
814 return Some((discovered_transport_id, discovered_addr, transport_name));
815 }
816
817 let Some(remote_socket_addr) = discovered_addr
818 .as_str()
819 .and_then(|addr| addr.parse::<SocketAddr>().ok())
820 else {
821 if self.bootstrap_transports.contains(&discovered_transport_id) {
822 debug!(
823 transport_id = %discovered_transport_id,
824 remote_addr = %discovered_addr,
825 "transport discovery: skip non-numeric UDP address from bootstrap transport"
826 );
827 return None;
828 }
829 return Some((discovered_transport_id, discovered_addr, transport_name));
830 };
831
832 let Some((transport_id, local_addr)) =
833 self.find_udp_transport_for_remote_addr(remote_socket_addr)
834 else {
835 debug!(
836 transport_id = %discovered_transport_id,
837 remote_addr = %discovered_addr,
838 "transport discovery: skip UDP peer with no compatible local socket"
839 );
840 return None;
841 };
842
843 if transport_id != discovered_transport_id {
844 debug!(
845 discovered_transport_id = %discovered_transport_id,
846 selected_transport_id = %transport_id,
847 local_addr = %local_addr,
848 remote_addr = %remote_socket_addr,
849 "transport discovery: selected compatible UDP transport"
850 );
851 }
852
853 Some((
854 transport_id,
855 TransportAddr::from_socket_addr(remote_socket_addr),
856 transport_name,
857 ))
858 }
859
860 fn peer_address_string_for_transport_candidate(
861 &self,
862 transport_id: TransportId,
863 transport_name: &str,
864 remote_addr: &TransportAddr,
865 ) -> String {
866 #[cfg(not(any(target_os = "linux", target_os = "macos")))]
867 let _ = (transport_id, transport_name);
868
869 #[cfg(any(target_os = "linux", target_os = "macos"))]
870 if transport_name == "ethernet"
871 && remote_addr.as_bytes().len() == 6
872 && let Some(interface) = self
873 .transports
874 .get(&transport_id)
875 .and_then(|transport| transport.interface_name())
876 {
877 let mut mac = [0u8; 6];
878 mac.copy_from_slice(remote_addr.as_bytes());
879 return format!(
880 "{interface}/{}",
881 crate::transport::ethernet::format_mac(&mac)
882 );
883 }
884
885 remote_addr.to_string()
886 }
887
888 fn resolve_peer_address_for_match(
889 &self,
890 candidate: &PeerAddress,
891 ) -> Option<(TransportId, TransportAddr)> {
892 if candidate.transport == "udp" && candidate.addr.eq_ignore_ascii_case("nat") {
893 return None;
894 }
895
896 if candidate.transport == "ethernet" {
897 return self.resolve_ethernet_addr(&candidate.addr).ok();
898 }
899
900 if candidate.transport == "ble" {
901 #[cfg(bluer_available)]
902 {
903 return self.resolve_ble_addr(&candidate.addr).ok();
904 }
905 #[cfg(not(bluer_available))]
906 {
907 return None;
908 }
909 }
910
911 let transport_id = if candidate.transport == "udp"
912 && let Ok(remote_socket_addr) = candidate.addr.parse::<SocketAddr>()
913 {
914 self.find_udp_transport_for_remote_addr(remote_socket_addr)
915 .map(|(id, _)| id)?
916 } else {
917 self.find_transport_for_type(&candidate.transport)?
918 };
919
920 Some((transport_id, TransportAddr::from_string(&candidate.addr)))
921 }
922
923 pub(super) async fn initiate_connection(
934 &mut self,
935 transport_id: TransportId,
936 remote_addr: TransportAddr,
937 peer_identity: PeerIdentity,
938 ) -> Result<(), NodeError> {
939 let peer_node_addr = *peer_identity.node_addr();
940
941 if self.is_connecting_to_peer_on_path(&peer_node_addr, transport_id, &remote_addr) {
942 debug!(
943 peer = %self.peer_display_name(&peer_node_addr),
944 transport_id = %transport_id,
945 remote_addr = %remote_addr,
946 "Connection already in progress for candidate path"
947 );
948 return Ok(());
949 }
950
951 if self.outbound_handshake_slots() == 0 {
952 return Err(NodeError::MaxConnectionsExceeded {
953 max: self.max_connections,
954 });
955 }
956
957 if self.outbound_link_slots() == 0 {
958 return Err(NodeError::MaxLinksExceeded {
959 max: self.max_links,
960 });
961 }
962
963 if !self.peers.contains_key(&peer_node_addr)
964 && self.max_peers > 0
965 && self.peers.len() >= self.max_peers
966 {
967 return Err(NodeError::MaxPeersExceeded {
968 max: self.max_peers,
969 });
970 }
971
972 self.authorize_peer(
973 &peer_identity,
974 PeerAclContext::OutboundConnect,
975 transport_id,
976 &remote_addr,
977 )?;
978
979 let is_connection_oriented = self
980 .transports
981 .get(&transport_id)
982 .map(|t| t.transport_type().connection_oriented)
983 .unwrap_or(false);
984
985 let link_id = self.allocate_link_id();
987
988 let link = if is_connection_oriented {
989 Link::new(
990 link_id,
991 transport_id,
992 remote_addr.clone(),
993 LinkDirection::Outbound,
994 Duration::from_millis(self.config.node.base_rtt_ms),
995 )
996 } else {
997 Link::connectionless(
998 link_id,
999 transport_id,
1000 remote_addr.clone(),
1001 LinkDirection::Outbound,
1002 Duration::from_millis(self.config.node.base_rtt_ms),
1003 )
1004 };
1005
1006 self.links.insert(link_id, link);
1007
1008 self.addr_to_link
1010 .insert((transport_id, remote_addr.clone()), link_id);
1011
1012 if is_connection_oriented {
1013 if let Some(transport) = self.transports.get(&transport_id) {
1015 match transport.connect(&remote_addr).await {
1016 Ok(()) => {
1017 debug!(
1018 peer = %self.peer_display_name(&peer_node_addr),
1019 transport_id = %transport_id,
1020 remote_addr = %remote_addr,
1021 link_id = %link_id,
1022 "Transport connect initiated (non-blocking)"
1023 );
1024 self.pending_connects.push(super::PendingConnect {
1025 link_id,
1026 transport_id,
1027 remote_addr,
1028 peer_identity,
1029 });
1030 }
1031 Err(e) => {
1032 self.links.remove(&link_id);
1034 self.addr_to_link.remove(&(transport_id, remote_addr));
1035 return Err(NodeError::from_transport_error(e));
1036 }
1037 }
1038 }
1039 Ok(())
1040 } else {
1041 self.start_handshake(link_id, transport_id, remote_addr, peer_identity)
1043 .await
1044 }
1045 }
1046
1047 pub(super) async fn start_handshake(
1052 &mut self,
1053 link_id: LinkId,
1054 transport_id: TransportId,
1055 remote_addr: TransportAddr,
1056 peer_identity: PeerIdentity,
1057 ) -> Result<(), NodeError> {
1058 let peer_node_addr = *peer_identity.node_addr();
1059
1060 let current_time_ms = Self::now_ms();
1062 let mut connection = PeerConnection::outbound(link_id, peer_identity, current_time_ms);
1063
1064 let our_index = match self.index_allocator.allocate() {
1066 Ok(idx) => idx,
1067 Err(e) => {
1068 self.links.remove(&link_id);
1070 self.addr_to_link.remove(&(transport_id, remote_addr));
1071 return Err(NodeError::IndexAllocationFailed(e.to_string()));
1072 }
1073 };
1074
1075 let our_keypair = self.identity.keypair();
1077 let noise_msg1 =
1078 match connection.start_handshake(our_keypair, self.startup_epoch, current_time_ms) {
1079 Ok(msg) => msg,
1080 Err(e) => {
1081 let _ = self.index_allocator.free(our_index);
1083 self.links.remove(&link_id);
1084 self.addr_to_link.remove(&(transport_id, remote_addr));
1085 return Err(NodeError::HandshakeFailed(e.to_string()));
1086 }
1087 };
1088
1089 connection.set_our_index(our_index);
1091 connection.set_transport_id(transport_id);
1092 connection.set_source_addr(remote_addr.clone());
1093
1094 let wire_msg1 = build_msg1(our_index, &noise_msg1);
1096
1097 debug!(
1098 peer = %self.peer_display_name(&peer_node_addr),
1099 transport_id = %transport_id,
1100 remote_addr = %remote_addr,
1101 link_id = %link_id,
1102 our_index = %our_index,
1103 "Connection initiated"
1104 );
1105
1106 let resend_interval = self.config.node.rate_limit.handshake_resend_interval_ms;
1108 connection.set_handshake_msg1(wire_msg1.clone(), current_time_ms + resend_interval);
1109
1110 self.pending_outbound
1112 .insert((transport_id, our_index.as_u32()), link_id);
1113 self.connections.insert(link_id, connection);
1114
1115 let send_result = match self.transports.get(&transport_id) {
1120 Some(transport) => Some(transport.send(&remote_addr, &wire_msg1).await),
1121 None => None,
1122 };
1123 match send_result {
1124 Some(send_result) => {
1125 self.note_local_send_outcome(&send_result);
1126 match send_result {
1127 Ok(bytes) => {
1128 debug!(
1129 link_id = %link_id,
1130 our_index = %our_index,
1131 bytes,
1132 "Sent Noise handshake message 1 (wire format)"
1133 );
1134 }
1135 Err(e) => {
1136 warn!(
1137 link_id = %link_id,
1138 transport_id = %transport_id,
1139 remote_addr = %remote_addr,
1140 our_index = %our_index,
1141 error = %e,
1142 "Failed to send handshake message"
1143 );
1144 self.pending_outbound
1145 .remove(&(transport_id, our_index.as_u32()));
1146 self.connections.remove(&link_id);
1147 self.links.remove(&link_id);
1148 self.addr_to_link
1149 .remove(&(transport_id, remote_addr.clone()));
1150 let _ = self.index_allocator.free(our_index);
1151 return Err(NodeError::from_transport_error(e));
1152 }
1153 }
1154 }
1155 None => {
1156 self.pending_outbound
1157 .remove(&(transport_id, our_index.as_u32()));
1158 self.connections.remove(&link_id);
1159 self.links.remove(&link_id);
1160 self.addr_to_link
1161 .remove(&(transport_id, remote_addr.clone()));
1162 let _ = self.index_allocator.free(our_index);
1163 return Err(NodeError::TransportError(format!(
1164 "transport {transport_id} disappeared before first handshake send"
1165 )));
1166 }
1167 }
1168
1169 Ok(())
1170 }
1171
1172 pub(super) async fn poll_transport_discovery(&mut self) {
1178 let mut to_connect = Vec::new();
1180 let mut queued_per_peer: HashMap<NodeAddr, usize> = HashMap::new();
1181 let mut connect_budget = self.discovery_connect_budget();
1182 let mut skipped_budget = 0usize;
1183
1184 for transport in self.transports.values() {
1185 if !transport.is_operational() {
1186 continue;
1187 }
1188 if !transport.auto_connect() {
1189 let _ = transport.discover();
1191 continue;
1192 }
1193 let discovered = match transport.discover() {
1194 Ok(peers) => peers,
1195 Err(_) => continue,
1196 };
1197 for peer in discovered {
1198 let discovered_transport_id = peer.transport_id;
1199 let pubkey = match peer.pubkey_hint {
1200 Some(pk) => pk,
1201 None => continue,
1202 };
1203 let identity = PeerIdentity::from_pubkey(pubkey);
1204 let node_addr = *identity.node_addr();
1205
1206 if node_addr == *self.identity.node_addr() {
1208 continue;
1209 }
1210
1211 let Some((candidate_transport_id, remote_addr, transport_name)) =
1212 self.transport_discovery_candidate(discovered_transport_id, peer.addr)
1213 else {
1214 continue;
1215 };
1216
1217 if self.peers.contains_key(&node_addr) {
1218 let candidate = PeerAddress::new(
1219 transport_name,
1220 self.peer_address_string_for_transport_candidate(
1221 candidate_transport_id,
1222 transport_name,
1223 &remote_addr,
1224 ),
1225 );
1226 if self.active_peer_candidate_is_fresh_enough_to_skip(
1227 &node_addr,
1228 std::slice::from_ref(&candidate),
1229 ) {
1230 continue;
1231 }
1232 if self.is_connecting_to_peer_on_path(
1233 &node_addr,
1234 candidate_transport_id,
1235 &remote_addr,
1236 ) {
1237 continue;
1238 }
1239 let queued_for_peer = queued_per_peer.get(&node_addr).copied().unwrap_or(0);
1240 if connect_budget == 0
1241 || self
1242 .path_candidate_attempt_budget(&node_addr)
1243 .saturating_sub(queued_for_peer)
1244 == 0
1245 {
1246 skipped_budget = skipped_budget.saturating_add(1);
1247 continue;
1248 }
1249 to_connect.push((candidate_transport_id, remote_addr, identity, true));
1250 *queued_per_peer.entry(node_addr).or_default() += 1;
1251 connect_budget = connect_budget.saturating_sub(1);
1252 continue;
1253 }
1254
1255 if self.is_connecting_to_peer_on_path(
1256 &node_addr,
1257 candidate_transport_id,
1258 &remote_addr,
1259 ) {
1260 continue;
1261 }
1262
1263 let queued_for_peer = queued_per_peer.get(&node_addr).copied().unwrap_or(0);
1264 if connect_budget == 0
1265 || self
1266 .path_candidate_attempt_budget(&node_addr)
1267 .saturating_sub(queued_for_peer)
1268 == 0
1269 {
1270 skipped_budget = skipped_budget.saturating_add(1);
1271 continue;
1272 }
1273 to_connect.push((candidate_transport_id, remote_addr, identity, false));
1274 *queued_per_peer.entry(node_addr).or_default() += 1;
1275 connect_budget = connect_budget.saturating_sub(1);
1276 }
1277 }
1278
1279 if skipped_budget > 0 {
1280 debug!(
1281 skipped = skipped_budget,
1282 queued = to_connect.len(),
1283 "Transport discovery connect budget exhausted"
1284 );
1285 }
1286
1287 for (transport_id, remote_addr, identity, active_refresh) in to_connect {
1288 info!(
1289 peer = %self.peer_display_name(identity.node_addr()),
1290 transport_id = %transport_id,
1291 remote_addr = %remote_addr,
1292 active_refresh,
1293 "Auto-connecting to discovered peer"
1294 );
1295 if let Err(e) = self
1296 .initiate_connection(transport_id, remote_addr, identity)
1297 .await
1298 {
1299 warn!(error = %e, "Failed to auto-connect to discovered peer");
1300 }
1301 }
1302 }
1303
1304 pub(super) async fn poll_nostr_discovery(&mut self) {
1305 let Some(bootstrap) = self.nostr_discovery.clone() else {
1306 return;
1307 };
1308
1309 bootstrap.set_outbound_admission(self.open_discovery_outbound_admission_check());
1310 bootstrap.set_direct_refresh_admission(self.outbound_direct_refresh_admission_check());
1311
1312 if let Err(err) = self.refresh_overlay_advert(&bootstrap).await {
1313 debug!(error = %err, "Failed to refresh local Nostr overlay advert");
1314 }
1315
1316 self.drain_nostr_mesh_signals(&bootstrap).await;
1317
1318 for event in bootstrap.drain_events().await {
1319 match event {
1320 BootstrapEvent::Established { traversal } => {
1321 let active_refresh = PeerIdentity::from_npub(&traversal.peer_npub)
1322 .ok()
1323 .is_some_and(|identity| self.peers.contains_key(identity.node_addr()));
1324 let admission_allowed = if active_refresh {
1325 self.outbound_direct_refresh_admission_check()
1326 } else {
1327 self.outbound_admission_check()
1328 };
1329 if !admission_allowed {
1330 debug!(
1331 peer_npub = %traversal.peer_npub,
1332 peers = self.peers.len(),
1333 max_peers = self.max_peers,
1334 active_refresh,
1335 "Dropping established NAT traversal: at capacity"
1336 );
1337 continue;
1338 }
1339 let peer_npub = traversal.peer_npub.clone();
1340 match self.adopt_established_traversal(traversal).await {
1341 Ok(_) => {
1342 info!(peer_npub = %peer_npub, "Adopted NAT traversal socket");
1343 }
1344 Err(err) => {
1345 warn!(peer_npub = %peer_npub, error = %err, "Failed to adopt NAT traversal");
1346 if let Ok(peer_identity) = PeerIdentity::from_npub(&peer_npub) {
1347 self.schedule_retry(*peer_identity.node_addr(), Self::now_ms());
1348 }
1349 }
1350 }
1351 }
1352 BootstrapEvent::Failed {
1353 peer_config,
1354 reason,
1355 } => {
1356 let peer_identity = match PeerIdentity::from_npub(&peer_config.npub) {
1357 Ok(identity) => identity,
1358 Err(_) => continue,
1359 };
1360 let node_addr = *peer_identity.node_addr();
1361 let now_ms = Self::now_ms();
1362 if self.peers.contains_key(&node_addr) {
1363 if self.active_peer_should_keep_direct_retry(&node_addr, &peer_config) {
1364 let decision =
1365 bootstrap.record_traversal_failure(&peer_config.npub, now_ms);
1366 if decision.should_warn {
1367 warn!(
1368 npub = %peer_config.npub,
1369 error = %reason,
1370 consecutive_failures = decision.consecutive_failures,
1371 cooldown_secs = decision
1372 .cooldown_until_ms
1373 .map(|t| t.saturating_sub(now_ms) / 1000),
1374 "Direct-path NAT traversal upgrade failed"
1375 );
1376 } else {
1377 debug!(
1378 npub = %peer_config.npub,
1379 error = %reason,
1380 consecutive_failures = decision.consecutive_failures,
1381 "Direct-path NAT traversal upgrade failed (suppressed by warn-rate-limit)"
1382 );
1383 }
1384 if decision.crossed_threshold {
1385 bootstrap
1386 .request_advert_stale_check(peer_config.npub.clone())
1387 .await;
1388 }
1389 self.schedule_link_dead_reprobe(node_addr, now_ms);
1390 } else {
1391 debug!(
1392 npub = %peer_config.npub,
1393 error = %reason,
1394 "Ignoring failed NAT traversal for already-connected peer on fresh direct path"
1395 );
1396 }
1397 continue;
1398 }
1399 if self.is_connecting_to_peer(&node_addr) {
1400 debug!(
1401 npub = %peer_config.npub,
1402 error = %reason,
1403 "Ignoring failed NAT traversal while peer handshake is already in progress"
1404 );
1405 continue;
1406 }
1407
1408 let decision = bootstrap.record_traversal_failure(&peer_config.npub, now_ms);
1409 if decision.should_warn {
1410 warn!(
1411 npub = %peer_config.npub,
1412 error = %reason,
1413 consecutive_failures = decision.consecutive_failures,
1414 cooldown_secs = decision
1415 .cooldown_until_ms
1416 .map(|t| t.saturating_sub(now_ms) / 1000),
1417 "NAT traversal failed"
1418 );
1419 } else {
1420 debug!(
1421 npub = %peer_config.npub,
1422 error = %reason,
1423 consecutive_failures = decision.consecutive_failures,
1424 "NAT traversal failed (suppressed by warn-rate-limit)"
1425 );
1426 }
1427
1428 if decision.crossed_threshold {
1432 bootstrap
1433 .request_advert_stale_check(peer_config.npub.clone())
1434 .await;
1435 }
1436
1437 if self
1438 .try_peer_addresses(&peer_config, peer_identity, false)
1439 .await
1440 .is_ok()
1441 {
1442 continue;
1443 }
1444
1445 self.schedule_retry(node_addr, now_ms);
1446 if self.nostr_cooldown_applies_to_peer_config(&peer_config)
1447 && let Some(cooldown_until_ms) = decision.cooldown_until_ms
1448 && let Some(state) = self.retry_pending.get_mut(&node_addr)
1449 {
1450 state.retry_after_ms = state.retry_after_ms.max(cooldown_until_ms);
1454 }
1455 }
1456 }
1457 }
1458
1459 self.maybe_run_startup_open_discovery_sweep(&bootstrap)
1460 .await;
1461 self.queue_open_discovery_retries(&bootstrap).await;
1462 self.queue_active_fallback_direct_retries(&bootstrap);
1463 }
1464
1465 async fn drain_nostr_mesh_signals(&mut self, bootstrap: &std::sync::Arc<NostrDiscovery>) {
1466 let mut deferred = Vec::new();
1467
1468 for signal in bootstrap.drain_mesh_signals().await {
1469 let (peer_npub, msg_type, payload) = match &signal {
1470 MeshTraversalSignal::Offer { peer_npub, offer } => {
1471 let payload = match serde_json::to_vec(&offer) {
1472 Ok(payload) => payload,
1473 Err(error) => {
1474 debug!(
1475 peer = %peer_npub,
1476 error = %error,
1477 "Failed to encode mesh traversal offer"
1478 );
1479 continue;
1480 }
1481 };
1482 (
1483 peer_npub.clone(),
1484 SessionMessageType::TraversalOffer.to_byte(),
1485 payload,
1486 )
1487 }
1488 MeshTraversalSignal::Answer { peer_npub, answer } => {
1489 let payload = match serde_json::to_vec(&answer) {
1490 Ok(payload) => payload,
1491 Err(error) => {
1492 debug!(
1493 peer = %peer_npub,
1494 error = %error,
1495 "Failed to encode mesh traversal answer"
1496 );
1497 continue;
1498 }
1499 };
1500 (
1501 peer_npub.clone(),
1502 SessionMessageType::TraversalAnswer.to_byte(),
1503 payload,
1504 )
1505 }
1506 };
1507
1508 let peer_identity = match PeerIdentity::from_npub(&peer_npub) {
1509 Ok(identity) => identity,
1510 Err(error) => {
1511 debug!(
1512 peer = %peer_npub,
1513 error = %error,
1514 "Cannot send mesh traversal signal to invalid peer npub"
1515 );
1516 continue;
1517 }
1518 };
1519 let peer_addr = *peer_identity.node_addr();
1520 match self
1521 .mesh_signal_session_action(peer_addr, peer_identity.pubkey_full())
1522 .await
1523 {
1524 MeshSignalSessionAction::Send => {}
1525 MeshSignalSessionAction::Defer => {
1526 deferred.push(signal);
1527 continue;
1528 }
1529 MeshSignalSessionAction::Drop => continue,
1530 }
1531
1532 if let Err(error) = self.send_session_msg(&peer_addr, msg_type, &payload).await {
1533 debug!(
1534 peer = %self.peer_display_name(&peer_addr),
1535 error = %error,
1536 "Failed to send mesh traversal signal"
1537 );
1538 }
1539 }
1540
1541 for signal in deferred {
1542 bootstrap.requeue_mesh_signal(signal);
1543 }
1544 }
1545
1546 async fn mesh_signal_session_action(
1547 &mut self,
1548 peer_addr: NodeAddr,
1549 peer_pubkey: PublicKey,
1550 ) -> MeshSignalSessionAction {
1551 if let Some(entry) = self.sessions.get(&peer_addr) {
1552 if entry.is_established() {
1553 return MeshSignalSessionAction::Send;
1554 }
1555 if entry.is_initiating() || entry.is_awaiting_msg3() {
1556 debug!(
1557 peer = %self.peer_display_name(&peer_addr),
1558 "Deferring mesh traversal signal until end-to-end session is established"
1559 );
1560 return MeshSignalSessionAction::Defer;
1561 }
1562 }
1563
1564 if self.find_next_hop(&peer_addr).is_none() {
1565 debug!(
1566 peer = %self.peer_display_name(&peer_addr),
1567 "Cannot warm mesh traversal signal session without a FIPS route"
1568 );
1569 self.maybe_initiate_lookup(&peer_addr).await;
1570 return MeshSignalSessionAction::Drop;
1571 }
1572
1573 self.register_identity(peer_addr, peer_pubkey);
1574 match self.initiate_session(peer_addr, peer_pubkey).await {
1575 Ok(()) => {
1576 debug!(
1577 peer = %self.peer_display_name(&peer_addr),
1578 "Warming end-to-end session for mesh traversal signal"
1579 );
1580 MeshSignalSessionAction::Defer
1581 }
1582 Err(NodeError::SendFailed { node_addr, reason })
1583 if node_addr == peer_addr && reason == "no route to destination" =>
1584 {
1585 debug!(
1586 peer = %self.peer_display_name(&peer_addr),
1587 "Cannot warm mesh traversal signal session without a FIPS route"
1588 );
1589 self.maybe_initiate_lookup(&peer_addr).await;
1590 MeshSignalSessionAction::Drop
1591 }
1592 Err(error) => {
1593 debug!(
1594 peer = %self.peer_display_name(&peer_addr),
1595 error = %error,
1596 "Failed to warm end-to-end session for mesh traversal signal"
1597 );
1598 MeshSignalSessionAction::Drop
1599 }
1600 }
1601 }
1602
1603 pub(super) fn lan_discovery_scope(&self) -> Option<String> {
1609 if let Some(scope) = self.config.node.discovery.lan.scope.as_deref() {
1610 let scope = scope.trim();
1611 if !scope.is_empty() {
1612 return Some(scope.to_string());
1613 }
1614 }
1615
1616 let app = self.config.node.discovery.nostr.app.trim();
1617 if app.is_empty() {
1618 return None;
1619 }
1620 if let Some(rest) = app.strip_prefix("fips-overlay-v1:") {
1621 let scope = rest.trim();
1622 if scope.is_empty() {
1623 None
1624 } else {
1625 Some(scope.to_string())
1626 }
1627 } else {
1628 Some(app.to_string())
1629 }
1630 }
1631
1632 pub(super) fn start_local_instance_discovery(&mut self) {
1633 if !self.config.node.discovery.local.enabled {
1634 return;
1635 }
1636 let Some(scope) = self.lan_discovery_scope() else {
1637 debug!("local instance discovery not started: no discovery scope");
1638 return;
1639 };
1640 let now_ms = Self::now_ms();
1641 match crate::discovery::local::LocalInstanceRegistry::new(
1642 self.identity.npub(),
1643 scope,
1644 &self.config.node.discovery.local,
1645 now_ms,
1646 ) {
1647 Ok(registry) => {
1648 self.local_instance_registry = Some(registry);
1649 self.local_instance_started_at_ms = Some(now_ms);
1650 self.last_local_instance_publish_ms = None;
1651 self.last_local_instance_scan_ms = None;
1652 self.publish_local_instance_record(now_ms);
1653 info!("Same-host FIPS instance discovery enabled");
1654 }
1655 Err(crate::discovery::local::LocalInstanceRegistryError::Disabled) => {
1656 debug!("same-host FIPS instance discovery disabled");
1657 }
1658 Err(err) => {
1659 debug!(error = %err, "same-host FIPS instance discovery not started");
1660 }
1661 }
1662 }
1663
1664 fn local_instance_contacts(&self) -> Vec<crate::discovery::local::LocalInstanceContact> {
1665 let mut contacts = Vec::new();
1666 for handle in self.transports.values() {
1667 if !handle.is_operational() || !handle.accept_connections() {
1668 continue;
1669 }
1670 let transport = handle.transport_type().name;
1671 if transport != "udp" && transport != "tcp" {
1672 continue;
1673 }
1674 let Some(local_addr) = handle.local_addr() else {
1675 continue;
1676 };
1677 let Some(contact) =
1678 crate::discovery::local::contact_for_transport_addr(transport, local_addr)
1679 else {
1680 continue;
1681 };
1682 if contacts
1683 .iter()
1684 .any(|existing: &crate::discovery::local::LocalInstanceContact| {
1685 existing.transport == contact.transport && existing.addr == contact.addr
1686 })
1687 {
1688 continue;
1689 }
1690 contacts.push(contact);
1691 }
1692 contacts
1693 }
1694
1695 fn publish_local_instance_record(&mut self, now_ms: u64) {
1696 let Some(registry) = self.local_instance_registry.clone() else {
1697 return;
1698 };
1699 let contacts = self.local_instance_contacts();
1700 match registry.publish(contacts, now_ms) {
1701 Ok(()) => {
1702 self.last_local_instance_publish_ms = Some(now_ms);
1703 }
1704 Err(err) => {
1705 debug!(error = %err, "failed to publish same-host FIPS instance record");
1706 }
1707 }
1708 }
1709
1710 fn maybe_publish_local_instance_record(&mut self, now_ms: u64) {
1711 if self.local_instance_registry.is_none() {
1712 return;
1713 }
1714 let interval_ms = self.config.node.discovery.local.publish_interval_ms();
1715 let due = self
1716 .last_local_instance_publish_ms
1717 .map(|last| now_ms.saturating_sub(last) >= interval_ms)
1718 .unwrap_or(true);
1719 if due {
1720 self.publish_local_instance_record(now_ms);
1721 }
1722 }
1723
1724 fn local_instance_scan_due(&self, now_ms: u64) -> bool {
1725 if self.local_instance_registry.is_none() {
1726 return false;
1727 }
1728 let cfg = &self.config.node.discovery.local;
1729 let interval_ms = if self
1730 .local_instance_started_at_ms
1731 .map(|started| now_ms.saturating_sub(started) <= cfg.startup_scan_duration_ms())
1732 .unwrap_or(false)
1733 {
1734 cfg.startup_scan_interval_ms()
1735 } else {
1736 cfg.scan_interval_ms()
1737 };
1738 self.last_local_instance_scan_ms
1739 .map(|last| now_ms.saturating_sub(last) >= interval_ms)
1740 .unwrap_or(true)
1741 }
1742
1743 fn local_instance_peer_allowed(&self, identity: &PeerIdentity) -> bool {
1744 if self.config.peers().iter().any(|peer| {
1745 PeerIdentity::from_npub(&peer.npub)
1746 .map(|configured| configured.node_addr() == identity.node_addr())
1747 .unwrap_or(false)
1748 }) {
1749 return true;
1750 }
1751 self.config.node.discovery.nostr.policy == NostrDiscoveryPolicy::Open
1752 }
1753
1754 fn local_instance_peer_addresses(
1755 &self,
1756 record: &crate::discovery::local::LocalInstanceRecord,
1757 ) -> Vec<PeerAddress> {
1758 let mut addresses = Vec::new();
1759 for contact in &record.contacts {
1760 if contact.transport != "udp" && contact.transport != "tcp" {
1761 continue;
1762 }
1763 let Ok(socket_addr) = contact.addr.parse::<SocketAddr>() else {
1764 debug!(
1765 npub = %record.npub,
1766 transport = %contact.transport,
1767 addr = %contact.addr,
1768 "local instance discovery: skip non-socket contact"
1769 );
1770 continue;
1771 };
1772 if !socket_addr.ip().is_loopback() {
1773 debug!(
1774 npub = %record.npub,
1775 addr = %contact.addr,
1776 "local instance discovery: skip non-loopback contact"
1777 );
1778 continue;
1779 }
1780 let address =
1781 PeerAddress::with_priority(contact.transport.clone(), contact.addr.clone(), 10)
1782 .with_seen_at_ms(record.updated_at_ms);
1783 if addresses.iter().any(|existing: &PeerAddress| {
1784 existing.transport == address.transport && existing.addr == address.addr
1785 }) {
1786 continue;
1787 }
1788 addresses.push(address);
1789 }
1790 addresses
1791 }
1792
1793 pub(super) async fn poll_local_instance_discovery(&mut self) {
1797 let Some(registry) = self.local_instance_registry.clone() else {
1798 return;
1799 };
1800 let now_ms = Self::now_ms();
1801 self.maybe_publish_local_instance_record(now_ms);
1802 if !self.local_instance_scan_due(now_ms) {
1803 return;
1804 }
1805 self.last_local_instance_scan_ms = Some(now_ms);
1806
1807 let records = match registry.scan(now_ms, self.config.node.discovery.local.stale_after_ms())
1808 {
1809 Ok(records) => records,
1810 Err(err) => {
1811 debug!(error = %err, "same-host FIPS instance scan failed");
1812 return;
1813 }
1814 };
1815 if records.is_empty() {
1816 return;
1817 }
1818
1819 let mut connect_budget = self.discovery_connect_budget();
1820 let mut skipped_budget = 0usize;
1821 for record in records {
1822 let identity = match PeerIdentity::from_npub(&record.npub) {
1823 Ok(identity) => identity,
1824 Err(err) => {
1825 debug!(npub = %record.npub, error = %err, "local instance discovery: skip bad npub");
1826 continue;
1827 }
1828 };
1829 let peer_node_addr = *identity.node_addr();
1830 if peer_node_addr == *self.identity.node_addr() {
1831 continue;
1832 }
1833 if !self.local_instance_peer_allowed(&identity) {
1834 debug!(
1835 npub = %identity.short_npub(),
1836 "local instance discovery: skip unconfigured peer"
1837 );
1838 continue;
1839 }
1840
1841 let addresses = self.local_instance_peer_addresses(&record);
1842 if addresses.is_empty() {
1843 continue;
1844 }
1845
1846 if self.peers.contains_key(&peer_node_addr)
1847 && self.active_peer_candidate_is_fresh_enough_to_skip(&peer_node_addr, &addresses)
1848 {
1849 continue;
1850 }
1851
1852 for address in addresses {
1853 let Some((transport_id, remote_addr)) =
1854 self.resolve_peer_address_for_match(&address)
1855 else {
1856 continue;
1857 };
1858 if self.is_connecting_to_peer_on_path(&peer_node_addr, transport_id, &remote_addr) {
1859 continue;
1860 }
1861 if connect_budget == 0 || self.path_candidate_attempt_budget(&peer_node_addr) == 0 {
1862 skipped_budget = skipped_budget.saturating_add(1);
1863 continue;
1864 }
1865 info!(
1866 npub = %identity.short_npub(),
1867 transport = %address.transport,
1868 addr = %address.addr,
1869 "same-host FIPS instance discovery: initiating handshake"
1870 );
1871 if let Err(err) = self
1872 .initiate_connection(transport_id, remote_addr, identity)
1873 .await
1874 {
1875 debug!(
1876 npub = %record.npub,
1877 error = %err,
1878 "same-host FIPS instance discovery: failed to initiate connection"
1879 );
1880 }
1881 connect_budget = connect_budget.saturating_sub(1);
1882 }
1883 }
1884 if skipped_budget > 0 {
1885 debug!(
1886 skipped = skipped_budget,
1887 "same-host FIPS instance discovery connect budget exhausted"
1888 );
1889 }
1890 }
1891
1892 pub(super) async fn poll_lan_discovery(&mut self) {
1899 let Some(runtime) = self.lan_discovery.clone() else {
1900 return;
1901 };
1902 let events = runtime.drain_events().await;
1903 if events.is_empty() {
1904 return;
1905 }
1906 let mut connect_budget = self.discovery_connect_budget();
1907 let mut skipped_budget = 0usize;
1908 for event in events {
1909 let crate::discovery::lan::LanEvent::Discovered(peer) = event;
1910 let Some((transport_id, local_addr)) =
1911 self.find_udp_transport_for_remote_addr(peer.addr)
1912 else {
1913 debug!(
1914 addr = %peer.addr,
1915 "lan: skip discovered peer with no compatible UDP transport"
1916 );
1917 continue;
1918 };
1919 let identity = match crate::PeerIdentity::from_npub(&peer.npub) {
1920 Ok(id) => id,
1921 Err(err) => {
1922 debug!(npub = %peer.npub, error = %err, "lan: skip bad npub");
1923 continue;
1924 }
1925 };
1926 let peer_node_addr = *identity.node_addr();
1927 let remote_addr = crate::transport::TransportAddr::from_string(&peer.addr.to_string());
1928 if self.peers.contains_key(&peer_node_addr) {
1929 let candidate = PeerAddress::new("udp", peer.addr.to_string());
1930 if self.active_peer_candidate_is_fresh_enough_to_skip(
1931 &peer_node_addr,
1932 std::slice::from_ref(&candidate),
1933 ) {
1934 continue;
1935 }
1936 if self.is_connecting_to_peer_on_path(&peer_node_addr, transport_id, &remote_addr) {
1937 continue;
1938 }
1939 if connect_budget == 0 || self.path_candidate_attempt_budget(&peer_node_addr) == 0 {
1940 skipped_budget = skipped_budget.saturating_add(1);
1941 continue;
1942 }
1943 info!(
1944 npub = %identity.short_npub(),
1945 addr = %peer.addr,
1946 local_addr = %local_addr,
1947 "lan: initiating alternate-path handshake to active peer"
1948 );
1949 if let Err(err) = self
1950 .initiate_connection(transport_id, remote_addr, identity)
1951 .await
1952 {
1953 debug!(
1954 npub = %peer.npub,
1955 error = %err,
1956 "lan: failed to initiate active peer alternate-path handshake"
1957 );
1958 }
1959 connect_budget = connect_budget.saturating_sub(1);
1960 continue;
1961 }
1962 if self.is_connecting_to_peer_on_path(&peer_node_addr, transport_id, &remote_addr) {
1963 continue;
1964 }
1965 if connect_budget == 0 || self.path_candidate_attempt_budget(&peer_node_addr) == 0 {
1966 skipped_budget = skipped_budget.saturating_add(1);
1967 continue;
1968 }
1969 info!(
1970 npub = %identity.short_npub(),
1971 addr = %peer.addr,
1972 local_addr = %local_addr,
1973 "lan: initiating handshake to discovered peer"
1974 );
1975 if let Err(err) = self
1976 .initiate_connection(transport_id, remote_addr, identity)
1977 .await
1978 {
1979 debug!(
1980 npub = %peer.npub,
1981 error = %err,
1982 "lan: failed to initiate connection to discovered peer"
1983 );
1984 }
1985 connect_budget = connect_budget.saturating_sub(1);
1986 }
1987 if skipped_budget > 0 {
1988 debug!(
1989 skipped = skipped_budget,
1990 "lan: discovery connect budget exhausted"
1991 );
1992 }
1993 }
1994
1995 pub(super) async fn poll_pending_connects(&mut self) {
2002 if self.pending_connects.is_empty() {
2003 return;
2004 }
2005
2006 let mut completed = Vec::new();
2007
2008 for (i, pending) in self.pending_connects.iter().enumerate() {
2009 let state = if let Some(transport) = self.transports.get(&pending.transport_id) {
2010 transport.connection_state(&pending.remote_addr)
2011 } else {
2012 crate::transport::ConnectionState::Failed("transport removed".into())
2013 };
2014
2015 match state {
2016 crate::transport::ConnectionState::Connected => {
2017 completed.push((i, true, None));
2018 }
2019 crate::transport::ConnectionState::Failed(reason) => {
2020 completed.push((i, false, Some(reason)));
2021 }
2022 crate::transport::ConnectionState::Connecting => {
2023 }
2025 crate::transport::ConnectionState::None => {
2026 completed.push((i, false, Some("no connection attempt found".into())));
2028 }
2029 }
2030 }
2031
2032 for (i, success, reason) in completed.into_iter().rev() {
2034 let pending = self.pending_connects.remove(i);
2035
2036 if success {
2037 if let Some(link) = self.links.get_mut(&pending.link_id) {
2039 link.set_connected();
2040 }
2041
2042 debug!(
2043 peer = %self.peer_display_name(pending.peer_identity.node_addr()),
2044 transport_id = %pending.transport_id,
2045 remote_addr = %pending.remote_addr,
2046 link_id = %pending.link_id,
2047 "Transport connected, starting handshake"
2048 );
2049
2050 if let Err(e) = self
2052 .start_handshake(
2053 pending.link_id,
2054 pending.transport_id,
2055 pending.remote_addr.clone(),
2056 pending.peer_identity,
2057 )
2058 .await
2059 {
2060 warn!(
2061 link_id = %pending.link_id,
2062 error = %e,
2063 "Failed to start handshake after transport connect"
2064 );
2065 self.remove_link(&pending.link_id);
2067 }
2068 } else {
2069 let reason = reason.unwrap_or_default();
2070 warn!(
2071 peer = %self.peer_display_name(pending.peer_identity.node_addr()),
2072 transport_id = %pending.transport_id,
2073 remote_addr = %pending.remote_addr,
2074 link_id = %pending.link_id,
2075 reason = %reason,
2076 "Transport connect failed"
2077 );
2078
2079 self.remove_link(&pending.link_id);
2081 self.links.remove(&pending.link_id);
2082 self.schedule_retry(*pending.peer_identity.node_addr(), Self::now_ms());
2083 }
2084 }
2085 }
2086
2087 pub async fn start(&mut self) -> Result<(), NodeError> {
2094 node_start_debug_log("Node::start begin");
2095 if !self.state.can_start() {
2096 return Err(NodeError::AlreadyStarted);
2097 }
2098 self.state = NodeState::Starting;
2099 node_start_debug_log("Node::start state set to starting");
2100
2101 let packet_buffer_size = self.config.node.buffers.packet_channel;
2103 let (packet_tx, packet_rx) = packet_channel(packet_buffer_size);
2104 self.packet_tx = Some(packet_tx.clone());
2105 self.packet_rx = Some(packet_rx);
2106 node_start_debug_log("Node::start packet channel created");
2107
2108 node_start_debug_log("Node::start create transports begin");
2110 let transport_handles = self.create_transports(&packet_tx).await;
2111 node_start_debug_log(format!(
2112 "Node::start create transports complete count={}",
2113 transport_handles.len()
2114 ));
2115
2116 for mut handle in transport_handles {
2117 let transport_id = handle.transport_id();
2118 let transport_type = handle.transport_type().name;
2119 let name = handle.name().map(|s| s.to_string());
2120
2121 node_start_debug_log(format!(
2122 "Node::start transport start begin id={} type={} name={:?}",
2123 transport_id, transport_type, name
2124 ));
2125 match handle.start().await {
2126 Ok(()) => {
2127 node_start_debug_log(format!(
2128 "Node::start transport start ok id={} type={}",
2129 transport_id, transport_type
2130 ));
2131 self.transports.insert(transport_id, handle);
2132 }
2133 Err(e) => {
2134 node_start_debug_log(format!(
2135 "Node::start transport start error id={} type={} error={}",
2136 transport_id, transport_type, e
2137 ));
2138 if let Some(ref n) = name {
2139 warn!(transport_type, name = %n, error = %e, "Transport failed to start");
2140 } else {
2141 warn!(transport_type, error = %e, "Transport failed to start");
2142 }
2143 }
2144 }
2145 }
2146
2147 if !self.transports.is_empty() {
2148 info!(count = self.transports.len(), "Transports initialized");
2149 }
2150
2151 #[cfg(unix)]
2167 {
2168 if self.config.node.worker_pools_enabled {
2169 node_start_debug_log("Node::start worker pools begin");
2170 let cpu_default = std::thread::available_parallelism()
2171 .map(|n| n.get())
2172 .unwrap_or(1)
2173 .max(1);
2174 let encrypt_worker_count: usize = std::env::var("FIPS_ENCRYPT_WORKERS")
2175 .ok()
2176 .and_then(|s| s.parse().ok())
2177 .unwrap_or(cpu_default)
2178 .max(1);
2179 self.encrypt_workers = Some(super::encrypt_worker::EncryptWorkerPool::spawn(
2180 encrypt_worker_count,
2181 ));
2182 info!(
2183 workers = encrypt_worker_count,
2184 "Spawned FMP-encrypt worker pool"
2185 );
2186
2187 let decrypt_worker_count: usize = std::env::var("FIPS_DECRYPT_WORKERS")
2196 .ok()
2197 .and_then(|s| s.parse().ok())
2198 .unwrap_or(cpu_default);
2199 if decrypt_worker_count == 0 {
2200 info!("FIPS_DECRYPT_WORKERS=0 → in-line decrypt in rx_loop (no worker pool)");
2201 } else {
2202 self.decrypt_workers = Some(super::decrypt_worker::DecryptWorkerPool::spawn(
2203 decrypt_worker_count,
2204 ));
2205 info!(
2206 workers = decrypt_worker_count,
2207 "Spawned FMP+FSP-decrypt worker pool"
2208 );
2209 }
2210 node_start_debug_log("Node::start worker pools complete");
2211 } else {
2212 node_start_debug_log("Node::start worker pools disabled");
2213 info!("FIPS worker pools disabled; using in-line crypto/send path");
2214 }
2215 }
2216
2217 if self.config.node.discovery.nostr.enabled {
2218 node_start_debug_log("Node::start nostr discovery start begin");
2219 match NostrDiscovery::start(&self.identity, self.config.node.discovery.nostr.clone())
2220 .await
2221 {
2222 Ok(runtime) => {
2223 node_start_debug_log("Node::start nostr discovery runtime created");
2224 if let Err(err) = self.refresh_overlay_advert(&runtime).await {
2225 warn!(error = %err, "Failed to publish initial Nostr overlay advert");
2226 }
2227 node_start_debug_log("Node::start nostr overlay advert refreshed");
2228 self.nostr_discovery = Some(runtime);
2229 self.nostr_discovery_started_at_ms = Some(Self::now_ms());
2230 info!("Nostr overlay discovery enabled");
2231 }
2232 Err(err) => {
2233 node_start_debug_log(format!(
2234 "Node::start nostr discovery start error error={}",
2235 err
2236 ));
2237 warn!(error = %err, "Failed to start Nostr overlay discovery");
2238 }
2239 }
2240 }
2241
2242 if self.config.node.discovery.lan.enabled {
2246 node_start_debug_log("Node::start lan discovery start begin");
2247 let advertised_udp_port = self
2248 .transports
2249 .values()
2250 .filter(|h| h.is_operational())
2251 .filter(|h| h.transport_type().name == "udp")
2252 .find_map(|h| h.local_addr().map(|addr| addr.port()))
2253 .unwrap_or(0);
2254 let scope = self.lan_discovery_scope();
2255 match crate::discovery::lan::LanDiscovery::start(
2256 &self.identity,
2257 scope,
2258 advertised_udp_port,
2259 self.config.node.discovery.lan.clone(),
2260 )
2261 .await
2262 {
2263 Ok(runtime) => {
2264 node_start_debug_log("Node::start lan discovery start ok");
2265 self.lan_discovery = Some(runtime);
2266 info!("LAN mDNS discovery enabled");
2267 }
2268 Err(err) => {
2269 node_start_debug_log(format!(
2270 "Node::start lan discovery start error error={}",
2271 err
2272 ));
2273 debug!(error = %err, "LAN mDNS discovery not started");
2274 }
2275 }
2276 }
2277
2278 self.start_local_instance_discovery();
2279 self.poll_local_instance_discovery().await;
2280
2281 node_start_debug_log("Node::start initiate peer connections begin");
2284 self.initiate_peer_connections().await;
2285 node_start_debug_log("Node::start initiate peer connections complete");
2286
2287 if self.config.tun.enabled {
2289 node_start_debug_log("Node::start tun init begin");
2290 let address = *self.identity.address();
2291 match TunDevice::create(&self.config.tun, address).await {
2292 Ok(device) => {
2293 let mtu = device.mtu();
2294 let name = device.name().to_string();
2295 let our_addr = *device.address();
2296
2297 info!("TUN device active:");
2298 info!(" name: {}", name);
2299 info!(" address: {}", device.address());
2300 info!(" mtu: {}", mtu);
2301
2302 let effective_mtu = self.effective_ipv6_mtu();
2304 let max_mss = effective_mtu.saturating_sub(40).saturating_sub(20); info!("effective MTU: {} bytes", effective_mtu);
2307 debug!(" max TCP MSS: {} bytes", max_mss);
2308
2309 #[cfg(target_os = "macos")]
2313 let (shutdown_read_fd, shutdown_write_fd) = {
2314 let mut fds = [0i32; 2];
2315 if unsafe { libc::pipe(fds.as_mut_ptr()) } < 0 {
2316 return Err(NodeError::Tun(crate::upper::tun::TunError::Configure(
2317 "failed to create shutdown pipe".into(),
2318 )));
2319 }
2320 (fds[0], fds[1])
2321 };
2322
2323 let (writer, tun_tx) =
2327 device.create_writer(max_mss, self.path_mtu_lookup.clone())?;
2328
2329 let writer_handle = thread::spawn(move || {
2331 writer.run();
2332 });
2333
2334 let reader_tun_tx = tun_tx.clone();
2336
2337 let tun_channel_size = self.config.node.buffers.tun_channel;
2339 let (outbound_tx, outbound_rx) = tokio::sync::mpsc::channel(tun_channel_size);
2340
2341 let transport_mtu = self.transport_mtu();
2343 let path_mtu_lookup = self.path_mtu_lookup.clone();
2344 #[cfg(target_os = "macos")]
2345 let reader_handle = thread::spawn(move || {
2346 run_tun_reader(
2347 device,
2348 mtu,
2349 our_addr,
2350 reader_tun_tx,
2351 outbound_tx,
2352 transport_mtu,
2353 path_mtu_lookup,
2354 shutdown_read_fd,
2355 );
2356 });
2357 #[cfg(not(target_os = "macos"))]
2358 let reader_handle = thread::spawn(move || {
2359 run_tun_reader(
2360 device,
2361 mtu,
2362 our_addr,
2363 reader_tun_tx,
2364 outbound_tx,
2365 transport_mtu,
2366 path_mtu_lookup,
2367 );
2368 });
2369
2370 self.tun_state = TunState::Active;
2371 self.tun_name = Some(name);
2372 self.tun_tx = Some(tun_tx);
2373 self.tun_outbound_rx = Some(outbound_rx);
2374 self.tun_reader_handle = Some(reader_handle);
2375 self.tun_writer_handle = Some(writer_handle);
2376 #[cfg(target_os = "macos")]
2377 {
2378 self.tun_shutdown_fd = Some(shutdown_write_fd);
2379 }
2380 }
2381 Err(e) => {
2382 self.tun_state = TunState::Failed;
2383 warn!(error = %e, "Failed to initialize TUN, continuing without it");
2384 }
2385 }
2386 node_start_debug_log("Node::start tun init complete");
2387 }
2388
2389 if self.config.dns.enabled {
2406 node_start_debug_log("Node::start dns init begin");
2407 let addr_str = self.config.dns.bind_addr();
2408 match addr_str.parse::<std::net::IpAddr>() {
2409 Ok(ip) => {
2410 let bind = std::net::SocketAddr::new(ip, self.config.dns.port());
2411 match Self::bind_dns_socket(bind) {
2412 Ok(socket) => {
2413 let dns_channel_size = self.config.node.buffers.dns_channel;
2414 let (identity_tx, identity_rx) =
2415 tokio::sync::mpsc::channel(dns_channel_size);
2416 let dns_ttl = self.config.dns.ttl();
2417 let base_hosts = crate::upper::hosts::HostMap::from_peer_configs(
2418 self.config.peers(),
2419 );
2420 let reloader = if self.config.node.system_files_enabled {
2421 let hosts_path = std::path::PathBuf::from(
2422 crate::upper::hosts::DEFAULT_HOSTS_PATH,
2423 );
2424 crate::upper::hosts::HostMapReloader::new(base_hosts, hosts_path)
2425 } else {
2426 crate::upper::hosts::HostMapReloader::memory_only(base_hosts)
2427 };
2428 let mesh_ifindex = Self::lookup_mesh_ifindex(self.config.tun.name());
2436 info!(
2437 bind = %bind,
2438 hosts = reloader.hosts().len(),
2439 mesh_ifindex = ?mesh_ifindex,
2440 "DNS responder started for .fips domain (auto-reload enabled)"
2441 );
2442 let handle = tokio::spawn(crate::upper::dns::run_dns_responder(
2443 socket,
2444 identity_tx,
2445 dns_ttl,
2446 reloader,
2447 mesh_ifindex,
2448 ));
2449 self.dns_identity_rx = Some(identity_rx);
2450 self.dns_task = Some(handle);
2451 }
2452 Err(e) => {
2453 warn!(bind = %bind, error = %e, "Failed to start DNS responder");
2454 }
2455 }
2456 }
2457 Err(e) => {
2458 warn!(addr = %addr_str, error = %e, "Invalid dns.bind_addr; DNS responder not started");
2459 }
2460 }
2461 node_start_debug_log("Node::start dns init complete");
2462 }
2463
2464 self.state = NodeState::Running;
2465 node_start_debug_log("Node::start running");
2466 info!("Node started:");
2467 info!(" state: {}", self.state);
2468 info!(" transports: {}", self.transports.len());
2469 info!(" connections: {}", self.connections.len());
2470 Ok(())
2471 }
2472
2473 fn bind_dns_socket(
2486 addr: std::net::SocketAddr,
2487 ) -> Result<tokio::net::UdpSocket, std::io::Error> {
2488 use socket2::{Domain, Protocol, Socket, Type};
2489 let domain = if addr.is_ipv4() {
2490 Domain::IPV4
2491 } else {
2492 Domain::IPV6
2493 };
2494 let sock = Socket::new(domain, Type::DGRAM, Some(Protocol::UDP))?;
2495 if addr.is_ipv6() {
2496 sock.set_only_v6(false)?;
2497 #[cfg(unix)]
2498 Self::set_recv_pktinfo_v6(&sock)?;
2499 }
2500 sock.set_nonblocking(true)?;
2501 sock.bind(&addr.into())?;
2502 tokio::net::UdpSocket::from_std(sock.into())
2503 }
2504
2505 #[cfg(unix)]
2511 fn set_recv_pktinfo_v6(sock: &socket2::Socket) -> Result<(), std::io::Error> {
2512 use std::os::fd::AsRawFd;
2513 let enable: libc::c_int = 1;
2514 let ret = unsafe {
2515 libc::setsockopt(
2516 sock.as_raw_fd(),
2517 libc::IPPROTO_IPV6,
2518 libc::IPV6_RECVPKTINFO,
2519 &enable as *const _ as *const libc::c_void,
2520 std::mem::size_of::<libc::c_int>() as libc::socklen_t,
2521 )
2522 };
2523 if ret < 0 {
2524 return Err(std::io::Error::last_os_error());
2525 }
2526 Ok(())
2527 }
2528
2529 fn lookup_mesh_ifindex(name: &str) -> Option<u32> {
2536 #[cfg(unix)]
2537 {
2538 let c_name = std::ffi::CString::new(name).ok()?;
2539 let idx = unsafe { libc::if_nametoindex(c_name.as_ptr()) };
2540 if idx == 0 { None } else { Some(idx) }
2541 }
2542 #[cfg(not(unix))]
2543 {
2544 let _ = name;
2545 None
2546 }
2547 }
2548
2549 pub async fn stop(&mut self) -> Result<(), NodeError> {
2554 if !self.state.can_stop() {
2555 return Err(NodeError::NotStarted);
2556 }
2557 self.state = NodeState::Stopping;
2558 info!(state = %self.state, "Node stopping");
2559
2560 if let Some(handle) = self.dns_task.take() {
2562 handle.abort();
2563 debug!("DNS responder stopped");
2564 }
2565
2566 self.send_disconnect_to_all_peers(DisconnectReason::Shutdown)
2568 .await;
2569
2570 if let Some(bootstrap) = self.nostr_discovery.take()
2572 && let Err(e) = bootstrap.shutdown().await
2573 {
2574 warn!(error = %e, "Failed to shutdown Nostr overlay discovery");
2575 }
2576
2577 if let Some(lan) = self.lan_discovery.take() {
2581 lan.shutdown().await;
2582 }
2583
2584 if let Some(registry) = self.local_instance_registry.take()
2585 && let Err(err) = registry.remove()
2586 {
2587 debug!(error = %err, "failed to remove same-host FIPS instance record");
2588 }
2589
2590 let transport_ids: Vec<_> = self.transports.keys().cloned().collect();
2592 for transport_id in transport_ids {
2593 if let Some(mut handle) = self.transports.remove(&transport_id) {
2594 let transport_type = handle.transport_type().name;
2595 match handle.stop().await {
2596 Ok(()) => {
2597 info!(transport_id = %transport_id, transport_type, "Transport stopped");
2598 }
2599 Err(e) => {
2600 warn!(
2601 transport_id = %transport_id,
2602 transport_type,
2603 error = %e,
2604 "Transport stop failed"
2605 );
2606 }
2607 }
2608 }
2609 }
2610
2611 self.packet_tx.take();
2613 self.packet_rx.take();
2614
2615 if let Some(name) = self.tun_name.take() {
2617 info!(name = %name, "Shutting down TUN interface");
2618
2619 self.tun_tx.take();
2621
2622 if let Err(e) = shutdown_tun_interface(&name).await {
2624 warn!(name = %name, error = %e, "Failed to shutdown TUN interface");
2625 }
2626
2627 #[cfg(target_os = "macos")]
2630 if let Some(fd) = self.tun_shutdown_fd.take() {
2631 unsafe {
2632 libc::write(fd, b"x".as_ptr() as *const libc::c_void, 1);
2633 libc::close(fd);
2634 }
2635 }
2636
2637 if let Some(handle) = self.tun_reader_handle.take() {
2639 let _ = handle.join();
2640 }
2641 if let Some(handle) = self.tun_writer_handle.take() {
2642 let _ = handle.join();
2643 }
2644
2645 self.tun_state = TunState::Disabled;
2646 }
2647
2648 self.state = NodeState::Stopped;
2649 info!(state = %self.state, "Node stopped");
2650 Ok(())
2651 }
2652
2653 async fn send_disconnect_to_all_peers(&mut self, reason: DisconnectReason) {
2658 let disconnect = Disconnect::new(reason);
2659 let plaintext = disconnect.encode();
2660
2661 let peer_addrs: Vec<NodeAddr> = self
2663 .peers
2664 .iter()
2665 .filter(|(_, peer)| peer.can_send() && peer.has_session())
2666 .map(|(addr, _)| *addr)
2667 .collect();
2668
2669 if peer_addrs.is_empty() {
2670 debug!(
2671 total_peers = self.peers.len(),
2672 "No sendable peers for disconnect notification"
2673 );
2674 return;
2675 }
2676
2677 let mut sent = 0usize;
2678 for node_addr in &peer_addrs {
2679 match self
2680 .send_encrypted_link_message(node_addr, &plaintext)
2681 .await
2682 {
2683 Ok(()) => sent += 1,
2684 Err(e) => {
2685 debug!(
2686 peer = %self.peer_display_name(node_addr),
2687 error = %e,
2688 "Failed to send disconnect (transport may be down)"
2689 );
2690 }
2691 }
2692 }
2693
2694 info!(sent, total = peer_addrs.len(), reason = %reason, "Sent disconnect notifications");
2695 }
2696
2697 pub(in crate::node) fn static_peer_addresses(
2698 &self,
2699 peer_config: &PeerConfig,
2700 ) -> Vec<PeerAddress> {
2701 peer_config
2702 .addresses_by_priority()
2703 .into_iter()
2704 .cloned()
2705 .collect()
2706 }
2707
2708 async fn nostr_peer_fallback_addresses(
2709 &self,
2710 peer_config: &PeerConfig,
2711 existing: &[PeerAddress],
2712 ) -> Vec<PeerAddress> {
2713 if !self.config.node.discovery.nostr.enabled
2714 || self.config.node.discovery.nostr.policy
2715 == crate::config::NostrDiscoveryPolicy::Disabled
2716 {
2717 return Vec::new();
2718 }
2719
2720 let Some(bootstrap) = self.nostr_discovery.clone() else {
2721 return Vec::new();
2722 };
2723 if self.nostr_cooldown_applies_to_peer_config(peer_config)
2724 && bootstrap
2725 .cooldown_until(&peer_config.npub, Self::now_ms())
2726 .is_some()
2727 {
2728 debug!(
2729 npub = %peer_config.npub,
2730 "Skipping cached Nostr fallback endpoints while peer is in traversal cooldown"
2731 );
2732 return Vec::new();
2733 }
2734 let endpoints = match bootstrap
2735 .cached_advert_endpoints_for_peer(&peer_config.npub)
2736 .await
2737 {
2738 Some(endpoints) => endpoints,
2739 None => {
2740 debug!(
2741 npub = %peer_config.npub,
2742 "No cached Nostr advert endpoints for configured peer"
2743 );
2744 return Vec::new();
2745 }
2746 };
2747
2748 let mut fallback = Vec::new();
2749 let mut next_priority = existing
2750 .iter()
2751 .map(|addr| addr.priority)
2752 .max()
2753 .unwrap_or(100)
2754 .saturating_add(1);
2755 let seen_at_ms = Self::now_ms();
2761 for endpoint in endpoints {
2762 let Some(candidate) =
2763 Self::overlay_endpoint_to_peer_address(&endpoint, next_priority, seen_at_ms)
2764 else {
2765 continue;
2766 };
2767 if existing
2768 .iter()
2769 .any(|addr| addr.transport == candidate.transport && addr.addr == candidate.addr)
2770 || fallback.iter().any(|addr: &PeerAddress| {
2771 addr.transport == candidate.transport && addr.addr == candidate.addr
2772 })
2773 {
2774 continue;
2775 }
2776 fallback.push(candidate);
2777 next_priority = next_priority.saturating_add(1);
2778 }
2779 fallback
2780 }
2781
2782 pub(in crate::node) async fn request_nostr_bootstrap(&self, peer_config: &PeerConfig) -> bool {
2783 if !self.config.node.discovery.nostr.enabled
2784 || self.config.node.discovery.nostr.policy
2785 == crate::config::NostrDiscoveryPolicy::Disabled
2786 {
2787 return false;
2788 }
2789 let Some(bootstrap) = self.nostr_discovery.clone() else {
2790 return false;
2791 };
2792 let now_ms = Self::now_ms();
2793 if self.nostr_cooldown_applies_to_peer_config(peer_config)
2794 && let Some(cooldown_until_ms) = bootstrap.cooldown_until(&peer_config.npub, now_ms)
2795 {
2796 debug!(
2797 npub = %peer_config.npub,
2798 cooldown_secs = cooldown_until_ms.saturating_sub(now_ms) / 1000,
2799 "Skipping Nostr traversal request while peer is in cooldown"
2800 );
2801 return false;
2802 }
2803 bootstrap.set_outbound_admission(self.open_discovery_outbound_admission_check());
2804 bootstrap.set_direct_refresh_admission(self.outbound_direct_refresh_admission_check());
2805 let mesh_signaling_allowed = self.mesh_signaling_allowed_for_peer(peer_config);
2806 let started = bootstrap
2807 .request_connect_with_mesh_signaling(peer_config.clone(), mesh_signaling_allowed)
2808 .await;
2809 if started {
2810 info!(
2811 npub = %peer_config.npub,
2812 mesh_signaling_allowed,
2813 "Started background UDP NAT traversal attempt"
2814 );
2815 } else {
2816 debug!(
2817 npub = %peer_config.npub,
2818 mesh_signaling_allowed,
2819 "Background UDP NAT traversal attempt already in progress"
2820 );
2821 }
2822 true
2823 }
2824
2825 fn nostr_cooldown_applies_to_peer_config(&self, peer_config: &PeerConfig) -> bool {
2826 !self.mesh_signaling_allowed_for_peer(peer_config)
2827 }
2828
2829 pub(in crate::node) fn mesh_signaling_allowed_for_peer(
2830 &self,
2831 peer_config: &PeerConfig,
2832 ) -> bool {
2833 let Ok(identity) = PeerIdentity::from_npub(&peer_config.npub) else {
2834 return false;
2835 };
2836 let peer_addr = identity.node_addr();
2837 self.configured_peer(peer_addr).is_some()
2838 }
2839
2840 fn overlay_endpoint_to_peer_address(
2841 endpoint: &OverlayEndpointAdvert,
2842 priority: u8,
2843 seen_at_ms: u64,
2844 ) -> Option<PeerAddress> {
2845 let transport = match endpoint.transport {
2846 OverlayTransportKind::Udp => "udp",
2847 OverlayTransportKind::Tcp => "tcp",
2848 OverlayTransportKind::Tor => "tor",
2849 OverlayTransportKind::WebRtc => "webrtc",
2850 };
2851 Some(
2852 PeerAddress::with_priority(transport, endpoint.addr.clone(), priority)
2853 .with_seen_at_ms(seen_at_ms),
2854 )
2855 }
2856
2857 async fn attempt_peer_address_list(
2858 &mut self,
2859 peer_config: &PeerConfig,
2860 peer_identity: PeerIdentity,
2861 allow_bootstrap_nat: bool,
2862 addresses: &[PeerAddress],
2863 ) -> Result<(), NodeError> {
2864 let mut attempted = false;
2865 let mut local_route_error = None;
2866 let peer_node_addr = *peer_identity.node_addr();
2867 let mut concrete_budget = self.path_candidate_attempt_budget(&peer_node_addr);
2868
2869 for addr in addresses {
2870 if addr.transport == "udp" && addr.addr.eq_ignore_ascii_case("nat") {
2871 if !allow_bootstrap_nat {
2872 continue;
2873 }
2874 if self.request_nostr_bootstrap(peer_config).await {
2875 attempted = true;
2876 continue;
2877 }
2878 debug!(npub = %peer_config.npub, "No Nostr overlay runtime for udp:nat address");
2879 continue;
2880 }
2881
2882 let (transport_id, remote_addr) = if addr.transport == "ethernet" {
2883 match self.resolve_ethernet_addr(&addr.addr) {
2884 Ok(result) => result,
2885 Err(e) => {
2886 debug!(
2887 transport = %addr.transport,
2888 addr = %addr.addr,
2889 error = %e,
2890 "Failed to resolve Ethernet address"
2891 );
2892 continue;
2893 }
2894 }
2895 } else if addr.transport == "ble" {
2896 #[cfg(bluer_available)]
2897 {
2898 match self.resolve_ble_addr(&addr.addr) {
2899 Ok(result) => result,
2900 Err(e) => {
2901 debug!(
2902 transport = %addr.transport,
2903 addr = %addr.addr,
2904 error = %e,
2905 "Failed to resolve BLE address"
2906 );
2907 continue;
2908 }
2909 }
2910 }
2911 #[cfg(not(bluer_available))]
2912 {
2913 debug!(transport = %addr.transport, "BLE transport not available on this build");
2914 continue;
2915 }
2916 } else {
2917 let tid = if addr.transport == "udp"
2918 && let Ok(remote_socket_addr) = addr.addr.parse::<SocketAddr>()
2919 {
2920 match self.find_udp_transport_for_remote_addr(remote_socket_addr) {
2921 Some((id, _)) => id,
2922 None => {
2923 debug!(
2924 transport = %addr.transport,
2925 addr = %addr.addr,
2926 "No compatible operational UDP transport for address"
2927 );
2928 continue;
2929 }
2930 }
2931 } else {
2932 match self.find_transport_for_type(&addr.transport) {
2933 Some(id) => id,
2934 None => {
2935 debug!(
2936 transport = %addr.transport,
2937 addr = %addr.addr,
2938 "No operational transport for address type"
2939 );
2940 continue;
2941 }
2942 }
2943 };
2944 (tid, TransportAddr::from_string(&addr.addr))
2945 };
2946
2947 if self.is_connecting_to_peer_on_path(&peer_node_addr, transport_id, &remote_addr) {
2948 attempted = true;
2949 debug!(
2950 npub = %peer_config.npub,
2951 transport_id = %transport_id,
2952 remote_addr = %remote_addr,
2953 "Skipping duplicate in-flight candidate path"
2954 );
2955 continue;
2956 }
2957
2958 if concrete_budget == 0 {
2959 debug!(
2960 npub = %peer_config.npub,
2961 max_candidates = MAX_PARALLEL_PATH_CANDIDATES_PER_PEER,
2962 "Path candidate race budget exhausted"
2963 );
2964 break;
2965 }
2966
2967 match self
2968 .initiate_connection(transport_id, remote_addr, peer_identity)
2969 .await
2970 {
2971 Ok(()) => {
2972 attempted = true;
2973 concrete_budget = concrete_budget.saturating_sub(1);
2974 }
2975 Err(e @ NodeError::AccessDenied(_)) => return Err(e),
2976 Err(e) => {
2977 if e.is_local_route_unavailable() && local_route_error.is_none() {
2978 local_route_error = Some(e.to_string());
2979 }
2980 debug!(
2981 npub = %peer_config.npub,
2982 transport_id = %transport_id,
2983 error = %e,
2984 "Connection attempt failed, trying next address"
2985 );
2986 }
2987 }
2988 }
2989
2990 if attempted {
2991 return Ok(());
2992 }
2993
2994 if let Some(error) = local_route_error {
2995 return Err(NodeError::LocalRouteUnavailable(error));
2996 }
2997
2998 Err(NodeError::NoTransportForType(format!(
2999 "no operational transport for any of {}'s addresses",
3000 peer_config.npub
3001 )))
3002 }
3003
3004 async fn queue_open_discovery_retries(&mut self, bootstrap: &std::sync::Arc<NostrDiscovery>) {
3005 self.run_open_discovery_sweep(bootstrap, None, "per-tick")
3006 .await;
3007 }
3008
3009 pub(in crate::node) fn queue_active_fallback_direct_retries(
3010 &mut self,
3011 _bootstrap: &std::sync::Arc<NostrDiscovery>,
3012 ) {
3013 let now_ms = Self::now_ms();
3014 let peer_configs = self
3015 .config
3016 .auto_connect_peers()
3017 .cloned()
3018 .collect::<Vec<_>>();
3019
3020 for peer_config in peer_configs {
3021 let Ok(peer_identity) = PeerIdentity::from_npub(&peer_config.npub) else {
3022 continue;
3023 };
3024 let node_addr = *peer_identity.node_addr();
3025
3026 if self.retry_pending.contains_key(&node_addr)
3027 || !self.peers.contains_key(&node_addr)
3028 || self.is_connecting_to_peer(&node_addr)
3029 || !self.active_peer_should_keep_direct_retry(&node_addr, &peer_config)
3030 {
3031 continue;
3032 }
3033
3034 let mut state = super::retry::RetryState::new(peer_config.clone());
3035 state.reconnect = true;
3036 state.retry_after_ms = now_ms;
3037 self.retry_pending.insert(node_addr, state);
3038
3039 debug!(
3040 peer = %self.peer_display_name(&node_addr),
3041 "Queued direct-path retry for active fallback peer"
3042 );
3043 }
3044 }
3045
3046 pub(in crate::node) async fn run_open_discovery_sweep(
3057 &mut self,
3058 bootstrap: &std::sync::Arc<NostrDiscovery>,
3059 max_age_secs: Option<u64>,
3060 caller: &'static str,
3061 ) {
3062 if !self.config.node.discovery.nostr.enabled
3063 || self.config.node.discovery.nostr.policy != crate::config::NostrDiscoveryPolicy::Open
3064 {
3065 return;
3066 }
3067
3068 let configured_npubs = self
3069 .config
3070 .peers()
3071 .iter()
3072 .map(|peer| peer.npub.clone())
3073 .collect::<HashSet<_>>();
3074 let now_ms = Self::now_ms();
3075 let now_secs = now_ms / 1000;
3076 let mut enqueue_budget = self.open_discovery_enqueue_budget(&configured_npubs);
3077 if enqueue_budget == 0 {
3078 debug!(
3079 caller = %caller,
3080 "open-discovery sweep: enqueue budget is 0, skipping"
3081 );
3082 return;
3083 }
3084
3085 let candidates = bootstrap.cached_open_discovery_candidates(64).await;
3086 let cached_count = candidates.len();
3087 let mut enqueued = 0usize;
3088 let mut skipped_age = 0usize;
3089 let mut skipped_configured = 0usize;
3090 let mut skipped_self = 0usize;
3091 let mut skipped_connected = 0usize;
3092 let mut skipped_retry_pending = 0usize;
3093 let mut skipped_connecting = 0usize;
3094 let mut skipped_no_endpoints = 0usize;
3095 let mut skipped_invalid_npub = 0usize;
3096 let mut skipped_cooldown = 0usize;
3097
3098 for (npub, endpoints, created_at_secs) in candidates {
3099 if enqueue_budget == 0 {
3100 break;
3101 }
3102
3103 if let Some(max_age) = max_age_secs
3104 && now_secs.saturating_sub(created_at_secs) > max_age
3105 {
3106 skipped_age = skipped_age.saturating_add(1);
3107 continue;
3108 }
3109
3110 if configured_npubs.contains(&npub) {
3111 if let Ok(identity) = PeerIdentity::from_npub(&npub) {
3131 let configured_addr = *identity.node_addr();
3132 if bootstrap.cooldown_until(&npub, now_ms).is_some() {
3133 skipped_cooldown = skipped_cooldown.saturating_add(1);
3134 skipped_configured = skipped_configured.saturating_add(1);
3135 continue;
3136 }
3137 if let Some(state) = self.retry_pending.get_mut(&configured_addr)
3138 && state.retry_after_ms > now_ms
3139 {
3140 state.retry_after_ms = now_ms;
3141 debug!(
3142 caller = %caller,
3143 peer = %self.peer_display_name(&configured_addr),
3144 advert_age_secs = now_secs.saturating_sub(created_at_secs),
3145 "Expediting configured-peer retry after fresh overlay advert"
3146 );
3147 }
3148 }
3149 skipped_configured = skipped_configured.saturating_add(1);
3150 continue;
3151 }
3152
3153 let peer_identity = match PeerIdentity::from_npub(&npub) {
3154 Ok(identity) => identity,
3155 Err(_) => {
3156 skipped_invalid_npub = skipped_invalid_npub.saturating_add(1);
3157 continue;
3158 }
3159 };
3160 let node_addr = *peer_identity.node_addr();
3161 if node_addr == *self.identity.node_addr() {
3162 skipped_self = skipped_self.saturating_add(1);
3163 continue;
3164 }
3165 if self.peers.contains_key(&node_addr) {
3166 skipped_connected = skipped_connected.saturating_add(1);
3167 continue;
3168 }
3169 if self.retry_pending.contains_key(&node_addr) {
3170 skipped_retry_pending = skipped_retry_pending.saturating_add(1);
3171 continue;
3172 }
3173 if bootstrap.cooldown_until(&npub, now_ms).is_some() {
3174 skipped_cooldown = skipped_cooldown.saturating_add(1);
3175 continue;
3176 }
3177 let connecting = self.connections.values().any(|conn| {
3178 conn.expected_identity()
3179 .map(|id| id.node_addr() == &node_addr)
3180 .unwrap_or(false)
3181 });
3182 if connecting {
3183 skipped_connecting = skipped_connecting.saturating_add(1);
3184 continue;
3185 }
3186
3187 let mut addresses = Vec::new();
3188 let mut priority = 120u8;
3189 let seen_at_ms = Self::now_ms();
3190 for endpoint in endpoints {
3191 let Some(candidate) =
3192 Self::overlay_endpoint_to_peer_address(&endpoint, priority, seen_at_ms)
3193 else {
3194 continue;
3195 };
3196 if addresses.iter().any(|existing: &PeerAddress| {
3197 existing.transport == candidate.transport && existing.addr == candidate.addr
3198 }) {
3199 continue;
3200 }
3201 addresses.push(candidate);
3202 priority = priority.saturating_add(1);
3203 }
3204 if addresses.is_empty() {
3205 skipped_no_endpoints = skipped_no_endpoints.saturating_add(1);
3206 continue;
3207 }
3208
3209 self.peer_aliases
3210 .entry(node_addr)
3211 .or_insert_with(|| peer_identity.short_npub());
3212 self.register_identity(node_addr, peer_identity.pubkey_full());
3213
3214 let mut state = super::retry::RetryState::new(PeerConfig {
3215 npub: npub.clone(),
3216 alias: None,
3217 addresses,
3218 connect_policy: ConnectPolicy::AutoConnect,
3219 auto_reconnect: true,
3220 discovery_fallback_transit: false,
3221 });
3222 state.reconnect = false;
3223 state.retry_after_ms = now_ms;
3224 state.expires_at_ms = Some(self.open_discovery_retry_expires_at_ms(now_ms));
3225 self.retry_pending.insert(node_addr, state);
3226 info!(
3227 caller = %caller,
3228 peer = %peer_identity.short_npub(),
3229 advert_age_secs = now_secs.saturating_sub(created_at_secs),
3230 "open-discovery sweep: queued retry for cached advert"
3231 );
3232 enqueue_budget = enqueue_budget.saturating_sub(1);
3233 enqueued = enqueued.saturating_add(1);
3234 }
3235
3236 let total_skipped = skipped_age
3240 + skipped_configured
3241 + skipped_self
3242 + skipped_connected
3243 + skipped_retry_pending
3244 + skipped_connecting
3245 + skipped_no_endpoints
3246 + skipped_invalid_npub
3247 + skipped_cooldown;
3248 let should_summarize = caller == "startup" || enqueued > 0;
3249 if should_summarize {
3250 info!(
3251 caller = %caller,
3252 cached = cached_count,
3253 queued = enqueued,
3254 skipped_age = skipped_age,
3255 skipped_configured = skipped_configured,
3256 skipped_self = skipped_self,
3257 skipped_connected = skipped_connected,
3258 skipped_retry_pending = skipped_retry_pending,
3259 skipped_connecting = skipped_connecting,
3260 skipped_no_endpoints = skipped_no_endpoints,
3261 skipped_invalid_npub = skipped_invalid_npub,
3262 skipped_cooldown = skipped_cooldown,
3263 skipped_total = total_skipped,
3264 "open-discovery sweep complete"
3265 );
3266 }
3267 }
3268
3269 async fn maybe_run_startup_open_discovery_sweep(
3277 &mut self,
3278 bootstrap: &std::sync::Arc<NostrDiscovery>,
3279 ) {
3280 if self.startup_open_discovery_sweep_done {
3281 return;
3282 }
3283 if !self.config.node.discovery.nostr.enabled
3284 || self.config.node.discovery.nostr.policy != crate::config::NostrDiscoveryPolicy::Open
3285 {
3286 self.startup_open_discovery_sweep_done = true;
3288 return;
3289 }
3290 let Some(started_at_ms) = self.nostr_discovery_started_at_ms else {
3291 return;
3292 };
3293 let now_ms = Self::now_ms();
3294 let delay_ms = self
3295 .config
3296 .node
3297 .discovery
3298 .nostr
3299 .startup_sweep_delay_secs
3300 .saturating_mul(1000);
3301 if now_ms < started_at_ms.saturating_add(delay_ms) {
3302 return;
3303 }
3304
3305 let max_age_secs = self.config.node.discovery.nostr.startup_sweep_max_age_secs;
3306 self.run_open_discovery_sweep(bootstrap, Some(max_age_secs), "startup")
3307 .await;
3308 self.startup_open_discovery_sweep_done = true;
3309 }
3310
3311 fn available_outbound_slots(&self) -> usize {
3312 let connection_used = self
3313 .connections
3314 .len()
3315 .saturating_add(self.pending_connects.len());
3316 let connection_slots = if self.max_connections == 0 {
3317 usize::MAX
3318 } else {
3319 self.max_connections.saturating_sub(connection_used)
3320 };
3321
3322 let peer_slots = if self.max_peers == 0 {
3323 usize::MAX
3324 } else {
3325 self.max_peers.saturating_sub(self.peers.len())
3326 };
3327
3328 let link_slots = if self.max_links == 0 {
3329 usize::MAX
3330 } else {
3331 self.max_links.saturating_sub(self.links.len())
3332 };
3333
3334 connection_slots.min(peer_slots).min(link_slots)
3335 }
3336
3337 pub(in crate::node) fn open_discovery_enqueue_budget(
3338 &self,
3339 configured_npubs: &HashSet<String>,
3340 ) -> usize {
3341 let current_open_discovery_active = self
3342 .peers
3343 .values()
3344 .filter(|peer| !configured_npubs.contains(&peer.npub()))
3345 .count();
3346 let current_open_discovery_pending = self
3347 .retry_pending
3348 .values()
3349 .filter(|state| !configured_npubs.contains(&state.peer_config.npub))
3350 .count();
3351
3352 let cap_remaining = self
3353 .config
3354 .node
3355 .discovery
3356 .nostr
3357 .open_discovery_max_pending
3358 .saturating_sub(current_open_discovery_active)
3359 .saturating_sub(current_open_discovery_pending);
3360
3361 cap_remaining.min(self.available_outbound_slots())
3362 }
3363
3364 fn open_discovery_retry_expires_at_ms(&self, now_ms: u64) -> u64 {
3365 now_ms.saturating_add(
3366 self.config
3367 .node
3368 .discovery
3369 .nostr
3370 .advert_ttl_secs
3371 .saturating_mul(1000)
3372 .saturating_mul(OPEN_DISCOVERY_RETRY_LIFETIME_MULTIPLIER),
3373 )
3374 }
3375
3376 async fn build_overlay_advert(
3377 &self,
3378 bootstrap: &std::sync::Arc<NostrDiscovery>,
3379 ) -> Option<OverlayAdvert> {
3380 if !self.config.node.discovery.nostr.enabled {
3381 return None;
3382 }
3383
3384 let mut endpoints = Vec::new();
3385 let mut has_udp_nat = false;
3386 let mut has_webrtc = false;
3387
3388 for handle in self.transports.values() {
3389 if !handle.is_operational() {
3390 continue;
3391 }
3392
3393 match handle.transport_type().name {
3394 "udp" => {
3395 let Some(cfg) = self.lookup_udp_config(handle.name()) else {
3396 continue;
3397 };
3398 if !cfg.advertise_on_nostr() {
3399 continue;
3400 }
3401 if cfg.is_public() {
3402 if let Some(explicit) = cfg.external_advert_addr() {
3412 endpoints.push(OverlayEndpointAdvert {
3413 transport: OverlayTransportKind::Udp,
3414 addr: explicit.to_string(),
3415 });
3416 } else {
3417 match handle.local_addr() {
3418 Some(addr)
3419 if !addr.ip().is_unspecified()
3420 && !is_unroutable_advert_ip(addr.ip()) =>
3421 {
3422 endpoints.push(OverlayEndpointAdvert {
3423 transport: OverlayTransportKind::Udp,
3424 addr: addr.to_string(),
3425 });
3426 }
3427 Some(addr) => {
3428 let key = handle.transport_id().as_u32();
3429 let port = addr.port();
3430 if let Some(public) =
3431 bootstrap.learn_public_udp_addr(key, port).await
3432 {
3433 endpoints.push(OverlayEndpointAdvert {
3434 transport: OverlayTransportKind::Udp,
3435 addr: public.to_string(),
3436 });
3437 } else {
3438 warn!(
3439 transport_id = key,
3440 bind_addr = %addr,
3441 "advert: udp public=true but bind is wildcard \
3442 or private and STUN observation failed; \
3443 advertising no UDP endpoint. Either set \
3444 transports.udp.external_addr, bind to a \
3445 specific *public* IP, or ensure \
3446 node.discovery.nostr.stun_servers is reachable"
3447 );
3448 }
3449 }
3450 None => {}
3451 }
3452 }
3453 } else {
3454 endpoints.push(OverlayEndpointAdvert {
3455 transport: OverlayTransportKind::Udp,
3456 addr: "nat".to_string(),
3457 });
3458 has_udp_nat = true;
3459 }
3460 }
3461 "webrtc" => {
3462 let Some(cfg) = self.lookup_webrtc_config(handle.name()) else {
3463 continue;
3464 };
3465 if !cfg.advertise_on_nostr() {
3466 continue;
3467 }
3468 endpoints.push(OverlayEndpointAdvert {
3469 transport: OverlayTransportKind::WebRtc,
3470 addr: hex::encode(self.identity.pubkey_full().serialize()),
3471 });
3472 has_webrtc = true;
3473 }
3474 "tcp" => {
3475 let Some(cfg) = self.lookup_tcp_config(handle.name()) else {
3476 continue;
3477 };
3478 if !cfg.advertise_on_nostr() {
3479 continue;
3480 }
3481 if let Some(explicit) = cfg.external_advert_addr() {
3493 endpoints.push(OverlayEndpointAdvert {
3494 transport: OverlayTransportKind::Tcp,
3495 addr: explicit.to_string(),
3496 });
3497 } else {
3498 match handle.local_addr() {
3499 Some(addr)
3500 if !addr.ip().is_unspecified()
3501 && !is_unroutable_advert_ip(addr.ip()) =>
3502 {
3503 endpoints.push(OverlayEndpointAdvert {
3504 transport: OverlayTransportKind::Tcp,
3505 addr: addr.to_string(),
3506 });
3507 }
3508 Some(addr) => {
3509 warn!(
3510 bind_addr = %addr,
3511 "advert: tcp advertise_on_nostr=true bound to wildcard \
3512 or private IP and no transports.tcp.external_addr set; \
3513 advertising no TCP endpoint. Either set external_addr \
3514 to the public IP (recommended for cloud 1:1-NAT setups) \
3515 or bind explicitly to the public IP"
3516 );
3517 }
3518 None => {}
3519 }
3520 }
3521 }
3522 "tor" => {
3523 let Some(cfg) = self.lookup_tor_config(handle.name()) else {
3524 continue;
3525 };
3526 if !cfg.advertise_on_nostr() {
3527 continue;
3528 }
3529 if let Some(addr) = handle.onion_address() {
3530 endpoints.push(OverlayEndpointAdvert {
3531 transport: OverlayTransportKind::Tor,
3532 addr: format!("{}:{}", addr, cfg.advertised_port()),
3533 });
3534 }
3535 }
3536 _ => {}
3537 }
3538 }
3539
3540 if endpoints.is_empty() {
3541 return None;
3542 }
3543
3544 Some(OverlayAdvert {
3545 identifier: ADVERT_IDENTIFIER.to_string(),
3546 version: ADVERT_VERSION,
3547 endpoints,
3548 signal_relays: (has_udp_nat || has_webrtc)
3549 .then(|| self.config.node.discovery.nostr.dm_relays.clone()),
3550 stun_servers: (has_udp_nat || has_webrtc)
3551 .then(|| self.config.node.discovery.nostr.stun_servers.clone()),
3552 })
3553 }
3554
3555 async fn refresh_overlay_advert(
3556 &self,
3557 bootstrap: &std::sync::Arc<NostrDiscovery>,
3558 ) -> Result<(), crate::discovery::nostr::BootstrapError> {
3559 let advert = self.build_overlay_advert(bootstrap).await;
3560 bootstrap.update_local_advert(advert).await
3561 }
3562
3563 fn lookup_udp_config(&self, transport_name: Option<&str>) -> Option<&crate::config::UdpConfig> {
3564 match (&self.config.transports.udp, transport_name) {
3565 (crate::config::TransportInstances::Single(cfg), None) => Some(cfg),
3566 (crate::config::TransportInstances::Named(configs), Some(name)) => configs.get(name),
3567 _ => None,
3568 }
3569 }
3570
3571 fn lookup_tcp_config(&self, transport_name: Option<&str>) -> Option<&crate::config::TcpConfig> {
3572 match (&self.config.transports.tcp, transport_name) {
3573 (crate::config::TransportInstances::Single(cfg), None) => Some(cfg),
3574 (crate::config::TransportInstances::Named(configs), Some(name)) => configs.get(name),
3575 _ => None,
3576 }
3577 }
3578
3579 fn lookup_tor_config(&self, transport_name: Option<&str>) -> Option<&crate::config::TorConfig> {
3580 match (&self.config.transports.tor, transport_name) {
3581 (crate::config::TransportInstances::Single(cfg), None) => Some(cfg),
3582 (crate::config::TransportInstances::Named(configs), Some(name)) => configs.get(name),
3583 _ => None,
3584 }
3585 }
3586
3587 fn lookup_webrtc_config(
3588 &self,
3589 transport_name: Option<&str>,
3590 ) -> Option<&crate::config::WebRtcConfig> {
3591 match (&self.config.transports.webrtc, transport_name) {
3592 (crate::config::TransportInstances::Single(cfg), None) => Some(cfg),
3593 (crate::config::TransportInstances::Named(configs), Some(name)) => configs.get(name),
3594 _ => None,
3595 }
3596 }
3597
3598 pub(in crate::node) async fn try_peer_addresses(
3599 &mut self,
3600 peer_config: &PeerConfig,
3601 peer_identity: PeerIdentity,
3602 allow_bootstrap_nat: bool,
3603 ) -> Result<(), NodeError> {
3604 let peer_node_addr = *peer_identity.node_addr();
3605 if self.peers.contains_key(&peer_node_addr) {
3606 debug!(
3607 npub = %peer_config.npub,
3608 "Peer already exists, skipping address attempts"
3609 );
3610 return Ok(());
3611 }
3612
3613 let candidates = self.peer_address_candidates(peer_config).await;
3614
3615 if candidates.is_empty() {
3616 if allow_bootstrap_nat && self.request_nostr_bootstrap(peer_config).await {
3617 return Ok(());
3618 }
3619 return Err(NodeError::NoTransportForType(format!(
3620 "no addresses known for {}",
3621 peer_config.npub
3622 )));
3623 }
3624
3625 if self
3626 .attempt_peer_address_list(peer_config, peer_identity, allow_bootstrap_nat, &candidates)
3627 .await
3628 .is_ok()
3629 {
3630 if allow_bootstrap_nat {
3631 self.request_nostr_bootstrap(peer_config).await;
3632 }
3633 return Ok(());
3634 }
3635
3636 if allow_bootstrap_nat && self.request_nostr_bootstrap(peer_config).await {
3637 return Ok(());
3638 }
3639
3640 Err(NodeError::NoTransportForType(format!(
3641 "no operational transport for any of {}'s addresses",
3642 peer_config.npub
3643 )))
3644 }
3645
3646 async fn try_active_peer_alternative_addresses(
3647 &mut self,
3648 peer_config: &PeerConfig,
3649 peer_identity: PeerIdentity,
3650 allow_same_path_refresh: bool,
3651 ) -> Result<bool, NodeError> {
3652 let peer_node_addr = *peer_identity.node_addr();
3653 let mut candidates = self.peer_address_candidates(peer_config).await;
3654 let same_path_refresh_needed = allow_same_path_refresh
3655 && (self.active_peer_needs_same_path_refresh(&peer_node_addr)
3656 || self
3657 .peers
3658 .get(&peer_node_addr)
3659 .is_some_and(|peer| !peer.can_send()));
3660 if same_path_refresh_needed
3661 && let Some(candidate) = self.active_peer_current_udp_candidate(&peer_node_addr)
3662 && !candidates.iter().any(|existing| {
3663 existing.transport == candidate.transport && existing.addr == candidate.addr
3664 })
3665 {
3666 candidates.push(candidate);
3667 Self::sort_peer_address_candidates(&mut candidates);
3668 }
3669 let should_try_nostr =
3670 self.active_peer_should_keep_direct_retry(&peer_node_addr, peer_config);
3671
3672 if candidates.is_empty() {
3673 if should_try_nostr && self.request_nostr_bootstrap(peer_config).await {
3674 return Ok(true);
3675 }
3676 return Err(NodeError::NoTransportForType(format!(
3677 "no addresses known for {}",
3678 peer_config.npub
3679 )));
3680 }
3681
3682 let alternatives: Vec<_> = candidates
3683 .into_iter()
3684 .filter(|addr| {
3685 same_path_refresh_needed
3686 || !self.active_peer_matches_candidate(&peer_node_addr, addr)
3687 })
3688 .collect();
3689
3690 if alternatives.is_empty() {
3691 if should_try_nostr && self.request_nostr_bootstrap(peer_config).await {
3692 return Ok(true);
3693 }
3694 return Ok(false);
3695 }
3696
3697 let needs_separate_nostr_attempt = should_try_nostr
3698 && !alternatives
3699 .iter()
3700 .any(|addr| addr.transport == "udp" && addr.addr.eq_ignore_ascii_case("nat"));
3701 let address_result = self
3702 .attempt_peer_address_list(peer_config, peer_identity, true, &alternatives)
3703 .await;
3704 let nostr_attempted =
3705 needs_separate_nostr_attempt && self.request_nostr_bootstrap(peer_config).await;
3706
3707 match address_result {
3708 Ok(()) => Ok(true),
3709 Err(err) if nostr_attempted => {
3710 debug!(
3711 npub = %peer_config.npub,
3712 error = %err,
3713 "Static active-peer direct-path alternatives failed; Nostr traversal still queued"
3714 );
3715 Ok(true)
3716 }
3717 Err(err) => Err(err),
3718 }
3719 }
3720
3721 async fn peer_address_candidates(&self, peer_config: &PeerConfig) -> Vec<PeerAddress> {
3722 let static_addresses = self.static_peer_addresses(peer_config);
3729 let overlay_addresses = self
3730 .nostr_peer_fallback_addresses(peer_config, &static_addresses)
3731 .await;
3732
3733 let mut candidates = Vec::with_capacity(overlay_addresses.len() + static_addresses.len());
3734 for addr in overlay_addresses.into_iter().chain(static_addresses) {
3735 if !candidates.iter().any(|existing: &PeerAddress| {
3736 existing.transport == addr.transport && existing.addr == addr.addr
3737 }) {
3738 candidates.push(addr);
3739 }
3740 }
3741
3742 Self::sort_peer_address_candidates(&mut candidates);
3743
3744 candidates
3745 }
3746
3747 fn sort_peer_address_candidates(candidates: &mut [PeerAddress]) {
3748 candidates.sort_by(|a, b| match (a.seen_at_ms, b.seen_at_ms) {
3754 (Some(_), None) => std::cmp::Ordering::Less,
3755 (None, Some(_)) => std::cmp::Ordering::Greater,
3756 _ if a.priority != b.priority => a.priority.cmp(&b.priority),
3757 (Some(a_ts), Some(b_ts)) => b_ts.cmp(&a_ts),
3758 (None, None) => std::cmp::Ordering::Equal,
3759 });
3760 }
3761
3762 fn active_peer_matches_any_candidate(
3763 &self,
3764 peer_node_addr: &NodeAddr,
3765 candidates: &[PeerAddress],
3766 ) -> bool {
3767 candidates
3768 .iter()
3769 .any(|candidate| self.active_peer_matches_candidate(peer_node_addr, candidate))
3770 }
3771
3772 pub(in crate::node) fn active_peer_candidate_is_fresh_enough_to_skip(
3773 &self,
3774 peer_node_addr: &NodeAddr,
3775 candidates: &[PeerAddress],
3776 ) -> bool {
3777 if !self
3778 .peers
3779 .get(peer_node_addr)
3780 .is_some_and(|peer| peer.can_send())
3781 {
3782 return false;
3783 }
3784 if !self.active_peer_matches_any_candidate(peer_node_addr, candidates) {
3785 return false;
3786 }
3787 !self.active_peer_needs_same_path_refresh(peer_node_addr)
3788 }
3789
3790 pub(in crate::node) fn active_peer_should_keep_direct_retry(
3791 &self,
3792 peer_node_addr: &NodeAddr,
3793 peer_config: &PeerConfig,
3794 ) -> bool {
3795 let Some(peer) = self.peers.get(peer_node_addr) else {
3796 return false;
3797 };
3798
3799 let static_addresses = self.static_peer_addresses(peer_config);
3800 if !static_addresses.is_empty() {
3801 return !self
3802 .active_peer_candidate_is_fresh_enough_to_skip(peer_node_addr, &static_addresses);
3803 }
3804
3805 if peer_config.npub.is_empty() {
3806 return false;
3807 }
3808
3809 if !self.config.node.discovery.nostr.enabled
3810 || self.config.node.discovery.nostr.policy == NostrDiscoveryPolicy::Disabled
3811 {
3812 return false;
3813 }
3814
3815 let Some(transport_id) = peer.transport_id() else {
3816 return true;
3817 };
3818
3819 if self.bootstrap_transports.contains(&transport_id) {
3820 return self.active_peer_needs_same_path_refresh(peer_node_addr);
3821 }
3822
3823 let Some(transport) = self.transports.get(&transport_id) else {
3824 return true;
3825 };
3826
3827 if transport.transport_type().name != "udp" {
3828 return true;
3829 }
3830
3831 self.active_peer_needs_same_path_refresh(peer_node_addr)
3832 }
3833
3834 pub(in crate::node) fn clear_retry_unless_direct_refresh_needed(
3835 &mut self,
3836 peer_node_addr: &NodeAddr,
3837 ) {
3838 let keep_retry = self
3839 .retry_pending
3840 .get(peer_node_addr)
3841 .map(|state| state.peer_config.clone())
3842 .is_some_and(|peer_config| {
3843 self.active_peer_should_keep_direct_retry(peer_node_addr, &peer_config)
3844 });
3845
3846 if !keep_retry {
3847 self.retry_pending.remove(peer_node_addr);
3848 }
3849 }
3850
3851 fn active_peer_needs_same_path_refresh(&self, peer_node_addr: &NodeAddr) -> bool {
3852 let Some(peer) = self.peers.get(peer_node_addr) else {
3853 return false;
3854 };
3855 let stale_after_ms = self
3856 .config
3857 .node
3858 .heartbeat_interval_secs
3859 .saturating_mul(1000)
3860 .max(1000);
3861 peer.idle_time(Self::now_ms()) > stale_after_ms
3862 }
3863
3864 fn active_peer_current_udp_candidate(&self, peer_node_addr: &NodeAddr) -> Option<PeerAddress> {
3865 let peer = self.peers.get(peer_node_addr)?;
3866 let transport_id = peer.transport_id()?;
3867 let current_addr = peer.current_addr()?;
3868 let transport = self.transports.get(&transport_id)?;
3869 if transport.transport_type().name != "udp" {
3870 return None;
3871 }
3872 let socket_addr = current_addr.as_str()?.parse::<SocketAddr>().ok()?;
3873
3874 Some(
3875 PeerAddress::with_priority("udp", socket_addr.to_string(), 240)
3876 .with_seen_at_ms(Self::now_ms()),
3877 )
3878 }
3879
3880 pub(in crate::node) fn active_peer_matches_candidate(
3881 &self,
3882 peer_node_addr: &NodeAddr,
3883 candidate: &PeerAddress,
3884 ) -> bool {
3885 let Some(peer) = self.peers.get(peer_node_addr) else {
3886 return false;
3887 };
3888 let Some(current_addr) = peer.current_addr() else {
3889 return false;
3890 };
3891 if let Some(peer_transport_id) = peer.transport_id()
3892 && let Some((candidate_transport_id, candidate_addr)) =
3893 self.resolve_peer_address_for_match(candidate)
3894 {
3895 return peer_transport_id == candidate_transport_id && current_addr == &candidate_addr;
3896 }
3897 if peer
3898 .transport_id()
3899 .map(|id| self.bootstrap_transports.contains(&id))
3900 .unwrap_or(false)
3901 {
3902 return false;
3903 }
3904 let current_addr = current_addr.to_string();
3905 let current_transport = peer
3906 .transport_id()
3907 .and_then(|id| self.transports.get(&id))
3908 .map(|transport| transport.transport_type().name);
3909
3910 candidate.addr == current_addr
3911 && current_transport
3912 .map(|transport| transport == candidate.transport)
3913 .unwrap_or(true)
3914 }
3915
3916 pub(in crate::node) fn active_peer_uses_recent_endpoint_path(
3917 &self,
3918 peer_node_addr: &NodeAddr,
3919 peer_config: &PeerConfig,
3920 ) -> bool {
3921 peer_config.addresses.iter().any(|addr| {
3922 addr.seen_at_ms.is_some() && self.active_peer_matches_candidate(peer_node_addr, addr)
3923 })
3924 }
3925
3926 pub(in crate::node) fn active_peer_uses_traversal_path(
3927 &self,
3928 peer_node_addr: &NodeAddr,
3929 peer_config: &PeerConfig,
3930 ) -> bool {
3931 let via_bootstrap_transport = self
3932 .peers
3933 .get(peer_node_addr)
3934 .and_then(|peer| peer.transport_id())
3935 .map(|id| self.bootstrap_transports.contains(&id))
3936 .unwrap_or(false);
3937
3938 via_bootstrap_transport
3939 || self.active_peer_uses_recent_endpoint_path(peer_node_addr, peer_config)
3940 }
3941
3942 pub(crate) async fn api_connect(
3950 &mut self,
3951 npub: &str,
3952 address: &str,
3953 transport: &str,
3954 ) -> Result<serde_json::Value, String> {
3955 let peer_config = PeerConfig {
3956 npub: npub.to_string(),
3957 alias: None,
3958 addresses: vec![PeerAddress::new(transport, address)],
3959 connect_policy: ConnectPolicy::Manual,
3960 auto_reconnect: false,
3961 discovery_fallback_transit: true,
3962 };
3963
3964 if let Ok(identity) = PeerIdentity::from_npub(npub) {
3966 self.peer_aliases
3967 .insert(*identity.node_addr(), identity.short_npub());
3968 self.register_identity(*identity.node_addr(), identity.pubkey_full());
3969 }
3970
3971 self.initiate_peer_connection(&peer_config)
3972 .await
3973 .map(|()| {
3974 info!(
3975 npub = %npub,
3976 address = %address,
3977 transport = %transport,
3978 "API connect initiated"
3979 );
3980 serde_json::json!({
3981 "npub": npub,
3982 "address": address,
3983 "transport": transport,
3984 })
3985 })
3986 .map_err(|e| e.to_string())
3987 }
3988
3989 pub(crate) fn api_disconnect(&mut self, npub: &str) -> Result<serde_json::Value, String> {
3993 let peer_identity =
3994 PeerIdentity::from_npub(npub).map_err(|e| format!("invalid npub '{npub}': {e}"))?;
3995 let node_addr = *peer_identity.node_addr();
3996
3997 if !self.peers.contains_key(&node_addr) {
3998 return Err(format!("peer not found: {npub}"));
3999 }
4000
4001 self.remove_active_peer(&node_addr);
4003
4004 self.retry_pending.remove(&node_addr);
4006
4007 info!(npub = %npub, "API disconnect completed");
4008
4009 Ok(serde_json::json!({
4010 "npub": npub,
4011 "disconnected": true,
4012 }))
4013 }
4014
4015 pub async fn adopt_established_traversal(
4022 &mut self,
4023 traversal: EstablishedTraversal,
4024 ) -> Result<BootstrapHandoffResult, NodeError> {
4025 debug!(
4026 peer_npub = %traversal.peer_npub,
4027 session_id = %traversal.session_id,
4028 remote_addr = %traversal.remote_addr,
4029 "adopting established traversal socket"
4030 );
4031
4032 if !self.state.is_operational() {
4033 return Err(NodeError::NotStarted);
4034 }
4035
4036 let packet_tx = self.packet_tx.clone().ok_or(NodeError::NotStarted)?;
4037 let peer_identity = PeerIdentity::from_npub(&traversal.peer_npub).map_err(|e| {
4038 NodeError::InvalidPeerNpub {
4039 npub: traversal.peer_npub.clone(),
4040 reason: e.to_string(),
4041 }
4042 })?;
4043 let peer_node_addr = *peer_identity.node_addr();
4044 if self.peers.contains_key(&peer_node_addr) {
4045 debug!(
4046 peer_npub = %traversal.peer_npub,
4047 "Adopting NAT traversal handoff as alternate path for already-connected peer"
4048 );
4049 }
4050
4051 self.peer_aliases
4052 .insert(peer_node_addr, peer_identity.short_npub());
4053 self.register_identity(peer_node_addr, peer_identity.pubkey_full());
4054
4055 let transport_id = self.allocate_transport_id();
4056 let inherited_config = traversal.transport_config.clone().unwrap_or_else(|| {
4076 let mut cfg = self
4077 .lookup_udp_config(traversal.transport_name.as_deref())
4078 .or_else(|| self.lookup_udp_config(None))
4079 .cloned()
4080 .unwrap_or_default();
4081 cfg.bind_addr = None;
4082 cfg.external_addr = None;
4083 cfg
4084 });
4085 let mut transport = crate::transport::udp::UdpTransport::new(
4086 transport_id,
4087 traversal.transport_name.clone(),
4088 inherited_config,
4089 packet_tx,
4090 );
4091
4092 transport
4093 .adopt_socket_async(traversal.socket)
4094 .await
4095 .map_err(|e| NodeError::BootstrapHandoff(e.to_string()))?;
4096
4097 let local_addr = transport.local_addr().ok_or_else(|| {
4098 NodeError::BootstrapHandoff("adopted UDP transport has no local address".into())
4099 })?;
4100
4101 self.transports.insert(
4102 transport_id,
4103 crate::transport::TransportHandle::Udp(transport),
4104 );
4105 self.bootstrap_transports.insert(transport_id);
4106 self.bootstrap_transport_npubs
4107 .insert(transport_id, traversal.peer_npub.clone());
4108
4109 let remote_addr = TransportAddr::from_string(&traversal.remote_addr.to_string());
4110 if let Err(err) = self
4111 .initiate_connection(transport_id, remote_addr.clone(), peer_identity)
4112 .await
4113 {
4114 self.bootstrap_transports.remove(&transport_id);
4115 self.bootstrap_transport_npubs.remove(&transport_id);
4116 if let Some(mut handle) = self.transports.remove(&transport_id) {
4117 let _ = handle.stop().await;
4118 }
4119 return Err(err);
4120 }
4121
4122 info!(
4123 peer = %self.peer_display_name(&peer_node_addr),
4124 transport_id = %transport_id,
4125 local_addr = %local_addr,
4126 remote_addr = %traversal.remote_addr,
4127 session_id = %traversal.session_id,
4128 "adopted NAT traversal socket; handshake initiated"
4129 );
4130
4131 Ok(BootstrapHandoffResult {
4132 transport_id,
4133 local_addr,
4134 remote_addr: traversal.remote_addr,
4135 peer_node_addr,
4136 session_id: traversal.session_id,
4137 })
4138 }
4139}