1use super::{Node, NodeError, NodeState};
4use crate::config::{ConnectPolicy, NostrDiscoveryPolicy, PeerAddress, PeerConfig};
5use crate::discovery::nostr::{
6 ADVERT_IDENTIFIER, ADVERT_VERSION, BootstrapEvent, MeshTraversalSignal, NostrDiscovery,
7 OverlayAdvert, OverlayEndpointAdvert, OverlayTransportKind,
8};
9use crate::discovery::{BootstrapHandoffResult, EstablishedTraversal};
10use crate::node::acl::PeerAclContext;
11use crate::node::wire::build_msg1;
12use crate::peer::PeerConnection;
13use crate::protocol::{Disconnect, DisconnectReason, SessionMessageType};
14use crate::transport::{Link, LinkDirection, LinkId, TransportAddr, TransportId, packet_channel};
15use crate::upper::tun::{TunDevice, TunState, run_tun_reader, shutdown_tun_interface};
16use crate::{NodeAddr, PeerIdentity};
17use secp256k1::PublicKey;
18use std::collections::{HashMap, HashSet};
19use std::net::{IpAddr, SocketAddr};
20use std::thread;
21use std::time::Duration;
22use tracing::{debug, info, warn};
23
24#[derive(Debug, Clone, Copy, PartialEq, Eq)]
25enum MeshSignalSessionAction {
26 Send,
27 Defer,
28 Drop,
29}
30
31#[cfg(debug_assertions)]
32fn node_start_debug_log(message: impl AsRef<str>) {
33 use std::io::Write as _;
34
35 if let Ok(mut file) = std::fs::OpenOptions::new()
36 .create(true)
37 .append(true)
38 .open(std::env::temp_dir().join("nvpn-fips-endpoint-debug.log"))
39 {
40 let _ = writeln!(
41 file,
42 "{:?} {}",
43 std::time::SystemTime::now(),
44 message.as_ref()
45 );
46 }
47}
48
49#[cfg(not(debug_assertions))]
50fn node_start_debug_log(_message: impl AsRef<str>) {}
51
52fn is_unroutable_advert_ip(ip: IpAddr) -> bool {
60 match ip {
61 IpAddr::V4(v4) => {
62 v4.is_private()
63 || v4.is_loopback()
64 || v4.is_link_local()
65 || v4.is_unspecified()
66 || v4.is_multicast()
67 || v4.is_broadcast()
68 || v4.is_documentation()
69 || (v4.octets()[0] == 100 && (v4.octets()[1] & 0xc0) == 64)
72 }
73 IpAddr::V6(v6) => {
74 v6.is_loopback()
75 || v6.is_unspecified()
76 || v6.is_unique_local()
77 || v6.is_multicast()
78 || (v6.segments()[0] & 0xffc0) == 0xfe80
80 }
81 }
82}
83
84fn socket_addr_families_compatible(local: SocketAddr, remote: SocketAddr) -> bool {
85 matches!(
86 (local, remote),
87 (SocketAddr::V4(_), SocketAddr::V4(_)) | (SocketAddr::V6(_), SocketAddr::V6(_))
88 )
89}
90
91const OPEN_DISCOVERY_RETRY_LIFETIME_MULTIPLIER: u64 = 2;
92const MAX_PARALLEL_PATH_CANDIDATES_PER_PEER: usize = 4;
93const MAX_AUTO_CONNECT_GRAPH_WARMUPS_PER_TICK: usize = 16;
94const MAX_DISCOVERY_CONNECTS_PER_TICK: usize = 16;
95
96impl Node {
97 pub(super) async fn update_peers(
109 &mut self,
110 new_peers: Vec<crate::config::PeerConfig>,
111 ) -> Result<crate::node::UpdatePeersOutcome, crate::node::NodeError> {
112 use std::collections::{HashMap, HashSet};
113
114 let mut new_by_addr: HashMap<crate::identity::NodeAddr, crate::config::PeerConfig> =
115 HashMap::with_capacity(new_peers.len());
116 let mut new_order = Vec::with_capacity(new_peers.len());
117 for peer in new_peers {
118 let identity = match PeerIdentity::from_npub(&peer.npub) {
119 Ok(id) => id,
120 Err(e) => {
121 return Err(crate::node::NodeError::InvalidPeerNpub {
122 npub: peer.npub.clone(),
123 reason: e.to_string(),
124 });
125 }
126 };
127 let node_addr = *identity.node_addr();
131 if !new_by_addr.contains_key(&node_addr) {
132 new_order.push(node_addr);
133 }
134 new_by_addr.insert(node_addr, peer);
135 }
136
137 let current_by_addr: HashMap<crate::identity::NodeAddr, crate::config::PeerConfig> = self
138 .config
139 .peers()
140 .iter()
141 .filter_map(|pc| {
142 PeerIdentity::from_npub(&pc.npub)
143 .ok()
144 .map(|id| (*id.node_addr(), pc.clone()))
145 })
146 .collect();
147
148 let new_addrs: HashSet<_> = new_by_addr.keys().copied().collect();
149 let current_addrs: HashSet<_> = current_by_addr.keys().copied().collect();
150
151 let removed: Vec<_> = current_addrs.difference(&new_addrs).copied().collect();
152 let added: Vec<_> = new_addrs.difference(¤t_addrs).copied().collect();
153 let kept: Vec<_> = new_addrs.intersection(¤t_addrs).copied().collect();
154
155 let mut outcome = crate::node::UpdatePeersOutcome::default();
156
157 for node_addr in &removed {
158 if self.retry_pending.remove(node_addr).is_some() {
159 debug!(
160 peer = %self.peer_display_name(node_addr),
161 "Dropping retry entry for peer removed from runtime peer list"
162 );
163 }
164 self.peer_aliases.remove(node_addr);
165 self.set_discovery_fallback_transit_allowed(*node_addr, false);
166 outcome.removed += 1;
167 }
168
169 let mut auto_connect_refresh_configs = Vec::new();
170 for node_addr in &kept {
171 let new_pc = &new_by_addr[node_addr];
172 let current_pc = ¤t_by_addr[node_addr];
173 if new_pc.addresses != current_pc.addresses
174 || new_pc.alias != current_pc.alias
175 || new_pc.connect_policy != current_pc.connect_policy
176 || new_pc.auto_reconnect != current_pc.auto_reconnect
177 || new_pc.discovery_fallback_transit != current_pc.discovery_fallback_transit
178 {
179 outcome.updated += 1;
180 self.set_discovery_fallback_transit_allowed(
181 *node_addr,
182 new_pc.discovery_fallback_transit,
183 );
184 if let Some(state) = self.retry_pending.get_mut(node_addr) {
185 state.peer_config = new_pc.clone();
186 state.retry_after_ms = Self::now_ms();
187 }
188 if let Some(alias) = new_pc.alias.clone() {
189 self.peer_aliases.insert(*node_addr, alias);
190 }
191 if new_pc.is_auto_connect() && !new_pc.addresses.is_empty() {
192 auto_connect_refresh_configs.push(new_pc.clone());
193 }
194 } else {
195 outcome.unchanged += 1;
196 self.set_discovery_fallback_transit_allowed(
197 *node_addr,
198 new_pc.discovery_fallback_transit,
199 );
200 if new_pc.is_auto_connect() && !new_pc.addresses.is_empty() {
201 auto_connect_refresh_configs.push(new_pc.clone());
202 }
203 }
204 }
205
206 let added_configs: Vec<crate::config::PeerConfig> = new_order
207 .iter()
208 .filter(|addr| added.contains(addr))
209 .map(|addr| new_by_addr[addr].clone())
210 .collect();
211
212 self.config.peers = new_order
216 .iter()
217 .filter_map(|addr| new_by_addr.get(addr).cloned())
218 .collect();
219 self.configured_peer_send_weights = Self::configured_peer_send_weights(&self.config);
220
221 for peer_config in added_configs {
222 outcome.added += 1;
223 let Ok(identity) = PeerIdentity::from_npub(&peer_config.npub) else {
224 continue;
225 };
226 let name = peer_config
227 .alias
228 .clone()
229 .unwrap_or_else(|| identity.short_npub());
230 self.peer_aliases.insert(*identity.node_addr(), name);
231 self.set_discovery_fallback_transit_allowed(
232 *identity.node_addr(),
233 peer_config.discovery_fallback_transit,
234 );
235 self.register_identity(*identity.node_addr(), identity.pubkey_full());
236
237 if !peer_config.is_auto_connect() {
238 continue;
239 }
240
241 match self
242 .try_auto_connect_graph_session(&peer_config, identity)
243 .await
244 {
245 Ok(true) => continue,
246 Ok(false) => {}
247 Err(err) => {
248 debug!(
249 npub = %peer_config.npub,
250 error = %err,
251 "Existing FIPS graph did not warm newly added peer"
252 );
253 }
254 }
255
256 if let Err(e) = self.initiate_peer_connection(&peer_config).await {
257 warn!(
258 npub = %peer_config.npub,
259 error = %e,
260 "Failed to initiate connection for newly added peer"
261 );
262 if let Ok(peer_identity) = PeerIdentity::from_npub(&peer_config.npub) {
263 self.schedule_retry_after_error(*peer_identity.node_addr(), Self::now_ms(), &e);
264 }
265 if matches!(e, crate::node::NodeError::NoTransportForType(_))
266 && let Some(bootstrap) = self.nostr_discovery.clone()
267 {
268 bootstrap
269 .request_advert_stale_check(peer_config.npub.clone())
270 .await;
271 }
272 }
273 }
274
275 for peer_config in auto_connect_refresh_configs {
276 let Ok(peer_identity) = PeerIdentity::from_npub(&peer_config.npub) else {
277 continue;
278 };
279 let node_addr = *peer_identity.node_addr();
280
281 if self.peers.contains_key(&node_addr) {
282 match self
283 .initiate_active_peer_alternative_connection(&peer_config)
284 .await
285 {
286 Ok(attempted) => {
287 if attempted {
288 debug!(
289 peer = %self.peer_display_name(&node_addr),
290 "Started non-disruptive alternate-path handshake for active peer"
291 );
292 }
293 }
294 Err(e) => {
295 debug!(
296 npub = %peer_config.npub,
297 error = %e,
298 "Active peer alternate-path refresh did not start"
299 );
300 }
301 }
302 continue;
303 }
304
305 match self
306 .try_auto_connect_graph_session(&peer_config, peer_identity)
307 .await
308 {
309 Ok(true) => continue,
310 Ok(false) => {}
311 Err(err) => {
312 debug!(
313 npub = %peer_config.npub,
314 error = %err,
315 "Existing FIPS graph did not warm refreshed peer"
316 );
317 }
318 }
319
320 match self.initiate_peer_connection(&peer_config).await {
321 Ok(()) => {
322 let hs_timeout_ms = self.config.node.rate_limit.handshake_timeout_secs * 1000;
323 if let Some(state) = self.retry_pending.get_mut(&node_addr) {
324 state.peer_config = peer_config;
325 state.retry_after_ms = Self::now_ms().saturating_add(hs_timeout_ms);
326 }
327 }
328 Err(e) => {
329 debug!(
330 npub = %peer_config.npub,
331 error = %e,
332 "Refreshed peer addresses did not initiate a direct connection"
333 );
334 self.schedule_retry_after_error(node_addr, Self::now_ms(), &e);
335 }
336 }
337 }
338
339 self.warm_auto_connect_graph_sessions().await;
340
341 Ok(outcome)
342 }
343
344 pub(super) async fn initiate_peer_connections(&mut self) {
345 let peer_identities: Vec<(PeerIdentity, Option<String>)> = self
351 .config
352 .peers()
353 .iter()
354 .filter_map(|pc| {
355 PeerIdentity::from_npub(&pc.npub)
356 .ok()
357 .map(|id| (id, pc.alias.clone()))
358 })
359 .collect();
360
361 for (identity, alias) in peer_identities {
362 let name = alias.unwrap_or_else(|| identity.short_npub());
363 self.peer_aliases.insert(*identity.node_addr(), name);
364 self.register_identity(*identity.node_addr(), identity.pubkey_full());
368 }
369
370 let peer_configs: Vec<_> = self.config.auto_connect_peers().cloned().collect();
372
373 if peer_configs.is_empty() {
374 debug!("No static peers configured");
375 return;
376 }
377
378 debug!(
379 count = peer_configs.len(),
380 "Initiating static peer connections"
381 );
382
383 for peer_config in peer_configs {
384 let peer_identity = match PeerIdentity::from_npub(&peer_config.npub) {
385 Ok(identity) => identity,
386 Err(_) => continue,
387 };
388 match self
389 .try_auto_connect_graph_session(&peer_config, peer_identity)
390 .await
391 {
392 Ok(true) => continue,
393 Ok(false) => {}
394 Err(err) => {
395 debug!(
396 npub = %peer_config.npub,
397 error = %err,
398 "Existing FIPS graph did not warm auto-connect peer"
399 );
400 }
401 }
402 if let Err(e) = self.initiate_peer_connection(&peer_config).await {
403 warn!(
404 npub = %peer_config.npub,
405 alias = ?peer_config.alias,
406 error = %e,
407 "Failed to initiate peer connection"
408 );
409 if let Ok(peer_identity) = PeerIdentity::from_npub(&peer_config.npub) {
413 self.schedule_retry_after_error(*peer_identity.node_addr(), Self::now_ms(), &e);
414 }
415 if matches!(e, crate::node::NodeError::NoTransportForType(_))
421 && let Some(bootstrap) = self.nostr_discovery.clone()
422 {
423 bootstrap
424 .request_advert_stale_check(peer_config.npub.clone())
425 .await;
426 }
427 }
428 }
429
430 self.warm_auto_connect_graph_sessions().await;
431 }
432
433 pub(in crate::node) async fn try_auto_connect_graph_session(
434 &mut self,
435 peer_config: &PeerConfig,
436 peer_identity: PeerIdentity,
437 ) -> Result<bool, NodeError> {
438 if !peer_config.is_auto_connect() {
439 return Ok(false);
440 }
441
442 let peer_node_addr = *peer_identity.node_addr();
443 if self.peers.contains_key(&peer_node_addr) {
444 return Ok(false);
445 }
446 if self.auto_connect_should_race_direct_path(peer_config) {
447 return Ok(false);
448 }
449 if self
450 .sessions
451 .get(&peer_node_addr)
452 .is_some_and(|entry| entry.is_established() || entry.is_initiating())
453 {
454 return Ok(true);
455 }
456 if self.find_next_hop(&peer_node_addr).is_none() {
457 return Ok(false);
458 }
459
460 self.register_identity(peer_node_addr, peer_identity.pubkey_full());
461 match self
462 .initiate_session(peer_node_addr, peer_identity.pubkey_full())
463 .await
464 {
465 Ok(()) => {
466 debug!(
467 peer = %self.peer_display_name(&peer_node_addr),
468 "Warmed auto-connect peer session over existing FIPS graph"
469 );
470 Ok(true)
471 }
472 Err(NodeError::SendFailed { node_addr, reason })
473 if node_addr == peer_node_addr && reason == "no route to destination" =>
474 {
475 self.maybe_initiate_lookup(&peer_node_addr).await;
476 Ok(false)
477 }
478 Err(err) => Err(err),
479 }
480 }
481
482 fn auto_connect_should_race_direct_path(&self, peer_config: &PeerConfig) -> bool {
483 !peer_config.addresses.is_empty() || self.config.node.discovery.nostr.enabled
484 }
485
486 pub(super) async fn initiate_peer_connection(
490 &mut self,
491 peer_config: &crate::config::PeerConfig,
492 ) -> Result<(), NodeError> {
493 self.initiate_peer_connection_inner(peer_config).await
494 }
495
496 pub(super) async fn initiate_peer_retry_connection(
502 &mut self,
503 peer_config: &crate::config::PeerConfig,
504 ) -> Result<(), NodeError> {
505 self.initiate_peer_connection_inner(peer_config).await
506 }
507
508 pub(in crate::node) async fn initiate_active_peer_alternative_connection(
509 &mut self,
510 peer_config: &crate::config::PeerConfig,
511 ) -> Result<bool, NodeError> {
512 let peer_identity =
513 PeerIdentity::from_npub(&peer_config.npub).map_err(|e| NodeError::InvalidPeerNpub {
514 npub: peer_config.npub.clone(),
515 reason: e.to_string(),
516 })?;
517 let peer_node_addr = *peer_identity.node_addr();
518
519 if !self.peers.contains_key(&peer_node_addr) {
520 self.initiate_peer_connection(peer_config).await?;
521 return Ok(true);
522 }
523
524 self.try_active_peer_alternative_addresses(peer_config, peer_identity)
530 .await
531 }
532
533 async fn initiate_peer_connection_inner(
534 &mut self,
535 peer_config: &crate::config::PeerConfig,
536 ) -> Result<(), NodeError> {
537 let peer_identity =
539 PeerIdentity::from_npub(&peer_config.npub).map_err(|e| NodeError::InvalidPeerNpub {
540 npub: peer_config.npub.clone(),
541 reason: e.to_string(),
542 })?;
543
544 let peer_node_addr = *peer_identity.node_addr();
545
546 if self.peers.contains_key(&peer_node_addr) {
548 debug!(
549 npub = %peer_config.npub,
550 "Peer already exists, skipping"
551 );
552 return Ok(());
553 }
554
555 self.try_peer_addresses(peer_config, peer_identity, true)
556 .await
557 }
558
559 fn is_connecting_to_peer(&self, peer_node_addr: &NodeAddr) -> bool {
560 self.connections.values().any(|conn| {
561 conn.expected_identity()
562 .map(|id| id.node_addr() == peer_node_addr)
563 .unwrap_or(false)
564 })
565 }
566
567 fn is_connecting_to_peer_on_path(
568 &self,
569 peer_node_addr: &NodeAddr,
570 transport_id: TransportId,
571 remote_addr: &TransportAddr,
572 ) -> bool {
573 self.connections.values().any(|conn| {
574 conn.expected_identity()
575 .map(|id| id.node_addr() == peer_node_addr)
576 .unwrap_or(false)
577 && conn.transport_id() == Some(transport_id)
578 && conn.source_addr() == Some(remote_addr)
579 }) || self.pending_connects.iter().any(|pending| {
580 pending.peer_identity.node_addr() == peer_node_addr
581 && pending.transport_id == transport_id
582 && &pending.remote_addr == remote_addr
583 })
584 }
585
586 pub(in crate::node) fn should_warm_auto_connect_session(
587 &self,
588 peer_node_addr: &NodeAddr,
589 ) -> bool {
590 if self.peers.contains_key(peer_node_addr)
591 || self
592 .sessions
593 .get(peer_node_addr)
594 .is_some_and(|entry| entry.is_established())
595 {
596 return false;
597 }
598
599 self.config.peers().iter().any(|peer| {
600 peer.is_auto_connect()
601 && PeerIdentity::from_npub(&peer.npub)
602 .map(|identity| identity.node_addr() == peer_node_addr)
603 .unwrap_or(false)
604 })
605 }
606
607 pub(in crate::node) async fn warm_auto_connect_graph_sessions(&mut self) -> usize {
608 if !self.peers.values().any(|peer| peer.can_send()) {
609 return 0;
610 }
611
612 let mut budget = self.graph_session_warmup_budget();
613 if budget == 0 {
614 return 0;
615 }
616
617 let peer_identities: Vec<_> = self
618 .config
619 .auto_connect_peers()
620 .filter_map(|peer| PeerIdentity::from_npub(&peer.npub).ok())
621 .collect();
622
623 let mut warmed = 0;
624 for identity in peer_identities {
625 if budget == 0 {
626 break;
627 }
628
629 let peer_node_addr = *identity.node_addr();
630 if peer_node_addr == *self.identity.node_addr()
631 || !self.should_warm_auto_connect_session(&peer_node_addr)
632 || self
633 .sessions
634 .get(&peer_node_addr)
635 .is_some_and(|entry| entry.is_initiating())
636 {
637 continue;
638 }
639
640 self.register_identity(peer_node_addr, identity.pubkey_full());
641
642 if self.find_next_hop(&peer_node_addr).is_some() {
643 match self
644 .initiate_session(peer_node_addr, identity.pubkey_full())
645 .await
646 {
647 Ok(()) => {
648 warmed += 1;
649 budget = budget.saturating_sub(1);
650 debug!(
651 peer = %self.peer_display_name(&peer_node_addr),
652 "Warmed auto-connect peer session over existing FIPS graph"
653 );
654 }
655 Err(NodeError::SendFailed { node_addr, reason })
656 if node_addr == peer_node_addr && reason == "no route to destination" =>
657 {
658 self.maybe_initiate_lookup(&peer_node_addr).await;
659 warmed += 1;
660 budget = budget.saturating_sub(1);
661 }
662 Err(err) => {
663 debug!(
664 peer = %self.peer_display_name(&peer_node_addr),
665 error = %err,
666 "Failed to warm auto-connect peer session"
667 );
668 }
669 }
670 } else {
671 self.maybe_initiate_lookup(&peer_node_addr).await;
672 warmed += 1;
673 budget = budget.saturating_sub(1);
674 }
675 }
676
677 warmed
678 }
679
680 pub(in crate::node) fn graph_session_warmup_budget(&self) -> usize {
681 let max_destinations = self.config.node.session.pending_max_destinations;
682 if max_destinations == 0 {
683 return 0;
684 }
685
686 let pending_sessions = self
687 .sessions
688 .values()
689 .filter(|entry| !entry.is_established())
690 .count();
691 let pending_total = pending_sessions.saturating_add(self.pending_lookups.len());
692 max_destinations
693 .saturating_sub(pending_total)
694 .min(MAX_AUTO_CONNECT_GRAPH_WARMUPS_PER_TICK)
695 }
696
697 fn outbound_handshake_slots(&self) -> usize {
698 let used = self
699 .connections
700 .len()
701 .saturating_add(self.pending_connects.len());
702 if self.max_connections == 0 {
703 usize::MAX
704 } else {
705 self.max_connections.saturating_sub(used)
706 }
707 }
708
709 fn outbound_link_slots(&self) -> usize {
710 if self.max_links == 0 {
711 usize::MAX
712 } else {
713 self.max_links.saturating_sub(self.links.len())
714 }
715 }
716
717 fn path_candidate_attempt_budget(&self, peer_node_addr: &NodeAddr) -> usize {
718 if !self.peers.contains_key(peer_node_addr)
719 && self.max_peers > 0
720 && self.peers.len() >= self.max_peers
721 {
722 return 0;
723 }
724
725 let in_flight_for_peer = self
726 .connections
727 .values()
728 .filter(|conn| {
729 conn.expected_identity()
730 .map(|id| id.node_addr() == peer_node_addr)
731 .unwrap_or(false)
732 })
733 .count()
734 .saturating_add(
735 self.pending_connects
736 .iter()
737 .filter(|pending| pending.peer_identity.node_addr() == peer_node_addr)
738 .count(),
739 );
740
741 self.outbound_handshake_slots()
742 .min(self.outbound_link_slots())
743 .min(MAX_PARALLEL_PATH_CANDIDATES_PER_PEER.saturating_sub(in_flight_for_peer))
744 }
745
746 fn discovery_connect_budget(&self) -> usize {
747 self.outbound_handshake_slots()
748 .min(self.outbound_link_slots())
749 .min(MAX_DISCOVERY_CONNECTS_PER_TICK)
750 }
751
752 fn find_udp_transport_for_remote_addr(
759 &self,
760 remote_addr: SocketAddr,
761 ) -> Option<(TransportId, SocketAddr)> {
762 self.transports
763 .iter()
764 .filter(|(id, handle)| {
765 handle.transport_type().name == "udp"
766 && handle.is_operational()
767 && !self.bootstrap_transports.contains(id)
768 })
769 .filter_map(|(id, handle)| {
770 let local_addr = handle.local_addr()?;
771 socket_addr_families_compatible(local_addr, remote_addr)
772 .then_some((*id, local_addr))
773 })
774 .min_by_key(|(id, _)| id.as_u32())
775 }
776
777 pub(super) fn transport_discovery_candidate(
778 &self,
779 discovered_transport_id: TransportId,
780 discovered_addr: TransportAddr,
781 ) -> Option<(TransportId, TransportAddr, &'static str)> {
782 let transport = self.transports.get(&discovered_transport_id)?;
783 let transport_name = transport.transport_type().name;
784
785 if transport_name != "udp" {
786 return Some((discovered_transport_id, discovered_addr, transport_name));
787 }
788
789 let Some(remote_socket_addr) = discovered_addr
790 .as_str()
791 .and_then(|addr| addr.parse::<SocketAddr>().ok())
792 else {
793 if self.bootstrap_transports.contains(&discovered_transport_id) {
794 debug!(
795 transport_id = %discovered_transport_id,
796 remote_addr = %discovered_addr,
797 "transport discovery: skip non-numeric UDP address from bootstrap transport"
798 );
799 return None;
800 }
801 return Some((discovered_transport_id, discovered_addr, transport_name));
802 };
803
804 let Some((transport_id, local_addr)) =
805 self.find_udp_transport_for_remote_addr(remote_socket_addr)
806 else {
807 debug!(
808 transport_id = %discovered_transport_id,
809 remote_addr = %discovered_addr,
810 "transport discovery: skip UDP peer with no compatible local socket"
811 );
812 return None;
813 };
814
815 if transport_id != discovered_transport_id {
816 debug!(
817 discovered_transport_id = %discovered_transport_id,
818 selected_transport_id = %transport_id,
819 local_addr = %local_addr,
820 remote_addr = %remote_socket_addr,
821 "transport discovery: selected compatible UDP transport"
822 );
823 }
824
825 Some((
826 transport_id,
827 TransportAddr::from_socket_addr(remote_socket_addr),
828 transport_name,
829 ))
830 }
831
832 fn peer_address_string_for_transport_candidate(
833 &self,
834 transport_id: TransportId,
835 transport_name: &str,
836 remote_addr: &TransportAddr,
837 ) -> String {
838 #[cfg(not(any(target_os = "linux", target_os = "macos")))]
839 let _ = (transport_id, transport_name);
840
841 #[cfg(any(target_os = "linux", target_os = "macos"))]
842 if transport_name == "ethernet"
843 && remote_addr.as_bytes().len() == 6
844 && let Some(interface) = self
845 .transports
846 .get(&transport_id)
847 .and_then(|transport| transport.interface_name())
848 {
849 let mut mac = [0u8; 6];
850 mac.copy_from_slice(remote_addr.as_bytes());
851 return format!(
852 "{interface}/{}",
853 crate::transport::ethernet::format_mac(&mac)
854 );
855 }
856
857 remote_addr.to_string()
858 }
859
860 fn resolve_peer_address_for_match(
861 &self,
862 candidate: &PeerAddress,
863 ) -> Option<(TransportId, TransportAddr)> {
864 if candidate.transport == "udp" && candidate.addr.eq_ignore_ascii_case("nat") {
865 return None;
866 }
867
868 if candidate.transport == "ethernet" {
869 return self.resolve_ethernet_addr(&candidate.addr).ok();
870 }
871
872 if candidate.transport == "ble" {
873 #[cfg(bluer_available)]
874 {
875 return self.resolve_ble_addr(&candidate.addr).ok();
876 }
877 #[cfg(not(bluer_available))]
878 {
879 return None;
880 }
881 }
882
883 let transport_id = if candidate.transport == "udp"
884 && let Ok(remote_socket_addr) = candidate.addr.parse::<SocketAddr>()
885 {
886 self.find_udp_transport_for_remote_addr(remote_socket_addr)
887 .map(|(id, _)| id)?
888 } else {
889 self.find_transport_for_type(&candidate.transport)?
890 };
891
892 Some((transport_id, TransportAddr::from_string(&candidate.addr)))
893 }
894
895 pub(super) async fn initiate_connection(
906 &mut self,
907 transport_id: TransportId,
908 remote_addr: TransportAddr,
909 peer_identity: PeerIdentity,
910 ) -> Result<(), NodeError> {
911 let peer_node_addr = *peer_identity.node_addr();
912
913 if self.is_connecting_to_peer_on_path(&peer_node_addr, transport_id, &remote_addr) {
914 debug!(
915 peer = %self.peer_display_name(&peer_node_addr),
916 transport_id = %transport_id,
917 remote_addr = %remote_addr,
918 "Connection already in progress for candidate path"
919 );
920 return Ok(());
921 }
922
923 if self.outbound_handshake_slots() == 0 {
924 return Err(NodeError::MaxConnectionsExceeded {
925 max: self.max_connections,
926 });
927 }
928
929 if self.outbound_link_slots() == 0 {
930 return Err(NodeError::MaxLinksExceeded {
931 max: self.max_links,
932 });
933 }
934
935 if !self.peers.contains_key(&peer_node_addr)
936 && self.max_peers > 0
937 && self.peers.len() >= self.max_peers
938 {
939 return Err(NodeError::MaxPeersExceeded {
940 max: self.max_peers,
941 });
942 }
943
944 self.authorize_peer(
945 &peer_identity,
946 PeerAclContext::OutboundConnect,
947 transport_id,
948 &remote_addr,
949 )?;
950
951 let is_connection_oriented = self
952 .transports
953 .get(&transport_id)
954 .map(|t| t.transport_type().connection_oriented)
955 .unwrap_or(false);
956
957 let link_id = self.allocate_link_id();
959
960 let link = if is_connection_oriented {
961 Link::new(
962 link_id,
963 transport_id,
964 remote_addr.clone(),
965 LinkDirection::Outbound,
966 Duration::from_millis(self.config.node.base_rtt_ms),
967 )
968 } else {
969 Link::connectionless(
970 link_id,
971 transport_id,
972 remote_addr.clone(),
973 LinkDirection::Outbound,
974 Duration::from_millis(self.config.node.base_rtt_ms),
975 )
976 };
977
978 self.links.insert(link_id, link);
979
980 self.addr_to_link
982 .insert((transport_id, remote_addr.clone()), link_id);
983
984 if is_connection_oriented {
985 if let Some(transport) = self.transports.get(&transport_id) {
987 match transport.connect(&remote_addr).await {
988 Ok(()) => {
989 debug!(
990 peer = %self.peer_display_name(&peer_node_addr),
991 transport_id = %transport_id,
992 remote_addr = %remote_addr,
993 link_id = %link_id,
994 "Transport connect initiated (non-blocking)"
995 );
996 self.pending_connects.push(super::PendingConnect {
997 link_id,
998 transport_id,
999 remote_addr,
1000 peer_identity,
1001 });
1002 }
1003 Err(e) => {
1004 self.links.remove(&link_id);
1006 self.addr_to_link.remove(&(transport_id, remote_addr));
1007 return Err(NodeError::from_transport_error(e));
1008 }
1009 }
1010 }
1011 Ok(())
1012 } else {
1013 self.start_handshake(link_id, transport_id, remote_addr, peer_identity)
1015 .await
1016 }
1017 }
1018
1019 pub(super) async fn start_handshake(
1024 &mut self,
1025 link_id: LinkId,
1026 transport_id: TransportId,
1027 remote_addr: TransportAddr,
1028 peer_identity: PeerIdentity,
1029 ) -> Result<(), NodeError> {
1030 let peer_node_addr = *peer_identity.node_addr();
1031
1032 let current_time_ms = Self::now_ms();
1034 let mut connection = PeerConnection::outbound(link_id, peer_identity, current_time_ms);
1035
1036 let our_index = match self.index_allocator.allocate() {
1038 Ok(idx) => idx,
1039 Err(e) => {
1040 self.links.remove(&link_id);
1042 self.addr_to_link.remove(&(transport_id, remote_addr));
1043 return Err(NodeError::IndexAllocationFailed(e.to_string()));
1044 }
1045 };
1046
1047 let our_keypair = self.identity.keypair();
1049 let noise_msg1 =
1050 match connection.start_handshake(our_keypair, self.startup_epoch, current_time_ms) {
1051 Ok(msg) => msg,
1052 Err(e) => {
1053 let _ = self.index_allocator.free(our_index);
1055 self.links.remove(&link_id);
1056 self.addr_to_link.remove(&(transport_id, remote_addr));
1057 return Err(NodeError::HandshakeFailed(e.to_string()));
1058 }
1059 };
1060
1061 connection.set_our_index(our_index);
1063 connection.set_transport_id(transport_id);
1064 connection.set_source_addr(remote_addr.clone());
1065
1066 let wire_msg1 = build_msg1(our_index, &noise_msg1);
1068
1069 debug!(
1070 peer = %self.peer_display_name(&peer_node_addr),
1071 transport_id = %transport_id,
1072 remote_addr = %remote_addr,
1073 link_id = %link_id,
1074 our_index = %our_index,
1075 "Connection initiated"
1076 );
1077
1078 let resend_interval = self.config.node.rate_limit.handshake_resend_interval_ms;
1080 connection.set_handshake_msg1(wire_msg1.clone(), current_time_ms + resend_interval);
1081
1082 self.pending_outbound
1084 .insert((transport_id, our_index.as_u32()), link_id);
1085 self.connections.insert(link_id, connection);
1086
1087 let send_result = match self.transports.get(&transport_id) {
1092 Some(transport) => Some(transport.send(&remote_addr, &wire_msg1).await),
1093 None => None,
1094 };
1095 match send_result {
1096 Some(send_result) => {
1097 self.note_local_send_outcome(&send_result);
1098 match send_result {
1099 Ok(bytes) => {
1100 debug!(
1101 link_id = %link_id,
1102 our_index = %our_index,
1103 bytes,
1104 "Sent Noise handshake message 1 (wire format)"
1105 );
1106 }
1107 Err(e) => {
1108 warn!(
1109 link_id = %link_id,
1110 transport_id = %transport_id,
1111 remote_addr = %remote_addr,
1112 our_index = %our_index,
1113 error = %e,
1114 "Failed to send handshake message"
1115 );
1116 self.pending_outbound
1117 .remove(&(transport_id, our_index.as_u32()));
1118 self.connections.remove(&link_id);
1119 self.links.remove(&link_id);
1120 self.addr_to_link
1121 .remove(&(transport_id, remote_addr.clone()));
1122 let _ = self.index_allocator.free(our_index);
1123 return Err(NodeError::from_transport_error(e));
1124 }
1125 }
1126 }
1127 None => {
1128 self.pending_outbound
1129 .remove(&(transport_id, our_index.as_u32()));
1130 self.connections.remove(&link_id);
1131 self.links.remove(&link_id);
1132 self.addr_to_link
1133 .remove(&(transport_id, remote_addr.clone()));
1134 let _ = self.index_allocator.free(our_index);
1135 return Err(NodeError::TransportError(format!(
1136 "transport {transport_id} disappeared before first handshake send"
1137 )));
1138 }
1139 }
1140
1141 Ok(())
1142 }
1143
1144 pub(super) async fn poll_transport_discovery(&mut self) {
1150 let mut to_connect = Vec::new();
1152 let mut queued_per_peer: HashMap<NodeAddr, usize> = HashMap::new();
1153 let mut connect_budget = self.discovery_connect_budget();
1154 let mut skipped_budget = 0usize;
1155
1156 for transport in self.transports.values() {
1157 if !transport.is_operational() {
1158 continue;
1159 }
1160 if !transport.auto_connect() {
1161 let _ = transport.discover();
1163 continue;
1164 }
1165 let discovered = match transport.discover() {
1166 Ok(peers) => peers,
1167 Err(_) => continue,
1168 };
1169 for peer in discovered {
1170 let discovered_transport_id = peer.transport_id;
1171 let pubkey = match peer.pubkey_hint {
1172 Some(pk) => pk,
1173 None => continue,
1174 };
1175 let identity = PeerIdentity::from_pubkey(pubkey);
1176 let node_addr = *identity.node_addr();
1177
1178 if node_addr == *self.identity.node_addr() {
1180 continue;
1181 }
1182
1183 let Some((candidate_transport_id, remote_addr, transport_name)) =
1184 self.transport_discovery_candidate(discovered_transport_id, peer.addr)
1185 else {
1186 continue;
1187 };
1188
1189 if self.peers.contains_key(&node_addr) {
1190 let candidate = PeerAddress::new(
1191 transport_name,
1192 self.peer_address_string_for_transport_candidate(
1193 candidate_transport_id,
1194 transport_name,
1195 &remote_addr,
1196 ),
1197 );
1198 if self.active_peer_candidate_is_fresh_enough_to_skip(
1199 &node_addr,
1200 std::slice::from_ref(&candidate),
1201 ) {
1202 continue;
1203 }
1204 if self.is_connecting_to_peer_on_path(
1205 &node_addr,
1206 candidate_transport_id,
1207 &remote_addr,
1208 ) {
1209 continue;
1210 }
1211 let queued_for_peer = queued_per_peer.get(&node_addr).copied().unwrap_or(0);
1212 if connect_budget == 0
1213 || self
1214 .path_candidate_attempt_budget(&node_addr)
1215 .saturating_sub(queued_for_peer)
1216 == 0
1217 {
1218 skipped_budget = skipped_budget.saturating_add(1);
1219 continue;
1220 }
1221 to_connect.push((candidate_transport_id, remote_addr, identity, true));
1222 *queued_per_peer.entry(node_addr).or_default() += 1;
1223 connect_budget = connect_budget.saturating_sub(1);
1224 continue;
1225 }
1226
1227 if self.is_connecting_to_peer_on_path(
1228 &node_addr,
1229 candidate_transport_id,
1230 &remote_addr,
1231 ) {
1232 continue;
1233 }
1234
1235 let queued_for_peer = queued_per_peer.get(&node_addr).copied().unwrap_or(0);
1236 if connect_budget == 0
1237 || self
1238 .path_candidate_attempt_budget(&node_addr)
1239 .saturating_sub(queued_for_peer)
1240 == 0
1241 {
1242 skipped_budget = skipped_budget.saturating_add(1);
1243 continue;
1244 }
1245 to_connect.push((candidate_transport_id, remote_addr, identity, false));
1246 *queued_per_peer.entry(node_addr).or_default() += 1;
1247 connect_budget = connect_budget.saturating_sub(1);
1248 }
1249 }
1250
1251 if skipped_budget > 0 {
1252 debug!(
1253 skipped = skipped_budget,
1254 queued = to_connect.len(),
1255 "Transport discovery connect budget exhausted"
1256 );
1257 }
1258
1259 for (transport_id, remote_addr, identity, active_refresh) in to_connect {
1260 info!(
1261 peer = %self.peer_display_name(identity.node_addr()),
1262 transport_id = %transport_id,
1263 remote_addr = %remote_addr,
1264 active_refresh,
1265 "Auto-connecting to discovered peer"
1266 );
1267 if let Err(e) = self
1268 .initiate_connection(transport_id, remote_addr, identity)
1269 .await
1270 {
1271 warn!(error = %e, "Failed to auto-connect to discovered peer");
1272 }
1273 }
1274 }
1275
1276 pub(super) async fn poll_nostr_discovery(&mut self) {
1277 let Some(bootstrap) = self.nostr_discovery.clone() else {
1278 return;
1279 };
1280
1281 bootstrap.set_outbound_admission(self.open_discovery_outbound_admission_check());
1282 bootstrap.set_direct_refresh_admission(self.outbound_direct_refresh_admission_check());
1283
1284 if let Err(err) = self.refresh_overlay_advert(&bootstrap).await {
1285 debug!(error = %err, "Failed to refresh local Nostr overlay advert");
1286 }
1287
1288 self.drain_nostr_mesh_signals(&bootstrap).await;
1289
1290 for event in bootstrap.drain_events().await {
1291 match event {
1292 BootstrapEvent::Established { traversal } => {
1293 let active_refresh = PeerIdentity::from_npub(&traversal.peer_npub)
1294 .ok()
1295 .is_some_and(|identity| self.peers.contains_key(identity.node_addr()));
1296 let admission_allowed = if active_refresh {
1297 self.outbound_direct_refresh_admission_check()
1298 } else {
1299 self.outbound_admission_check()
1300 };
1301 if !admission_allowed {
1302 debug!(
1303 peer_npub = %traversal.peer_npub,
1304 peers = self.peers.len(),
1305 max_peers = self.max_peers,
1306 active_refresh,
1307 "Dropping established NAT traversal: at capacity"
1308 );
1309 continue;
1310 }
1311 let peer_npub = traversal.peer_npub.clone();
1312 match self.adopt_established_traversal(traversal).await {
1313 Ok(_) => {
1314 info!(peer_npub = %peer_npub, "Adopted NAT traversal socket");
1315 }
1316 Err(err) => {
1317 warn!(peer_npub = %peer_npub, error = %err, "Failed to adopt NAT traversal");
1318 if let Ok(peer_identity) = PeerIdentity::from_npub(&peer_npub) {
1319 self.schedule_retry(*peer_identity.node_addr(), Self::now_ms());
1320 }
1321 }
1322 }
1323 }
1324 BootstrapEvent::Failed {
1325 peer_config,
1326 reason,
1327 } => {
1328 let peer_identity = match PeerIdentity::from_npub(&peer_config.npub) {
1329 Ok(identity) => identity,
1330 Err(_) => continue,
1331 };
1332 let node_addr = *peer_identity.node_addr();
1333 let now_ms = Self::now_ms();
1334 if self.peers.contains_key(&node_addr) {
1335 if self.active_peer_should_keep_direct_retry(&node_addr, &peer_config) {
1336 let decision =
1337 bootstrap.record_traversal_failure(&peer_config.npub, now_ms);
1338 if decision.should_warn {
1339 warn!(
1340 npub = %peer_config.npub,
1341 error = %reason,
1342 consecutive_failures = decision.consecutive_failures,
1343 cooldown_secs = decision
1344 .cooldown_until_ms
1345 .map(|t| t.saturating_sub(now_ms) / 1000),
1346 "Direct-path NAT traversal upgrade failed"
1347 );
1348 } else {
1349 debug!(
1350 npub = %peer_config.npub,
1351 error = %reason,
1352 consecutive_failures = decision.consecutive_failures,
1353 "Direct-path NAT traversal upgrade failed (suppressed by warn-rate-limit)"
1354 );
1355 }
1356 if decision.crossed_threshold {
1357 bootstrap
1358 .request_advert_stale_check(peer_config.npub.clone())
1359 .await;
1360 }
1361 self.schedule_retry(node_addr, now_ms);
1362 if self.nostr_cooldown_applies_to_peer_config(&peer_config)
1363 && let Some(cooldown_until_ms) = decision.cooldown_until_ms
1364 && let Some(state) = self.retry_pending.get_mut(&node_addr)
1365 {
1366 state.retry_after_ms = state.retry_after_ms.max(cooldown_until_ms);
1367 }
1368 } else {
1369 debug!(
1370 npub = %peer_config.npub,
1371 error = %reason,
1372 "Ignoring failed NAT traversal for already-connected peer on fresh direct path"
1373 );
1374 }
1375 continue;
1376 }
1377 if self.is_connecting_to_peer(&node_addr) {
1378 debug!(
1379 npub = %peer_config.npub,
1380 error = %reason,
1381 "Ignoring failed NAT traversal while peer handshake is already in progress"
1382 );
1383 continue;
1384 }
1385
1386 let decision = bootstrap.record_traversal_failure(&peer_config.npub, now_ms);
1387 if decision.should_warn {
1388 warn!(
1389 npub = %peer_config.npub,
1390 error = %reason,
1391 consecutive_failures = decision.consecutive_failures,
1392 cooldown_secs = decision
1393 .cooldown_until_ms
1394 .map(|t| t.saturating_sub(now_ms) / 1000),
1395 "NAT traversal failed"
1396 );
1397 } else {
1398 debug!(
1399 npub = %peer_config.npub,
1400 error = %reason,
1401 consecutive_failures = decision.consecutive_failures,
1402 "NAT traversal failed (suppressed by warn-rate-limit)"
1403 );
1404 }
1405
1406 if decision.crossed_threshold {
1410 bootstrap
1411 .request_advert_stale_check(peer_config.npub.clone())
1412 .await;
1413 }
1414
1415 if self
1416 .try_peer_addresses(&peer_config, peer_identity, false)
1417 .await
1418 .is_ok()
1419 {
1420 continue;
1421 }
1422
1423 self.schedule_retry(node_addr, now_ms);
1424 if self.nostr_cooldown_applies_to_peer_config(&peer_config)
1425 && let Some(cooldown_until_ms) = decision.cooldown_until_ms
1426 && let Some(state) = self.retry_pending.get_mut(&node_addr)
1427 {
1428 state.retry_after_ms = state.retry_after_ms.max(cooldown_until_ms);
1432 }
1433 }
1434 }
1435 }
1436
1437 self.maybe_run_startup_open_discovery_sweep(&bootstrap)
1438 .await;
1439 self.queue_open_discovery_retries(&bootstrap).await;
1440 self.queue_active_fallback_direct_retries(&bootstrap);
1441 }
1442
1443 async fn drain_nostr_mesh_signals(&mut self, bootstrap: &std::sync::Arc<NostrDiscovery>) {
1444 let mut deferred = Vec::new();
1445
1446 for signal in bootstrap.drain_mesh_signals().await {
1447 let (peer_npub, msg_type, payload) = match &signal {
1448 MeshTraversalSignal::Offer { peer_npub, offer } => {
1449 let payload = match serde_json::to_vec(&offer) {
1450 Ok(payload) => payload,
1451 Err(error) => {
1452 debug!(
1453 peer = %peer_npub,
1454 error = %error,
1455 "Failed to encode mesh traversal offer"
1456 );
1457 continue;
1458 }
1459 };
1460 (
1461 peer_npub.clone(),
1462 SessionMessageType::TraversalOffer.to_byte(),
1463 payload,
1464 )
1465 }
1466 MeshTraversalSignal::Answer { peer_npub, answer } => {
1467 let payload = match serde_json::to_vec(&answer) {
1468 Ok(payload) => payload,
1469 Err(error) => {
1470 debug!(
1471 peer = %peer_npub,
1472 error = %error,
1473 "Failed to encode mesh traversal answer"
1474 );
1475 continue;
1476 }
1477 };
1478 (
1479 peer_npub.clone(),
1480 SessionMessageType::TraversalAnswer.to_byte(),
1481 payload,
1482 )
1483 }
1484 };
1485
1486 let peer_identity = match PeerIdentity::from_npub(&peer_npub) {
1487 Ok(identity) => identity,
1488 Err(error) => {
1489 debug!(
1490 peer = %peer_npub,
1491 error = %error,
1492 "Cannot send mesh traversal signal to invalid peer npub"
1493 );
1494 continue;
1495 }
1496 };
1497 let peer_addr = *peer_identity.node_addr();
1498 match self
1499 .mesh_signal_session_action(peer_addr, peer_identity.pubkey_full())
1500 .await
1501 {
1502 MeshSignalSessionAction::Send => {}
1503 MeshSignalSessionAction::Defer => {
1504 deferred.push(signal);
1505 continue;
1506 }
1507 MeshSignalSessionAction::Drop => continue,
1508 }
1509
1510 if let Err(error) = self.send_session_msg(&peer_addr, msg_type, &payload).await {
1511 debug!(
1512 peer = %self.peer_display_name(&peer_addr),
1513 error = %error,
1514 "Failed to send mesh traversal signal"
1515 );
1516 }
1517 }
1518
1519 for signal in deferred {
1520 bootstrap.requeue_mesh_signal(signal);
1521 }
1522 }
1523
1524 async fn mesh_signal_session_action(
1525 &mut self,
1526 peer_addr: NodeAddr,
1527 peer_pubkey: PublicKey,
1528 ) -> MeshSignalSessionAction {
1529 if let Some(entry) = self.sessions.get(&peer_addr) {
1530 if entry.is_established() {
1531 return MeshSignalSessionAction::Send;
1532 }
1533 if entry.is_initiating() || entry.is_awaiting_msg3() {
1534 debug!(
1535 peer = %self.peer_display_name(&peer_addr),
1536 "Deferring mesh traversal signal until end-to-end session is established"
1537 );
1538 return MeshSignalSessionAction::Defer;
1539 }
1540 }
1541
1542 if self.find_next_hop(&peer_addr).is_none() {
1543 debug!(
1544 peer = %self.peer_display_name(&peer_addr),
1545 "Cannot warm mesh traversal signal session without a FIPS route"
1546 );
1547 self.maybe_initiate_lookup(&peer_addr).await;
1548 return MeshSignalSessionAction::Drop;
1549 }
1550
1551 self.register_identity(peer_addr, peer_pubkey);
1552 match self.initiate_session(peer_addr, peer_pubkey).await {
1553 Ok(()) => {
1554 debug!(
1555 peer = %self.peer_display_name(&peer_addr),
1556 "Warming end-to-end session for mesh traversal signal"
1557 );
1558 MeshSignalSessionAction::Defer
1559 }
1560 Err(NodeError::SendFailed { node_addr, reason })
1561 if node_addr == peer_addr && reason == "no route to destination" =>
1562 {
1563 debug!(
1564 peer = %self.peer_display_name(&peer_addr),
1565 "Cannot warm mesh traversal signal session without a FIPS route"
1566 );
1567 self.maybe_initiate_lookup(&peer_addr).await;
1568 MeshSignalSessionAction::Drop
1569 }
1570 Err(error) => {
1571 debug!(
1572 peer = %self.peer_display_name(&peer_addr),
1573 error = %error,
1574 "Failed to warm end-to-end session for mesh traversal signal"
1575 );
1576 MeshSignalSessionAction::Drop
1577 }
1578 }
1579 }
1580
1581 pub(super) fn lan_discovery_scope(&self) -> Option<String> {
1587 if let Some(scope) = self.config.node.discovery.lan.scope.as_deref() {
1588 let scope = scope.trim();
1589 if !scope.is_empty() {
1590 return Some(scope.to_string());
1591 }
1592 }
1593
1594 let app = self.config.node.discovery.nostr.app.trim();
1595 if app.is_empty() {
1596 return None;
1597 }
1598 if let Some(rest) = app.strip_prefix("fips-overlay-v1:") {
1599 let scope = rest.trim();
1600 if scope.is_empty() {
1601 None
1602 } else {
1603 Some(scope.to_string())
1604 }
1605 } else {
1606 Some(app.to_string())
1607 }
1608 }
1609
1610 pub(super) fn start_local_instance_discovery(&mut self) {
1611 if !self.config.node.discovery.local.enabled {
1612 return;
1613 }
1614 let Some(scope) = self.lan_discovery_scope() else {
1615 debug!("local instance discovery not started: no discovery scope");
1616 return;
1617 };
1618 let now_ms = Self::now_ms();
1619 match crate::discovery::local::LocalInstanceRegistry::new(
1620 self.identity.npub(),
1621 scope,
1622 &self.config.node.discovery.local,
1623 now_ms,
1624 ) {
1625 Ok(registry) => {
1626 self.local_instance_registry = Some(registry);
1627 self.local_instance_started_at_ms = Some(now_ms);
1628 self.last_local_instance_publish_ms = None;
1629 self.last_local_instance_scan_ms = None;
1630 self.publish_local_instance_record(now_ms);
1631 info!("Same-host FIPS instance discovery enabled");
1632 }
1633 Err(crate::discovery::local::LocalInstanceRegistryError::Disabled) => {
1634 debug!("same-host FIPS instance discovery disabled");
1635 }
1636 Err(err) => {
1637 debug!(error = %err, "same-host FIPS instance discovery not started");
1638 }
1639 }
1640 }
1641
1642 fn local_instance_contacts(&self) -> Vec<crate::discovery::local::LocalInstanceContact> {
1643 let mut contacts = Vec::new();
1644 for handle in self.transports.values() {
1645 if !handle.is_operational() || !handle.accept_connections() {
1646 continue;
1647 }
1648 let transport = handle.transport_type().name;
1649 if transport != "udp" && transport != "tcp" {
1650 continue;
1651 }
1652 let Some(local_addr) = handle.local_addr() else {
1653 continue;
1654 };
1655 let Some(contact) =
1656 crate::discovery::local::contact_for_transport_addr(transport, local_addr)
1657 else {
1658 continue;
1659 };
1660 if contacts
1661 .iter()
1662 .any(|existing: &crate::discovery::local::LocalInstanceContact| {
1663 existing.transport == contact.transport && existing.addr == contact.addr
1664 })
1665 {
1666 continue;
1667 }
1668 contacts.push(contact);
1669 }
1670 contacts
1671 }
1672
1673 fn publish_local_instance_record(&mut self, now_ms: u64) {
1674 let Some(registry) = self.local_instance_registry.clone() else {
1675 return;
1676 };
1677 let contacts = self.local_instance_contacts();
1678 match registry.publish(contacts, now_ms) {
1679 Ok(()) => {
1680 self.last_local_instance_publish_ms = Some(now_ms);
1681 }
1682 Err(err) => {
1683 debug!(error = %err, "failed to publish same-host FIPS instance record");
1684 }
1685 }
1686 }
1687
1688 fn maybe_publish_local_instance_record(&mut self, now_ms: u64) {
1689 if self.local_instance_registry.is_none() {
1690 return;
1691 }
1692 let interval_ms = self.config.node.discovery.local.publish_interval_ms();
1693 let due = self
1694 .last_local_instance_publish_ms
1695 .map(|last| now_ms.saturating_sub(last) >= interval_ms)
1696 .unwrap_or(true);
1697 if due {
1698 self.publish_local_instance_record(now_ms);
1699 }
1700 }
1701
1702 fn local_instance_scan_due(&self, now_ms: u64) -> bool {
1703 if self.local_instance_registry.is_none() {
1704 return false;
1705 }
1706 let cfg = &self.config.node.discovery.local;
1707 let interval_ms = if self
1708 .local_instance_started_at_ms
1709 .map(|started| now_ms.saturating_sub(started) <= cfg.startup_scan_duration_ms())
1710 .unwrap_or(false)
1711 {
1712 cfg.startup_scan_interval_ms()
1713 } else {
1714 cfg.scan_interval_ms()
1715 };
1716 self.last_local_instance_scan_ms
1717 .map(|last| now_ms.saturating_sub(last) >= interval_ms)
1718 .unwrap_or(true)
1719 }
1720
1721 fn local_instance_peer_allowed(&self, identity: &PeerIdentity) -> bool {
1722 if self.config.peers().iter().any(|peer| {
1723 PeerIdentity::from_npub(&peer.npub)
1724 .map(|configured| configured.node_addr() == identity.node_addr())
1725 .unwrap_or(false)
1726 }) {
1727 return true;
1728 }
1729 self.config.node.discovery.nostr.policy == NostrDiscoveryPolicy::Open
1730 }
1731
1732 fn local_instance_peer_addresses(
1733 &self,
1734 record: &crate::discovery::local::LocalInstanceRecord,
1735 ) -> Vec<PeerAddress> {
1736 let mut addresses = Vec::new();
1737 for contact in &record.contacts {
1738 if contact.transport != "udp" && contact.transport != "tcp" {
1739 continue;
1740 }
1741 let Ok(socket_addr) = contact.addr.parse::<SocketAddr>() else {
1742 debug!(
1743 npub = %record.npub,
1744 transport = %contact.transport,
1745 addr = %contact.addr,
1746 "local instance discovery: skip non-socket contact"
1747 );
1748 continue;
1749 };
1750 if !socket_addr.ip().is_loopback() {
1751 debug!(
1752 npub = %record.npub,
1753 addr = %contact.addr,
1754 "local instance discovery: skip non-loopback contact"
1755 );
1756 continue;
1757 }
1758 let address =
1759 PeerAddress::with_priority(contact.transport.clone(), contact.addr.clone(), 10)
1760 .with_seen_at_ms(record.updated_at_ms);
1761 if addresses.iter().any(|existing: &PeerAddress| {
1762 existing.transport == address.transport && existing.addr == address.addr
1763 }) {
1764 continue;
1765 }
1766 addresses.push(address);
1767 }
1768 addresses
1769 }
1770
1771 pub(super) async fn poll_local_instance_discovery(&mut self) {
1775 let Some(registry) = self.local_instance_registry.clone() else {
1776 return;
1777 };
1778 let now_ms = Self::now_ms();
1779 self.maybe_publish_local_instance_record(now_ms);
1780 if !self.local_instance_scan_due(now_ms) {
1781 return;
1782 }
1783 self.last_local_instance_scan_ms = Some(now_ms);
1784
1785 let records = match registry.scan(now_ms, self.config.node.discovery.local.stale_after_ms())
1786 {
1787 Ok(records) => records,
1788 Err(err) => {
1789 debug!(error = %err, "same-host FIPS instance scan failed");
1790 return;
1791 }
1792 };
1793 if records.is_empty() {
1794 return;
1795 }
1796
1797 let mut connect_budget = self.discovery_connect_budget();
1798 let mut skipped_budget = 0usize;
1799 for record in records {
1800 let identity = match PeerIdentity::from_npub(&record.npub) {
1801 Ok(identity) => identity,
1802 Err(err) => {
1803 debug!(npub = %record.npub, error = %err, "local instance discovery: skip bad npub");
1804 continue;
1805 }
1806 };
1807 let peer_node_addr = *identity.node_addr();
1808 if peer_node_addr == *self.identity.node_addr() {
1809 continue;
1810 }
1811 if !self.local_instance_peer_allowed(&identity) {
1812 debug!(
1813 npub = %identity.short_npub(),
1814 "local instance discovery: skip unconfigured peer"
1815 );
1816 continue;
1817 }
1818
1819 let addresses = self.local_instance_peer_addresses(&record);
1820 if addresses.is_empty() {
1821 continue;
1822 }
1823
1824 if self.peers.contains_key(&peer_node_addr)
1825 && self.active_peer_candidate_is_fresh_enough_to_skip(&peer_node_addr, &addresses)
1826 {
1827 continue;
1828 }
1829
1830 for address in addresses {
1831 let Some((transport_id, remote_addr)) =
1832 self.resolve_peer_address_for_match(&address)
1833 else {
1834 continue;
1835 };
1836 if self.is_connecting_to_peer_on_path(&peer_node_addr, transport_id, &remote_addr) {
1837 continue;
1838 }
1839 if connect_budget == 0 || self.path_candidate_attempt_budget(&peer_node_addr) == 0 {
1840 skipped_budget = skipped_budget.saturating_add(1);
1841 continue;
1842 }
1843 info!(
1844 npub = %identity.short_npub(),
1845 transport = %address.transport,
1846 addr = %address.addr,
1847 "same-host FIPS instance discovery: initiating handshake"
1848 );
1849 if let Err(err) = self
1850 .initiate_connection(transport_id, remote_addr, identity)
1851 .await
1852 {
1853 debug!(
1854 npub = %record.npub,
1855 error = %err,
1856 "same-host FIPS instance discovery: failed to initiate connection"
1857 );
1858 }
1859 connect_budget = connect_budget.saturating_sub(1);
1860 }
1861 }
1862 if skipped_budget > 0 {
1863 debug!(
1864 skipped = skipped_budget,
1865 "same-host FIPS instance discovery connect budget exhausted"
1866 );
1867 }
1868 }
1869
1870 pub(super) async fn poll_lan_discovery(&mut self) {
1877 let Some(runtime) = self.lan_discovery.clone() else {
1878 return;
1879 };
1880 let events = runtime.drain_events().await;
1881 if events.is_empty() {
1882 return;
1883 }
1884 let mut connect_budget = self.discovery_connect_budget();
1885 let mut skipped_budget = 0usize;
1886 for event in events {
1887 let crate::discovery::lan::LanEvent::Discovered(peer) = event;
1888 let Some((transport_id, local_addr)) =
1889 self.find_udp_transport_for_remote_addr(peer.addr)
1890 else {
1891 debug!(
1892 addr = %peer.addr,
1893 "lan: skip discovered peer with no compatible UDP transport"
1894 );
1895 continue;
1896 };
1897 let identity = match crate::PeerIdentity::from_npub(&peer.npub) {
1898 Ok(id) => id,
1899 Err(err) => {
1900 debug!(npub = %peer.npub, error = %err, "lan: skip bad npub");
1901 continue;
1902 }
1903 };
1904 let peer_node_addr = *identity.node_addr();
1905 let remote_addr = crate::transport::TransportAddr::from_string(&peer.addr.to_string());
1906 if self.peers.contains_key(&peer_node_addr) {
1907 let candidate = PeerAddress::new("udp", peer.addr.to_string());
1908 if self.active_peer_candidate_is_fresh_enough_to_skip(
1909 &peer_node_addr,
1910 std::slice::from_ref(&candidate),
1911 ) {
1912 continue;
1913 }
1914 if self.is_connecting_to_peer_on_path(&peer_node_addr, transport_id, &remote_addr) {
1915 continue;
1916 }
1917 if connect_budget == 0 || self.path_candidate_attempt_budget(&peer_node_addr) == 0 {
1918 skipped_budget = skipped_budget.saturating_add(1);
1919 continue;
1920 }
1921 info!(
1922 npub = %identity.short_npub(),
1923 addr = %peer.addr,
1924 local_addr = %local_addr,
1925 "lan: initiating alternate-path handshake to active peer"
1926 );
1927 if let Err(err) = self
1928 .initiate_connection(transport_id, remote_addr, identity)
1929 .await
1930 {
1931 debug!(
1932 npub = %peer.npub,
1933 error = %err,
1934 "lan: failed to initiate active peer alternate-path handshake"
1935 );
1936 }
1937 connect_budget = connect_budget.saturating_sub(1);
1938 continue;
1939 }
1940 if self.is_connecting_to_peer_on_path(&peer_node_addr, transport_id, &remote_addr) {
1941 continue;
1942 }
1943 if connect_budget == 0 || self.path_candidate_attempt_budget(&peer_node_addr) == 0 {
1944 skipped_budget = skipped_budget.saturating_add(1);
1945 continue;
1946 }
1947 info!(
1948 npub = %identity.short_npub(),
1949 addr = %peer.addr,
1950 local_addr = %local_addr,
1951 "lan: initiating handshake to discovered peer"
1952 );
1953 if let Err(err) = self
1954 .initiate_connection(transport_id, remote_addr, identity)
1955 .await
1956 {
1957 debug!(
1958 npub = %peer.npub,
1959 error = %err,
1960 "lan: failed to initiate connection to discovered peer"
1961 );
1962 }
1963 connect_budget = connect_budget.saturating_sub(1);
1964 }
1965 if skipped_budget > 0 {
1966 debug!(
1967 skipped = skipped_budget,
1968 "lan: discovery connect budget exhausted"
1969 );
1970 }
1971 }
1972
1973 pub(super) async fn poll_pending_connects(&mut self) {
1980 if self.pending_connects.is_empty() {
1981 return;
1982 }
1983
1984 let mut completed = Vec::new();
1985
1986 for (i, pending) in self.pending_connects.iter().enumerate() {
1987 let state = if let Some(transport) = self.transports.get(&pending.transport_id) {
1988 transport.connection_state(&pending.remote_addr)
1989 } else {
1990 crate::transport::ConnectionState::Failed("transport removed".into())
1991 };
1992
1993 match state {
1994 crate::transport::ConnectionState::Connected => {
1995 completed.push((i, true, None));
1996 }
1997 crate::transport::ConnectionState::Failed(reason) => {
1998 completed.push((i, false, Some(reason)));
1999 }
2000 crate::transport::ConnectionState::Connecting => {
2001 }
2003 crate::transport::ConnectionState::None => {
2004 completed.push((i, false, Some("no connection attempt found".into())));
2006 }
2007 }
2008 }
2009
2010 for (i, success, reason) in completed.into_iter().rev() {
2012 let pending = self.pending_connects.remove(i);
2013
2014 if success {
2015 if let Some(link) = self.links.get_mut(&pending.link_id) {
2017 link.set_connected();
2018 }
2019
2020 debug!(
2021 peer = %self.peer_display_name(pending.peer_identity.node_addr()),
2022 transport_id = %pending.transport_id,
2023 remote_addr = %pending.remote_addr,
2024 link_id = %pending.link_id,
2025 "Transport connected, starting handshake"
2026 );
2027
2028 if let Err(e) = self
2030 .start_handshake(
2031 pending.link_id,
2032 pending.transport_id,
2033 pending.remote_addr.clone(),
2034 pending.peer_identity,
2035 )
2036 .await
2037 {
2038 warn!(
2039 link_id = %pending.link_id,
2040 error = %e,
2041 "Failed to start handshake after transport connect"
2042 );
2043 self.remove_link(&pending.link_id);
2045 }
2046 } else {
2047 let reason = reason.unwrap_or_default();
2048 warn!(
2049 peer = %self.peer_display_name(pending.peer_identity.node_addr()),
2050 transport_id = %pending.transport_id,
2051 remote_addr = %pending.remote_addr,
2052 link_id = %pending.link_id,
2053 reason = %reason,
2054 "Transport connect failed"
2055 );
2056
2057 self.remove_link(&pending.link_id);
2059 self.links.remove(&pending.link_id);
2060 self.schedule_retry(*pending.peer_identity.node_addr(), Self::now_ms());
2061 }
2062 }
2063 }
2064
2065 pub async fn start(&mut self) -> Result<(), NodeError> {
2072 node_start_debug_log("Node::start begin");
2073 if !self.state.can_start() {
2074 return Err(NodeError::AlreadyStarted);
2075 }
2076 self.state = NodeState::Starting;
2077 node_start_debug_log("Node::start state set to starting");
2078
2079 let packet_buffer_size = self.config.node.buffers.packet_channel;
2081 let (packet_tx, packet_rx) = packet_channel(packet_buffer_size);
2082 self.packet_tx = Some(packet_tx.clone());
2083 self.packet_rx = Some(packet_rx);
2084 node_start_debug_log("Node::start packet channel created");
2085
2086 node_start_debug_log("Node::start create transports begin");
2088 let transport_handles = self.create_transports(&packet_tx).await;
2089 node_start_debug_log(format!(
2090 "Node::start create transports complete count={}",
2091 transport_handles.len()
2092 ));
2093
2094 for mut handle in transport_handles {
2095 let transport_id = handle.transport_id();
2096 let transport_type = handle.transport_type().name;
2097 let name = handle.name().map(|s| s.to_string());
2098
2099 node_start_debug_log(format!(
2100 "Node::start transport start begin id={} type={} name={:?}",
2101 transport_id, transport_type, name
2102 ));
2103 match handle.start().await {
2104 Ok(()) => {
2105 node_start_debug_log(format!(
2106 "Node::start transport start ok id={} type={}",
2107 transport_id, transport_type
2108 ));
2109 self.transports.insert(transport_id, handle);
2110 }
2111 Err(e) => {
2112 node_start_debug_log(format!(
2113 "Node::start transport start error id={} type={} error={}",
2114 transport_id, transport_type, e
2115 ));
2116 if let Some(ref n) = name {
2117 warn!(transport_type, name = %n, error = %e, "Transport failed to start");
2118 } else {
2119 warn!(transport_type, error = %e, "Transport failed to start");
2120 }
2121 }
2122 }
2123 }
2124
2125 if !self.transports.is_empty() {
2126 info!(count = self.transports.len(), "Transports initialized");
2127 }
2128
2129 #[cfg(unix)]
2145 {
2146 if self.config.node.worker_pools_enabled {
2147 node_start_debug_log("Node::start worker pools begin");
2148 let cpu_default = std::thread::available_parallelism()
2149 .map(|n| n.get())
2150 .unwrap_or(1)
2151 .max(1);
2152 let encrypt_worker_count: usize = std::env::var("FIPS_ENCRYPT_WORKERS")
2153 .ok()
2154 .and_then(|s| s.parse().ok())
2155 .unwrap_or(cpu_default)
2156 .max(1);
2157 self.encrypt_workers = Some(super::encrypt_worker::EncryptWorkerPool::spawn(
2158 encrypt_worker_count,
2159 ));
2160 info!(
2161 workers = encrypt_worker_count,
2162 "Spawned FMP-encrypt worker pool"
2163 );
2164
2165 let decrypt_worker_count: usize = std::env::var("FIPS_DECRYPT_WORKERS")
2174 .ok()
2175 .and_then(|s| s.parse().ok())
2176 .unwrap_or(cpu_default);
2177 if decrypt_worker_count == 0 {
2178 info!("FIPS_DECRYPT_WORKERS=0 → in-line decrypt in rx_loop (no worker pool)");
2179 } else {
2180 self.decrypt_workers = Some(super::decrypt_worker::DecryptWorkerPool::spawn(
2181 decrypt_worker_count,
2182 ));
2183 info!(
2184 workers = decrypt_worker_count,
2185 "Spawned FMP+FSP-decrypt worker pool"
2186 );
2187 }
2188 node_start_debug_log("Node::start worker pools complete");
2189 } else {
2190 node_start_debug_log("Node::start worker pools disabled");
2191 info!("FIPS worker pools disabled; using in-line crypto/send path");
2192 }
2193 }
2194
2195 if self.config.node.discovery.nostr.enabled {
2196 node_start_debug_log("Node::start nostr discovery start begin");
2197 match NostrDiscovery::start(&self.identity, self.config.node.discovery.nostr.clone())
2198 .await
2199 {
2200 Ok(runtime) => {
2201 node_start_debug_log("Node::start nostr discovery runtime created");
2202 if let Err(err) = self.refresh_overlay_advert(&runtime).await {
2203 warn!(error = %err, "Failed to publish initial Nostr overlay advert");
2204 }
2205 node_start_debug_log("Node::start nostr overlay advert refreshed");
2206 self.nostr_discovery = Some(runtime);
2207 self.nostr_discovery_started_at_ms = Some(Self::now_ms());
2208 info!("Nostr overlay discovery enabled");
2209 }
2210 Err(err) => {
2211 node_start_debug_log(format!(
2212 "Node::start nostr discovery start error error={}",
2213 err
2214 ));
2215 warn!(error = %err, "Failed to start Nostr overlay discovery");
2216 }
2217 }
2218 }
2219
2220 if self.config.node.discovery.lan.enabled {
2224 node_start_debug_log("Node::start lan discovery start begin");
2225 let advertised_udp_port = self
2226 .transports
2227 .values()
2228 .filter(|h| h.is_operational())
2229 .filter(|h| h.transport_type().name == "udp")
2230 .find_map(|h| h.local_addr().map(|addr| addr.port()))
2231 .unwrap_or(0);
2232 let scope = self.lan_discovery_scope();
2233 match crate::discovery::lan::LanDiscovery::start(
2234 &self.identity,
2235 scope,
2236 advertised_udp_port,
2237 self.config.node.discovery.lan.clone(),
2238 )
2239 .await
2240 {
2241 Ok(runtime) => {
2242 node_start_debug_log("Node::start lan discovery start ok");
2243 self.lan_discovery = Some(runtime);
2244 info!("LAN mDNS discovery enabled");
2245 }
2246 Err(err) => {
2247 node_start_debug_log(format!(
2248 "Node::start lan discovery start error error={}",
2249 err
2250 ));
2251 debug!(error = %err, "LAN mDNS discovery not started");
2252 }
2253 }
2254 }
2255
2256 self.start_local_instance_discovery();
2257 self.poll_local_instance_discovery().await;
2258
2259 node_start_debug_log("Node::start initiate peer connections begin");
2262 self.initiate_peer_connections().await;
2263 node_start_debug_log("Node::start initiate peer connections complete");
2264
2265 if self.config.tun.enabled {
2267 node_start_debug_log("Node::start tun init begin");
2268 let address = *self.identity.address();
2269 match TunDevice::create(&self.config.tun, address).await {
2270 Ok(device) => {
2271 let mtu = device.mtu();
2272 let name = device.name().to_string();
2273 let our_addr = *device.address();
2274
2275 info!("TUN device active:");
2276 info!(" name: {}", name);
2277 info!(" address: {}", device.address());
2278 info!(" mtu: {}", mtu);
2279
2280 let effective_mtu = self.effective_ipv6_mtu();
2282 let max_mss = effective_mtu.saturating_sub(40).saturating_sub(20); info!("effective MTU: {} bytes", effective_mtu);
2285 debug!(" max TCP MSS: {} bytes", max_mss);
2286
2287 #[cfg(target_os = "macos")]
2291 let (shutdown_read_fd, shutdown_write_fd) = {
2292 let mut fds = [0i32; 2];
2293 if unsafe { libc::pipe(fds.as_mut_ptr()) } < 0 {
2294 return Err(NodeError::Tun(crate::upper::tun::TunError::Configure(
2295 "failed to create shutdown pipe".into(),
2296 )));
2297 }
2298 (fds[0], fds[1])
2299 };
2300
2301 let (writer, tun_tx) =
2305 device.create_writer(max_mss, self.path_mtu_lookup.clone())?;
2306
2307 let writer_handle = thread::spawn(move || {
2309 writer.run();
2310 });
2311
2312 let reader_tun_tx = tun_tx.clone();
2314
2315 let tun_channel_size = self.config.node.buffers.tun_channel;
2317 let (outbound_tx, outbound_rx) = tokio::sync::mpsc::channel(tun_channel_size);
2318
2319 let transport_mtu = self.transport_mtu();
2321 let path_mtu_lookup = self.path_mtu_lookup.clone();
2322 #[cfg(target_os = "macos")]
2323 let reader_handle = thread::spawn(move || {
2324 run_tun_reader(
2325 device,
2326 mtu,
2327 our_addr,
2328 reader_tun_tx,
2329 outbound_tx,
2330 transport_mtu,
2331 path_mtu_lookup,
2332 shutdown_read_fd,
2333 );
2334 });
2335 #[cfg(not(target_os = "macos"))]
2336 let reader_handle = thread::spawn(move || {
2337 run_tun_reader(
2338 device,
2339 mtu,
2340 our_addr,
2341 reader_tun_tx,
2342 outbound_tx,
2343 transport_mtu,
2344 path_mtu_lookup,
2345 );
2346 });
2347
2348 self.tun_state = TunState::Active;
2349 self.tun_name = Some(name);
2350 self.tun_tx = Some(tun_tx);
2351 self.tun_outbound_rx = Some(outbound_rx);
2352 self.tun_reader_handle = Some(reader_handle);
2353 self.tun_writer_handle = Some(writer_handle);
2354 #[cfg(target_os = "macos")]
2355 {
2356 self.tun_shutdown_fd = Some(shutdown_write_fd);
2357 }
2358 }
2359 Err(e) => {
2360 self.tun_state = TunState::Failed;
2361 warn!(error = %e, "Failed to initialize TUN, continuing without it");
2362 }
2363 }
2364 node_start_debug_log("Node::start tun init complete");
2365 }
2366
2367 if self.config.dns.enabled {
2384 node_start_debug_log("Node::start dns init begin");
2385 let addr_str = self.config.dns.bind_addr();
2386 match addr_str.parse::<std::net::IpAddr>() {
2387 Ok(ip) => {
2388 let bind = std::net::SocketAddr::new(ip, self.config.dns.port());
2389 match Self::bind_dns_socket(bind) {
2390 Ok(socket) => {
2391 let dns_channel_size = self.config.node.buffers.dns_channel;
2392 let (identity_tx, identity_rx) =
2393 tokio::sync::mpsc::channel(dns_channel_size);
2394 let dns_ttl = self.config.dns.ttl();
2395 let base_hosts = crate::upper::hosts::HostMap::from_peer_configs(
2396 self.config.peers(),
2397 );
2398 let reloader = if self.config.node.system_files_enabled {
2399 let hosts_path = std::path::PathBuf::from(
2400 crate::upper::hosts::DEFAULT_HOSTS_PATH,
2401 );
2402 crate::upper::hosts::HostMapReloader::new(base_hosts, hosts_path)
2403 } else {
2404 crate::upper::hosts::HostMapReloader::memory_only(base_hosts)
2405 };
2406 let mesh_ifindex = Self::lookup_mesh_ifindex(self.config.tun.name());
2414 info!(
2415 bind = %bind,
2416 hosts = reloader.hosts().len(),
2417 mesh_ifindex = ?mesh_ifindex,
2418 "DNS responder started for .fips domain (auto-reload enabled)"
2419 );
2420 let handle = tokio::spawn(crate::upper::dns::run_dns_responder(
2421 socket,
2422 identity_tx,
2423 dns_ttl,
2424 reloader,
2425 mesh_ifindex,
2426 ));
2427 self.dns_identity_rx = Some(identity_rx);
2428 self.dns_task = Some(handle);
2429 }
2430 Err(e) => {
2431 warn!(bind = %bind, error = %e, "Failed to start DNS responder");
2432 }
2433 }
2434 }
2435 Err(e) => {
2436 warn!(addr = %addr_str, error = %e, "Invalid dns.bind_addr; DNS responder not started");
2437 }
2438 }
2439 node_start_debug_log("Node::start dns init complete");
2440 }
2441
2442 self.state = NodeState::Running;
2443 node_start_debug_log("Node::start running");
2444 info!("Node started:");
2445 info!(" state: {}", self.state);
2446 info!(" transports: {}", self.transports.len());
2447 info!(" connections: {}", self.connections.len());
2448 Ok(())
2449 }
2450
2451 fn bind_dns_socket(
2464 addr: std::net::SocketAddr,
2465 ) -> Result<tokio::net::UdpSocket, std::io::Error> {
2466 use socket2::{Domain, Protocol, Socket, Type};
2467 let domain = if addr.is_ipv4() {
2468 Domain::IPV4
2469 } else {
2470 Domain::IPV6
2471 };
2472 let sock = Socket::new(domain, Type::DGRAM, Some(Protocol::UDP))?;
2473 if addr.is_ipv6() {
2474 sock.set_only_v6(false)?;
2475 #[cfg(unix)]
2476 Self::set_recv_pktinfo_v6(&sock)?;
2477 }
2478 sock.set_nonblocking(true)?;
2479 sock.bind(&addr.into())?;
2480 tokio::net::UdpSocket::from_std(sock.into())
2481 }
2482
2483 #[cfg(unix)]
2489 fn set_recv_pktinfo_v6(sock: &socket2::Socket) -> Result<(), std::io::Error> {
2490 use std::os::fd::AsRawFd;
2491 let enable: libc::c_int = 1;
2492 let ret = unsafe {
2493 libc::setsockopt(
2494 sock.as_raw_fd(),
2495 libc::IPPROTO_IPV6,
2496 libc::IPV6_RECVPKTINFO,
2497 &enable as *const _ as *const libc::c_void,
2498 std::mem::size_of::<libc::c_int>() as libc::socklen_t,
2499 )
2500 };
2501 if ret < 0 {
2502 return Err(std::io::Error::last_os_error());
2503 }
2504 Ok(())
2505 }
2506
2507 fn lookup_mesh_ifindex(name: &str) -> Option<u32> {
2514 #[cfg(unix)]
2515 {
2516 let c_name = std::ffi::CString::new(name).ok()?;
2517 let idx = unsafe { libc::if_nametoindex(c_name.as_ptr()) };
2518 if idx == 0 { None } else { Some(idx) }
2519 }
2520 #[cfg(not(unix))]
2521 {
2522 let _ = name;
2523 None
2524 }
2525 }
2526
2527 pub async fn stop(&mut self) -> Result<(), NodeError> {
2532 if !self.state.can_stop() {
2533 return Err(NodeError::NotStarted);
2534 }
2535 self.state = NodeState::Stopping;
2536 info!(state = %self.state, "Node stopping");
2537
2538 if let Some(handle) = self.dns_task.take() {
2540 handle.abort();
2541 debug!("DNS responder stopped");
2542 }
2543
2544 self.send_disconnect_to_all_peers(DisconnectReason::Shutdown)
2546 .await;
2547
2548 if let Some(bootstrap) = self.nostr_discovery.take()
2550 && let Err(e) = bootstrap.shutdown().await
2551 {
2552 warn!(error = %e, "Failed to shutdown Nostr overlay discovery");
2553 }
2554
2555 if let Some(lan) = self.lan_discovery.take() {
2559 lan.shutdown().await;
2560 }
2561
2562 if let Some(registry) = self.local_instance_registry.take()
2563 && let Err(err) = registry.remove()
2564 {
2565 debug!(error = %err, "failed to remove same-host FIPS instance record");
2566 }
2567
2568 let transport_ids: Vec<_> = self.transports.keys().cloned().collect();
2570 for transport_id in transport_ids {
2571 if let Some(mut handle) = self.transports.remove(&transport_id) {
2572 let transport_type = handle.transport_type().name;
2573 match handle.stop().await {
2574 Ok(()) => {
2575 info!(transport_id = %transport_id, transport_type, "Transport stopped");
2576 }
2577 Err(e) => {
2578 warn!(
2579 transport_id = %transport_id,
2580 transport_type,
2581 error = %e,
2582 "Transport stop failed"
2583 );
2584 }
2585 }
2586 }
2587 }
2588
2589 self.packet_tx.take();
2591 self.packet_rx.take();
2592
2593 if let Some(name) = self.tun_name.take() {
2595 info!(name = %name, "Shutting down TUN interface");
2596
2597 self.tun_tx.take();
2599
2600 if let Err(e) = shutdown_tun_interface(&name).await {
2602 warn!(name = %name, error = %e, "Failed to shutdown TUN interface");
2603 }
2604
2605 #[cfg(target_os = "macos")]
2608 if let Some(fd) = self.tun_shutdown_fd.take() {
2609 unsafe {
2610 libc::write(fd, b"x".as_ptr() as *const libc::c_void, 1);
2611 libc::close(fd);
2612 }
2613 }
2614
2615 if let Some(handle) = self.tun_reader_handle.take() {
2617 let _ = handle.join();
2618 }
2619 if let Some(handle) = self.tun_writer_handle.take() {
2620 let _ = handle.join();
2621 }
2622
2623 self.tun_state = TunState::Disabled;
2624 }
2625
2626 self.state = NodeState::Stopped;
2627 info!(state = %self.state, "Node stopped");
2628 Ok(())
2629 }
2630
2631 async fn send_disconnect_to_all_peers(&mut self, reason: DisconnectReason) {
2636 let disconnect = Disconnect::new(reason);
2637 let plaintext = disconnect.encode();
2638
2639 let peer_addrs: Vec<NodeAddr> = self
2641 .peers
2642 .iter()
2643 .filter(|(_, peer)| peer.can_send() && peer.has_session())
2644 .map(|(addr, _)| *addr)
2645 .collect();
2646
2647 if peer_addrs.is_empty() {
2648 debug!(
2649 total_peers = self.peers.len(),
2650 "No sendable peers for disconnect notification"
2651 );
2652 return;
2653 }
2654
2655 let mut sent = 0usize;
2656 for node_addr in &peer_addrs {
2657 match self
2658 .send_encrypted_link_message(node_addr, &plaintext)
2659 .await
2660 {
2661 Ok(()) => sent += 1,
2662 Err(e) => {
2663 debug!(
2664 peer = %self.peer_display_name(node_addr),
2665 error = %e,
2666 "Failed to send disconnect (transport may be down)"
2667 );
2668 }
2669 }
2670 }
2671
2672 info!(sent, total = peer_addrs.len(), reason = %reason, "Sent disconnect notifications");
2673 }
2674
2675 pub(in crate::node) fn static_peer_addresses(
2676 &self,
2677 peer_config: &PeerConfig,
2678 ) -> Vec<PeerAddress> {
2679 peer_config
2680 .addresses_by_priority()
2681 .into_iter()
2682 .cloned()
2683 .collect()
2684 }
2685
2686 async fn nostr_peer_fallback_addresses(
2687 &self,
2688 peer_config: &PeerConfig,
2689 existing: &[PeerAddress],
2690 ) -> Vec<PeerAddress> {
2691 if !self.config.node.discovery.nostr.enabled
2692 || self.config.node.discovery.nostr.policy
2693 == crate::config::NostrDiscoveryPolicy::Disabled
2694 {
2695 return Vec::new();
2696 }
2697
2698 let Some(bootstrap) = self.nostr_discovery.clone() else {
2699 return Vec::new();
2700 };
2701 if self.nostr_cooldown_applies_to_peer_config(peer_config)
2702 && bootstrap
2703 .cooldown_until(&peer_config.npub, Self::now_ms())
2704 .is_some()
2705 {
2706 debug!(
2707 npub = %peer_config.npub,
2708 "Skipping cached Nostr fallback endpoints while peer is in traversal cooldown"
2709 );
2710 return Vec::new();
2711 }
2712 let endpoints = match bootstrap
2713 .cached_advert_endpoints_for_peer(&peer_config.npub)
2714 .await
2715 {
2716 Some(endpoints) => endpoints,
2717 None => {
2718 debug!(
2719 npub = %peer_config.npub,
2720 "No cached Nostr advert endpoints for configured peer"
2721 );
2722 return Vec::new();
2723 }
2724 };
2725
2726 let mut fallback = Vec::new();
2727 let mut next_priority = existing
2728 .iter()
2729 .map(|addr| addr.priority)
2730 .max()
2731 .unwrap_or(100)
2732 .saturating_add(1);
2733 let seen_at_ms = Self::now_ms();
2739 for endpoint in endpoints {
2740 let Some(candidate) =
2741 Self::overlay_endpoint_to_peer_address(&endpoint, next_priority, seen_at_ms)
2742 else {
2743 continue;
2744 };
2745 if existing
2746 .iter()
2747 .any(|addr| addr.transport == candidate.transport && addr.addr == candidate.addr)
2748 || fallback.iter().any(|addr: &PeerAddress| {
2749 addr.transport == candidate.transport && addr.addr == candidate.addr
2750 })
2751 {
2752 continue;
2753 }
2754 fallback.push(candidate);
2755 next_priority = next_priority.saturating_add(1);
2756 }
2757 fallback
2758 }
2759
2760 pub(in crate::node) async fn request_nostr_bootstrap(&self, peer_config: &PeerConfig) -> bool {
2761 if !self.config.node.discovery.nostr.enabled
2762 || self.config.node.discovery.nostr.policy
2763 == crate::config::NostrDiscoveryPolicy::Disabled
2764 {
2765 return false;
2766 }
2767 let Some(bootstrap) = self.nostr_discovery.clone() else {
2768 return false;
2769 };
2770 let now_ms = Self::now_ms();
2771 if self.nostr_cooldown_applies_to_peer_config(peer_config)
2772 && let Some(cooldown_until_ms) = bootstrap.cooldown_until(&peer_config.npub, now_ms)
2773 {
2774 debug!(
2775 npub = %peer_config.npub,
2776 cooldown_secs = cooldown_until_ms.saturating_sub(now_ms) / 1000,
2777 "Skipping Nostr traversal request while peer is in cooldown"
2778 );
2779 return false;
2780 }
2781 bootstrap.set_outbound_admission(self.open_discovery_outbound_admission_check());
2782 bootstrap.set_direct_refresh_admission(self.outbound_direct_refresh_admission_check());
2783 let mesh_signaling_allowed = self.mesh_signaling_allowed_for_peer(peer_config);
2784 bootstrap
2785 .request_connect_with_mesh_signaling(peer_config.clone(), mesh_signaling_allowed)
2786 .await;
2787 info!(
2788 npub = %peer_config.npub,
2789 mesh_signaling_allowed,
2790 "Started background UDP NAT traversal attempt"
2791 );
2792 true
2793 }
2794
2795 fn nostr_cooldown_applies_to_peer_config(&self, peer_config: &PeerConfig) -> bool {
2796 !self.mesh_signaling_allowed_for_peer(peer_config)
2797 }
2798
2799 pub(in crate::node) fn mesh_signaling_allowed_for_peer(
2800 &self,
2801 peer_config: &PeerConfig,
2802 ) -> bool {
2803 let Ok(identity) = PeerIdentity::from_npub(&peer_config.npub) else {
2804 return false;
2805 };
2806 let peer_addr = identity.node_addr();
2807 self.configured_peer(peer_addr).is_some()
2808 }
2809
2810 fn overlay_endpoint_to_peer_address(
2811 endpoint: &OverlayEndpointAdvert,
2812 priority: u8,
2813 seen_at_ms: u64,
2814 ) -> Option<PeerAddress> {
2815 let transport = match endpoint.transport {
2816 OverlayTransportKind::Udp => "udp",
2817 OverlayTransportKind::Tcp => "tcp",
2818 OverlayTransportKind::Tor => "tor",
2819 OverlayTransportKind::WebRtc => "webrtc",
2820 };
2821 Some(
2822 PeerAddress::with_priority(transport, endpoint.addr.clone(), priority)
2823 .with_seen_at_ms(seen_at_ms),
2824 )
2825 }
2826
2827 async fn attempt_peer_address_list(
2828 &mut self,
2829 peer_config: &PeerConfig,
2830 peer_identity: PeerIdentity,
2831 allow_bootstrap_nat: bool,
2832 addresses: &[PeerAddress],
2833 ) -> Result<(), NodeError> {
2834 let mut attempted = false;
2835 let mut local_route_error = None;
2836 let peer_node_addr = *peer_identity.node_addr();
2837 let mut concrete_budget = self.path_candidate_attempt_budget(&peer_node_addr);
2838
2839 for addr in addresses {
2840 if addr.transport == "udp" && addr.addr.eq_ignore_ascii_case("nat") {
2841 if !allow_bootstrap_nat {
2842 continue;
2843 }
2844 if self.request_nostr_bootstrap(peer_config).await {
2845 attempted = true;
2846 continue;
2847 }
2848 debug!(npub = %peer_config.npub, "No Nostr overlay runtime for udp:nat address");
2849 continue;
2850 }
2851
2852 let (transport_id, remote_addr) = if addr.transport == "ethernet" {
2853 match self.resolve_ethernet_addr(&addr.addr) {
2854 Ok(result) => result,
2855 Err(e) => {
2856 debug!(
2857 transport = %addr.transport,
2858 addr = %addr.addr,
2859 error = %e,
2860 "Failed to resolve Ethernet address"
2861 );
2862 continue;
2863 }
2864 }
2865 } else if addr.transport == "ble" {
2866 #[cfg(bluer_available)]
2867 {
2868 match self.resolve_ble_addr(&addr.addr) {
2869 Ok(result) => result,
2870 Err(e) => {
2871 debug!(
2872 transport = %addr.transport,
2873 addr = %addr.addr,
2874 error = %e,
2875 "Failed to resolve BLE address"
2876 );
2877 continue;
2878 }
2879 }
2880 }
2881 #[cfg(not(bluer_available))]
2882 {
2883 debug!(transport = %addr.transport, "BLE transport not available on this build");
2884 continue;
2885 }
2886 } else {
2887 let tid = if addr.transport == "udp"
2888 && let Ok(remote_socket_addr) = addr.addr.parse::<SocketAddr>()
2889 {
2890 match self.find_udp_transport_for_remote_addr(remote_socket_addr) {
2891 Some((id, _)) => id,
2892 None => {
2893 debug!(
2894 transport = %addr.transport,
2895 addr = %addr.addr,
2896 "No compatible operational UDP transport for address"
2897 );
2898 continue;
2899 }
2900 }
2901 } else {
2902 match self.find_transport_for_type(&addr.transport) {
2903 Some(id) => id,
2904 None => {
2905 debug!(
2906 transport = %addr.transport,
2907 addr = %addr.addr,
2908 "No operational transport for address type"
2909 );
2910 continue;
2911 }
2912 }
2913 };
2914 (tid, TransportAddr::from_string(&addr.addr))
2915 };
2916
2917 if self.is_connecting_to_peer_on_path(&peer_node_addr, transport_id, &remote_addr) {
2918 attempted = true;
2919 debug!(
2920 npub = %peer_config.npub,
2921 transport_id = %transport_id,
2922 remote_addr = %remote_addr,
2923 "Skipping duplicate in-flight candidate path"
2924 );
2925 continue;
2926 }
2927
2928 if concrete_budget == 0 {
2929 debug!(
2930 npub = %peer_config.npub,
2931 max_candidates = MAX_PARALLEL_PATH_CANDIDATES_PER_PEER,
2932 "Path candidate race budget exhausted"
2933 );
2934 break;
2935 }
2936
2937 match self
2938 .initiate_connection(transport_id, remote_addr, peer_identity)
2939 .await
2940 {
2941 Ok(()) => {
2942 attempted = true;
2943 concrete_budget = concrete_budget.saturating_sub(1);
2944 }
2945 Err(e @ NodeError::AccessDenied(_)) => return Err(e),
2946 Err(e) => {
2947 if e.is_local_route_unavailable() && local_route_error.is_none() {
2948 local_route_error = Some(e.to_string());
2949 }
2950 debug!(
2951 npub = %peer_config.npub,
2952 transport_id = %transport_id,
2953 error = %e,
2954 "Connection attempt failed, trying next address"
2955 );
2956 }
2957 }
2958 }
2959
2960 if attempted {
2961 return Ok(());
2962 }
2963
2964 if let Some(error) = local_route_error {
2965 return Err(NodeError::LocalRouteUnavailable(error));
2966 }
2967
2968 Err(NodeError::NoTransportForType(format!(
2969 "no operational transport for any of {}'s addresses",
2970 peer_config.npub
2971 )))
2972 }
2973
2974 async fn queue_open_discovery_retries(&mut self, bootstrap: &std::sync::Arc<NostrDiscovery>) {
2975 self.run_open_discovery_sweep(bootstrap, None, "per-tick")
2976 .await;
2977 }
2978
2979 pub(in crate::node) fn queue_active_fallback_direct_retries(
2980 &mut self,
2981 _bootstrap: &std::sync::Arc<NostrDiscovery>,
2982 ) {
2983 let now_ms = Self::now_ms();
2984 let peer_configs = self
2985 .config
2986 .auto_connect_peers()
2987 .cloned()
2988 .collect::<Vec<_>>();
2989
2990 for peer_config in peer_configs {
2991 let Ok(peer_identity) = PeerIdentity::from_npub(&peer_config.npub) else {
2992 continue;
2993 };
2994 let node_addr = *peer_identity.node_addr();
2995
2996 if self.retry_pending.contains_key(&node_addr)
2997 || !self.peers.contains_key(&node_addr)
2998 || self.is_connecting_to_peer(&node_addr)
2999 || !self.active_peer_should_keep_direct_retry(&node_addr, &peer_config)
3000 {
3001 continue;
3002 }
3003
3004 let mut state = super::retry::RetryState::new(peer_config.clone());
3005 state.reconnect = true;
3006 state.retry_after_ms = now_ms;
3007 self.retry_pending.insert(node_addr, state);
3008
3009 debug!(
3010 peer = %self.peer_display_name(&node_addr),
3011 "Queued direct-path retry for active fallback peer"
3012 );
3013 }
3014 }
3015
3016 pub(in crate::node) async fn run_open_discovery_sweep(
3027 &mut self,
3028 bootstrap: &std::sync::Arc<NostrDiscovery>,
3029 max_age_secs: Option<u64>,
3030 caller: &'static str,
3031 ) {
3032 if !self.config.node.discovery.nostr.enabled
3033 || self.config.node.discovery.nostr.policy != crate::config::NostrDiscoveryPolicy::Open
3034 {
3035 return;
3036 }
3037
3038 let configured_npubs = self
3039 .config
3040 .peers()
3041 .iter()
3042 .map(|peer| peer.npub.clone())
3043 .collect::<HashSet<_>>();
3044 let now_ms = Self::now_ms();
3045 let now_secs = now_ms / 1000;
3046 let mut enqueue_budget = self.open_discovery_enqueue_budget(&configured_npubs);
3047 if enqueue_budget == 0 {
3048 debug!(
3049 caller = %caller,
3050 "open-discovery sweep: enqueue budget is 0, skipping"
3051 );
3052 return;
3053 }
3054
3055 let candidates = bootstrap.cached_open_discovery_candidates(64).await;
3056 let cached_count = candidates.len();
3057 let mut enqueued = 0usize;
3058 let mut skipped_age = 0usize;
3059 let mut skipped_configured = 0usize;
3060 let mut skipped_self = 0usize;
3061 let mut skipped_connected = 0usize;
3062 let mut skipped_retry_pending = 0usize;
3063 let mut skipped_connecting = 0usize;
3064 let mut skipped_no_endpoints = 0usize;
3065 let mut skipped_invalid_npub = 0usize;
3066 let mut skipped_cooldown = 0usize;
3067
3068 for (npub, endpoints, created_at_secs) in candidates {
3069 if enqueue_budget == 0 {
3070 break;
3071 }
3072
3073 if let Some(max_age) = max_age_secs
3074 && now_secs.saturating_sub(created_at_secs) > max_age
3075 {
3076 skipped_age = skipped_age.saturating_add(1);
3077 continue;
3078 }
3079
3080 if configured_npubs.contains(&npub) {
3081 if let Ok(identity) = PeerIdentity::from_npub(&npub) {
3101 let configured_addr = *identity.node_addr();
3102 if bootstrap.cooldown_until(&npub, now_ms).is_some() {
3103 skipped_cooldown = skipped_cooldown.saturating_add(1);
3104 skipped_configured = skipped_configured.saturating_add(1);
3105 continue;
3106 }
3107 if let Some(state) = self.retry_pending.get_mut(&configured_addr)
3108 && state.retry_after_ms > now_ms
3109 {
3110 state.retry_after_ms = now_ms;
3111 debug!(
3112 caller = %caller,
3113 peer = %self.peer_display_name(&configured_addr),
3114 advert_age_secs = now_secs.saturating_sub(created_at_secs),
3115 "Expediting configured-peer retry after fresh overlay advert"
3116 );
3117 }
3118 }
3119 skipped_configured = skipped_configured.saturating_add(1);
3120 continue;
3121 }
3122
3123 let peer_identity = match PeerIdentity::from_npub(&npub) {
3124 Ok(identity) => identity,
3125 Err(_) => {
3126 skipped_invalid_npub = skipped_invalid_npub.saturating_add(1);
3127 continue;
3128 }
3129 };
3130 let node_addr = *peer_identity.node_addr();
3131 if node_addr == *self.identity.node_addr() {
3132 skipped_self = skipped_self.saturating_add(1);
3133 continue;
3134 }
3135 if self.peers.contains_key(&node_addr) {
3136 skipped_connected = skipped_connected.saturating_add(1);
3137 continue;
3138 }
3139 if self.retry_pending.contains_key(&node_addr) {
3140 skipped_retry_pending = skipped_retry_pending.saturating_add(1);
3141 continue;
3142 }
3143 if bootstrap.cooldown_until(&npub, now_ms).is_some() {
3144 skipped_cooldown = skipped_cooldown.saturating_add(1);
3145 continue;
3146 }
3147 let connecting = self.connections.values().any(|conn| {
3148 conn.expected_identity()
3149 .map(|id| id.node_addr() == &node_addr)
3150 .unwrap_or(false)
3151 });
3152 if connecting {
3153 skipped_connecting = skipped_connecting.saturating_add(1);
3154 continue;
3155 }
3156
3157 let mut addresses = Vec::new();
3158 let mut priority = 120u8;
3159 let seen_at_ms = Self::now_ms();
3160 for endpoint in endpoints {
3161 let Some(candidate) =
3162 Self::overlay_endpoint_to_peer_address(&endpoint, priority, seen_at_ms)
3163 else {
3164 continue;
3165 };
3166 if addresses.iter().any(|existing: &PeerAddress| {
3167 existing.transport == candidate.transport && existing.addr == candidate.addr
3168 }) {
3169 continue;
3170 }
3171 addresses.push(candidate);
3172 priority = priority.saturating_add(1);
3173 }
3174 if addresses.is_empty() {
3175 skipped_no_endpoints = skipped_no_endpoints.saturating_add(1);
3176 continue;
3177 }
3178
3179 self.peer_aliases
3180 .entry(node_addr)
3181 .or_insert_with(|| peer_identity.short_npub());
3182 self.register_identity(node_addr, peer_identity.pubkey_full());
3183
3184 let mut state = super::retry::RetryState::new(PeerConfig {
3185 npub: npub.clone(),
3186 alias: None,
3187 addresses,
3188 connect_policy: ConnectPolicy::AutoConnect,
3189 auto_reconnect: true,
3190 discovery_fallback_transit: false,
3191 });
3192 state.reconnect = false;
3193 state.retry_after_ms = now_ms;
3194 state.expires_at_ms = Some(self.open_discovery_retry_expires_at_ms(now_ms));
3195 self.retry_pending.insert(node_addr, state);
3196 info!(
3197 caller = %caller,
3198 peer = %peer_identity.short_npub(),
3199 advert_age_secs = now_secs.saturating_sub(created_at_secs),
3200 "open-discovery sweep: queued retry for cached advert"
3201 );
3202 enqueue_budget = enqueue_budget.saturating_sub(1);
3203 enqueued = enqueued.saturating_add(1);
3204 }
3205
3206 let total_skipped = skipped_age
3210 + skipped_configured
3211 + skipped_self
3212 + skipped_connected
3213 + skipped_retry_pending
3214 + skipped_connecting
3215 + skipped_no_endpoints
3216 + skipped_invalid_npub
3217 + skipped_cooldown;
3218 let should_summarize = caller == "startup" || enqueued > 0;
3219 if should_summarize {
3220 info!(
3221 caller = %caller,
3222 cached = cached_count,
3223 queued = enqueued,
3224 skipped_age = skipped_age,
3225 skipped_configured = skipped_configured,
3226 skipped_self = skipped_self,
3227 skipped_connected = skipped_connected,
3228 skipped_retry_pending = skipped_retry_pending,
3229 skipped_connecting = skipped_connecting,
3230 skipped_no_endpoints = skipped_no_endpoints,
3231 skipped_invalid_npub = skipped_invalid_npub,
3232 skipped_cooldown = skipped_cooldown,
3233 skipped_total = total_skipped,
3234 "open-discovery sweep complete"
3235 );
3236 }
3237 }
3238
3239 async fn maybe_run_startup_open_discovery_sweep(
3247 &mut self,
3248 bootstrap: &std::sync::Arc<NostrDiscovery>,
3249 ) {
3250 if self.startup_open_discovery_sweep_done {
3251 return;
3252 }
3253 if !self.config.node.discovery.nostr.enabled
3254 || self.config.node.discovery.nostr.policy != crate::config::NostrDiscoveryPolicy::Open
3255 {
3256 self.startup_open_discovery_sweep_done = true;
3258 return;
3259 }
3260 let Some(started_at_ms) = self.nostr_discovery_started_at_ms else {
3261 return;
3262 };
3263 let now_ms = Self::now_ms();
3264 let delay_ms = self
3265 .config
3266 .node
3267 .discovery
3268 .nostr
3269 .startup_sweep_delay_secs
3270 .saturating_mul(1000);
3271 if now_ms < started_at_ms.saturating_add(delay_ms) {
3272 return;
3273 }
3274
3275 let max_age_secs = self.config.node.discovery.nostr.startup_sweep_max_age_secs;
3276 self.run_open_discovery_sweep(bootstrap, Some(max_age_secs), "startup")
3277 .await;
3278 self.startup_open_discovery_sweep_done = true;
3279 }
3280
3281 fn available_outbound_slots(&self) -> usize {
3282 let connection_used = self
3283 .connections
3284 .len()
3285 .saturating_add(self.pending_connects.len());
3286 let connection_slots = if self.max_connections == 0 {
3287 usize::MAX
3288 } else {
3289 self.max_connections.saturating_sub(connection_used)
3290 };
3291
3292 let peer_slots = if self.max_peers == 0 {
3293 usize::MAX
3294 } else {
3295 self.max_peers.saturating_sub(self.peers.len())
3296 };
3297
3298 let link_slots = if self.max_links == 0 {
3299 usize::MAX
3300 } else {
3301 self.max_links.saturating_sub(self.links.len())
3302 };
3303
3304 connection_slots.min(peer_slots).min(link_slots)
3305 }
3306
3307 pub(in crate::node) fn open_discovery_enqueue_budget(
3308 &self,
3309 configured_npubs: &HashSet<String>,
3310 ) -> usize {
3311 let current_open_discovery_active = self
3312 .peers
3313 .values()
3314 .filter(|peer| !configured_npubs.contains(&peer.npub()))
3315 .count();
3316 let current_open_discovery_pending = self
3317 .retry_pending
3318 .values()
3319 .filter(|state| !configured_npubs.contains(&state.peer_config.npub))
3320 .count();
3321
3322 let cap_remaining = self
3323 .config
3324 .node
3325 .discovery
3326 .nostr
3327 .open_discovery_max_pending
3328 .saturating_sub(current_open_discovery_active)
3329 .saturating_sub(current_open_discovery_pending);
3330
3331 cap_remaining.min(self.available_outbound_slots())
3332 }
3333
3334 fn open_discovery_retry_expires_at_ms(&self, now_ms: u64) -> u64 {
3335 now_ms.saturating_add(
3336 self.config
3337 .node
3338 .discovery
3339 .nostr
3340 .advert_ttl_secs
3341 .saturating_mul(1000)
3342 .saturating_mul(OPEN_DISCOVERY_RETRY_LIFETIME_MULTIPLIER),
3343 )
3344 }
3345
3346 async fn build_overlay_advert(
3347 &self,
3348 bootstrap: &std::sync::Arc<NostrDiscovery>,
3349 ) -> Option<OverlayAdvert> {
3350 if !self.config.node.discovery.nostr.enabled {
3351 return None;
3352 }
3353
3354 let mut endpoints = Vec::new();
3355 let mut has_udp_nat = false;
3356 let mut has_webrtc = false;
3357
3358 for handle in self.transports.values() {
3359 if !handle.is_operational() {
3360 continue;
3361 }
3362
3363 match handle.transport_type().name {
3364 "udp" => {
3365 let Some(cfg) = self.lookup_udp_config(handle.name()) else {
3366 continue;
3367 };
3368 if !cfg.advertise_on_nostr() {
3369 continue;
3370 }
3371 if cfg.is_public() {
3372 if let Some(explicit) = cfg.external_advert_addr() {
3382 endpoints.push(OverlayEndpointAdvert {
3383 transport: OverlayTransportKind::Udp,
3384 addr: explicit.to_string(),
3385 });
3386 } else {
3387 match handle.local_addr() {
3388 Some(addr)
3389 if !addr.ip().is_unspecified()
3390 && !is_unroutable_advert_ip(addr.ip()) =>
3391 {
3392 endpoints.push(OverlayEndpointAdvert {
3393 transport: OverlayTransportKind::Udp,
3394 addr: addr.to_string(),
3395 });
3396 }
3397 Some(addr) => {
3398 let key = handle.transport_id().as_u32();
3399 let port = addr.port();
3400 if let Some(public) =
3401 bootstrap.learn_public_udp_addr(key, port).await
3402 {
3403 endpoints.push(OverlayEndpointAdvert {
3404 transport: OverlayTransportKind::Udp,
3405 addr: public.to_string(),
3406 });
3407 } else {
3408 warn!(
3409 transport_id = key,
3410 bind_addr = %addr,
3411 "advert: udp public=true but bind is wildcard \
3412 or private and STUN observation failed; \
3413 advertising no UDP endpoint. Either set \
3414 transports.udp.external_addr, bind to a \
3415 specific *public* IP, or ensure \
3416 node.discovery.nostr.stun_servers is reachable"
3417 );
3418 }
3419 }
3420 None => {}
3421 }
3422 }
3423 } else {
3424 endpoints.push(OverlayEndpointAdvert {
3425 transport: OverlayTransportKind::Udp,
3426 addr: "nat".to_string(),
3427 });
3428 has_udp_nat = true;
3429 }
3430 }
3431 "webrtc" => {
3432 let Some(cfg) = self.lookup_webrtc_config(handle.name()) else {
3433 continue;
3434 };
3435 if !cfg.advertise_on_nostr() {
3436 continue;
3437 }
3438 endpoints.push(OverlayEndpointAdvert {
3439 transport: OverlayTransportKind::WebRtc,
3440 addr: hex::encode(self.identity.pubkey_full().serialize()),
3441 });
3442 has_webrtc = true;
3443 }
3444 "tcp" => {
3445 let Some(cfg) = self.lookup_tcp_config(handle.name()) else {
3446 continue;
3447 };
3448 if !cfg.advertise_on_nostr() {
3449 continue;
3450 }
3451 if let Some(explicit) = cfg.external_advert_addr() {
3463 endpoints.push(OverlayEndpointAdvert {
3464 transport: OverlayTransportKind::Tcp,
3465 addr: explicit.to_string(),
3466 });
3467 } else {
3468 match handle.local_addr() {
3469 Some(addr)
3470 if !addr.ip().is_unspecified()
3471 && !is_unroutable_advert_ip(addr.ip()) =>
3472 {
3473 endpoints.push(OverlayEndpointAdvert {
3474 transport: OverlayTransportKind::Tcp,
3475 addr: addr.to_string(),
3476 });
3477 }
3478 Some(addr) => {
3479 warn!(
3480 bind_addr = %addr,
3481 "advert: tcp advertise_on_nostr=true bound to wildcard \
3482 or private IP and no transports.tcp.external_addr set; \
3483 advertising no TCP endpoint. Either set external_addr \
3484 to the public IP (recommended for cloud 1:1-NAT setups) \
3485 or bind explicitly to the public IP"
3486 );
3487 }
3488 None => {}
3489 }
3490 }
3491 }
3492 "tor" => {
3493 let Some(cfg) = self.lookup_tor_config(handle.name()) else {
3494 continue;
3495 };
3496 if !cfg.advertise_on_nostr() {
3497 continue;
3498 }
3499 if let Some(addr) = handle.onion_address() {
3500 endpoints.push(OverlayEndpointAdvert {
3501 transport: OverlayTransportKind::Tor,
3502 addr: format!("{}:{}", addr, cfg.advertised_port()),
3503 });
3504 }
3505 }
3506 _ => {}
3507 }
3508 }
3509
3510 if endpoints.is_empty() {
3511 return None;
3512 }
3513
3514 Some(OverlayAdvert {
3515 identifier: ADVERT_IDENTIFIER.to_string(),
3516 version: ADVERT_VERSION,
3517 endpoints,
3518 signal_relays: (has_udp_nat || has_webrtc)
3519 .then(|| self.config.node.discovery.nostr.dm_relays.clone()),
3520 stun_servers: (has_udp_nat || has_webrtc)
3521 .then(|| self.config.node.discovery.nostr.stun_servers.clone()),
3522 })
3523 }
3524
3525 async fn refresh_overlay_advert(
3526 &self,
3527 bootstrap: &std::sync::Arc<NostrDiscovery>,
3528 ) -> Result<(), crate::discovery::nostr::BootstrapError> {
3529 let advert = self.build_overlay_advert(bootstrap).await;
3530 bootstrap.update_local_advert(advert).await
3531 }
3532
3533 fn lookup_udp_config(&self, transport_name: Option<&str>) -> Option<&crate::config::UdpConfig> {
3534 match (&self.config.transports.udp, transport_name) {
3535 (crate::config::TransportInstances::Single(cfg), None) => Some(cfg),
3536 (crate::config::TransportInstances::Named(configs), Some(name)) => configs.get(name),
3537 _ => None,
3538 }
3539 }
3540
3541 fn lookup_tcp_config(&self, transport_name: Option<&str>) -> Option<&crate::config::TcpConfig> {
3542 match (&self.config.transports.tcp, transport_name) {
3543 (crate::config::TransportInstances::Single(cfg), None) => Some(cfg),
3544 (crate::config::TransportInstances::Named(configs), Some(name)) => configs.get(name),
3545 _ => None,
3546 }
3547 }
3548
3549 fn lookup_tor_config(&self, transport_name: Option<&str>) -> Option<&crate::config::TorConfig> {
3550 match (&self.config.transports.tor, transport_name) {
3551 (crate::config::TransportInstances::Single(cfg), None) => Some(cfg),
3552 (crate::config::TransportInstances::Named(configs), Some(name)) => configs.get(name),
3553 _ => None,
3554 }
3555 }
3556
3557 fn lookup_webrtc_config(
3558 &self,
3559 transport_name: Option<&str>,
3560 ) -> Option<&crate::config::WebRtcConfig> {
3561 match (&self.config.transports.webrtc, transport_name) {
3562 (crate::config::TransportInstances::Single(cfg), None) => Some(cfg),
3563 (crate::config::TransportInstances::Named(configs), Some(name)) => configs.get(name),
3564 _ => None,
3565 }
3566 }
3567
3568 pub(in crate::node) async fn try_peer_addresses(
3569 &mut self,
3570 peer_config: &PeerConfig,
3571 peer_identity: PeerIdentity,
3572 allow_bootstrap_nat: bool,
3573 ) -> Result<(), NodeError> {
3574 let peer_node_addr = *peer_identity.node_addr();
3575 if self.peers.contains_key(&peer_node_addr) {
3576 debug!(
3577 npub = %peer_config.npub,
3578 "Peer already exists, skipping address attempts"
3579 );
3580 return Ok(());
3581 }
3582
3583 let candidates = self.peer_address_candidates(peer_config).await;
3584
3585 if candidates.is_empty() {
3586 if allow_bootstrap_nat && self.request_nostr_bootstrap(peer_config).await {
3587 return Ok(());
3588 }
3589 return Err(NodeError::NoTransportForType(format!(
3590 "no addresses known for {}",
3591 peer_config.npub
3592 )));
3593 }
3594
3595 if self
3596 .attempt_peer_address_list(peer_config, peer_identity, allow_bootstrap_nat, &candidates)
3597 .await
3598 .is_ok()
3599 {
3600 if allow_bootstrap_nat {
3601 self.request_nostr_bootstrap(peer_config).await;
3602 }
3603 return Ok(());
3604 }
3605
3606 if allow_bootstrap_nat && self.request_nostr_bootstrap(peer_config).await {
3607 return Ok(());
3608 }
3609
3610 Err(NodeError::NoTransportForType(format!(
3611 "no operational transport for any of {}'s addresses",
3612 peer_config.npub
3613 )))
3614 }
3615
3616 async fn try_active_peer_alternative_addresses(
3617 &mut self,
3618 peer_config: &PeerConfig,
3619 peer_identity: PeerIdentity,
3620 ) -> Result<bool, NodeError> {
3621 let peer_node_addr = *peer_identity.node_addr();
3622 let candidates = self.peer_address_candidates(peer_config).await;
3623 let should_try_nostr =
3624 self.active_peer_should_keep_direct_retry(&peer_node_addr, peer_config);
3625
3626 if candidates.is_empty() {
3627 if should_try_nostr && self.request_nostr_bootstrap(peer_config).await {
3628 return Ok(true);
3629 }
3630 return Err(NodeError::NoTransportForType(format!(
3631 "no addresses known for {}",
3632 peer_config.npub
3633 )));
3634 }
3635
3636 let alternatives: Vec<_> = candidates
3637 .into_iter()
3638 .filter(|addr| !self.active_peer_matches_candidate(&peer_node_addr, addr))
3639 .collect();
3640
3641 if alternatives.is_empty() {
3642 if should_try_nostr && self.request_nostr_bootstrap(peer_config).await {
3643 return Ok(true);
3644 }
3645 return Ok(false);
3646 }
3647
3648 let needs_separate_nostr_attempt = should_try_nostr
3649 && !alternatives
3650 .iter()
3651 .any(|addr| addr.transport == "udp" && addr.addr.eq_ignore_ascii_case("nat"));
3652 let address_result = self
3653 .attempt_peer_address_list(peer_config, peer_identity, true, &alternatives)
3654 .await;
3655 let nostr_attempted =
3656 needs_separate_nostr_attempt && self.request_nostr_bootstrap(peer_config).await;
3657
3658 match address_result {
3659 Ok(()) => Ok(true),
3660 Err(err) if nostr_attempted => {
3661 debug!(
3662 npub = %peer_config.npub,
3663 error = %err,
3664 "Static active-peer direct-path alternatives failed; Nostr traversal still queued"
3665 );
3666 Ok(true)
3667 }
3668 Err(err) => Err(err),
3669 }
3670 }
3671
3672 async fn peer_address_candidates(&self, peer_config: &PeerConfig) -> Vec<PeerAddress> {
3673 let static_addresses = self.static_peer_addresses(peer_config);
3680 let overlay_addresses = self
3681 .nostr_peer_fallback_addresses(peer_config, &static_addresses)
3682 .await;
3683
3684 let mut candidates = Vec::with_capacity(overlay_addresses.len() + static_addresses.len());
3685 for addr in overlay_addresses.into_iter().chain(static_addresses) {
3686 if !candidates.iter().any(|existing: &PeerAddress| {
3687 existing.transport == addr.transport && existing.addr == addr.addr
3688 }) {
3689 candidates.push(addr);
3690 }
3691 }
3692
3693 candidates.sort_by(|a, b| match (a.seen_at_ms, b.seen_at_ms) {
3699 (Some(_), None) => std::cmp::Ordering::Less,
3700 (None, Some(_)) => std::cmp::Ordering::Greater,
3701 _ if a.priority != b.priority => a.priority.cmp(&b.priority),
3702 (Some(a_ts), Some(b_ts)) => b_ts.cmp(&a_ts),
3703 (None, None) => std::cmp::Ordering::Equal,
3704 });
3705
3706 candidates
3707 }
3708
3709 fn active_peer_matches_any_candidate(
3710 &self,
3711 peer_node_addr: &NodeAddr,
3712 candidates: &[PeerAddress],
3713 ) -> bool {
3714 candidates
3715 .iter()
3716 .any(|candidate| self.active_peer_matches_candidate(peer_node_addr, candidate))
3717 }
3718
3719 pub(in crate::node) fn active_peer_candidate_is_fresh_enough_to_skip(
3720 &self,
3721 peer_node_addr: &NodeAddr,
3722 candidates: &[PeerAddress],
3723 ) -> bool {
3724 if !self.active_peer_matches_any_candidate(peer_node_addr, candidates) {
3725 return false;
3726 }
3727 !self.active_peer_needs_same_path_refresh(peer_node_addr)
3728 }
3729
3730 pub(in crate::node) fn active_peer_should_keep_direct_retry(
3731 &self,
3732 peer_node_addr: &NodeAddr,
3733 peer_config: &PeerConfig,
3734 ) -> bool {
3735 let Some(peer) = self.peers.get(peer_node_addr) else {
3736 return false;
3737 };
3738
3739 let static_addresses = self.static_peer_addresses(peer_config);
3740 if !static_addresses.is_empty() {
3741 return !self
3742 .active_peer_candidate_is_fresh_enough_to_skip(peer_node_addr, &static_addresses);
3743 }
3744
3745 if peer_config.npub.is_empty() {
3746 return false;
3747 }
3748
3749 if !self.config.node.discovery.nostr.enabled
3750 || self.config.node.discovery.nostr.policy == NostrDiscoveryPolicy::Disabled
3751 {
3752 return false;
3753 }
3754
3755 peer.transport_id()
3756 .and_then(|id| self.transports.get(&id))
3757 .map(|transport| transport.transport_type().name != "udp")
3758 .unwrap_or(true)
3759 }
3760
3761 pub(in crate::node) fn clear_retry_unless_direct_refresh_needed(
3762 &mut self,
3763 peer_node_addr: &NodeAddr,
3764 ) {
3765 let keep_retry = self
3766 .retry_pending
3767 .get(peer_node_addr)
3768 .map(|state| state.peer_config.clone())
3769 .is_some_and(|peer_config| {
3770 self.active_peer_should_keep_direct_retry(peer_node_addr, &peer_config)
3771 });
3772
3773 if !keep_retry {
3774 self.retry_pending.remove(peer_node_addr);
3775 }
3776 }
3777
3778 fn active_peer_needs_same_path_refresh(&self, peer_node_addr: &NodeAddr) -> bool {
3779 let Some(peer) = self.peers.get(peer_node_addr) else {
3780 return false;
3781 };
3782 let stale_after_ms = self
3783 .config
3784 .node
3785 .heartbeat_interval_secs
3786 .saturating_mul(1000)
3787 .max(1000);
3788 peer.idle_time(Self::now_ms()) > stale_after_ms
3789 }
3790
3791 pub(in crate::node) fn active_peer_matches_candidate(
3792 &self,
3793 peer_node_addr: &NodeAddr,
3794 candidate: &PeerAddress,
3795 ) -> bool {
3796 let Some(peer) = self.peers.get(peer_node_addr) else {
3797 return false;
3798 };
3799 let Some(current_addr) = peer.current_addr() else {
3800 return false;
3801 };
3802 if let Some(peer_transport_id) = peer.transport_id()
3803 && let Some((candidate_transport_id, candidate_addr)) =
3804 self.resolve_peer_address_for_match(candidate)
3805 {
3806 return peer_transport_id == candidate_transport_id && current_addr == &candidate_addr;
3807 }
3808 if peer
3809 .transport_id()
3810 .map(|id| self.bootstrap_transports.contains(&id))
3811 .unwrap_or(false)
3812 {
3813 return false;
3814 }
3815 let current_addr = current_addr.to_string();
3816 let current_transport = peer
3817 .transport_id()
3818 .and_then(|id| self.transports.get(&id))
3819 .map(|transport| transport.transport_type().name);
3820
3821 candidate.addr == current_addr
3822 && current_transport
3823 .map(|transport| transport == candidate.transport)
3824 .unwrap_or(true)
3825 }
3826
3827 pub(in crate::node) fn active_peer_uses_recent_endpoint_path(
3828 &self,
3829 peer_node_addr: &NodeAddr,
3830 peer_config: &PeerConfig,
3831 ) -> bool {
3832 peer_config.addresses.iter().any(|addr| {
3833 addr.seen_at_ms.is_some() && self.active_peer_matches_candidate(peer_node_addr, addr)
3834 })
3835 }
3836
3837 pub(in crate::node) fn active_peer_uses_traversal_path(
3838 &self,
3839 peer_node_addr: &NodeAddr,
3840 peer_config: &PeerConfig,
3841 ) -> bool {
3842 let via_bootstrap_transport = self
3843 .peers
3844 .get(peer_node_addr)
3845 .and_then(|peer| peer.transport_id())
3846 .map(|id| self.bootstrap_transports.contains(&id))
3847 .unwrap_or(false);
3848
3849 via_bootstrap_transport
3850 || self.active_peer_uses_recent_endpoint_path(peer_node_addr, peer_config)
3851 }
3852
3853 pub(crate) async fn api_connect(
3861 &mut self,
3862 npub: &str,
3863 address: &str,
3864 transport: &str,
3865 ) -> Result<serde_json::Value, String> {
3866 let peer_config = PeerConfig {
3867 npub: npub.to_string(),
3868 alias: None,
3869 addresses: vec![PeerAddress::new(transport, address)],
3870 connect_policy: ConnectPolicy::Manual,
3871 auto_reconnect: false,
3872 discovery_fallback_transit: true,
3873 };
3874
3875 if let Ok(identity) = PeerIdentity::from_npub(npub) {
3877 self.peer_aliases
3878 .insert(*identity.node_addr(), identity.short_npub());
3879 self.register_identity(*identity.node_addr(), identity.pubkey_full());
3880 }
3881
3882 self.initiate_peer_connection(&peer_config)
3883 .await
3884 .map(|()| {
3885 info!(
3886 npub = %npub,
3887 address = %address,
3888 transport = %transport,
3889 "API connect initiated"
3890 );
3891 serde_json::json!({
3892 "npub": npub,
3893 "address": address,
3894 "transport": transport,
3895 })
3896 })
3897 .map_err(|e| e.to_string())
3898 }
3899
3900 pub(crate) fn api_disconnect(&mut self, npub: &str) -> Result<serde_json::Value, String> {
3904 let peer_identity =
3905 PeerIdentity::from_npub(npub).map_err(|e| format!("invalid npub '{npub}': {e}"))?;
3906 let node_addr = *peer_identity.node_addr();
3907
3908 if !self.peers.contains_key(&node_addr) {
3909 return Err(format!("peer not found: {npub}"));
3910 }
3911
3912 self.remove_active_peer(&node_addr);
3914
3915 self.retry_pending.remove(&node_addr);
3917
3918 info!(npub = %npub, "API disconnect completed");
3919
3920 Ok(serde_json::json!({
3921 "npub": npub,
3922 "disconnected": true,
3923 }))
3924 }
3925
3926 pub async fn adopt_established_traversal(
3933 &mut self,
3934 traversal: EstablishedTraversal,
3935 ) -> Result<BootstrapHandoffResult, NodeError> {
3936 debug!(
3937 peer_npub = %traversal.peer_npub,
3938 session_id = %traversal.session_id,
3939 remote_addr = %traversal.remote_addr,
3940 "adopting established traversal socket"
3941 );
3942
3943 if !self.state.is_operational() {
3944 return Err(NodeError::NotStarted);
3945 }
3946
3947 let packet_tx = self.packet_tx.clone().ok_or(NodeError::NotStarted)?;
3948 let peer_identity = PeerIdentity::from_npub(&traversal.peer_npub).map_err(|e| {
3949 NodeError::InvalidPeerNpub {
3950 npub: traversal.peer_npub.clone(),
3951 reason: e.to_string(),
3952 }
3953 })?;
3954 let peer_node_addr = *peer_identity.node_addr();
3955 if self.peers.contains_key(&peer_node_addr) {
3956 debug!(
3957 peer_npub = %traversal.peer_npub,
3958 "Adopting NAT traversal handoff as alternate path for already-connected peer"
3959 );
3960 }
3961
3962 self.peer_aliases
3963 .insert(peer_node_addr, peer_identity.short_npub());
3964 self.register_identity(peer_node_addr, peer_identity.pubkey_full());
3965
3966 let transport_id = self.allocate_transport_id();
3967 let inherited_config = traversal.transport_config.clone().unwrap_or_else(|| {
3987 let mut cfg = self
3988 .lookup_udp_config(traversal.transport_name.as_deref())
3989 .or_else(|| self.lookup_udp_config(None))
3990 .cloned()
3991 .unwrap_or_default();
3992 cfg.bind_addr = None;
3993 cfg.external_addr = None;
3994 cfg
3995 });
3996 let mut transport = crate::transport::udp::UdpTransport::new(
3997 transport_id,
3998 traversal.transport_name.clone(),
3999 inherited_config,
4000 packet_tx,
4001 );
4002
4003 transport
4004 .adopt_socket_async(traversal.socket)
4005 .await
4006 .map_err(|e| NodeError::BootstrapHandoff(e.to_string()))?;
4007
4008 let local_addr = transport.local_addr().ok_or_else(|| {
4009 NodeError::BootstrapHandoff("adopted UDP transport has no local address".into())
4010 })?;
4011
4012 self.transports.insert(
4013 transport_id,
4014 crate::transport::TransportHandle::Udp(transport),
4015 );
4016 self.bootstrap_transports.insert(transport_id);
4017 self.bootstrap_transport_npubs
4018 .insert(transport_id, traversal.peer_npub.clone());
4019
4020 let remote_addr = TransportAddr::from_string(&traversal.remote_addr.to_string());
4021 if let Err(err) = self
4022 .initiate_connection(transport_id, remote_addr.clone(), peer_identity)
4023 .await
4024 {
4025 self.bootstrap_transports.remove(&transport_id);
4026 self.bootstrap_transport_npubs.remove(&transport_id);
4027 if let Some(mut handle) = self.transports.remove(&transport_id) {
4028 let _ = handle.stop().await;
4029 }
4030 return Err(err);
4031 }
4032
4033 info!(
4034 peer = %self.peer_display_name(&peer_node_addr),
4035 transport_id = %transport_id,
4036 local_addr = %local_addr,
4037 remote_addr = %traversal.remote_addr,
4038 session_id = %traversal.session_id,
4039 "adopted NAT traversal socket; handshake initiated"
4040 );
4041
4042 Ok(BootstrapHandoffResult {
4043 transport_id,
4044 local_addr,
4045 remote_addr: traversal.remote_addr,
4046 peer_node_addr,
4047 session_id: traversal.session_id,
4048 })
4049 }
4050}