1use super::{Node, NodeError, NodeState};
4use crate::config::{ConnectPolicy, NostrDiscoveryPolicy, PeerAddress, PeerConfig};
5use crate::discovery::nostr::{
6 ADVERT_IDENTIFIER, ADVERT_VERSION, BootstrapEvent, MeshTraversalSignal, NostrDiscovery,
7 OverlayAdvert, OverlayEndpointAdvert, OverlayTransportKind,
8};
9use crate::discovery::{BootstrapHandoffResult, EstablishedTraversal};
10use crate::node::acl::PeerAclContext;
11use crate::node::wire::build_msg1;
12use crate::peer::PeerConnection;
13use crate::protocol::{Disconnect, DisconnectReason, SessionMessageType};
14use crate::transport::{Link, LinkDirection, LinkId, TransportAddr, TransportId, packet_channel};
15use crate::upper::tun::{TunDevice, TunState, run_tun_reader, shutdown_tun_interface};
16use crate::{NodeAddr, PeerIdentity};
17use secp256k1::PublicKey;
18use std::collections::{HashMap, HashSet};
19use std::net::{IpAddr, SocketAddr};
20use std::thread;
21use std::time::Duration;
22use tracing::{debug, info, warn};
23
24#[derive(Debug, Clone, Copy, PartialEq, Eq)]
25enum MeshSignalSessionAction {
26 Send,
27 Defer,
28 Drop,
29}
30
31#[cfg(debug_assertions)]
32fn node_start_debug_log(message: impl AsRef<str>) {
33 use std::io::Write as _;
34
35 if let Ok(mut file) = std::fs::OpenOptions::new()
36 .create(true)
37 .append(true)
38 .open(std::env::temp_dir().join("nvpn-fips-endpoint-debug.log"))
39 {
40 let _ = writeln!(
41 file,
42 "{:?} {}",
43 std::time::SystemTime::now(),
44 message.as_ref()
45 );
46 }
47}
48
49#[cfg(not(debug_assertions))]
50fn node_start_debug_log(_message: impl AsRef<str>) {}
51
52fn is_unroutable_advert_ip(ip: IpAddr) -> bool {
60 match ip {
61 IpAddr::V4(v4) => {
62 v4.is_private()
63 || v4.is_loopback()
64 || v4.is_link_local()
65 || v4.is_unspecified()
66 || v4.is_multicast()
67 || v4.is_broadcast()
68 || v4.is_documentation()
69 || (v4.octets()[0] == 100 && (v4.octets()[1] & 0xc0) == 64)
72 }
73 IpAddr::V6(v6) => {
74 v6.is_loopback()
75 || v6.is_unspecified()
76 || v6.is_unique_local()
77 || v6.is_multicast()
78 || (v6.segments()[0] & 0xffc0) == 0xfe80
80 }
81 }
82}
83
84fn socket_addr_families_compatible(local: SocketAddr, remote: SocketAddr) -> bool {
85 matches!(
86 (local, remote),
87 (SocketAddr::V4(_), SocketAddr::V4(_)) | (SocketAddr::V6(_), SocketAddr::V6(_))
88 )
89}
90
91const OPEN_DISCOVERY_RETRY_LIFETIME_MULTIPLIER: u64 = 2;
92const MAX_PARALLEL_PATH_CANDIDATES_PER_PEER: usize = 4;
93const MAX_AUTO_CONNECT_GRAPH_WARMUPS_PER_TICK: usize = 16;
94const MAX_DISCOVERY_CONNECTS_PER_TICK: usize = 16;
95
96impl Node {
97 pub(super) async fn update_peers(
109 &mut self,
110 new_peers: Vec<crate::config::PeerConfig>,
111 ) -> Result<crate::node::UpdatePeersOutcome, crate::node::NodeError> {
112 use std::collections::{HashMap, HashSet};
113
114 let mut new_by_addr: HashMap<crate::identity::NodeAddr, crate::config::PeerConfig> =
115 HashMap::with_capacity(new_peers.len());
116 let mut new_order = Vec::with_capacity(new_peers.len());
117 for peer in new_peers {
118 let identity = match PeerIdentity::from_npub(&peer.npub) {
119 Ok(id) => id,
120 Err(e) => {
121 return Err(crate::node::NodeError::InvalidPeerNpub {
122 npub: peer.npub.clone(),
123 reason: e.to_string(),
124 });
125 }
126 };
127 let node_addr = *identity.node_addr();
131 if !new_by_addr.contains_key(&node_addr) {
132 new_order.push(node_addr);
133 }
134 new_by_addr.insert(node_addr, peer);
135 }
136
137 let current_by_addr: HashMap<crate::identity::NodeAddr, crate::config::PeerConfig> = self
138 .config
139 .peers()
140 .iter()
141 .filter_map(|pc| {
142 PeerIdentity::from_npub(&pc.npub)
143 .ok()
144 .map(|id| (*id.node_addr(), pc.clone()))
145 })
146 .collect();
147
148 let new_addrs: HashSet<_> = new_by_addr.keys().copied().collect();
149 let current_addrs: HashSet<_> = current_by_addr.keys().copied().collect();
150
151 let removed: Vec<_> = current_addrs.difference(&new_addrs).copied().collect();
152 let added: Vec<_> = new_addrs.difference(¤t_addrs).copied().collect();
153 let kept: Vec<_> = new_addrs.intersection(¤t_addrs).copied().collect();
154
155 let mut outcome = crate::node::UpdatePeersOutcome::default();
156
157 for node_addr in &removed {
158 if self.retry_pending.remove(node_addr).is_some() {
159 debug!(
160 peer = %self.peer_display_name(node_addr),
161 "Dropping retry entry for peer removed from runtime peer list"
162 );
163 }
164 self.peer_aliases.remove(node_addr);
165 self.set_discovery_fallback_transit_allowed(*node_addr, false);
166 outcome.removed += 1;
167 }
168
169 let mut auto_connect_refresh_configs = Vec::new();
170 for node_addr in &kept {
171 let new_pc = &new_by_addr[node_addr];
172 let current_pc = ¤t_by_addr[node_addr];
173 if new_pc.addresses != current_pc.addresses
174 || new_pc.alias != current_pc.alias
175 || new_pc.connect_policy != current_pc.connect_policy
176 || new_pc.auto_reconnect != current_pc.auto_reconnect
177 || new_pc.discovery_fallback_transit != current_pc.discovery_fallback_transit
178 {
179 outcome.updated += 1;
180 self.set_discovery_fallback_transit_allowed(
181 *node_addr,
182 new_pc.discovery_fallback_transit,
183 );
184 if let Some(state) = self.retry_pending.get_mut(node_addr) {
185 state.peer_config = new_pc.clone();
186 state.retry_after_ms = Self::now_ms();
187 }
188 if let Some(alias) = new_pc.alias.clone() {
189 self.peer_aliases.insert(*node_addr, alias);
190 }
191 if new_pc.is_auto_connect() && !new_pc.addresses.is_empty() {
192 auto_connect_refresh_configs.push(new_pc.clone());
193 }
194 } else {
195 outcome.unchanged += 1;
196 self.set_discovery_fallback_transit_allowed(
197 *node_addr,
198 new_pc.discovery_fallback_transit,
199 );
200 if new_pc.is_auto_connect() && !new_pc.addresses.is_empty() {
201 auto_connect_refresh_configs.push(new_pc.clone());
202 }
203 }
204 }
205
206 let added_configs: Vec<crate::config::PeerConfig> = new_order
207 .iter()
208 .filter(|addr| added.contains(addr))
209 .map(|addr| new_by_addr[addr].clone())
210 .collect();
211
212 self.config.peers = new_order
216 .iter()
217 .filter_map(|addr| new_by_addr.get(addr).cloned())
218 .collect();
219 self.configured_peer_send_weights = Self::configured_peer_send_weights(&self.config);
220
221 for peer_config in added_configs {
222 outcome.added += 1;
223 let Ok(identity) = PeerIdentity::from_npub(&peer_config.npub) else {
224 continue;
225 };
226 let name = peer_config
227 .alias
228 .clone()
229 .unwrap_or_else(|| identity.short_npub());
230 self.peer_aliases.insert(*identity.node_addr(), name);
231 self.set_discovery_fallback_transit_allowed(
232 *identity.node_addr(),
233 peer_config.discovery_fallback_transit,
234 );
235 self.register_identity(*identity.node_addr(), identity.pubkey_full());
236
237 if !peer_config.is_auto_connect() {
238 continue;
239 }
240
241 match self
242 .try_auto_connect_graph_session(&peer_config, identity)
243 .await
244 {
245 Ok(true) => continue,
246 Ok(false) => {}
247 Err(err) => {
248 debug!(
249 npub = %peer_config.npub,
250 error = %err,
251 "Existing FIPS graph did not warm newly added peer"
252 );
253 }
254 }
255
256 if let Err(e) = self.initiate_peer_connection(&peer_config).await {
257 warn!(
258 npub = %peer_config.npub,
259 error = %e,
260 "Failed to initiate connection for newly added peer"
261 );
262 if let Ok(peer_identity) = PeerIdentity::from_npub(&peer_config.npub) {
263 self.schedule_retry_after_error(*peer_identity.node_addr(), Self::now_ms(), &e);
264 }
265 if matches!(e, crate::node::NodeError::NoTransportForType(_))
266 && let Some(bootstrap) = self.nostr_discovery.clone()
267 {
268 bootstrap
269 .request_advert_stale_check(peer_config.npub.clone())
270 .await;
271 }
272 }
273 }
274
275 for peer_config in auto_connect_refresh_configs {
276 let Ok(peer_identity) = PeerIdentity::from_npub(&peer_config.npub) else {
277 continue;
278 };
279 let node_addr = *peer_identity.node_addr();
280
281 if self.peers.contains_key(&node_addr) {
282 match self
283 .initiate_active_peer_alternative_connection(&peer_config)
284 .await
285 {
286 Ok(attempted) => {
287 if attempted {
288 debug!(
289 peer = %self.peer_display_name(&node_addr),
290 "Started non-disruptive alternate-path handshake for active peer"
291 );
292 }
293 }
294 Err(e) => {
295 debug!(
296 npub = %peer_config.npub,
297 error = %e,
298 "Active peer alternate-path refresh did not start"
299 );
300 }
301 }
302 continue;
303 }
304
305 match self
306 .try_auto_connect_graph_session(&peer_config, peer_identity)
307 .await
308 {
309 Ok(true) => continue,
310 Ok(false) => {}
311 Err(err) => {
312 debug!(
313 npub = %peer_config.npub,
314 error = %err,
315 "Existing FIPS graph did not warm refreshed peer"
316 );
317 }
318 }
319
320 match self.initiate_peer_connection(&peer_config).await {
321 Ok(()) => {
322 let hs_timeout_ms = self.config.node.rate_limit.handshake_timeout_secs * 1000;
323 if let Some(state) = self.retry_pending.get_mut(&node_addr) {
324 state.peer_config = peer_config;
325 state.retry_after_ms = Self::now_ms().saturating_add(hs_timeout_ms);
326 }
327 }
328 Err(e) => {
329 debug!(
330 npub = %peer_config.npub,
331 error = %e,
332 "Refreshed peer addresses did not initiate a direct connection"
333 );
334 self.schedule_retry_after_error(node_addr, Self::now_ms(), &e);
335 }
336 }
337 }
338
339 self.warm_auto_connect_graph_sessions().await;
340
341 Ok(outcome)
342 }
343
344 pub(super) async fn initiate_peer_connections(&mut self) {
345 let peer_identities: Vec<(PeerIdentity, Option<String>)> = self
351 .config
352 .peers()
353 .iter()
354 .filter_map(|pc| {
355 PeerIdentity::from_npub(&pc.npub)
356 .ok()
357 .map(|id| (id, pc.alias.clone()))
358 })
359 .collect();
360
361 for (identity, alias) in peer_identities {
362 let name = alias.unwrap_or_else(|| identity.short_npub());
363 self.peer_aliases.insert(*identity.node_addr(), name);
364 self.register_identity(*identity.node_addr(), identity.pubkey_full());
368 }
369
370 let peer_configs: Vec<_> = self.config.auto_connect_peers().cloned().collect();
372
373 if peer_configs.is_empty() {
374 debug!("No static peers configured");
375 return;
376 }
377
378 debug!(
379 count = peer_configs.len(),
380 "Initiating static peer connections"
381 );
382
383 for peer_config in peer_configs {
384 let peer_identity = match PeerIdentity::from_npub(&peer_config.npub) {
385 Ok(identity) => identity,
386 Err(_) => continue,
387 };
388 match self
389 .try_auto_connect_graph_session(&peer_config, peer_identity)
390 .await
391 {
392 Ok(true) => continue,
393 Ok(false) => {}
394 Err(err) => {
395 debug!(
396 npub = %peer_config.npub,
397 error = %err,
398 "Existing FIPS graph did not warm auto-connect peer"
399 );
400 }
401 }
402 if let Err(e) = self.initiate_peer_connection(&peer_config).await {
403 warn!(
404 npub = %peer_config.npub,
405 alias = ?peer_config.alias,
406 error = %e,
407 "Failed to initiate peer connection"
408 );
409 if let Ok(peer_identity) = PeerIdentity::from_npub(&peer_config.npub) {
413 self.schedule_retry_after_error(*peer_identity.node_addr(), Self::now_ms(), &e);
414 }
415 if matches!(e, crate::node::NodeError::NoTransportForType(_))
421 && let Some(bootstrap) = self.nostr_discovery.clone()
422 {
423 bootstrap
424 .request_advert_stale_check(peer_config.npub.clone())
425 .await;
426 }
427 }
428 }
429
430 self.warm_auto_connect_graph_sessions().await;
431 }
432
433 pub(in crate::node) async fn try_auto_connect_graph_session(
434 &mut self,
435 peer_config: &PeerConfig,
436 peer_identity: PeerIdentity,
437 ) -> Result<bool, NodeError> {
438 if !peer_config.is_auto_connect() {
439 return Ok(false);
440 }
441
442 let peer_node_addr = *peer_identity.node_addr();
443 if self.peers.contains_key(&peer_node_addr) {
444 return Ok(false);
445 }
446 if self.auto_connect_should_race_direct_path(peer_config) {
447 return Ok(false);
448 }
449 if self
450 .sessions
451 .get(&peer_node_addr)
452 .is_some_and(|entry| entry.is_established() || entry.is_initiating())
453 {
454 return Ok(true);
455 }
456 if self.find_next_hop(&peer_node_addr).is_none() {
457 return Ok(false);
458 }
459
460 self.register_identity(peer_node_addr, peer_identity.pubkey_full());
461 match self
462 .initiate_session(peer_node_addr, peer_identity.pubkey_full())
463 .await
464 {
465 Ok(()) => {
466 debug!(
467 peer = %self.peer_display_name(&peer_node_addr),
468 "Warmed auto-connect peer session over existing FIPS graph"
469 );
470 Ok(true)
471 }
472 Err(NodeError::SendFailed { node_addr, reason })
473 if node_addr == peer_node_addr && reason == "no route to destination" =>
474 {
475 self.maybe_initiate_lookup(&peer_node_addr).await;
476 Ok(false)
477 }
478 Err(err) => Err(err),
479 }
480 }
481
482 fn auto_connect_should_race_direct_path(&self, peer_config: &PeerConfig) -> bool {
483 !peer_config.addresses.is_empty() || self.config.node.discovery.nostr.enabled
484 }
485
486 pub(super) async fn initiate_peer_connection(
490 &mut self,
491 peer_config: &crate::config::PeerConfig,
492 ) -> Result<(), NodeError> {
493 self.initiate_peer_connection_inner(peer_config).await
494 }
495
496 pub(super) async fn initiate_peer_retry_connection(
502 &mut self,
503 peer_config: &crate::config::PeerConfig,
504 ) -> Result<(), NodeError> {
505 self.initiate_peer_connection_inner(peer_config).await
506 }
507
508 pub(in crate::node) async fn initiate_active_peer_alternative_connection(
509 &mut self,
510 peer_config: &crate::config::PeerConfig,
511 ) -> Result<bool, NodeError> {
512 let peer_identity =
513 PeerIdentity::from_npub(&peer_config.npub).map_err(|e| NodeError::InvalidPeerNpub {
514 npub: peer_config.npub.clone(),
515 reason: e.to_string(),
516 })?;
517 let peer_node_addr = *peer_identity.node_addr();
518
519 if !self.peers.contains_key(&peer_node_addr) {
520 self.initiate_peer_connection(peer_config).await?;
521 return Ok(true);
522 }
523
524 self.try_active_peer_alternative_addresses(peer_config, peer_identity)
530 .await
531 }
532
533 async fn initiate_peer_connection_inner(
534 &mut self,
535 peer_config: &crate::config::PeerConfig,
536 ) -> Result<(), NodeError> {
537 let peer_identity =
539 PeerIdentity::from_npub(&peer_config.npub).map_err(|e| NodeError::InvalidPeerNpub {
540 npub: peer_config.npub.clone(),
541 reason: e.to_string(),
542 })?;
543
544 let peer_node_addr = *peer_identity.node_addr();
545
546 if self.peers.contains_key(&peer_node_addr) {
548 debug!(
549 npub = %peer_config.npub,
550 "Peer already exists, skipping"
551 );
552 return Ok(());
553 }
554
555 self.try_peer_addresses(peer_config, peer_identity, true)
556 .await
557 }
558
559 fn is_connecting_to_peer(&self, peer_node_addr: &NodeAddr) -> bool {
560 self.connections.values().any(|conn| {
561 conn.expected_identity()
562 .map(|id| id.node_addr() == peer_node_addr)
563 .unwrap_or(false)
564 })
565 }
566
567 fn is_connecting_to_peer_on_path(
568 &self,
569 peer_node_addr: &NodeAddr,
570 transport_id: TransportId,
571 remote_addr: &TransportAddr,
572 ) -> bool {
573 self.connections.values().any(|conn| {
574 conn.expected_identity()
575 .map(|id| id.node_addr() == peer_node_addr)
576 .unwrap_or(false)
577 && conn.transport_id() == Some(transport_id)
578 && conn.source_addr() == Some(remote_addr)
579 }) || self.pending_connects.iter().any(|pending| {
580 pending.peer_identity.node_addr() == peer_node_addr
581 && pending.transport_id == transport_id
582 && &pending.remote_addr == remote_addr
583 })
584 }
585
586 pub(in crate::node) fn should_warm_auto_connect_session(
587 &self,
588 peer_node_addr: &NodeAddr,
589 ) -> bool {
590 if self.peers.contains_key(peer_node_addr)
591 || self
592 .sessions
593 .get(peer_node_addr)
594 .is_some_and(|entry| entry.is_established())
595 {
596 return false;
597 }
598
599 self.config.peers().iter().any(|peer| {
600 peer.is_auto_connect()
601 && PeerIdentity::from_npub(&peer.npub)
602 .map(|identity| identity.node_addr() == peer_node_addr)
603 .unwrap_or(false)
604 })
605 }
606
607 pub(in crate::node) async fn warm_auto_connect_graph_sessions(&mut self) -> usize {
608 if !self.peers.values().any(|peer| peer.can_send()) {
609 return 0;
610 }
611
612 let mut budget = self.graph_session_warmup_budget();
613 if budget == 0 {
614 return 0;
615 }
616
617 let peer_identities: Vec<_> = self
618 .config
619 .auto_connect_peers()
620 .filter_map(|peer| PeerIdentity::from_npub(&peer.npub).ok())
621 .collect();
622
623 let mut warmed = 0;
624 for identity in peer_identities {
625 if budget == 0 {
626 break;
627 }
628
629 let peer_node_addr = *identity.node_addr();
630 if peer_node_addr == *self.identity.node_addr()
631 || !self.should_warm_auto_connect_session(&peer_node_addr)
632 || self
633 .sessions
634 .get(&peer_node_addr)
635 .is_some_and(|entry| entry.is_initiating())
636 {
637 continue;
638 }
639
640 self.register_identity(peer_node_addr, identity.pubkey_full());
641
642 if self.find_next_hop(&peer_node_addr).is_some() {
643 match self
644 .initiate_session(peer_node_addr, identity.pubkey_full())
645 .await
646 {
647 Ok(()) => {
648 warmed += 1;
649 budget = budget.saturating_sub(1);
650 debug!(
651 peer = %self.peer_display_name(&peer_node_addr),
652 "Warmed auto-connect peer session over existing FIPS graph"
653 );
654 }
655 Err(NodeError::SendFailed { node_addr, reason })
656 if node_addr == peer_node_addr && reason == "no route to destination" =>
657 {
658 self.maybe_initiate_lookup(&peer_node_addr).await;
659 warmed += 1;
660 budget = budget.saturating_sub(1);
661 }
662 Err(err) => {
663 debug!(
664 peer = %self.peer_display_name(&peer_node_addr),
665 error = %err,
666 "Failed to warm auto-connect peer session"
667 );
668 }
669 }
670 } else {
671 self.maybe_initiate_lookup(&peer_node_addr).await;
672 warmed += 1;
673 budget = budget.saturating_sub(1);
674 }
675 }
676
677 warmed
678 }
679
680 pub(in crate::node) fn graph_session_warmup_budget(&self) -> usize {
681 let max_destinations = self.config.node.session.pending_max_destinations;
682 if max_destinations == 0 {
683 return 0;
684 }
685
686 let pending_sessions = self
687 .sessions
688 .values()
689 .filter(|entry| !entry.is_established())
690 .count();
691 let pending_total = pending_sessions.saturating_add(self.pending_lookups.len());
692 max_destinations
693 .saturating_sub(pending_total)
694 .min(MAX_AUTO_CONNECT_GRAPH_WARMUPS_PER_TICK)
695 }
696
697 fn outbound_handshake_slots(&self) -> usize {
698 let used = self
699 .connections
700 .len()
701 .saturating_add(self.pending_connects.len());
702 if self.max_connections == 0 {
703 usize::MAX
704 } else {
705 self.max_connections.saturating_sub(used)
706 }
707 }
708
709 fn outbound_link_slots(&self) -> usize {
710 if self.max_links == 0 {
711 usize::MAX
712 } else {
713 self.max_links.saturating_sub(self.links.len())
714 }
715 }
716
717 fn path_candidate_attempt_budget(&self, peer_node_addr: &NodeAddr) -> usize {
718 if !self.peers.contains_key(peer_node_addr)
719 && self.max_peers > 0
720 && self.peers.len() >= self.max_peers
721 {
722 return 0;
723 }
724
725 let in_flight_for_peer = self
726 .connections
727 .values()
728 .filter(|conn| {
729 conn.expected_identity()
730 .map(|id| id.node_addr() == peer_node_addr)
731 .unwrap_or(false)
732 })
733 .count()
734 .saturating_add(
735 self.pending_connects
736 .iter()
737 .filter(|pending| pending.peer_identity.node_addr() == peer_node_addr)
738 .count(),
739 );
740
741 self.outbound_handshake_slots()
742 .min(self.outbound_link_slots())
743 .min(MAX_PARALLEL_PATH_CANDIDATES_PER_PEER.saturating_sub(in_flight_for_peer))
744 }
745
746 fn discovery_connect_budget(&self) -> usize {
747 self.outbound_handshake_slots()
748 .min(self.outbound_link_slots())
749 .min(MAX_DISCOVERY_CONNECTS_PER_TICK)
750 }
751
752 fn find_udp_transport_for_remote_addr(
759 &self,
760 remote_addr: SocketAddr,
761 ) -> Option<(TransportId, SocketAddr)> {
762 self.transports
763 .iter()
764 .filter(|(id, handle)| {
765 handle.transport_type().name == "udp"
766 && handle.is_operational()
767 && !self.bootstrap_transports.contains(id)
768 })
769 .filter_map(|(id, handle)| {
770 let local_addr = handle.local_addr()?;
771 socket_addr_families_compatible(local_addr, remote_addr)
772 .then_some((*id, local_addr))
773 })
774 .min_by_key(|(id, _)| id.as_u32())
775 }
776
777 pub(super) fn transport_discovery_candidate(
778 &self,
779 discovered_transport_id: TransportId,
780 discovered_addr: TransportAddr,
781 ) -> Option<(TransportId, TransportAddr, &'static str)> {
782 let transport = self.transports.get(&discovered_transport_id)?;
783 let transport_name = transport.transport_type().name;
784
785 if transport_name != "udp" {
786 return Some((discovered_transport_id, discovered_addr, transport_name));
787 }
788
789 let Some(remote_socket_addr) = discovered_addr
790 .as_str()
791 .and_then(|addr| addr.parse::<SocketAddr>().ok())
792 else {
793 if self.bootstrap_transports.contains(&discovered_transport_id) {
794 debug!(
795 transport_id = %discovered_transport_id,
796 remote_addr = %discovered_addr,
797 "transport discovery: skip non-numeric UDP address from bootstrap transport"
798 );
799 return None;
800 }
801 return Some((discovered_transport_id, discovered_addr, transport_name));
802 };
803
804 let Some((transport_id, local_addr)) =
805 self.find_udp_transport_for_remote_addr(remote_socket_addr)
806 else {
807 debug!(
808 transport_id = %discovered_transport_id,
809 remote_addr = %discovered_addr,
810 "transport discovery: skip UDP peer with no compatible local socket"
811 );
812 return None;
813 };
814
815 if transport_id != discovered_transport_id {
816 debug!(
817 discovered_transport_id = %discovered_transport_id,
818 selected_transport_id = %transport_id,
819 local_addr = %local_addr,
820 remote_addr = %remote_socket_addr,
821 "transport discovery: selected compatible UDP transport"
822 );
823 }
824
825 Some((
826 transport_id,
827 TransportAddr::from_socket_addr(remote_socket_addr),
828 transport_name,
829 ))
830 }
831
832 fn peer_address_string_for_transport_candidate(
833 &self,
834 transport_id: TransportId,
835 transport_name: &str,
836 remote_addr: &TransportAddr,
837 ) -> String {
838 #[cfg(not(any(target_os = "linux", target_os = "macos")))]
839 let _ = (transport_id, transport_name);
840
841 #[cfg(any(target_os = "linux", target_os = "macos"))]
842 if transport_name == "ethernet"
843 && remote_addr.as_bytes().len() == 6
844 && let Some(interface) = self
845 .transports
846 .get(&transport_id)
847 .and_then(|transport| transport.interface_name())
848 {
849 let mut mac = [0u8; 6];
850 mac.copy_from_slice(remote_addr.as_bytes());
851 return format!(
852 "{interface}/{}",
853 crate::transport::ethernet::format_mac(&mac)
854 );
855 }
856
857 remote_addr.to_string()
858 }
859
860 fn resolve_peer_address_for_match(
861 &self,
862 candidate: &PeerAddress,
863 ) -> Option<(TransportId, TransportAddr)> {
864 if candidate.transport == "udp" && candidate.addr.eq_ignore_ascii_case("nat") {
865 return None;
866 }
867
868 if candidate.transport == "ethernet" {
869 return self.resolve_ethernet_addr(&candidate.addr).ok();
870 }
871
872 if candidate.transport == "ble" {
873 #[cfg(bluer_available)]
874 {
875 return self.resolve_ble_addr(&candidate.addr).ok();
876 }
877 #[cfg(not(bluer_available))]
878 {
879 return None;
880 }
881 }
882
883 let transport_id = if candidate.transport == "udp"
884 && let Ok(remote_socket_addr) = candidate.addr.parse::<SocketAddr>()
885 {
886 self.find_udp_transport_for_remote_addr(remote_socket_addr)
887 .map(|(id, _)| id)?
888 } else {
889 self.find_transport_for_type(&candidate.transport)?
890 };
891
892 Some((transport_id, TransportAddr::from_string(&candidate.addr)))
893 }
894
895 pub(super) async fn initiate_connection(
906 &mut self,
907 transport_id: TransportId,
908 remote_addr: TransportAddr,
909 peer_identity: PeerIdentity,
910 ) -> Result<(), NodeError> {
911 let peer_node_addr = *peer_identity.node_addr();
912
913 if self.is_connecting_to_peer_on_path(&peer_node_addr, transport_id, &remote_addr) {
914 debug!(
915 peer = %self.peer_display_name(&peer_node_addr),
916 transport_id = %transport_id,
917 remote_addr = %remote_addr,
918 "Connection already in progress for candidate path"
919 );
920 return Ok(());
921 }
922
923 if self.outbound_handshake_slots() == 0 {
924 return Err(NodeError::MaxConnectionsExceeded {
925 max: self.max_connections,
926 });
927 }
928
929 if self.outbound_link_slots() == 0 {
930 return Err(NodeError::MaxLinksExceeded {
931 max: self.max_links,
932 });
933 }
934
935 if !self.peers.contains_key(&peer_node_addr)
936 && self.max_peers > 0
937 && self.peers.len() >= self.max_peers
938 {
939 return Err(NodeError::MaxPeersExceeded {
940 max: self.max_peers,
941 });
942 }
943
944 self.authorize_peer(
945 &peer_identity,
946 PeerAclContext::OutboundConnect,
947 transport_id,
948 &remote_addr,
949 )?;
950
951 let is_connection_oriented = self
952 .transports
953 .get(&transport_id)
954 .map(|t| t.transport_type().connection_oriented)
955 .unwrap_or(false);
956
957 let link_id = self.allocate_link_id();
959
960 let link = if is_connection_oriented {
961 Link::new(
962 link_id,
963 transport_id,
964 remote_addr.clone(),
965 LinkDirection::Outbound,
966 Duration::from_millis(self.config.node.base_rtt_ms),
967 )
968 } else {
969 Link::connectionless(
970 link_id,
971 transport_id,
972 remote_addr.clone(),
973 LinkDirection::Outbound,
974 Duration::from_millis(self.config.node.base_rtt_ms),
975 )
976 };
977
978 self.links.insert(link_id, link);
979
980 self.addr_to_link
982 .insert((transport_id, remote_addr.clone()), link_id);
983
984 if is_connection_oriented {
985 if let Some(transport) = self.transports.get(&transport_id) {
987 match transport.connect(&remote_addr).await {
988 Ok(()) => {
989 debug!(
990 peer = %self.peer_display_name(&peer_node_addr),
991 transport_id = %transport_id,
992 remote_addr = %remote_addr,
993 link_id = %link_id,
994 "Transport connect initiated (non-blocking)"
995 );
996 self.pending_connects.push(super::PendingConnect {
997 link_id,
998 transport_id,
999 remote_addr,
1000 peer_identity,
1001 });
1002 }
1003 Err(e) => {
1004 self.links.remove(&link_id);
1006 self.addr_to_link.remove(&(transport_id, remote_addr));
1007 return Err(NodeError::from_transport_error(e));
1008 }
1009 }
1010 }
1011 Ok(())
1012 } else {
1013 self.start_handshake(link_id, transport_id, remote_addr, peer_identity)
1015 .await
1016 }
1017 }
1018
1019 pub(super) async fn start_handshake(
1024 &mut self,
1025 link_id: LinkId,
1026 transport_id: TransportId,
1027 remote_addr: TransportAddr,
1028 peer_identity: PeerIdentity,
1029 ) -> Result<(), NodeError> {
1030 let peer_node_addr = *peer_identity.node_addr();
1031
1032 let current_time_ms = Self::now_ms();
1034 let mut connection = PeerConnection::outbound(link_id, peer_identity, current_time_ms);
1035
1036 let our_index = match self.index_allocator.allocate() {
1038 Ok(idx) => idx,
1039 Err(e) => {
1040 self.links.remove(&link_id);
1042 self.addr_to_link.remove(&(transport_id, remote_addr));
1043 return Err(NodeError::IndexAllocationFailed(e.to_string()));
1044 }
1045 };
1046
1047 let our_keypair = self.identity.keypair();
1049 let noise_msg1 =
1050 match connection.start_handshake(our_keypair, self.startup_epoch, current_time_ms) {
1051 Ok(msg) => msg,
1052 Err(e) => {
1053 let _ = self.index_allocator.free(our_index);
1055 self.links.remove(&link_id);
1056 self.addr_to_link.remove(&(transport_id, remote_addr));
1057 return Err(NodeError::HandshakeFailed(e.to_string()));
1058 }
1059 };
1060
1061 connection.set_our_index(our_index);
1063 connection.set_transport_id(transport_id);
1064 connection.set_source_addr(remote_addr.clone());
1065
1066 let wire_msg1 = build_msg1(our_index, &noise_msg1);
1068
1069 debug!(
1070 peer = %self.peer_display_name(&peer_node_addr),
1071 transport_id = %transport_id,
1072 remote_addr = %remote_addr,
1073 link_id = %link_id,
1074 our_index = %our_index,
1075 "Connection initiated"
1076 );
1077
1078 let resend_interval = self.config.node.rate_limit.handshake_resend_interval_ms;
1080 connection.set_handshake_msg1(wire_msg1.clone(), current_time_ms + resend_interval);
1081
1082 self.pending_outbound
1084 .insert((transport_id, our_index.as_u32()), link_id);
1085 self.connections.insert(link_id, connection);
1086
1087 let send_result = match self.transports.get(&transport_id) {
1092 Some(transport) => Some(transport.send(&remote_addr, &wire_msg1).await),
1093 None => None,
1094 };
1095 match send_result {
1096 Some(send_result) => {
1097 self.note_local_send_outcome(&send_result);
1098 match send_result {
1099 Ok(bytes) => {
1100 debug!(
1101 link_id = %link_id,
1102 our_index = %our_index,
1103 bytes,
1104 "Sent Noise handshake message 1 (wire format)"
1105 );
1106 }
1107 Err(e) => {
1108 warn!(
1109 link_id = %link_id,
1110 transport_id = %transport_id,
1111 remote_addr = %remote_addr,
1112 our_index = %our_index,
1113 error = %e,
1114 "Failed to send handshake message"
1115 );
1116 self.pending_outbound
1117 .remove(&(transport_id, our_index.as_u32()));
1118 self.connections.remove(&link_id);
1119 self.links.remove(&link_id);
1120 self.addr_to_link
1121 .remove(&(transport_id, remote_addr.clone()));
1122 let _ = self.index_allocator.free(our_index);
1123 return Err(NodeError::from_transport_error(e));
1124 }
1125 }
1126 }
1127 None => {
1128 self.pending_outbound
1129 .remove(&(transport_id, our_index.as_u32()));
1130 self.connections.remove(&link_id);
1131 self.links.remove(&link_id);
1132 self.addr_to_link
1133 .remove(&(transport_id, remote_addr.clone()));
1134 let _ = self.index_allocator.free(our_index);
1135 return Err(NodeError::TransportError(format!(
1136 "transport {transport_id} disappeared before first handshake send"
1137 )));
1138 }
1139 }
1140
1141 Ok(())
1142 }
1143
1144 pub(super) async fn poll_transport_discovery(&mut self) {
1150 let mut to_connect = Vec::new();
1152 let mut queued_per_peer: HashMap<NodeAddr, usize> = HashMap::new();
1153 let mut connect_budget = self.discovery_connect_budget();
1154 let mut skipped_budget = 0usize;
1155
1156 for transport in self.transports.values() {
1157 if !transport.is_operational() {
1158 continue;
1159 }
1160 if !transport.auto_connect() {
1161 let _ = transport.discover();
1163 continue;
1164 }
1165 let discovered = match transport.discover() {
1166 Ok(peers) => peers,
1167 Err(_) => continue,
1168 };
1169 for peer in discovered {
1170 let discovered_transport_id = peer.transport_id;
1171 let pubkey = match peer.pubkey_hint {
1172 Some(pk) => pk,
1173 None => continue,
1174 };
1175 let identity = PeerIdentity::from_pubkey(pubkey);
1176 let node_addr = *identity.node_addr();
1177
1178 if node_addr == *self.identity.node_addr() {
1180 continue;
1181 }
1182
1183 let Some((candidate_transport_id, remote_addr, transport_name)) =
1184 self.transport_discovery_candidate(discovered_transport_id, peer.addr)
1185 else {
1186 continue;
1187 };
1188
1189 if self.peers.contains_key(&node_addr) {
1190 let candidate = PeerAddress::new(
1191 transport_name,
1192 self.peer_address_string_for_transport_candidate(
1193 candidate_transport_id,
1194 transport_name,
1195 &remote_addr,
1196 ),
1197 );
1198 if self.active_peer_candidate_is_fresh_enough_to_skip(
1199 &node_addr,
1200 std::slice::from_ref(&candidate),
1201 ) {
1202 continue;
1203 }
1204 if self.is_connecting_to_peer_on_path(
1205 &node_addr,
1206 candidate_transport_id,
1207 &remote_addr,
1208 ) {
1209 continue;
1210 }
1211 let queued_for_peer = queued_per_peer.get(&node_addr).copied().unwrap_or(0);
1212 if connect_budget == 0
1213 || self
1214 .path_candidate_attempt_budget(&node_addr)
1215 .saturating_sub(queued_for_peer)
1216 == 0
1217 {
1218 skipped_budget = skipped_budget.saturating_add(1);
1219 continue;
1220 }
1221 to_connect.push((candidate_transport_id, remote_addr, identity, true));
1222 *queued_per_peer.entry(node_addr).or_default() += 1;
1223 connect_budget = connect_budget.saturating_sub(1);
1224 continue;
1225 }
1226
1227 if self.is_connecting_to_peer_on_path(
1228 &node_addr,
1229 candidate_transport_id,
1230 &remote_addr,
1231 ) {
1232 continue;
1233 }
1234
1235 let queued_for_peer = queued_per_peer.get(&node_addr).copied().unwrap_or(0);
1236 if connect_budget == 0
1237 || self
1238 .path_candidate_attempt_budget(&node_addr)
1239 .saturating_sub(queued_for_peer)
1240 == 0
1241 {
1242 skipped_budget = skipped_budget.saturating_add(1);
1243 continue;
1244 }
1245 to_connect.push((candidate_transport_id, remote_addr, identity, false));
1246 *queued_per_peer.entry(node_addr).or_default() += 1;
1247 connect_budget = connect_budget.saturating_sub(1);
1248 }
1249 }
1250
1251 if skipped_budget > 0 {
1252 debug!(
1253 skipped = skipped_budget,
1254 queued = to_connect.len(),
1255 "Transport discovery connect budget exhausted"
1256 );
1257 }
1258
1259 for (transport_id, remote_addr, identity, active_refresh) in to_connect {
1260 info!(
1261 peer = %self.peer_display_name(identity.node_addr()),
1262 transport_id = %transport_id,
1263 remote_addr = %remote_addr,
1264 active_refresh,
1265 "Auto-connecting to discovered peer"
1266 );
1267 if let Err(e) = self
1268 .initiate_connection(transport_id, remote_addr, identity)
1269 .await
1270 {
1271 warn!(error = %e, "Failed to auto-connect to discovered peer");
1272 }
1273 }
1274 }
1275
1276 pub(super) async fn poll_nostr_discovery(&mut self) {
1277 let Some(bootstrap) = self.nostr_discovery.clone() else {
1278 return;
1279 };
1280
1281 bootstrap.set_outbound_admission(self.open_discovery_outbound_admission_check());
1282 bootstrap.set_direct_refresh_admission(self.outbound_direct_refresh_admission_check());
1283
1284 if let Err(err) = self.refresh_overlay_advert(&bootstrap).await {
1285 debug!(error = %err, "Failed to refresh local Nostr overlay advert");
1286 }
1287
1288 self.drain_nostr_mesh_signals(&bootstrap).await;
1289
1290 for event in bootstrap.drain_events().await {
1291 match event {
1292 BootstrapEvent::Established { traversal } => {
1293 let active_refresh = PeerIdentity::from_npub(&traversal.peer_npub)
1294 .ok()
1295 .is_some_and(|identity| self.peers.contains_key(identity.node_addr()));
1296 let admission_allowed = if active_refresh {
1297 self.outbound_direct_refresh_admission_check()
1298 } else {
1299 self.outbound_admission_check()
1300 };
1301 if !admission_allowed {
1302 debug!(
1303 peer_npub = %traversal.peer_npub,
1304 peers = self.peers.len(),
1305 max_peers = self.max_peers,
1306 active_refresh,
1307 "Dropping established NAT traversal: at capacity"
1308 );
1309 continue;
1310 }
1311 let peer_npub = traversal.peer_npub.clone();
1312 match self.adopt_established_traversal(traversal).await {
1313 Ok(_) => {
1314 info!(peer_npub = %peer_npub, "Adopted NAT traversal socket");
1315 }
1316 Err(err) => {
1317 warn!(peer_npub = %peer_npub, error = %err, "Failed to adopt NAT traversal");
1318 if let Ok(peer_identity) = PeerIdentity::from_npub(&peer_npub) {
1319 self.schedule_retry(*peer_identity.node_addr(), Self::now_ms());
1320 }
1321 }
1322 }
1323 }
1324 BootstrapEvent::Failed {
1325 peer_config,
1326 reason,
1327 } => {
1328 let peer_identity = match PeerIdentity::from_npub(&peer_config.npub) {
1329 Ok(identity) => identity,
1330 Err(_) => continue,
1331 };
1332 let node_addr = *peer_identity.node_addr();
1333 let now_ms = Self::now_ms();
1334 if self.peers.contains_key(&node_addr) {
1335 if self.active_peer_should_keep_direct_retry(&node_addr, &peer_config) {
1336 let decision =
1337 bootstrap.record_traversal_failure(&peer_config.npub, now_ms);
1338 if decision.should_warn {
1339 warn!(
1340 npub = %peer_config.npub,
1341 error = %reason,
1342 consecutive_failures = decision.consecutive_failures,
1343 cooldown_secs = decision
1344 .cooldown_until_ms
1345 .map(|t| t.saturating_sub(now_ms) / 1000),
1346 "Direct-path NAT traversal upgrade failed"
1347 );
1348 } else {
1349 debug!(
1350 npub = %peer_config.npub,
1351 error = %reason,
1352 consecutive_failures = decision.consecutive_failures,
1353 "Direct-path NAT traversal upgrade failed (suppressed by warn-rate-limit)"
1354 );
1355 }
1356 if decision.crossed_threshold {
1357 bootstrap
1358 .request_advert_stale_check(peer_config.npub.clone())
1359 .await;
1360 }
1361 self.schedule_retry(node_addr, now_ms);
1362 if let Some(cooldown_until_ms) = decision.cooldown_until_ms
1363 && let Some(state) = self.retry_pending.get_mut(&node_addr)
1364 {
1365 state.retry_after_ms = state.retry_after_ms.max(cooldown_until_ms);
1366 }
1367 } else {
1368 debug!(
1369 npub = %peer_config.npub,
1370 error = %reason,
1371 "Ignoring failed NAT traversal for already-connected peer on fresh direct path"
1372 );
1373 }
1374 continue;
1375 }
1376 if self.is_connecting_to_peer(&node_addr) {
1377 debug!(
1378 npub = %peer_config.npub,
1379 error = %reason,
1380 "Ignoring failed NAT traversal while peer handshake is already in progress"
1381 );
1382 continue;
1383 }
1384
1385 let decision = bootstrap.record_traversal_failure(&peer_config.npub, now_ms);
1386 if decision.should_warn {
1387 warn!(
1388 npub = %peer_config.npub,
1389 error = %reason,
1390 consecutive_failures = decision.consecutive_failures,
1391 cooldown_secs = decision
1392 .cooldown_until_ms
1393 .map(|t| t.saturating_sub(now_ms) / 1000),
1394 "NAT traversal failed"
1395 );
1396 } else {
1397 debug!(
1398 npub = %peer_config.npub,
1399 error = %reason,
1400 consecutive_failures = decision.consecutive_failures,
1401 "NAT traversal failed (suppressed by warn-rate-limit)"
1402 );
1403 }
1404
1405 if decision.crossed_threshold {
1409 bootstrap
1410 .request_advert_stale_check(peer_config.npub.clone())
1411 .await;
1412 }
1413
1414 if self
1415 .try_peer_addresses(&peer_config, peer_identity, false)
1416 .await
1417 .is_ok()
1418 {
1419 continue;
1420 }
1421
1422 self.schedule_retry(node_addr, now_ms);
1423 if let Some(cooldown_until_ms) = decision.cooldown_until_ms
1424 && let Some(state) = self.retry_pending.get_mut(&node_addr)
1425 {
1426 state.retry_after_ms = state.retry_after_ms.max(cooldown_until_ms);
1430 }
1431 }
1432 }
1433 }
1434
1435 self.maybe_run_startup_open_discovery_sweep(&bootstrap)
1436 .await;
1437 self.queue_open_discovery_retries(&bootstrap).await;
1438 self.queue_active_fallback_direct_retries(&bootstrap);
1439 }
1440
1441 async fn drain_nostr_mesh_signals(&mut self, bootstrap: &std::sync::Arc<NostrDiscovery>) {
1442 let mut deferred = Vec::new();
1443
1444 for signal in bootstrap.drain_mesh_signals().await {
1445 let (peer_npub, msg_type, payload) = match &signal {
1446 MeshTraversalSignal::Offer { peer_npub, offer } => {
1447 let payload = match serde_json::to_vec(&offer) {
1448 Ok(payload) => payload,
1449 Err(error) => {
1450 debug!(
1451 peer = %peer_npub,
1452 error = %error,
1453 "Failed to encode mesh traversal offer"
1454 );
1455 continue;
1456 }
1457 };
1458 (
1459 peer_npub.clone(),
1460 SessionMessageType::TraversalOffer.to_byte(),
1461 payload,
1462 )
1463 }
1464 MeshTraversalSignal::Answer { peer_npub, answer } => {
1465 let payload = match serde_json::to_vec(&answer) {
1466 Ok(payload) => payload,
1467 Err(error) => {
1468 debug!(
1469 peer = %peer_npub,
1470 error = %error,
1471 "Failed to encode mesh traversal answer"
1472 );
1473 continue;
1474 }
1475 };
1476 (
1477 peer_npub.clone(),
1478 SessionMessageType::TraversalAnswer.to_byte(),
1479 payload,
1480 )
1481 }
1482 };
1483
1484 let peer_identity = match PeerIdentity::from_npub(&peer_npub) {
1485 Ok(identity) => identity,
1486 Err(error) => {
1487 debug!(
1488 peer = %peer_npub,
1489 error = %error,
1490 "Cannot send mesh traversal signal to invalid peer npub"
1491 );
1492 continue;
1493 }
1494 };
1495 let peer_addr = *peer_identity.node_addr();
1496 match self
1497 .mesh_signal_session_action(peer_addr, peer_identity.pubkey_full())
1498 .await
1499 {
1500 MeshSignalSessionAction::Send => {}
1501 MeshSignalSessionAction::Defer => {
1502 deferred.push(signal);
1503 continue;
1504 }
1505 MeshSignalSessionAction::Drop => continue,
1506 }
1507
1508 if let Err(error) = self.send_session_msg(&peer_addr, msg_type, &payload).await {
1509 debug!(
1510 peer = %self.peer_display_name(&peer_addr),
1511 error = %error,
1512 "Failed to send mesh traversal signal"
1513 );
1514 }
1515 }
1516
1517 for signal in deferred {
1518 bootstrap.requeue_mesh_signal(signal);
1519 }
1520 }
1521
1522 async fn mesh_signal_session_action(
1523 &mut self,
1524 peer_addr: NodeAddr,
1525 peer_pubkey: PublicKey,
1526 ) -> MeshSignalSessionAction {
1527 if let Some(entry) = self.sessions.get(&peer_addr) {
1528 if entry.is_established() {
1529 return MeshSignalSessionAction::Send;
1530 }
1531 if entry.is_initiating() || entry.is_awaiting_msg3() {
1532 debug!(
1533 peer = %self.peer_display_name(&peer_addr),
1534 "Deferring mesh traversal signal until end-to-end session is established"
1535 );
1536 return MeshSignalSessionAction::Defer;
1537 }
1538 }
1539
1540 if self.find_next_hop(&peer_addr).is_none() {
1541 debug!(
1542 peer = %self.peer_display_name(&peer_addr),
1543 "Cannot warm mesh traversal signal session without a FIPS route"
1544 );
1545 self.maybe_initiate_lookup(&peer_addr).await;
1546 return MeshSignalSessionAction::Drop;
1547 }
1548
1549 self.register_identity(peer_addr, peer_pubkey);
1550 match self.initiate_session(peer_addr, peer_pubkey).await {
1551 Ok(()) => {
1552 debug!(
1553 peer = %self.peer_display_name(&peer_addr),
1554 "Warming end-to-end session for mesh traversal signal"
1555 );
1556 MeshSignalSessionAction::Defer
1557 }
1558 Err(NodeError::SendFailed { node_addr, reason })
1559 if node_addr == peer_addr && reason == "no route to destination" =>
1560 {
1561 debug!(
1562 peer = %self.peer_display_name(&peer_addr),
1563 "Cannot warm mesh traversal signal session without a FIPS route"
1564 );
1565 self.maybe_initiate_lookup(&peer_addr).await;
1566 MeshSignalSessionAction::Drop
1567 }
1568 Err(error) => {
1569 debug!(
1570 peer = %self.peer_display_name(&peer_addr),
1571 error = %error,
1572 "Failed to warm end-to-end session for mesh traversal signal"
1573 );
1574 MeshSignalSessionAction::Drop
1575 }
1576 }
1577 }
1578
1579 pub(super) fn lan_discovery_scope(&self) -> Option<String> {
1585 if let Some(scope) = self.config.node.discovery.lan.scope.as_deref() {
1586 let scope = scope.trim();
1587 if !scope.is_empty() {
1588 return Some(scope.to_string());
1589 }
1590 }
1591
1592 let app = self.config.node.discovery.nostr.app.trim();
1593 if app.is_empty() {
1594 return None;
1595 }
1596 if let Some(rest) = app.strip_prefix("fips-overlay-v1:") {
1597 let scope = rest.trim();
1598 if scope.is_empty() {
1599 None
1600 } else {
1601 Some(scope.to_string())
1602 }
1603 } else {
1604 Some(app.to_string())
1605 }
1606 }
1607
1608 pub(super) fn start_local_instance_discovery(&mut self) {
1609 if !self.config.node.discovery.local.enabled {
1610 return;
1611 }
1612 let Some(scope) = self.lan_discovery_scope() else {
1613 debug!("local instance discovery not started: no discovery scope");
1614 return;
1615 };
1616 let now_ms = Self::now_ms();
1617 match crate::discovery::local::LocalInstanceRegistry::new(
1618 self.identity.npub(),
1619 scope,
1620 &self.config.node.discovery.local,
1621 now_ms,
1622 ) {
1623 Ok(registry) => {
1624 self.local_instance_registry = Some(registry);
1625 self.local_instance_started_at_ms = Some(now_ms);
1626 self.last_local_instance_publish_ms = None;
1627 self.last_local_instance_scan_ms = None;
1628 self.publish_local_instance_record(now_ms);
1629 info!("Same-host FIPS instance discovery enabled");
1630 }
1631 Err(crate::discovery::local::LocalInstanceRegistryError::Disabled) => {
1632 debug!("same-host FIPS instance discovery disabled");
1633 }
1634 Err(err) => {
1635 debug!(error = %err, "same-host FIPS instance discovery not started");
1636 }
1637 }
1638 }
1639
1640 fn local_instance_contacts(&self) -> Vec<crate::discovery::local::LocalInstanceContact> {
1641 let mut contacts = Vec::new();
1642 for handle in self.transports.values() {
1643 if !handle.is_operational() || !handle.accept_connections() {
1644 continue;
1645 }
1646 let transport = handle.transport_type().name;
1647 if transport != "udp" && transport != "tcp" {
1648 continue;
1649 }
1650 let Some(local_addr) = handle.local_addr() else {
1651 continue;
1652 };
1653 let Some(contact) =
1654 crate::discovery::local::contact_for_transport_addr(transport, local_addr)
1655 else {
1656 continue;
1657 };
1658 if contacts
1659 .iter()
1660 .any(|existing: &crate::discovery::local::LocalInstanceContact| {
1661 existing.transport == contact.transport && existing.addr == contact.addr
1662 })
1663 {
1664 continue;
1665 }
1666 contacts.push(contact);
1667 }
1668 contacts
1669 }
1670
1671 fn publish_local_instance_record(&mut self, now_ms: u64) {
1672 let Some(registry) = self.local_instance_registry.clone() else {
1673 return;
1674 };
1675 let contacts = self.local_instance_contacts();
1676 match registry.publish(contacts, now_ms) {
1677 Ok(()) => {
1678 self.last_local_instance_publish_ms = Some(now_ms);
1679 }
1680 Err(err) => {
1681 debug!(error = %err, "failed to publish same-host FIPS instance record");
1682 }
1683 }
1684 }
1685
1686 fn maybe_publish_local_instance_record(&mut self, now_ms: u64) {
1687 if self.local_instance_registry.is_none() {
1688 return;
1689 }
1690 let interval_ms = self.config.node.discovery.local.publish_interval_ms();
1691 let due = self
1692 .last_local_instance_publish_ms
1693 .map(|last| now_ms.saturating_sub(last) >= interval_ms)
1694 .unwrap_or(true);
1695 if due {
1696 self.publish_local_instance_record(now_ms);
1697 }
1698 }
1699
1700 fn local_instance_scan_due(&self, now_ms: u64) -> bool {
1701 if self.local_instance_registry.is_none() {
1702 return false;
1703 }
1704 let cfg = &self.config.node.discovery.local;
1705 let interval_ms = if self
1706 .local_instance_started_at_ms
1707 .map(|started| now_ms.saturating_sub(started) <= cfg.startup_scan_duration_ms())
1708 .unwrap_or(false)
1709 {
1710 cfg.startup_scan_interval_ms()
1711 } else {
1712 cfg.scan_interval_ms()
1713 };
1714 self.last_local_instance_scan_ms
1715 .map(|last| now_ms.saturating_sub(last) >= interval_ms)
1716 .unwrap_or(true)
1717 }
1718
1719 fn local_instance_peer_allowed(&self, identity: &PeerIdentity) -> bool {
1720 if self.config.peers().iter().any(|peer| {
1721 PeerIdentity::from_npub(&peer.npub)
1722 .map(|configured| configured.node_addr() == identity.node_addr())
1723 .unwrap_or(false)
1724 }) {
1725 return true;
1726 }
1727 self.config.node.discovery.nostr.policy == NostrDiscoveryPolicy::Open
1728 }
1729
1730 fn local_instance_peer_addresses(
1731 &self,
1732 record: &crate::discovery::local::LocalInstanceRecord,
1733 ) -> Vec<PeerAddress> {
1734 let mut addresses = Vec::new();
1735 for contact in &record.contacts {
1736 if contact.transport != "udp" && contact.transport != "tcp" {
1737 continue;
1738 }
1739 let Ok(socket_addr) = contact.addr.parse::<SocketAddr>() else {
1740 debug!(
1741 npub = %record.npub,
1742 transport = %contact.transport,
1743 addr = %contact.addr,
1744 "local instance discovery: skip non-socket contact"
1745 );
1746 continue;
1747 };
1748 if !socket_addr.ip().is_loopback() {
1749 debug!(
1750 npub = %record.npub,
1751 addr = %contact.addr,
1752 "local instance discovery: skip non-loopback contact"
1753 );
1754 continue;
1755 }
1756 let address =
1757 PeerAddress::with_priority(contact.transport.clone(), contact.addr.clone(), 10)
1758 .with_seen_at_ms(record.updated_at_ms);
1759 if addresses.iter().any(|existing: &PeerAddress| {
1760 existing.transport == address.transport && existing.addr == address.addr
1761 }) {
1762 continue;
1763 }
1764 addresses.push(address);
1765 }
1766 addresses
1767 }
1768
1769 pub(super) async fn poll_local_instance_discovery(&mut self) {
1773 let Some(registry) = self.local_instance_registry.clone() else {
1774 return;
1775 };
1776 let now_ms = Self::now_ms();
1777 self.maybe_publish_local_instance_record(now_ms);
1778 if !self.local_instance_scan_due(now_ms) {
1779 return;
1780 }
1781 self.last_local_instance_scan_ms = Some(now_ms);
1782
1783 let records = match registry.scan(now_ms, self.config.node.discovery.local.stale_after_ms())
1784 {
1785 Ok(records) => records,
1786 Err(err) => {
1787 debug!(error = %err, "same-host FIPS instance scan failed");
1788 return;
1789 }
1790 };
1791 if records.is_empty() {
1792 return;
1793 }
1794
1795 let mut connect_budget = self.discovery_connect_budget();
1796 let mut skipped_budget = 0usize;
1797 for record in records {
1798 let identity = match PeerIdentity::from_npub(&record.npub) {
1799 Ok(identity) => identity,
1800 Err(err) => {
1801 debug!(npub = %record.npub, error = %err, "local instance discovery: skip bad npub");
1802 continue;
1803 }
1804 };
1805 let peer_node_addr = *identity.node_addr();
1806 if peer_node_addr == *self.identity.node_addr() {
1807 continue;
1808 }
1809 if !self.local_instance_peer_allowed(&identity) {
1810 debug!(
1811 npub = %identity.short_npub(),
1812 "local instance discovery: skip unconfigured peer"
1813 );
1814 continue;
1815 }
1816
1817 let addresses = self.local_instance_peer_addresses(&record);
1818 if addresses.is_empty() {
1819 continue;
1820 }
1821
1822 if self.peers.contains_key(&peer_node_addr)
1823 && self.active_peer_candidate_is_fresh_enough_to_skip(&peer_node_addr, &addresses)
1824 {
1825 continue;
1826 }
1827
1828 for address in addresses {
1829 let Some((transport_id, remote_addr)) =
1830 self.resolve_peer_address_for_match(&address)
1831 else {
1832 continue;
1833 };
1834 if self.is_connecting_to_peer_on_path(&peer_node_addr, transport_id, &remote_addr) {
1835 continue;
1836 }
1837 if connect_budget == 0 || self.path_candidate_attempt_budget(&peer_node_addr) == 0 {
1838 skipped_budget = skipped_budget.saturating_add(1);
1839 continue;
1840 }
1841 info!(
1842 npub = %identity.short_npub(),
1843 transport = %address.transport,
1844 addr = %address.addr,
1845 "same-host FIPS instance discovery: initiating handshake"
1846 );
1847 if let Err(err) = self
1848 .initiate_connection(transport_id, remote_addr, identity)
1849 .await
1850 {
1851 debug!(
1852 npub = %record.npub,
1853 error = %err,
1854 "same-host FIPS instance discovery: failed to initiate connection"
1855 );
1856 }
1857 connect_budget = connect_budget.saturating_sub(1);
1858 }
1859 }
1860 if skipped_budget > 0 {
1861 debug!(
1862 skipped = skipped_budget,
1863 "same-host FIPS instance discovery connect budget exhausted"
1864 );
1865 }
1866 }
1867
1868 pub(super) async fn poll_lan_discovery(&mut self) {
1875 let Some(runtime) = self.lan_discovery.clone() else {
1876 return;
1877 };
1878 let events = runtime.drain_events().await;
1879 if events.is_empty() {
1880 return;
1881 }
1882 let mut connect_budget = self.discovery_connect_budget();
1883 let mut skipped_budget = 0usize;
1884 for event in events {
1885 let crate::discovery::lan::LanEvent::Discovered(peer) = event;
1886 let Some((transport_id, local_addr)) =
1887 self.find_udp_transport_for_remote_addr(peer.addr)
1888 else {
1889 debug!(
1890 addr = %peer.addr,
1891 "lan: skip discovered peer with no compatible UDP transport"
1892 );
1893 continue;
1894 };
1895 let identity = match crate::PeerIdentity::from_npub(&peer.npub) {
1896 Ok(id) => id,
1897 Err(err) => {
1898 debug!(npub = %peer.npub, error = %err, "lan: skip bad npub");
1899 continue;
1900 }
1901 };
1902 let peer_node_addr = *identity.node_addr();
1903 let remote_addr = crate::transport::TransportAddr::from_string(&peer.addr.to_string());
1904 if self.peers.contains_key(&peer_node_addr) {
1905 let candidate = PeerAddress::new("udp", peer.addr.to_string());
1906 if self.active_peer_candidate_is_fresh_enough_to_skip(
1907 &peer_node_addr,
1908 std::slice::from_ref(&candidate),
1909 ) {
1910 continue;
1911 }
1912 if self.is_connecting_to_peer_on_path(&peer_node_addr, transport_id, &remote_addr) {
1913 continue;
1914 }
1915 if connect_budget == 0 || self.path_candidate_attempt_budget(&peer_node_addr) == 0 {
1916 skipped_budget = skipped_budget.saturating_add(1);
1917 continue;
1918 }
1919 info!(
1920 npub = %identity.short_npub(),
1921 addr = %peer.addr,
1922 local_addr = %local_addr,
1923 "lan: initiating alternate-path handshake to active peer"
1924 );
1925 if let Err(err) = self
1926 .initiate_connection(transport_id, remote_addr, identity)
1927 .await
1928 {
1929 debug!(
1930 npub = %peer.npub,
1931 error = %err,
1932 "lan: failed to initiate active peer alternate-path handshake"
1933 );
1934 }
1935 connect_budget = connect_budget.saturating_sub(1);
1936 continue;
1937 }
1938 if self.is_connecting_to_peer_on_path(&peer_node_addr, transport_id, &remote_addr) {
1939 continue;
1940 }
1941 if connect_budget == 0 || self.path_candidate_attempt_budget(&peer_node_addr) == 0 {
1942 skipped_budget = skipped_budget.saturating_add(1);
1943 continue;
1944 }
1945 info!(
1946 npub = %identity.short_npub(),
1947 addr = %peer.addr,
1948 local_addr = %local_addr,
1949 "lan: initiating handshake to discovered peer"
1950 );
1951 if let Err(err) = self
1952 .initiate_connection(transport_id, remote_addr, identity)
1953 .await
1954 {
1955 debug!(
1956 npub = %peer.npub,
1957 error = %err,
1958 "lan: failed to initiate connection to discovered peer"
1959 );
1960 }
1961 connect_budget = connect_budget.saturating_sub(1);
1962 }
1963 if skipped_budget > 0 {
1964 debug!(
1965 skipped = skipped_budget,
1966 "lan: discovery connect budget exhausted"
1967 );
1968 }
1969 }
1970
1971 pub(super) async fn poll_pending_connects(&mut self) {
1978 if self.pending_connects.is_empty() {
1979 return;
1980 }
1981
1982 let mut completed = Vec::new();
1983
1984 for (i, pending) in self.pending_connects.iter().enumerate() {
1985 let state = if let Some(transport) = self.transports.get(&pending.transport_id) {
1986 transport.connection_state(&pending.remote_addr)
1987 } else {
1988 crate::transport::ConnectionState::Failed("transport removed".into())
1989 };
1990
1991 match state {
1992 crate::transport::ConnectionState::Connected => {
1993 completed.push((i, true, None));
1994 }
1995 crate::transport::ConnectionState::Failed(reason) => {
1996 completed.push((i, false, Some(reason)));
1997 }
1998 crate::transport::ConnectionState::Connecting => {
1999 }
2001 crate::transport::ConnectionState::None => {
2002 completed.push((i, false, Some("no connection attempt found".into())));
2004 }
2005 }
2006 }
2007
2008 for (i, success, reason) in completed.into_iter().rev() {
2010 let pending = self.pending_connects.remove(i);
2011
2012 if success {
2013 if let Some(link) = self.links.get_mut(&pending.link_id) {
2015 link.set_connected();
2016 }
2017
2018 debug!(
2019 peer = %self.peer_display_name(pending.peer_identity.node_addr()),
2020 transport_id = %pending.transport_id,
2021 remote_addr = %pending.remote_addr,
2022 link_id = %pending.link_id,
2023 "Transport connected, starting handshake"
2024 );
2025
2026 if let Err(e) = self
2028 .start_handshake(
2029 pending.link_id,
2030 pending.transport_id,
2031 pending.remote_addr.clone(),
2032 pending.peer_identity,
2033 )
2034 .await
2035 {
2036 warn!(
2037 link_id = %pending.link_id,
2038 error = %e,
2039 "Failed to start handshake after transport connect"
2040 );
2041 self.remove_link(&pending.link_id);
2043 }
2044 } else {
2045 let reason = reason.unwrap_or_default();
2046 warn!(
2047 peer = %self.peer_display_name(pending.peer_identity.node_addr()),
2048 transport_id = %pending.transport_id,
2049 remote_addr = %pending.remote_addr,
2050 link_id = %pending.link_id,
2051 reason = %reason,
2052 "Transport connect failed"
2053 );
2054
2055 self.remove_link(&pending.link_id);
2057 self.links.remove(&pending.link_id);
2058 self.schedule_retry(*pending.peer_identity.node_addr(), Self::now_ms());
2059 }
2060 }
2061 }
2062
2063 pub async fn start(&mut self) -> Result<(), NodeError> {
2070 node_start_debug_log("Node::start begin");
2071 if !self.state.can_start() {
2072 return Err(NodeError::AlreadyStarted);
2073 }
2074 self.state = NodeState::Starting;
2075 node_start_debug_log("Node::start state set to starting");
2076
2077 let packet_buffer_size = self.config.node.buffers.packet_channel;
2079 let (packet_tx, packet_rx) = packet_channel(packet_buffer_size);
2080 self.packet_tx = Some(packet_tx.clone());
2081 self.packet_rx = Some(packet_rx);
2082 node_start_debug_log("Node::start packet channel created");
2083
2084 node_start_debug_log("Node::start create transports begin");
2086 let transport_handles = self.create_transports(&packet_tx).await;
2087 node_start_debug_log(format!(
2088 "Node::start create transports complete count={}",
2089 transport_handles.len()
2090 ));
2091
2092 for mut handle in transport_handles {
2093 let transport_id = handle.transport_id();
2094 let transport_type = handle.transport_type().name;
2095 let name = handle.name().map(|s| s.to_string());
2096
2097 node_start_debug_log(format!(
2098 "Node::start transport start begin id={} type={} name={:?}",
2099 transport_id, transport_type, name
2100 ));
2101 match handle.start().await {
2102 Ok(()) => {
2103 node_start_debug_log(format!(
2104 "Node::start transport start ok id={} type={}",
2105 transport_id, transport_type
2106 ));
2107 self.transports.insert(transport_id, handle);
2108 }
2109 Err(e) => {
2110 node_start_debug_log(format!(
2111 "Node::start transport start error id={} type={} error={}",
2112 transport_id, transport_type, e
2113 ));
2114 if let Some(ref n) = name {
2115 warn!(transport_type, name = %n, error = %e, "Transport failed to start");
2116 } else {
2117 warn!(transport_type, error = %e, "Transport failed to start");
2118 }
2119 }
2120 }
2121 }
2122
2123 if !self.transports.is_empty() {
2124 info!(count = self.transports.len(), "Transports initialized");
2125 }
2126
2127 #[cfg(unix)]
2143 {
2144 if self.config.node.worker_pools_enabled {
2145 node_start_debug_log("Node::start worker pools begin");
2146 let cpu_default = std::thread::available_parallelism()
2147 .map(|n| n.get())
2148 .unwrap_or(1)
2149 .max(1);
2150 let encrypt_worker_count: usize = std::env::var("FIPS_ENCRYPT_WORKERS")
2151 .ok()
2152 .and_then(|s| s.parse().ok())
2153 .unwrap_or(cpu_default)
2154 .max(1);
2155 self.encrypt_workers = Some(super::encrypt_worker::EncryptWorkerPool::spawn(
2156 encrypt_worker_count,
2157 ));
2158 info!(
2159 workers = encrypt_worker_count,
2160 "Spawned FMP-encrypt worker pool"
2161 );
2162
2163 let decrypt_worker_count: usize = std::env::var("FIPS_DECRYPT_WORKERS")
2172 .ok()
2173 .and_then(|s| s.parse().ok())
2174 .unwrap_or(cpu_default);
2175 if decrypt_worker_count == 0 {
2176 info!("FIPS_DECRYPT_WORKERS=0 → in-line decrypt in rx_loop (no worker pool)");
2177 } else {
2178 self.decrypt_workers = Some(super::decrypt_worker::DecryptWorkerPool::spawn(
2179 decrypt_worker_count,
2180 ));
2181 info!(
2182 workers = decrypt_worker_count,
2183 "Spawned FMP+FSP-decrypt worker pool"
2184 );
2185 }
2186 node_start_debug_log("Node::start worker pools complete");
2187 } else {
2188 node_start_debug_log("Node::start worker pools disabled");
2189 info!("FIPS worker pools disabled; using in-line crypto/send path");
2190 }
2191 }
2192
2193 if self.config.node.discovery.nostr.enabled {
2194 node_start_debug_log("Node::start nostr discovery start begin");
2195 match NostrDiscovery::start(&self.identity, self.config.node.discovery.nostr.clone())
2196 .await
2197 {
2198 Ok(runtime) => {
2199 node_start_debug_log("Node::start nostr discovery runtime created");
2200 if let Err(err) = self.refresh_overlay_advert(&runtime).await {
2201 warn!(error = %err, "Failed to publish initial Nostr overlay advert");
2202 }
2203 node_start_debug_log("Node::start nostr overlay advert refreshed");
2204 self.nostr_discovery = Some(runtime);
2205 self.nostr_discovery_started_at_ms = Some(Self::now_ms());
2206 info!("Nostr overlay discovery enabled");
2207 }
2208 Err(err) => {
2209 node_start_debug_log(format!(
2210 "Node::start nostr discovery start error error={}",
2211 err
2212 ));
2213 warn!(error = %err, "Failed to start Nostr overlay discovery");
2214 }
2215 }
2216 }
2217
2218 if self.config.node.discovery.lan.enabled {
2222 node_start_debug_log("Node::start lan discovery start begin");
2223 let advertised_udp_port = self
2224 .transports
2225 .values()
2226 .filter(|h| h.is_operational())
2227 .filter(|h| h.transport_type().name == "udp")
2228 .find_map(|h| h.local_addr().map(|addr| addr.port()))
2229 .unwrap_or(0);
2230 let scope = self.lan_discovery_scope();
2231 match crate::discovery::lan::LanDiscovery::start(
2232 &self.identity,
2233 scope,
2234 advertised_udp_port,
2235 self.config.node.discovery.lan.clone(),
2236 )
2237 .await
2238 {
2239 Ok(runtime) => {
2240 node_start_debug_log("Node::start lan discovery start ok");
2241 self.lan_discovery = Some(runtime);
2242 info!("LAN mDNS discovery enabled");
2243 }
2244 Err(err) => {
2245 node_start_debug_log(format!(
2246 "Node::start lan discovery start error error={}",
2247 err
2248 ));
2249 debug!(error = %err, "LAN mDNS discovery not started");
2250 }
2251 }
2252 }
2253
2254 self.start_local_instance_discovery();
2255 self.poll_local_instance_discovery().await;
2256
2257 node_start_debug_log("Node::start initiate peer connections begin");
2260 self.initiate_peer_connections().await;
2261 node_start_debug_log("Node::start initiate peer connections complete");
2262
2263 if self.config.tun.enabled {
2265 node_start_debug_log("Node::start tun init begin");
2266 let address = *self.identity.address();
2267 match TunDevice::create(&self.config.tun, address).await {
2268 Ok(device) => {
2269 let mtu = device.mtu();
2270 let name = device.name().to_string();
2271 let our_addr = *device.address();
2272
2273 info!("TUN device active:");
2274 info!(" name: {}", name);
2275 info!(" address: {}", device.address());
2276 info!(" mtu: {}", mtu);
2277
2278 let effective_mtu = self.effective_ipv6_mtu();
2280 let max_mss = effective_mtu.saturating_sub(40).saturating_sub(20); info!("effective MTU: {} bytes", effective_mtu);
2283 debug!(" max TCP MSS: {} bytes", max_mss);
2284
2285 #[cfg(target_os = "macos")]
2289 let (shutdown_read_fd, shutdown_write_fd) = {
2290 let mut fds = [0i32; 2];
2291 if unsafe { libc::pipe(fds.as_mut_ptr()) } < 0 {
2292 return Err(NodeError::Tun(crate::upper::tun::TunError::Configure(
2293 "failed to create shutdown pipe".into(),
2294 )));
2295 }
2296 (fds[0], fds[1])
2297 };
2298
2299 let (writer, tun_tx) =
2303 device.create_writer(max_mss, self.path_mtu_lookup.clone())?;
2304
2305 let writer_handle = thread::spawn(move || {
2307 writer.run();
2308 });
2309
2310 let reader_tun_tx = tun_tx.clone();
2312
2313 let tun_channel_size = self.config.node.buffers.tun_channel;
2315 let (outbound_tx, outbound_rx) = tokio::sync::mpsc::channel(tun_channel_size);
2316
2317 let transport_mtu = self.transport_mtu();
2319 let path_mtu_lookup = self.path_mtu_lookup.clone();
2320 #[cfg(target_os = "macos")]
2321 let reader_handle = thread::spawn(move || {
2322 run_tun_reader(
2323 device,
2324 mtu,
2325 our_addr,
2326 reader_tun_tx,
2327 outbound_tx,
2328 transport_mtu,
2329 path_mtu_lookup,
2330 shutdown_read_fd,
2331 );
2332 });
2333 #[cfg(not(target_os = "macos"))]
2334 let reader_handle = thread::spawn(move || {
2335 run_tun_reader(
2336 device,
2337 mtu,
2338 our_addr,
2339 reader_tun_tx,
2340 outbound_tx,
2341 transport_mtu,
2342 path_mtu_lookup,
2343 );
2344 });
2345
2346 self.tun_state = TunState::Active;
2347 self.tun_name = Some(name);
2348 self.tun_tx = Some(tun_tx);
2349 self.tun_outbound_rx = Some(outbound_rx);
2350 self.tun_reader_handle = Some(reader_handle);
2351 self.tun_writer_handle = Some(writer_handle);
2352 #[cfg(target_os = "macos")]
2353 {
2354 self.tun_shutdown_fd = Some(shutdown_write_fd);
2355 }
2356 }
2357 Err(e) => {
2358 self.tun_state = TunState::Failed;
2359 warn!(error = %e, "Failed to initialize TUN, continuing without it");
2360 }
2361 }
2362 node_start_debug_log("Node::start tun init complete");
2363 }
2364
2365 if self.config.dns.enabled {
2382 node_start_debug_log("Node::start dns init begin");
2383 let addr_str = self.config.dns.bind_addr();
2384 match addr_str.parse::<std::net::IpAddr>() {
2385 Ok(ip) => {
2386 let bind = std::net::SocketAddr::new(ip, self.config.dns.port());
2387 match Self::bind_dns_socket(bind) {
2388 Ok(socket) => {
2389 let dns_channel_size = self.config.node.buffers.dns_channel;
2390 let (identity_tx, identity_rx) =
2391 tokio::sync::mpsc::channel(dns_channel_size);
2392 let dns_ttl = self.config.dns.ttl();
2393 let base_hosts = crate::upper::hosts::HostMap::from_peer_configs(
2394 self.config.peers(),
2395 );
2396 let reloader = if self.config.node.system_files_enabled {
2397 let hosts_path = std::path::PathBuf::from(
2398 crate::upper::hosts::DEFAULT_HOSTS_PATH,
2399 );
2400 crate::upper::hosts::HostMapReloader::new(base_hosts, hosts_path)
2401 } else {
2402 crate::upper::hosts::HostMapReloader::memory_only(base_hosts)
2403 };
2404 let mesh_ifindex = Self::lookup_mesh_ifindex(self.config.tun.name());
2412 info!(
2413 bind = %bind,
2414 hosts = reloader.hosts().len(),
2415 mesh_ifindex = ?mesh_ifindex,
2416 "DNS responder started for .fips domain (auto-reload enabled)"
2417 );
2418 let handle = tokio::spawn(crate::upper::dns::run_dns_responder(
2419 socket,
2420 identity_tx,
2421 dns_ttl,
2422 reloader,
2423 mesh_ifindex,
2424 ));
2425 self.dns_identity_rx = Some(identity_rx);
2426 self.dns_task = Some(handle);
2427 }
2428 Err(e) => {
2429 warn!(bind = %bind, error = %e, "Failed to start DNS responder");
2430 }
2431 }
2432 }
2433 Err(e) => {
2434 warn!(addr = %addr_str, error = %e, "Invalid dns.bind_addr; DNS responder not started");
2435 }
2436 }
2437 node_start_debug_log("Node::start dns init complete");
2438 }
2439
2440 self.state = NodeState::Running;
2441 node_start_debug_log("Node::start running");
2442 info!("Node started:");
2443 info!(" state: {}", self.state);
2444 info!(" transports: {}", self.transports.len());
2445 info!(" connections: {}", self.connections.len());
2446 Ok(())
2447 }
2448
2449 fn bind_dns_socket(
2462 addr: std::net::SocketAddr,
2463 ) -> Result<tokio::net::UdpSocket, std::io::Error> {
2464 use socket2::{Domain, Protocol, Socket, Type};
2465 let domain = if addr.is_ipv4() {
2466 Domain::IPV4
2467 } else {
2468 Domain::IPV6
2469 };
2470 let sock = Socket::new(domain, Type::DGRAM, Some(Protocol::UDP))?;
2471 if addr.is_ipv6() {
2472 sock.set_only_v6(false)?;
2473 #[cfg(unix)]
2474 Self::set_recv_pktinfo_v6(&sock)?;
2475 }
2476 sock.set_nonblocking(true)?;
2477 sock.bind(&addr.into())?;
2478 tokio::net::UdpSocket::from_std(sock.into())
2479 }
2480
2481 #[cfg(unix)]
2487 fn set_recv_pktinfo_v6(sock: &socket2::Socket) -> Result<(), std::io::Error> {
2488 use std::os::fd::AsRawFd;
2489 let enable: libc::c_int = 1;
2490 let ret = unsafe {
2491 libc::setsockopt(
2492 sock.as_raw_fd(),
2493 libc::IPPROTO_IPV6,
2494 libc::IPV6_RECVPKTINFO,
2495 &enable as *const _ as *const libc::c_void,
2496 std::mem::size_of::<libc::c_int>() as libc::socklen_t,
2497 )
2498 };
2499 if ret < 0 {
2500 return Err(std::io::Error::last_os_error());
2501 }
2502 Ok(())
2503 }
2504
2505 fn lookup_mesh_ifindex(name: &str) -> Option<u32> {
2512 #[cfg(unix)]
2513 {
2514 let c_name = std::ffi::CString::new(name).ok()?;
2515 let idx = unsafe { libc::if_nametoindex(c_name.as_ptr()) };
2516 if idx == 0 { None } else { Some(idx) }
2517 }
2518 #[cfg(not(unix))]
2519 {
2520 let _ = name;
2521 None
2522 }
2523 }
2524
2525 pub async fn stop(&mut self) -> Result<(), NodeError> {
2530 if !self.state.can_stop() {
2531 return Err(NodeError::NotStarted);
2532 }
2533 self.state = NodeState::Stopping;
2534 info!(state = %self.state, "Node stopping");
2535
2536 if let Some(handle) = self.dns_task.take() {
2538 handle.abort();
2539 debug!("DNS responder stopped");
2540 }
2541
2542 self.send_disconnect_to_all_peers(DisconnectReason::Shutdown)
2544 .await;
2545
2546 if let Some(bootstrap) = self.nostr_discovery.take()
2548 && let Err(e) = bootstrap.shutdown().await
2549 {
2550 warn!(error = %e, "Failed to shutdown Nostr overlay discovery");
2551 }
2552
2553 if let Some(lan) = self.lan_discovery.take() {
2557 lan.shutdown().await;
2558 }
2559
2560 if let Some(registry) = self.local_instance_registry.take()
2561 && let Err(err) = registry.remove()
2562 {
2563 debug!(error = %err, "failed to remove same-host FIPS instance record");
2564 }
2565
2566 let transport_ids: Vec<_> = self.transports.keys().cloned().collect();
2568 for transport_id in transport_ids {
2569 if let Some(mut handle) = self.transports.remove(&transport_id) {
2570 let transport_type = handle.transport_type().name;
2571 match handle.stop().await {
2572 Ok(()) => {
2573 info!(transport_id = %transport_id, transport_type, "Transport stopped");
2574 }
2575 Err(e) => {
2576 warn!(
2577 transport_id = %transport_id,
2578 transport_type,
2579 error = %e,
2580 "Transport stop failed"
2581 );
2582 }
2583 }
2584 }
2585 }
2586
2587 self.packet_tx.take();
2589 self.packet_rx.take();
2590
2591 if let Some(name) = self.tun_name.take() {
2593 info!(name = %name, "Shutting down TUN interface");
2594
2595 self.tun_tx.take();
2597
2598 if let Err(e) = shutdown_tun_interface(&name).await {
2600 warn!(name = %name, error = %e, "Failed to shutdown TUN interface");
2601 }
2602
2603 #[cfg(target_os = "macos")]
2606 if let Some(fd) = self.tun_shutdown_fd.take() {
2607 unsafe {
2608 libc::write(fd, b"x".as_ptr() as *const libc::c_void, 1);
2609 libc::close(fd);
2610 }
2611 }
2612
2613 if let Some(handle) = self.tun_reader_handle.take() {
2615 let _ = handle.join();
2616 }
2617 if let Some(handle) = self.tun_writer_handle.take() {
2618 let _ = handle.join();
2619 }
2620
2621 self.tun_state = TunState::Disabled;
2622 }
2623
2624 self.state = NodeState::Stopped;
2625 info!(state = %self.state, "Node stopped");
2626 Ok(())
2627 }
2628
2629 async fn send_disconnect_to_all_peers(&mut self, reason: DisconnectReason) {
2634 let disconnect = Disconnect::new(reason);
2635 let plaintext = disconnect.encode();
2636
2637 let peer_addrs: Vec<NodeAddr> = self
2639 .peers
2640 .iter()
2641 .filter(|(_, peer)| peer.can_send() && peer.has_session())
2642 .map(|(addr, _)| *addr)
2643 .collect();
2644
2645 if peer_addrs.is_empty() {
2646 debug!(
2647 total_peers = self.peers.len(),
2648 "No sendable peers for disconnect notification"
2649 );
2650 return;
2651 }
2652
2653 let mut sent = 0usize;
2654 for node_addr in &peer_addrs {
2655 match self
2656 .send_encrypted_link_message(node_addr, &plaintext)
2657 .await
2658 {
2659 Ok(()) => sent += 1,
2660 Err(e) => {
2661 debug!(
2662 peer = %self.peer_display_name(node_addr),
2663 error = %e,
2664 "Failed to send disconnect (transport may be down)"
2665 );
2666 }
2667 }
2668 }
2669
2670 info!(sent, total = peer_addrs.len(), reason = %reason, "Sent disconnect notifications");
2671 }
2672
2673 pub(in crate::node) fn static_peer_addresses(
2674 &self,
2675 peer_config: &PeerConfig,
2676 ) -> Vec<PeerAddress> {
2677 peer_config
2678 .addresses_by_priority()
2679 .into_iter()
2680 .cloned()
2681 .collect()
2682 }
2683
2684 async fn nostr_peer_fallback_addresses(
2685 &self,
2686 peer_config: &PeerConfig,
2687 existing: &[PeerAddress],
2688 ) -> Vec<PeerAddress> {
2689 if !self.config.node.discovery.nostr.enabled
2690 || self.config.node.discovery.nostr.policy
2691 == crate::config::NostrDiscoveryPolicy::Disabled
2692 {
2693 return Vec::new();
2694 }
2695
2696 let Some(bootstrap) = self.nostr_discovery.clone() else {
2697 return Vec::new();
2698 };
2699 if bootstrap
2700 .cooldown_until(&peer_config.npub, Self::now_ms())
2701 .is_some()
2702 {
2703 debug!(
2704 npub = %peer_config.npub,
2705 "Skipping cached Nostr fallback endpoints while peer is in traversal cooldown"
2706 );
2707 return Vec::new();
2708 }
2709 let endpoints = match bootstrap
2710 .cached_advert_endpoints_for_peer(&peer_config.npub)
2711 .await
2712 {
2713 Some(endpoints) => endpoints,
2714 None => {
2715 debug!(
2716 npub = %peer_config.npub,
2717 "No cached Nostr advert endpoints for configured peer"
2718 );
2719 return Vec::new();
2720 }
2721 };
2722
2723 let mut fallback = Vec::new();
2724 let mut next_priority = existing
2725 .iter()
2726 .map(|addr| addr.priority)
2727 .max()
2728 .unwrap_or(100)
2729 .saturating_add(1);
2730 let seen_at_ms = Self::now_ms();
2736 for endpoint in endpoints {
2737 let Some(candidate) =
2738 Self::overlay_endpoint_to_peer_address(&endpoint, next_priority, seen_at_ms)
2739 else {
2740 continue;
2741 };
2742 if existing
2743 .iter()
2744 .any(|addr| addr.transport == candidate.transport && addr.addr == candidate.addr)
2745 || fallback.iter().any(|addr: &PeerAddress| {
2746 addr.transport == candidate.transport && addr.addr == candidate.addr
2747 })
2748 {
2749 continue;
2750 }
2751 fallback.push(candidate);
2752 next_priority = next_priority.saturating_add(1);
2753 }
2754 fallback
2755 }
2756
2757 async fn request_nostr_bootstrap(&self, peer_config: &PeerConfig) -> bool {
2758 if !self.config.node.discovery.nostr.enabled
2759 || self.config.node.discovery.nostr.policy
2760 == crate::config::NostrDiscoveryPolicy::Disabled
2761 {
2762 return false;
2763 }
2764 let Some(bootstrap) = self.nostr_discovery.clone() else {
2765 return false;
2766 };
2767 let now_ms = Self::now_ms();
2768 if let Some(cooldown_until_ms) = bootstrap.cooldown_until(&peer_config.npub, now_ms) {
2769 debug!(
2770 npub = %peer_config.npub,
2771 cooldown_secs = cooldown_until_ms.saturating_sub(now_ms) / 1000,
2772 "Skipping Nostr traversal request while peer is in cooldown"
2773 );
2774 return false;
2775 }
2776 bootstrap.set_outbound_admission(self.open_discovery_outbound_admission_check());
2777 bootstrap.set_direct_refresh_admission(self.outbound_direct_refresh_admission_check());
2778 let mesh_signaling_allowed = self.mesh_signaling_allowed_for_peer(peer_config);
2779 bootstrap
2780 .request_connect_with_mesh_signaling(peer_config.clone(), mesh_signaling_allowed)
2781 .await;
2782 info!(
2783 npub = %peer_config.npub,
2784 mesh_signaling_allowed,
2785 "Started background UDP NAT traversal attempt"
2786 );
2787 true
2788 }
2789
2790 pub(in crate::node) fn mesh_signaling_allowed_for_peer(
2791 &self,
2792 peer_config: &PeerConfig,
2793 ) -> bool {
2794 let Ok(identity) = PeerIdentity::from_npub(&peer_config.npub) else {
2795 return false;
2796 };
2797 let peer_addr = identity.node_addr();
2798 self.configured_peer(peer_addr).is_some()
2799 }
2800
2801 fn overlay_endpoint_to_peer_address(
2802 endpoint: &OverlayEndpointAdvert,
2803 priority: u8,
2804 seen_at_ms: u64,
2805 ) -> Option<PeerAddress> {
2806 let transport = match endpoint.transport {
2807 OverlayTransportKind::Udp => "udp",
2808 OverlayTransportKind::Tcp => "tcp",
2809 OverlayTransportKind::Tor => "tor",
2810 OverlayTransportKind::WebRtc => "webrtc",
2811 };
2812 Some(
2813 PeerAddress::with_priority(transport, endpoint.addr.clone(), priority)
2814 .with_seen_at_ms(seen_at_ms),
2815 )
2816 }
2817
2818 async fn attempt_peer_address_list(
2819 &mut self,
2820 peer_config: &PeerConfig,
2821 peer_identity: PeerIdentity,
2822 allow_bootstrap_nat: bool,
2823 addresses: &[PeerAddress],
2824 ) -> Result<(), NodeError> {
2825 let mut attempted = false;
2826 let mut local_route_error = None;
2827 let peer_node_addr = *peer_identity.node_addr();
2828 let mut concrete_budget = self.path_candidate_attempt_budget(&peer_node_addr);
2829
2830 for addr in addresses {
2831 if addr.transport == "udp" && addr.addr.eq_ignore_ascii_case("nat") {
2832 if !allow_bootstrap_nat {
2833 continue;
2834 }
2835 if self.request_nostr_bootstrap(peer_config).await {
2836 attempted = true;
2837 continue;
2838 }
2839 debug!(npub = %peer_config.npub, "No Nostr overlay runtime for udp:nat address");
2840 continue;
2841 }
2842
2843 let (transport_id, remote_addr) = if addr.transport == "ethernet" {
2844 match self.resolve_ethernet_addr(&addr.addr) {
2845 Ok(result) => result,
2846 Err(e) => {
2847 debug!(
2848 transport = %addr.transport,
2849 addr = %addr.addr,
2850 error = %e,
2851 "Failed to resolve Ethernet address"
2852 );
2853 continue;
2854 }
2855 }
2856 } else if addr.transport == "ble" {
2857 #[cfg(bluer_available)]
2858 {
2859 match self.resolve_ble_addr(&addr.addr) {
2860 Ok(result) => result,
2861 Err(e) => {
2862 debug!(
2863 transport = %addr.transport,
2864 addr = %addr.addr,
2865 error = %e,
2866 "Failed to resolve BLE address"
2867 );
2868 continue;
2869 }
2870 }
2871 }
2872 #[cfg(not(bluer_available))]
2873 {
2874 debug!(transport = %addr.transport, "BLE transport not available on this build");
2875 continue;
2876 }
2877 } else {
2878 let tid = if addr.transport == "udp"
2879 && let Ok(remote_socket_addr) = addr.addr.parse::<SocketAddr>()
2880 {
2881 match self.find_udp_transport_for_remote_addr(remote_socket_addr) {
2882 Some((id, _)) => id,
2883 None => {
2884 debug!(
2885 transport = %addr.transport,
2886 addr = %addr.addr,
2887 "No compatible operational UDP transport for address"
2888 );
2889 continue;
2890 }
2891 }
2892 } else {
2893 match self.find_transport_for_type(&addr.transport) {
2894 Some(id) => id,
2895 None => {
2896 debug!(
2897 transport = %addr.transport,
2898 addr = %addr.addr,
2899 "No operational transport for address type"
2900 );
2901 continue;
2902 }
2903 }
2904 };
2905 (tid, TransportAddr::from_string(&addr.addr))
2906 };
2907
2908 if self.is_connecting_to_peer_on_path(&peer_node_addr, transport_id, &remote_addr) {
2909 attempted = true;
2910 debug!(
2911 npub = %peer_config.npub,
2912 transport_id = %transport_id,
2913 remote_addr = %remote_addr,
2914 "Skipping duplicate in-flight candidate path"
2915 );
2916 continue;
2917 }
2918
2919 if concrete_budget == 0 {
2920 debug!(
2921 npub = %peer_config.npub,
2922 max_candidates = MAX_PARALLEL_PATH_CANDIDATES_PER_PEER,
2923 "Path candidate race budget exhausted"
2924 );
2925 break;
2926 }
2927
2928 match self
2929 .initiate_connection(transport_id, remote_addr, peer_identity)
2930 .await
2931 {
2932 Ok(()) => {
2933 attempted = true;
2934 concrete_budget = concrete_budget.saturating_sub(1);
2935 }
2936 Err(e @ NodeError::AccessDenied(_)) => return Err(e),
2937 Err(e) => {
2938 if e.is_local_route_unavailable() && local_route_error.is_none() {
2939 local_route_error = Some(e.to_string());
2940 }
2941 debug!(
2942 npub = %peer_config.npub,
2943 transport_id = %transport_id,
2944 error = %e,
2945 "Connection attempt failed, trying next address"
2946 );
2947 }
2948 }
2949 }
2950
2951 if attempted {
2952 return Ok(());
2953 }
2954
2955 if let Some(error) = local_route_error {
2956 return Err(NodeError::LocalRouteUnavailable(error));
2957 }
2958
2959 Err(NodeError::NoTransportForType(format!(
2960 "no operational transport for any of {}'s addresses",
2961 peer_config.npub
2962 )))
2963 }
2964
2965 async fn queue_open_discovery_retries(&mut self, bootstrap: &std::sync::Arc<NostrDiscovery>) {
2966 self.run_open_discovery_sweep(bootstrap, None, "per-tick")
2967 .await;
2968 }
2969
2970 pub(in crate::node) fn queue_active_fallback_direct_retries(
2971 &mut self,
2972 bootstrap: &std::sync::Arc<NostrDiscovery>,
2973 ) {
2974 let now_ms = Self::now_ms();
2975 let peer_configs = self
2976 .config
2977 .auto_connect_peers()
2978 .cloned()
2979 .collect::<Vec<_>>();
2980
2981 for peer_config in peer_configs {
2982 let Ok(peer_identity) = PeerIdentity::from_npub(&peer_config.npub) else {
2983 continue;
2984 };
2985 let node_addr = *peer_identity.node_addr();
2986
2987 if self.retry_pending.contains_key(&node_addr)
2988 || !self.peers.contains_key(&node_addr)
2989 || self.is_connecting_to_peer(&node_addr)
2990 || !self.active_peer_should_keep_direct_retry(&node_addr, &peer_config)
2991 {
2992 continue;
2993 }
2994
2995 let cooldown_until_ms = bootstrap.cooldown_until(&peer_config.npub, now_ms);
2996 let mut state = super::retry::RetryState::new(peer_config.clone());
2997 state.reconnect = true;
2998 state.retry_after_ms = cooldown_until_ms.unwrap_or(now_ms);
2999 self.retry_pending.insert(node_addr, state);
3000
3001 debug!(
3002 peer = %self.peer_display_name(&node_addr),
3003 cooldown_secs = cooldown_until_ms.map(|t| t.saturating_sub(now_ms) / 1000),
3004 "Queued direct-path retry for active fallback peer"
3005 );
3006 }
3007 }
3008
3009 pub(in crate::node) async fn run_open_discovery_sweep(
3020 &mut self,
3021 bootstrap: &std::sync::Arc<NostrDiscovery>,
3022 max_age_secs: Option<u64>,
3023 caller: &'static str,
3024 ) {
3025 if !self.config.node.discovery.nostr.enabled
3026 || self.config.node.discovery.nostr.policy != crate::config::NostrDiscoveryPolicy::Open
3027 {
3028 return;
3029 }
3030
3031 let configured_npubs = self
3032 .config
3033 .peers()
3034 .iter()
3035 .map(|peer| peer.npub.clone())
3036 .collect::<HashSet<_>>();
3037 let now_ms = Self::now_ms();
3038 let now_secs = now_ms / 1000;
3039 let mut enqueue_budget = self.open_discovery_enqueue_budget(&configured_npubs);
3040 if enqueue_budget == 0 {
3041 debug!(
3042 caller = %caller,
3043 "open-discovery sweep: enqueue budget is 0, skipping"
3044 );
3045 return;
3046 }
3047
3048 let candidates = bootstrap.cached_open_discovery_candidates(64).await;
3049 let cached_count = candidates.len();
3050 let mut enqueued = 0usize;
3051 let mut skipped_age = 0usize;
3052 let mut skipped_configured = 0usize;
3053 let mut skipped_self = 0usize;
3054 let mut skipped_connected = 0usize;
3055 let mut skipped_retry_pending = 0usize;
3056 let mut skipped_connecting = 0usize;
3057 let mut skipped_no_endpoints = 0usize;
3058 let mut skipped_invalid_npub = 0usize;
3059 let mut skipped_cooldown = 0usize;
3060
3061 for (npub, endpoints, created_at_secs) in candidates {
3062 if enqueue_budget == 0 {
3063 break;
3064 }
3065
3066 if let Some(max_age) = max_age_secs
3067 && now_secs.saturating_sub(created_at_secs) > max_age
3068 {
3069 skipped_age = skipped_age.saturating_add(1);
3070 continue;
3071 }
3072
3073 if configured_npubs.contains(&npub) {
3074 if let Ok(identity) = PeerIdentity::from_npub(&npub) {
3094 let configured_addr = *identity.node_addr();
3095 if bootstrap.cooldown_until(&npub, now_ms).is_some() {
3096 skipped_cooldown = skipped_cooldown.saturating_add(1);
3097 skipped_configured = skipped_configured.saturating_add(1);
3098 continue;
3099 }
3100 if let Some(state) = self.retry_pending.get_mut(&configured_addr)
3101 && state.retry_after_ms > now_ms
3102 {
3103 state.retry_after_ms = now_ms;
3104 debug!(
3105 caller = %caller,
3106 peer = %self.peer_display_name(&configured_addr),
3107 advert_age_secs = now_secs.saturating_sub(created_at_secs),
3108 "Expediting configured-peer retry after fresh overlay advert"
3109 );
3110 }
3111 }
3112 skipped_configured = skipped_configured.saturating_add(1);
3113 continue;
3114 }
3115
3116 let peer_identity = match PeerIdentity::from_npub(&npub) {
3117 Ok(identity) => identity,
3118 Err(_) => {
3119 skipped_invalid_npub = skipped_invalid_npub.saturating_add(1);
3120 continue;
3121 }
3122 };
3123 let node_addr = *peer_identity.node_addr();
3124 if node_addr == *self.identity.node_addr() {
3125 skipped_self = skipped_self.saturating_add(1);
3126 continue;
3127 }
3128 if self.peers.contains_key(&node_addr) {
3129 skipped_connected = skipped_connected.saturating_add(1);
3130 continue;
3131 }
3132 if self.retry_pending.contains_key(&node_addr) {
3133 skipped_retry_pending = skipped_retry_pending.saturating_add(1);
3134 continue;
3135 }
3136 if bootstrap.cooldown_until(&npub, now_ms).is_some() {
3137 skipped_cooldown = skipped_cooldown.saturating_add(1);
3138 continue;
3139 }
3140 let connecting = self.connections.values().any(|conn| {
3141 conn.expected_identity()
3142 .map(|id| id.node_addr() == &node_addr)
3143 .unwrap_or(false)
3144 });
3145 if connecting {
3146 skipped_connecting = skipped_connecting.saturating_add(1);
3147 continue;
3148 }
3149
3150 let mut addresses = Vec::new();
3151 let mut priority = 120u8;
3152 let seen_at_ms = Self::now_ms();
3153 for endpoint in endpoints {
3154 let Some(candidate) =
3155 Self::overlay_endpoint_to_peer_address(&endpoint, priority, seen_at_ms)
3156 else {
3157 continue;
3158 };
3159 if addresses.iter().any(|existing: &PeerAddress| {
3160 existing.transport == candidate.transport && existing.addr == candidate.addr
3161 }) {
3162 continue;
3163 }
3164 addresses.push(candidate);
3165 priority = priority.saturating_add(1);
3166 }
3167 if addresses.is_empty() {
3168 skipped_no_endpoints = skipped_no_endpoints.saturating_add(1);
3169 continue;
3170 }
3171
3172 self.peer_aliases
3173 .entry(node_addr)
3174 .or_insert_with(|| peer_identity.short_npub());
3175 self.register_identity(node_addr, peer_identity.pubkey_full());
3176
3177 let mut state = super::retry::RetryState::new(PeerConfig {
3178 npub: npub.clone(),
3179 alias: None,
3180 addresses,
3181 connect_policy: ConnectPolicy::AutoConnect,
3182 auto_reconnect: true,
3183 discovery_fallback_transit: false,
3184 });
3185 state.reconnect = false;
3186 state.retry_after_ms = now_ms;
3187 state.expires_at_ms = Some(self.open_discovery_retry_expires_at_ms(now_ms));
3188 self.retry_pending.insert(node_addr, state);
3189 info!(
3190 caller = %caller,
3191 peer = %peer_identity.short_npub(),
3192 advert_age_secs = now_secs.saturating_sub(created_at_secs),
3193 "open-discovery sweep: queued retry for cached advert"
3194 );
3195 enqueue_budget = enqueue_budget.saturating_sub(1);
3196 enqueued = enqueued.saturating_add(1);
3197 }
3198
3199 let total_skipped = skipped_age
3203 + skipped_configured
3204 + skipped_self
3205 + skipped_connected
3206 + skipped_retry_pending
3207 + skipped_connecting
3208 + skipped_no_endpoints
3209 + skipped_invalid_npub
3210 + skipped_cooldown;
3211 let should_summarize = caller == "startup" || enqueued > 0;
3212 if should_summarize {
3213 info!(
3214 caller = %caller,
3215 cached = cached_count,
3216 queued = enqueued,
3217 skipped_age = skipped_age,
3218 skipped_configured = skipped_configured,
3219 skipped_self = skipped_self,
3220 skipped_connected = skipped_connected,
3221 skipped_retry_pending = skipped_retry_pending,
3222 skipped_connecting = skipped_connecting,
3223 skipped_no_endpoints = skipped_no_endpoints,
3224 skipped_invalid_npub = skipped_invalid_npub,
3225 skipped_cooldown = skipped_cooldown,
3226 skipped_total = total_skipped,
3227 "open-discovery sweep complete"
3228 );
3229 }
3230 }
3231
3232 async fn maybe_run_startup_open_discovery_sweep(
3240 &mut self,
3241 bootstrap: &std::sync::Arc<NostrDiscovery>,
3242 ) {
3243 if self.startup_open_discovery_sweep_done {
3244 return;
3245 }
3246 if !self.config.node.discovery.nostr.enabled
3247 || self.config.node.discovery.nostr.policy != crate::config::NostrDiscoveryPolicy::Open
3248 {
3249 self.startup_open_discovery_sweep_done = true;
3251 return;
3252 }
3253 let Some(started_at_ms) = self.nostr_discovery_started_at_ms else {
3254 return;
3255 };
3256 let now_ms = Self::now_ms();
3257 let delay_ms = self
3258 .config
3259 .node
3260 .discovery
3261 .nostr
3262 .startup_sweep_delay_secs
3263 .saturating_mul(1000);
3264 if now_ms < started_at_ms.saturating_add(delay_ms) {
3265 return;
3266 }
3267
3268 let max_age_secs = self.config.node.discovery.nostr.startup_sweep_max_age_secs;
3269 self.run_open_discovery_sweep(bootstrap, Some(max_age_secs), "startup")
3270 .await;
3271 self.startup_open_discovery_sweep_done = true;
3272 }
3273
3274 fn available_outbound_slots(&self) -> usize {
3275 let connection_used = self
3276 .connections
3277 .len()
3278 .saturating_add(self.pending_connects.len());
3279 let connection_slots = if self.max_connections == 0 {
3280 usize::MAX
3281 } else {
3282 self.max_connections.saturating_sub(connection_used)
3283 };
3284
3285 let peer_slots = if self.max_peers == 0 {
3286 usize::MAX
3287 } else {
3288 self.max_peers.saturating_sub(self.peers.len())
3289 };
3290
3291 let link_slots = if self.max_links == 0 {
3292 usize::MAX
3293 } else {
3294 self.max_links.saturating_sub(self.links.len())
3295 };
3296
3297 connection_slots.min(peer_slots).min(link_slots)
3298 }
3299
3300 pub(in crate::node) fn open_discovery_enqueue_budget(
3301 &self,
3302 configured_npubs: &HashSet<String>,
3303 ) -> usize {
3304 let current_open_discovery_active = self
3305 .peers
3306 .values()
3307 .filter(|peer| !configured_npubs.contains(&peer.npub()))
3308 .count();
3309 let current_open_discovery_pending = self
3310 .retry_pending
3311 .values()
3312 .filter(|state| !configured_npubs.contains(&state.peer_config.npub))
3313 .count();
3314
3315 let cap_remaining = self
3316 .config
3317 .node
3318 .discovery
3319 .nostr
3320 .open_discovery_max_pending
3321 .saturating_sub(current_open_discovery_active)
3322 .saturating_sub(current_open_discovery_pending);
3323
3324 cap_remaining.min(self.available_outbound_slots())
3325 }
3326
3327 fn open_discovery_retry_expires_at_ms(&self, now_ms: u64) -> u64 {
3328 now_ms.saturating_add(
3329 self.config
3330 .node
3331 .discovery
3332 .nostr
3333 .advert_ttl_secs
3334 .saturating_mul(1000)
3335 .saturating_mul(OPEN_DISCOVERY_RETRY_LIFETIME_MULTIPLIER),
3336 )
3337 }
3338
3339 async fn build_overlay_advert(
3340 &self,
3341 bootstrap: &std::sync::Arc<NostrDiscovery>,
3342 ) -> Option<OverlayAdvert> {
3343 if !self.config.node.discovery.nostr.enabled {
3344 return None;
3345 }
3346
3347 let mut endpoints = Vec::new();
3348 let mut has_udp_nat = false;
3349 let mut has_webrtc = false;
3350
3351 for handle in self.transports.values() {
3352 if !handle.is_operational() {
3353 continue;
3354 }
3355
3356 match handle.transport_type().name {
3357 "udp" => {
3358 let Some(cfg) = self.lookup_udp_config(handle.name()) else {
3359 continue;
3360 };
3361 if !cfg.advertise_on_nostr() {
3362 continue;
3363 }
3364 if cfg.is_public() {
3365 if let Some(explicit) = cfg.external_advert_addr() {
3375 endpoints.push(OverlayEndpointAdvert {
3376 transport: OverlayTransportKind::Udp,
3377 addr: explicit.to_string(),
3378 });
3379 } else {
3380 match handle.local_addr() {
3381 Some(addr)
3382 if !addr.ip().is_unspecified()
3383 && !is_unroutable_advert_ip(addr.ip()) =>
3384 {
3385 endpoints.push(OverlayEndpointAdvert {
3386 transport: OverlayTransportKind::Udp,
3387 addr: addr.to_string(),
3388 });
3389 }
3390 Some(addr) => {
3391 let key = handle.transport_id().as_u32();
3392 let port = addr.port();
3393 if let Some(public) =
3394 bootstrap.learn_public_udp_addr(key, port).await
3395 {
3396 endpoints.push(OverlayEndpointAdvert {
3397 transport: OverlayTransportKind::Udp,
3398 addr: public.to_string(),
3399 });
3400 } else {
3401 warn!(
3402 transport_id = key,
3403 bind_addr = %addr,
3404 "advert: udp public=true but bind is wildcard \
3405 or private and STUN observation failed; \
3406 advertising no UDP endpoint. Either set \
3407 transports.udp.external_addr, bind to a \
3408 specific *public* IP, or ensure \
3409 node.discovery.nostr.stun_servers is reachable"
3410 );
3411 }
3412 }
3413 None => {}
3414 }
3415 }
3416 } else {
3417 endpoints.push(OverlayEndpointAdvert {
3418 transport: OverlayTransportKind::Udp,
3419 addr: "nat".to_string(),
3420 });
3421 has_udp_nat = true;
3422 }
3423 }
3424 "webrtc" => {
3425 let Some(cfg) = self.lookup_webrtc_config(handle.name()) else {
3426 continue;
3427 };
3428 if !cfg.advertise_on_nostr() {
3429 continue;
3430 }
3431 endpoints.push(OverlayEndpointAdvert {
3432 transport: OverlayTransportKind::WebRtc,
3433 addr: hex::encode(self.identity.pubkey_full().serialize()),
3434 });
3435 has_webrtc = true;
3436 }
3437 "tcp" => {
3438 let Some(cfg) = self.lookup_tcp_config(handle.name()) else {
3439 continue;
3440 };
3441 if !cfg.advertise_on_nostr() {
3442 continue;
3443 }
3444 if let Some(explicit) = cfg.external_advert_addr() {
3456 endpoints.push(OverlayEndpointAdvert {
3457 transport: OverlayTransportKind::Tcp,
3458 addr: explicit.to_string(),
3459 });
3460 } else {
3461 match handle.local_addr() {
3462 Some(addr)
3463 if !addr.ip().is_unspecified()
3464 && !is_unroutable_advert_ip(addr.ip()) =>
3465 {
3466 endpoints.push(OverlayEndpointAdvert {
3467 transport: OverlayTransportKind::Tcp,
3468 addr: addr.to_string(),
3469 });
3470 }
3471 Some(addr) => {
3472 warn!(
3473 bind_addr = %addr,
3474 "advert: tcp advertise_on_nostr=true bound to wildcard \
3475 or private IP and no transports.tcp.external_addr set; \
3476 advertising no TCP endpoint. Either set external_addr \
3477 to the public IP (recommended for cloud 1:1-NAT setups) \
3478 or bind explicitly to the public IP"
3479 );
3480 }
3481 None => {}
3482 }
3483 }
3484 }
3485 "tor" => {
3486 let Some(cfg) = self.lookup_tor_config(handle.name()) else {
3487 continue;
3488 };
3489 if !cfg.advertise_on_nostr() {
3490 continue;
3491 }
3492 if let Some(addr) = handle.onion_address() {
3493 endpoints.push(OverlayEndpointAdvert {
3494 transport: OverlayTransportKind::Tor,
3495 addr: format!("{}:{}", addr, cfg.advertised_port()),
3496 });
3497 }
3498 }
3499 _ => {}
3500 }
3501 }
3502
3503 if endpoints.is_empty() {
3504 return None;
3505 }
3506
3507 Some(OverlayAdvert {
3508 identifier: ADVERT_IDENTIFIER.to_string(),
3509 version: ADVERT_VERSION,
3510 endpoints,
3511 signal_relays: (has_udp_nat || has_webrtc)
3512 .then(|| self.config.node.discovery.nostr.dm_relays.clone()),
3513 stun_servers: (has_udp_nat || has_webrtc)
3514 .then(|| self.config.node.discovery.nostr.stun_servers.clone()),
3515 })
3516 }
3517
3518 async fn refresh_overlay_advert(
3519 &self,
3520 bootstrap: &std::sync::Arc<NostrDiscovery>,
3521 ) -> Result<(), crate::discovery::nostr::BootstrapError> {
3522 let advert = self.build_overlay_advert(bootstrap).await;
3523 bootstrap.update_local_advert(advert).await
3524 }
3525
3526 fn lookup_udp_config(&self, transport_name: Option<&str>) -> Option<&crate::config::UdpConfig> {
3527 match (&self.config.transports.udp, transport_name) {
3528 (crate::config::TransportInstances::Single(cfg), None) => Some(cfg),
3529 (crate::config::TransportInstances::Named(configs), Some(name)) => configs.get(name),
3530 _ => None,
3531 }
3532 }
3533
3534 fn lookup_tcp_config(&self, transport_name: Option<&str>) -> Option<&crate::config::TcpConfig> {
3535 match (&self.config.transports.tcp, transport_name) {
3536 (crate::config::TransportInstances::Single(cfg), None) => Some(cfg),
3537 (crate::config::TransportInstances::Named(configs), Some(name)) => configs.get(name),
3538 _ => None,
3539 }
3540 }
3541
3542 fn lookup_tor_config(&self, transport_name: Option<&str>) -> Option<&crate::config::TorConfig> {
3543 match (&self.config.transports.tor, transport_name) {
3544 (crate::config::TransportInstances::Single(cfg), None) => Some(cfg),
3545 (crate::config::TransportInstances::Named(configs), Some(name)) => configs.get(name),
3546 _ => None,
3547 }
3548 }
3549
3550 fn lookup_webrtc_config(
3551 &self,
3552 transport_name: Option<&str>,
3553 ) -> Option<&crate::config::WebRtcConfig> {
3554 match (&self.config.transports.webrtc, transport_name) {
3555 (crate::config::TransportInstances::Single(cfg), None) => Some(cfg),
3556 (crate::config::TransportInstances::Named(configs), Some(name)) => configs.get(name),
3557 _ => None,
3558 }
3559 }
3560
3561 pub(in crate::node) async fn try_peer_addresses(
3562 &mut self,
3563 peer_config: &PeerConfig,
3564 peer_identity: PeerIdentity,
3565 allow_bootstrap_nat: bool,
3566 ) -> Result<(), NodeError> {
3567 let peer_node_addr = *peer_identity.node_addr();
3568 if self.peers.contains_key(&peer_node_addr) {
3569 debug!(
3570 npub = %peer_config.npub,
3571 "Peer already exists, skipping address attempts"
3572 );
3573 return Ok(());
3574 }
3575
3576 let candidates = self.peer_address_candidates(peer_config).await;
3577
3578 if candidates.is_empty() {
3579 if allow_bootstrap_nat && self.request_nostr_bootstrap(peer_config).await {
3580 return Ok(());
3581 }
3582 return Err(NodeError::NoTransportForType(format!(
3583 "no addresses known for {}",
3584 peer_config.npub
3585 )));
3586 }
3587
3588 if self
3589 .attempt_peer_address_list(peer_config, peer_identity, allow_bootstrap_nat, &candidates)
3590 .await
3591 .is_ok()
3592 {
3593 if allow_bootstrap_nat {
3594 self.request_nostr_bootstrap(peer_config).await;
3595 }
3596 return Ok(());
3597 }
3598
3599 if allow_bootstrap_nat && self.request_nostr_bootstrap(peer_config).await {
3600 return Ok(());
3601 }
3602
3603 Err(NodeError::NoTransportForType(format!(
3604 "no operational transport for any of {}'s addresses",
3605 peer_config.npub
3606 )))
3607 }
3608
3609 async fn try_active_peer_alternative_addresses(
3610 &mut self,
3611 peer_config: &PeerConfig,
3612 peer_identity: PeerIdentity,
3613 ) -> Result<bool, NodeError> {
3614 let peer_node_addr = *peer_identity.node_addr();
3615 let candidates = self.peer_address_candidates(peer_config).await;
3616 let should_try_nostr =
3617 self.active_peer_should_keep_direct_retry(&peer_node_addr, peer_config);
3618
3619 if candidates.is_empty() {
3620 if should_try_nostr && self.request_nostr_bootstrap(peer_config).await {
3621 return Ok(true);
3622 }
3623 return Err(NodeError::NoTransportForType(format!(
3624 "no addresses known for {}",
3625 peer_config.npub
3626 )));
3627 }
3628
3629 let alternatives: Vec<_> = candidates
3630 .into_iter()
3631 .filter(|addr| !self.active_peer_matches_candidate(&peer_node_addr, addr))
3632 .collect();
3633
3634 if alternatives.is_empty() {
3635 if should_try_nostr && self.request_nostr_bootstrap(peer_config).await {
3636 return Ok(true);
3637 }
3638 return Ok(false);
3639 }
3640
3641 let needs_separate_nostr_attempt = should_try_nostr
3642 && !alternatives
3643 .iter()
3644 .any(|addr| addr.transport == "udp" && addr.addr.eq_ignore_ascii_case("nat"));
3645 let address_result = self
3646 .attempt_peer_address_list(peer_config, peer_identity, true, &alternatives)
3647 .await;
3648 let nostr_attempted =
3649 needs_separate_nostr_attempt && self.request_nostr_bootstrap(peer_config).await;
3650
3651 match address_result {
3652 Ok(()) => Ok(true),
3653 Err(err) if nostr_attempted => {
3654 debug!(
3655 npub = %peer_config.npub,
3656 error = %err,
3657 "Static active-peer direct-path alternatives failed; Nostr traversal still queued"
3658 );
3659 Ok(true)
3660 }
3661 Err(err) => Err(err),
3662 }
3663 }
3664
3665 async fn peer_address_candidates(&self, peer_config: &PeerConfig) -> Vec<PeerAddress> {
3666 let static_addresses = self.static_peer_addresses(peer_config);
3673 let overlay_addresses = self
3674 .nostr_peer_fallback_addresses(peer_config, &static_addresses)
3675 .await;
3676
3677 let mut candidates = Vec::with_capacity(overlay_addresses.len() + static_addresses.len());
3678 for addr in overlay_addresses.into_iter().chain(static_addresses) {
3679 if !candidates.iter().any(|existing: &PeerAddress| {
3680 existing.transport == addr.transport && existing.addr == addr.addr
3681 }) {
3682 candidates.push(addr);
3683 }
3684 }
3685
3686 candidates.sort_by(|a, b| match (a.seen_at_ms, b.seen_at_ms) {
3691 _ if a.priority != b.priority => a.priority.cmp(&b.priority),
3692 (Some(a_ts), Some(b_ts)) => b_ts.cmp(&a_ts),
3693 (Some(_), None) => std::cmp::Ordering::Less,
3694 (None, Some(_)) => std::cmp::Ordering::Greater,
3695 (None, None) => std::cmp::Ordering::Equal,
3696 });
3697
3698 candidates
3699 }
3700
3701 fn active_peer_matches_any_candidate(
3702 &self,
3703 peer_node_addr: &NodeAddr,
3704 candidates: &[PeerAddress],
3705 ) -> bool {
3706 candidates
3707 .iter()
3708 .any(|candidate| self.active_peer_matches_candidate(peer_node_addr, candidate))
3709 }
3710
3711 pub(in crate::node) fn active_peer_candidate_is_fresh_enough_to_skip(
3712 &self,
3713 peer_node_addr: &NodeAddr,
3714 candidates: &[PeerAddress],
3715 ) -> bool {
3716 if !self.active_peer_matches_any_candidate(peer_node_addr, candidates) {
3717 return false;
3718 }
3719 !self.active_peer_needs_same_path_refresh(peer_node_addr)
3720 }
3721
3722 pub(in crate::node) fn active_peer_should_keep_direct_retry(
3723 &self,
3724 peer_node_addr: &NodeAddr,
3725 peer_config: &PeerConfig,
3726 ) -> bool {
3727 let Some(peer) = self.peers.get(peer_node_addr) else {
3728 return false;
3729 };
3730
3731 let static_addresses = self.static_peer_addresses(peer_config);
3732 if !static_addresses.is_empty() {
3733 return !self
3734 .active_peer_candidate_is_fresh_enough_to_skip(peer_node_addr, &static_addresses);
3735 }
3736
3737 if peer_config.npub.is_empty() {
3738 return false;
3739 }
3740
3741 if !self.config.node.discovery.nostr.enabled
3742 || self.config.node.discovery.nostr.policy == NostrDiscoveryPolicy::Disabled
3743 {
3744 return false;
3745 }
3746
3747 peer.transport_id()
3748 .and_then(|id| self.transports.get(&id))
3749 .map(|transport| transport.transport_type().name != "udp")
3750 .unwrap_or(true)
3751 }
3752
3753 pub(in crate::node) fn clear_retry_unless_direct_refresh_needed(
3754 &mut self,
3755 peer_node_addr: &NodeAddr,
3756 ) {
3757 let keep_retry = self
3758 .retry_pending
3759 .get(peer_node_addr)
3760 .map(|state| state.peer_config.clone())
3761 .is_some_and(|peer_config| {
3762 self.active_peer_should_keep_direct_retry(peer_node_addr, &peer_config)
3763 });
3764
3765 if !keep_retry {
3766 self.retry_pending.remove(peer_node_addr);
3767 }
3768 }
3769
3770 fn active_peer_needs_same_path_refresh(&self, peer_node_addr: &NodeAddr) -> bool {
3771 let Some(peer) = self.peers.get(peer_node_addr) else {
3772 return false;
3773 };
3774 let stale_after_ms = self
3775 .config
3776 .node
3777 .heartbeat_interval_secs
3778 .saturating_mul(1000)
3779 .max(1000);
3780 peer.idle_time(Self::now_ms()) > stale_after_ms
3781 }
3782
3783 pub(in crate::node) fn active_peer_matches_candidate(
3784 &self,
3785 peer_node_addr: &NodeAddr,
3786 candidate: &PeerAddress,
3787 ) -> bool {
3788 let Some(peer) = self.peers.get(peer_node_addr) else {
3789 return false;
3790 };
3791 let Some(current_addr) = peer.current_addr() else {
3792 return false;
3793 };
3794 if let Some(peer_transport_id) = peer.transport_id()
3795 && let Some((candidate_transport_id, candidate_addr)) =
3796 self.resolve_peer_address_for_match(candidate)
3797 {
3798 return peer_transport_id == candidate_transport_id && current_addr == &candidate_addr;
3799 }
3800 if peer
3801 .transport_id()
3802 .map(|id| self.bootstrap_transports.contains(&id))
3803 .unwrap_or(false)
3804 {
3805 return false;
3806 }
3807 let current_addr = current_addr.to_string();
3808 let current_transport = peer
3809 .transport_id()
3810 .and_then(|id| self.transports.get(&id))
3811 .map(|transport| transport.transport_type().name);
3812
3813 candidate.addr == current_addr
3814 && current_transport
3815 .map(|transport| transport == candidate.transport)
3816 .unwrap_or(true)
3817 }
3818
3819 pub(in crate::node) fn active_peer_uses_recent_endpoint_path(
3820 &self,
3821 peer_node_addr: &NodeAddr,
3822 peer_config: &PeerConfig,
3823 ) -> bool {
3824 peer_config.addresses.iter().any(|addr| {
3825 addr.seen_at_ms.is_some() && self.active_peer_matches_candidate(peer_node_addr, addr)
3826 })
3827 }
3828
3829 pub(in crate::node) fn active_peer_uses_traversal_path(
3830 &self,
3831 peer_node_addr: &NodeAddr,
3832 peer_config: &PeerConfig,
3833 ) -> bool {
3834 let via_bootstrap_transport = self
3835 .peers
3836 .get(peer_node_addr)
3837 .and_then(|peer| peer.transport_id())
3838 .map(|id| self.bootstrap_transports.contains(&id))
3839 .unwrap_or(false);
3840
3841 via_bootstrap_transport
3842 || self.active_peer_uses_recent_endpoint_path(peer_node_addr, peer_config)
3843 }
3844
3845 pub(crate) async fn api_connect(
3853 &mut self,
3854 npub: &str,
3855 address: &str,
3856 transport: &str,
3857 ) -> Result<serde_json::Value, String> {
3858 let peer_config = PeerConfig {
3859 npub: npub.to_string(),
3860 alias: None,
3861 addresses: vec![PeerAddress::new(transport, address)],
3862 connect_policy: ConnectPolicy::Manual,
3863 auto_reconnect: false,
3864 discovery_fallback_transit: true,
3865 };
3866
3867 if let Ok(identity) = PeerIdentity::from_npub(npub) {
3869 self.peer_aliases
3870 .insert(*identity.node_addr(), identity.short_npub());
3871 self.register_identity(*identity.node_addr(), identity.pubkey_full());
3872 }
3873
3874 self.initiate_peer_connection(&peer_config)
3875 .await
3876 .map(|()| {
3877 info!(
3878 npub = %npub,
3879 address = %address,
3880 transport = %transport,
3881 "API connect initiated"
3882 );
3883 serde_json::json!({
3884 "npub": npub,
3885 "address": address,
3886 "transport": transport,
3887 })
3888 })
3889 .map_err(|e| e.to_string())
3890 }
3891
3892 pub(crate) fn api_disconnect(&mut self, npub: &str) -> Result<serde_json::Value, String> {
3896 let peer_identity =
3897 PeerIdentity::from_npub(npub).map_err(|e| format!("invalid npub '{npub}': {e}"))?;
3898 let node_addr = *peer_identity.node_addr();
3899
3900 if !self.peers.contains_key(&node_addr) {
3901 return Err(format!("peer not found: {npub}"));
3902 }
3903
3904 self.remove_active_peer(&node_addr);
3906
3907 self.retry_pending.remove(&node_addr);
3909
3910 info!(npub = %npub, "API disconnect completed");
3911
3912 Ok(serde_json::json!({
3913 "npub": npub,
3914 "disconnected": true,
3915 }))
3916 }
3917
3918 pub async fn adopt_established_traversal(
3925 &mut self,
3926 traversal: EstablishedTraversal,
3927 ) -> Result<BootstrapHandoffResult, NodeError> {
3928 debug!(
3929 peer_npub = %traversal.peer_npub,
3930 session_id = %traversal.session_id,
3931 remote_addr = %traversal.remote_addr,
3932 "adopting established traversal socket"
3933 );
3934
3935 if !self.state.is_operational() {
3936 return Err(NodeError::NotStarted);
3937 }
3938
3939 let packet_tx = self.packet_tx.clone().ok_or(NodeError::NotStarted)?;
3940 let peer_identity = PeerIdentity::from_npub(&traversal.peer_npub).map_err(|e| {
3941 NodeError::InvalidPeerNpub {
3942 npub: traversal.peer_npub.clone(),
3943 reason: e.to_string(),
3944 }
3945 })?;
3946 let peer_node_addr = *peer_identity.node_addr();
3947 if self.peers.contains_key(&peer_node_addr) {
3948 debug!(
3949 peer_npub = %traversal.peer_npub,
3950 "Adopting NAT traversal handoff as alternate path for already-connected peer"
3951 );
3952 }
3953
3954 self.peer_aliases
3955 .insert(peer_node_addr, peer_identity.short_npub());
3956 self.register_identity(peer_node_addr, peer_identity.pubkey_full());
3957
3958 let transport_id = self.allocate_transport_id();
3959 let inherited_config = traversal.transport_config.clone().unwrap_or_else(|| {
3979 let mut cfg = self
3980 .lookup_udp_config(traversal.transport_name.as_deref())
3981 .or_else(|| self.lookup_udp_config(None))
3982 .cloned()
3983 .unwrap_or_default();
3984 cfg.bind_addr = None;
3985 cfg.external_addr = None;
3986 cfg
3987 });
3988 let mut transport = crate::transport::udp::UdpTransport::new(
3989 transport_id,
3990 traversal.transport_name.clone(),
3991 inherited_config,
3992 packet_tx,
3993 );
3994
3995 transport
3996 .adopt_socket_async(traversal.socket)
3997 .await
3998 .map_err(|e| NodeError::BootstrapHandoff(e.to_string()))?;
3999
4000 let local_addr = transport.local_addr().ok_or_else(|| {
4001 NodeError::BootstrapHandoff("adopted UDP transport has no local address".into())
4002 })?;
4003
4004 self.transports.insert(
4005 transport_id,
4006 crate::transport::TransportHandle::Udp(transport),
4007 );
4008 self.bootstrap_transports.insert(transport_id);
4009 self.bootstrap_transport_npubs
4010 .insert(transport_id, traversal.peer_npub.clone());
4011
4012 let remote_addr = TransportAddr::from_string(&traversal.remote_addr.to_string());
4013 if let Err(err) = self
4014 .initiate_connection(transport_id, remote_addr.clone(), peer_identity)
4015 .await
4016 {
4017 self.bootstrap_transports.remove(&transport_id);
4018 self.bootstrap_transport_npubs.remove(&transport_id);
4019 if let Some(mut handle) = self.transports.remove(&transport_id) {
4020 let _ = handle.stop().await;
4021 }
4022 return Err(err);
4023 }
4024
4025 info!(
4026 peer = %self.peer_display_name(&peer_node_addr),
4027 transport_id = %transport_id,
4028 local_addr = %local_addr,
4029 remote_addr = %traversal.remote_addr,
4030 session_id = %traversal.session_id,
4031 "adopted NAT traversal socket; handshake initiated"
4032 );
4033
4034 Ok(BootstrapHandoffResult {
4035 transport_id,
4036 local_addr,
4037 remote_addr: traversal.remote_addr,
4038 peer_node_addr,
4039 session_id: traversal.session_id,
4040 })
4041 }
4042}