1use super::{Node, NodeError, NodeState};
4use crate::config::{ConnectPolicy, NostrDiscoveryPolicy, PeerAddress, PeerConfig};
5use crate::discovery::nostr::{
6 ADVERT_IDENTIFIER, ADVERT_VERSION, BootstrapEvent, MeshTraversalSignal, NostrDiscovery,
7 OverlayAdvert, OverlayEndpointAdvert, OverlayTransportKind,
8};
9use crate::discovery::{BootstrapHandoffResult, EstablishedTraversal};
10use crate::node::acl::PeerAclContext;
11use crate::node::wire::build_msg1;
12use crate::peer::PeerConnection;
13use crate::protocol::{Disconnect, DisconnectReason, SessionMessageType};
14use crate::transport::{Link, LinkDirection, LinkId, TransportAddr, TransportId, packet_channel};
15use crate::upper::tun::{TunDevice, TunState, run_tun_reader, shutdown_tun_interface};
16use crate::{NodeAddr, PeerIdentity};
17use secp256k1::PublicKey;
18use std::collections::{HashMap, HashSet};
19use std::net::{IpAddr, SocketAddr};
20use std::thread;
21use std::time::Duration;
22use tracing::{debug, info, warn};
23
24#[derive(Debug, Clone, Copy, PartialEq, Eq)]
25enum MeshSignalSessionAction {
26 Send,
27 Defer,
28 Drop,
29}
30
31#[cfg(debug_assertions)]
32fn node_start_debug_log(message: impl AsRef<str>) {
33 use std::io::Write as _;
34
35 if let Ok(mut file) = std::fs::OpenOptions::new()
36 .create(true)
37 .append(true)
38 .open(std::env::temp_dir().join("nvpn-fips-endpoint-debug.log"))
39 {
40 let _ = writeln!(
41 file,
42 "{:?} {}",
43 std::time::SystemTime::now(),
44 message.as_ref()
45 );
46 }
47}
48
49#[cfg(not(debug_assertions))]
50fn node_start_debug_log(_message: impl AsRef<str>) {}
51
52fn is_unroutable_advert_ip(ip: IpAddr) -> bool {
60 match ip {
61 IpAddr::V4(v4) => {
62 v4.is_private()
63 || v4.is_loopback()
64 || v4.is_link_local()
65 || v4.is_unspecified()
66 || v4.is_multicast()
67 || v4.is_broadcast()
68 || v4.is_documentation()
69 || (v4.octets()[0] == 100 && (v4.octets()[1] & 0xc0) == 64)
72 }
73 IpAddr::V6(v6) => {
74 v6.is_loopback()
75 || v6.is_unspecified()
76 || v6.is_unique_local()
77 || v6.is_multicast()
78 || (v6.segments()[0] & 0xffc0) == 0xfe80
80 }
81 }
82}
83
84fn socket_addr_families_compatible(local: SocketAddr, remote: SocketAddr) -> bool {
85 matches!(
86 (local, remote),
87 (SocketAddr::V4(_), SocketAddr::V4(_)) | (SocketAddr::V6(_), SocketAddr::V6(_))
88 )
89}
90
91const OPEN_DISCOVERY_RETRY_LIFETIME_MULTIPLIER: u64 = 2;
92const MAX_PARALLEL_PATH_CANDIDATES_PER_PEER: usize = 4;
93const MAX_AUTO_CONNECT_GRAPH_WARMUPS_PER_TICK: usize = 16;
94const MAX_DISCOVERY_CONNECTS_PER_TICK: usize = 16;
95
96impl Node {
97 pub(super) async fn update_peers(
109 &mut self,
110 new_peers: Vec<crate::config::PeerConfig>,
111 ) -> Result<crate::node::UpdatePeersOutcome, crate::node::NodeError> {
112 use std::collections::{HashMap, HashSet};
113
114 let mut new_by_addr: HashMap<crate::identity::NodeAddr, crate::config::PeerConfig> =
115 HashMap::with_capacity(new_peers.len());
116 let mut new_order = Vec::with_capacity(new_peers.len());
117 for peer in new_peers {
118 let identity = match PeerIdentity::from_npub(&peer.npub) {
119 Ok(id) => id,
120 Err(e) => {
121 return Err(crate::node::NodeError::InvalidPeerNpub {
122 npub: peer.npub.clone(),
123 reason: e.to_string(),
124 });
125 }
126 };
127 let node_addr = *identity.node_addr();
131 if !new_by_addr.contains_key(&node_addr) {
132 new_order.push(node_addr);
133 }
134 new_by_addr.insert(node_addr, peer);
135 }
136
137 let current_by_addr: HashMap<crate::identity::NodeAddr, crate::config::PeerConfig> = self
138 .config
139 .peers()
140 .iter()
141 .filter_map(|pc| {
142 PeerIdentity::from_npub(&pc.npub)
143 .ok()
144 .map(|id| (*id.node_addr(), pc.clone()))
145 })
146 .collect();
147
148 let new_addrs: HashSet<_> = new_by_addr.keys().copied().collect();
149 let current_addrs: HashSet<_> = current_by_addr.keys().copied().collect();
150
151 let removed: Vec<_> = current_addrs.difference(&new_addrs).copied().collect();
152 let added: Vec<_> = new_addrs.difference(¤t_addrs).copied().collect();
153 let kept: Vec<_> = new_addrs.intersection(¤t_addrs).copied().collect();
154
155 let mut outcome = crate::node::UpdatePeersOutcome::default();
156
157 for node_addr in &removed {
158 if self.retry_pending.remove(node_addr).is_some() {
159 debug!(
160 peer = %self.peer_display_name(node_addr),
161 "Dropping retry entry for peer removed from runtime peer list"
162 );
163 }
164 self.peer_aliases.remove(node_addr);
165 self.set_discovery_fallback_transit_allowed(*node_addr, false);
166 outcome.removed += 1;
167 }
168
169 let mut auto_connect_refresh_configs = Vec::new();
170 for node_addr in &kept {
171 let new_pc = &new_by_addr[node_addr];
172 let current_pc = ¤t_by_addr[node_addr];
173 if new_pc.addresses != current_pc.addresses
174 || new_pc.alias != current_pc.alias
175 || new_pc.connect_policy != current_pc.connect_policy
176 || new_pc.auto_reconnect != current_pc.auto_reconnect
177 || new_pc.discovery_fallback_transit != current_pc.discovery_fallback_transit
178 {
179 outcome.updated += 1;
180 self.set_discovery_fallback_transit_allowed(
181 *node_addr,
182 new_pc.discovery_fallback_transit,
183 );
184 if let Some(state) = self.retry_pending.get_mut(node_addr) {
185 state.peer_config = new_pc.clone();
186 state.retry_after_ms = Self::now_ms();
187 }
188 if let Some(alias) = new_pc.alias.clone() {
189 self.peer_aliases.insert(*node_addr, alias);
190 }
191 if new_pc.is_auto_connect() && !new_pc.addresses.is_empty() {
192 auto_connect_refresh_configs.push(new_pc.clone());
193 }
194 } else {
195 outcome.unchanged += 1;
196 self.set_discovery_fallback_transit_allowed(
197 *node_addr,
198 new_pc.discovery_fallback_transit,
199 );
200 if let Some(state) = self.retry_pending.get_mut(node_addr) {
201 state.peer_config = new_pc.clone();
202 }
203 if new_pc.is_auto_connect() && !new_pc.addresses.is_empty() {
204 auto_connect_refresh_configs.push(new_pc.clone());
205 }
206 }
207 }
208
209 let added_configs: Vec<crate::config::PeerConfig> = new_order
210 .iter()
211 .filter(|addr| added.contains(addr))
212 .map(|addr| new_by_addr[addr].clone())
213 .collect();
214
215 self.config.peers = new_order
219 .iter()
220 .filter_map(|addr| new_by_addr.get(addr).cloned())
221 .collect();
222 self.configured_peer_send_weights = Self::configured_peer_send_weights(&self.config);
223
224 for peer_config in added_configs {
225 outcome.added += 1;
226 let Ok(identity) = PeerIdentity::from_npub(&peer_config.npub) else {
227 continue;
228 };
229 let name = peer_config
230 .alias
231 .clone()
232 .unwrap_or_else(|| identity.short_npub());
233 self.peer_aliases.insert(*identity.node_addr(), name);
234 self.set_discovery_fallback_transit_allowed(
235 *identity.node_addr(),
236 peer_config.discovery_fallback_transit,
237 );
238 self.register_identity(*identity.node_addr(), identity.pubkey_full());
239
240 if !peer_config.is_auto_connect() {
241 continue;
242 }
243
244 match self
245 .try_auto_connect_graph_session(&peer_config, identity)
246 .await
247 {
248 Ok(true) => continue,
249 Ok(false) => {}
250 Err(err) => {
251 debug!(
252 npub = %peer_config.npub,
253 error = %err,
254 "Existing FIPS graph did not warm newly added peer"
255 );
256 }
257 }
258
259 if let Err(e) = self.initiate_peer_connection(&peer_config).await {
260 warn!(
261 npub = %peer_config.npub,
262 error = %e,
263 "Failed to initiate connection for newly added peer"
264 );
265 if let Ok(peer_identity) = PeerIdentity::from_npub(&peer_config.npub) {
266 self.schedule_retry_after_error(*peer_identity.node_addr(), Self::now_ms(), &e);
267 }
268 if matches!(e, crate::node::NodeError::NoTransportForType(_))
269 && let Some(bootstrap) = self.nostr_discovery.clone()
270 {
271 bootstrap
272 .request_advert_stale_check(peer_config.npub.clone())
273 .await;
274 }
275 }
276 }
277
278 for peer_config in auto_connect_refresh_configs {
279 let Ok(peer_identity) = PeerIdentity::from_npub(&peer_config.npub) else {
280 continue;
281 };
282 let node_addr = *peer_identity.node_addr();
283
284 if self.peers.contains_key(&node_addr) {
285 match self
286 .initiate_active_peer_alternative_connection(&peer_config)
287 .await
288 {
289 Ok(attempted) => {
290 if attempted {
291 debug!(
292 peer = %self.peer_display_name(&node_addr),
293 "Started non-disruptive alternate-path handshake for active peer"
294 );
295 }
296 }
297 Err(e) => {
298 debug!(
299 npub = %peer_config.npub,
300 error = %e,
301 "Active peer alternate-path refresh did not start"
302 );
303 }
304 }
305 continue;
306 }
307
308 match self
309 .try_auto_connect_graph_session(&peer_config, peer_identity)
310 .await
311 {
312 Ok(true) => continue,
313 Ok(false) => {}
314 Err(err) => {
315 debug!(
316 npub = %peer_config.npub,
317 error = %err,
318 "Existing FIPS graph did not warm refreshed peer"
319 );
320 }
321 }
322
323 match self.initiate_peer_connection(&peer_config).await {
324 Ok(()) => {
325 let hs_timeout_ms = self.config.node.rate_limit.handshake_timeout_secs * 1000;
326 if let Some(state) = self.retry_pending.get_mut(&node_addr) {
327 state.peer_config = peer_config;
328 state.retry_after_ms = Self::now_ms().saturating_add(hs_timeout_ms);
329 }
330 }
331 Err(e) => {
332 debug!(
333 npub = %peer_config.npub,
334 error = %e,
335 "Refreshed peer addresses did not initiate a direct connection"
336 );
337 self.schedule_retry_after_error(node_addr, Self::now_ms(), &e);
338 }
339 }
340 }
341
342 self.warm_auto_connect_graph_sessions().await;
343
344 Ok(outcome)
345 }
346
347 pub(super) async fn initiate_peer_connections(&mut self) {
348 let peer_identities: Vec<(PeerIdentity, Option<String>)> = self
354 .config
355 .peers()
356 .iter()
357 .filter_map(|pc| {
358 PeerIdentity::from_npub(&pc.npub)
359 .ok()
360 .map(|id| (id, pc.alias.clone()))
361 })
362 .collect();
363
364 for (identity, alias) in peer_identities {
365 let name = alias.unwrap_or_else(|| identity.short_npub());
366 self.peer_aliases.insert(*identity.node_addr(), name);
367 self.register_identity(*identity.node_addr(), identity.pubkey_full());
371 }
372
373 let peer_configs: Vec<_> = self.config.auto_connect_peers().cloned().collect();
375
376 if peer_configs.is_empty() {
377 debug!("No static peers configured");
378 return;
379 }
380
381 debug!(
382 count = peer_configs.len(),
383 "Initiating static peer connections"
384 );
385
386 for peer_config in peer_configs {
387 let peer_identity = match PeerIdentity::from_npub(&peer_config.npub) {
388 Ok(identity) => identity,
389 Err(_) => continue,
390 };
391 match self
392 .try_auto_connect_graph_session(&peer_config, peer_identity)
393 .await
394 {
395 Ok(true) => continue,
396 Ok(false) => {}
397 Err(err) => {
398 debug!(
399 npub = %peer_config.npub,
400 error = %err,
401 "Existing FIPS graph did not warm auto-connect peer"
402 );
403 }
404 }
405 if let Err(e) = self.initiate_peer_connection(&peer_config).await {
406 warn!(
407 npub = %peer_config.npub,
408 alias = ?peer_config.alias,
409 error = %e,
410 "Failed to initiate peer connection"
411 );
412 if let Ok(peer_identity) = PeerIdentity::from_npub(&peer_config.npub) {
416 self.schedule_retry_after_error(*peer_identity.node_addr(), Self::now_ms(), &e);
417 }
418 if matches!(e, crate::node::NodeError::NoTransportForType(_))
424 && let Some(bootstrap) = self.nostr_discovery.clone()
425 {
426 bootstrap
427 .request_advert_stale_check(peer_config.npub.clone())
428 .await;
429 }
430 }
431 }
432
433 self.warm_auto_connect_graph_sessions().await;
434 }
435
436 pub(in crate::node) async fn try_auto_connect_graph_session(
437 &mut self,
438 peer_config: &PeerConfig,
439 peer_identity: PeerIdentity,
440 ) -> Result<bool, NodeError> {
441 if !peer_config.is_auto_connect() {
442 return Ok(false);
443 }
444
445 let peer_node_addr = *peer_identity.node_addr();
446 if self
447 .peers
448 .get(&peer_node_addr)
449 .is_some_and(|peer| peer.can_send())
450 {
451 return Ok(false);
452 }
453 if self.auto_connect_should_race_direct_path(peer_config) {
454 return Ok(false);
455 }
456 if self
457 .sessions
458 .get(&peer_node_addr)
459 .is_some_and(|entry| entry.is_established() || entry.is_initiating())
460 {
461 return Ok(true);
462 }
463 if self.find_next_hop(&peer_node_addr).is_none() {
464 return Ok(false);
465 }
466
467 self.register_identity(peer_node_addr, peer_identity.pubkey_full());
468 match self
469 .initiate_session(peer_node_addr, peer_identity.pubkey_full())
470 .await
471 {
472 Ok(()) => {
473 debug!(
474 peer = %self.peer_display_name(&peer_node_addr),
475 "Warmed auto-connect peer session over existing FIPS graph"
476 );
477 Ok(true)
478 }
479 Err(NodeError::SendFailed { node_addr, reason })
480 if node_addr == peer_node_addr && reason == "no route to destination" =>
481 {
482 self.maybe_initiate_lookup(&peer_node_addr).await;
483 Ok(false)
484 }
485 Err(err) => Err(err),
486 }
487 }
488
489 fn auto_connect_should_race_direct_path(&self, peer_config: &PeerConfig) -> bool {
490 !peer_config.addresses.is_empty() || self.config.node.discovery.nostr.enabled
491 }
492
493 pub(super) async fn initiate_peer_connection(
497 &mut self,
498 peer_config: &crate::config::PeerConfig,
499 ) -> Result<(), NodeError> {
500 self.initiate_peer_connection_inner(peer_config).await
501 }
502
503 pub(super) async fn initiate_peer_retry_connection(
509 &mut self,
510 peer_config: &crate::config::PeerConfig,
511 ) -> Result<(), NodeError> {
512 self.initiate_peer_connection_inner(peer_config).await
513 }
514
515 pub(in crate::node) async fn initiate_active_peer_alternative_connection(
516 &mut self,
517 peer_config: &crate::config::PeerConfig,
518 ) -> Result<bool, NodeError> {
519 self.initiate_active_peer_alternative_connection_inner(peer_config, false)
520 .await
521 }
522
523 pub(in crate::node) async fn initiate_active_peer_direct_refresh_connection(
524 &mut self,
525 peer_config: &crate::config::PeerConfig,
526 ) -> Result<bool, NodeError> {
527 self.initiate_active_peer_alternative_connection_inner(peer_config, true)
528 .await
529 }
530
531 async fn initiate_active_peer_alternative_connection_inner(
532 &mut self,
533 peer_config: &crate::config::PeerConfig,
534 allow_same_path_refresh: bool,
535 ) -> Result<bool, NodeError> {
536 let peer_identity =
537 PeerIdentity::from_npub(&peer_config.npub).map_err(|e| NodeError::InvalidPeerNpub {
538 npub: peer_config.npub.clone(),
539 reason: e.to_string(),
540 })?;
541 let peer_node_addr = *peer_identity.node_addr();
542
543 if !self.peers.contains_key(&peer_node_addr) {
544 self.initiate_peer_connection(peer_config).await?;
545 return Ok(true);
546 }
547
548 self.try_active_peer_alternative_addresses(
554 peer_config,
555 peer_identity,
556 allow_same_path_refresh,
557 )
558 .await
559 }
560
561 async fn initiate_peer_connection_inner(
562 &mut self,
563 peer_config: &crate::config::PeerConfig,
564 ) -> Result<(), NodeError> {
565 let peer_identity =
567 PeerIdentity::from_npub(&peer_config.npub).map_err(|e| NodeError::InvalidPeerNpub {
568 npub: peer_config.npub.clone(),
569 reason: e.to_string(),
570 })?;
571
572 let peer_node_addr = *peer_identity.node_addr();
573
574 if self.peers.contains_key(&peer_node_addr) {
576 debug!(
577 npub = %peer_config.npub,
578 "Peer already exists, skipping"
579 );
580 return Ok(());
581 }
582
583 self.try_peer_addresses(peer_config, peer_identity, true)
584 .await
585 }
586
587 fn is_connecting_to_peer(&self, peer_node_addr: &NodeAddr) -> bool {
588 self.connections.values().any(|conn| {
589 conn.expected_identity()
590 .map(|id| id.node_addr() == peer_node_addr)
591 .unwrap_or(false)
592 })
593 }
594
595 fn is_connecting_to_peer_on_path(
596 &self,
597 peer_node_addr: &NodeAddr,
598 transport_id: TransportId,
599 remote_addr: &TransportAddr,
600 ) -> bool {
601 self.connections.values().any(|conn| {
602 conn.expected_identity()
603 .map(|id| id.node_addr() == peer_node_addr)
604 .unwrap_or(false)
605 && conn.transport_id() == Some(transport_id)
606 && conn.source_addr() == Some(remote_addr)
607 }) || self.pending_connects.iter().any(|pending| {
608 pending.peer_identity.node_addr() == peer_node_addr
609 && pending.transport_id == transport_id
610 && &pending.remote_addr == remote_addr
611 })
612 }
613
614 pub(in crate::node) fn should_warm_auto_connect_session(
615 &self,
616 peer_node_addr: &NodeAddr,
617 ) -> bool {
618 if self
619 .peers
620 .get(peer_node_addr)
621 .is_some_and(|peer| peer.can_send())
622 || self
623 .sessions
624 .get(peer_node_addr)
625 .is_some_and(|entry| entry.is_established())
626 {
627 return false;
628 }
629
630 self.config.peers().iter().any(|peer| {
631 peer.is_auto_connect()
632 && PeerIdentity::from_npub(&peer.npub)
633 .map(|identity| identity.node_addr() == peer_node_addr)
634 .unwrap_or(false)
635 })
636 }
637
638 pub(in crate::node) async fn warm_auto_connect_graph_sessions(&mut self) -> usize {
639 if !self.peers.values().any(|peer| peer.can_send()) {
640 return 0;
641 }
642
643 let mut budget = self.graph_session_warmup_budget();
644 if budget == 0 {
645 return 0;
646 }
647
648 let peer_identities: Vec<_> = self
649 .config
650 .auto_connect_peers()
651 .filter_map(|peer| PeerIdentity::from_npub(&peer.npub).ok())
652 .collect();
653
654 let mut warmed = 0;
655 for identity in peer_identities {
656 if budget == 0 {
657 break;
658 }
659
660 let peer_node_addr = *identity.node_addr();
661 if peer_node_addr == *self.identity.node_addr()
662 || !self.should_warm_auto_connect_session(&peer_node_addr)
663 || self
664 .sessions
665 .get(&peer_node_addr)
666 .is_some_and(|entry| entry.is_initiating())
667 {
668 continue;
669 }
670
671 self.register_identity(peer_node_addr, identity.pubkey_full());
672
673 if self.find_next_hop(&peer_node_addr).is_some() {
674 match self
675 .initiate_session(peer_node_addr, identity.pubkey_full())
676 .await
677 {
678 Ok(()) => {
679 warmed += 1;
680 budget = budget.saturating_sub(1);
681 debug!(
682 peer = %self.peer_display_name(&peer_node_addr),
683 "Warmed auto-connect peer session over existing FIPS graph"
684 );
685 }
686 Err(NodeError::SendFailed { node_addr, reason })
687 if node_addr == peer_node_addr && reason == "no route to destination" =>
688 {
689 self.maybe_initiate_lookup(&peer_node_addr).await;
690 warmed += 1;
691 budget = budget.saturating_sub(1);
692 }
693 Err(err) => {
694 debug!(
695 peer = %self.peer_display_name(&peer_node_addr),
696 error = %err,
697 "Failed to warm auto-connect peer session"
698 );
699 }
700 }
701 } else {
702 self.maybe_initiate_lookup(&peer_node_addr).await;
703 warmed += 1;
704 budget = budget.saturating_sub(1);
705 }
706 }
707
708 warmed
709 }
710
711 pub(in crate::node) fn graph_session_warmup_budget(&self) -> usize {
712 let max_destinations = self.config.node.session.pending_max_destinations;
713 if max_destinations == 0 {
714 return 0;
715 }
716
717 let pending_sessions = self
718 .sessions
719 .values()
720 .filter(|entry| !entry.is_established())
721 .count();
722 let pending_total = pending_sessions.saturating_add(self.pending_lookups.len());
723 max_destinations
724 .saturating_sub(pending_total)
725 .min(MAX_AUTO_CONNECT_GRAPH_WARMUPS_PER_TICK)
726 }
727
728 fn outbound_handshake_slots(&self) -> usize {
729 let used = self
730 .connections
731 .len()
732 .saturating_add(self.pending_connects.len());
733 if self.max_connections == 0 {
734 usize::MAX
735 } else {
736 self.max_connections.saturating_sub(used)
737 }
738 }
739
740 fn outbound_link_slots(&self) -> usize {
741 if self.max_links == 0 {
742 usize::MAX
743 } else {
744 self.max_links.saturating_sub(self.links.len())
745 }
746 }
747
748 fn path_candidate_attempt_budget(&self, peer_node_addr: &NodeAddr) -> usize {
749 if !self.peers.contains_key(peer_node_addr)
750 && self.max_peers > 0
751 && self.peers.len() >= self.max_peers
752 {
753 return 0;
754 }
755
756 let in_flight_for_peer = self
757 .connections
758 .values()
759 .filter(|conn| {
760 conn.expected_identity()
761 .map(|id| id.node_addr() == peer_node_addr)
762 .unwrap_or(false)
763 })
764 .count()
765 .saturating_add(
766 self.pending_connects
767 .iter()
768 .filter(|pending| pending.peer_identity.node_addr() == peer_node_addr)
769 .count(),
770 );
771
772 self.outbound_handshake_slots()
773 .min(self.outbound_link_slots())
774 .min(MAX_PARALLEL_PATH_CANDIDATES_PER_PEER.saturating_sub(in_flight_for_peer))
775 }
776
777 fn discovery_connect_budget(&self) -> usize {
778 self.outbound_handshake_slots()
779 .min(self.outbound_link_slots())
780 .min(MAX_DISCOVERY_CONNECTS_PER_TICK)
781 }
782
783 fn find_udp_transport_for_remote_addr(
790 &self,
791 remote_addr: SocketAddr,
792 ) -> Option<(TransportId, SocketAddr)> {
793 self.transports
794 .iter()
795 .filter(|(id, handle)| {
796 handle.transport_type().name == "udp"
797 && handle.is_operational()
798 && !self.bootstrap_transports.contains(id)
799 })
800 .filter_map(|(id, handle)| {
801 let local_addr = handle.local_addr()?;
802 socket_addr_families_compatible(local_addr, remote_addr)
803 .then_some((*id, local_addr))
804 })
805 .min_by_key(|(id, _)| id.as_u32())
806 }
807
808 pub(super) fn transport_discovery_candidate(
809 &self,
810 discovered_transport_id: TransportId,
811 discovered_addr: TransportAddr,
812 ) -> Option<(TransportId, TransportAddr, &'static str)> {
813 let transport = self.transports.get(&discovered_transport_id)?;
814 let transport_name = transport.transport_type().name;
815
816 if transport_name != "udp" {
817 return Some((discovered_transport_id, discovered_addr, transport_name));
818 }
819
820 let Some(remote_socket_addr) = discovered_addr
821 .as_str()
822 .and_then(|addr| addr.parse::<SocketAddr>().ok())
823 else {
824 if self.bootstrap_transports.contains(&discovered_transport_id) {
825 debug!(
826 transport_id = %discovered_transport_id,
827 remote_addr = %discovered_addr,
828 "transport discovery: skip non-numeric UDP address from bootstrap transport"
829 );
830 return None;
831 }
832 return Some((discovered_transport_id, discovered_addr, transport_name));
833 };
834
835 let Some((transport_id, local_addr)) =
836 self.find_udp_transport_for_remote_addr(remote_socket_addr)
837 else {
838 debug!(
839 transport_id = %discovered_transport_id,
840 remote_addr = %discovered_addr,
841 "transport discovery: skip UDP peer with no compatible local socket"
842 );
843 return None;
844 };
845
846 if transport_id != discovered_transport_id {
847 debug!(
848 discovered_transport_id = %discovered_transport_id,
849 selected_transport_id = %transport_id,
850 local_addr = %local_addr,
851 remote_addr = %remote_socket_addr,
852 "transport discovery: selected compatible UDP transport"
853 );
854 }
855
856 Some((
857 transport_id,
858 TransportAddr::from_socket_addr(remote_socket_addr),
859 transport_name,
860 ))
861 }
862
863 fn peer_address_string_for_transport_candidate(
864 &self,
865 transport_id: TransportId,
866 transport_name: &str,
867 remote_addr: &TransportAddr,
868 ) -> String {
869 #[cfg(not(any(target_os = "linux", target_os = "macos")))]
870 let _ = (transport_id, transport_name);
871
872 #[cfg(any(target_os = "linux", target_os = "macos"))]
873 if transport_name == "ethernet"
874 && remote_addr.as_bytes().len() == 6
875 && let Some(interface) = self
876 .transports
877 .get(&transport_id)
878 .and_then(|transport| transport.interface_name())
879 {
880 let mut mac = [0u8; 6];
881 mac.copy_from_slice(remote_addr.as_bytes());
882 return format!(
883 "{interface}/{}",
884 crate::transport::ethernet::format_mac(&mac)
885 );
886 }
887
888 remote_addr.to_string()
889 }
890
891 fn resolve_peer_address_for_match(
892 &self,
893 candidate: &PeerAddress,
894 ) -> Option<(TransportId, TransportAddr)> {
895 if candidate.transport == "udp" && candidate.addr.eq_ignore_ascii_case("nat") {
896 return None;
897 }
898
899 if candidate.transport == "ethernet" {
900 return self.resolve_ethernet_addr(&candidate.addr).ok();
901 }
902
903 if candidate.transport == "ble" {
904 #[cfg(bluer_available)]
905 {
906 return self.resolve_ble_addr(&candidate.addr).ok();
907 }
908 #[cfg(not(bluer_available))]
909 {
910 return None;
911 }
912 }
913
914 let transport_id = if candidate.transport == "udp"
915 && let Ok(remote_socket_addr) = candidate.addr.parse::<SocketAddr>()
916 {
917 self.find_udp_transport_for_remote_addr(remote_socket_addr)
918 .map(|(id, _)| id)?
919 } else {
920 self.find_transport_for_type(&candidate.transport)?
921 };
922
923 Some((transport_id, TransportAddr::from_string(&candidate.addr)))
924 }
925
926 pub(super) async fn initiate_connection(
937 &mut self,
938 transport_id: TransportId,
939 remote_addr: TransportAddr,
940 peer_identity: PeerIdentity,
941 ) -> Result<(), NodeError> {
942 let peer_node_addr = *peer_identity.node_addr();
943
944 if self.is_connecting_to_peer_on_path(&peer_node_addr, transport_id, &remote_addr) {
945 debug!(
946 peer = %self.peer_display_name(&peer_node_addr),
947 transport_id = %transport_id,
948 remote_addr = %remote_addr,
949 "Connection already in progress for candidate path"
950 );
951 return Ok(());
952 }
953
954 if self.outbound_handshake_slots() == 0 {
955 return Err(NodeError::MaxConnectionsExceeded {
956 max: self.max_connections,
957 });
958 }
959
960 if self.outbound_link_slots() == 0 {
961 return Err(NodeError::MaxLinksExceeded {
962 max: self.max_links,
963 });
964 }
965
966 if !self.peers.contains_key(&peer_node_addr)
967 && self.max_peers > 0
968 && self.peers.len() >= self.max_peers
969 {
970 return Err(NodeError::MaxPeersExceeded {
971 max: self.max_peers,
972 });
973 }
974
975 self.authorize_peer(
976 &peer_identity,
977 PeerAclContext::OutboundConnect,
978 transport_id,
979 &remote_addr,
980 )?;
981
982 let is_connection_oriented = self
983 .transports
984 .get(&transport_id)
985 .map(|t| t.transport_type().connection_oriented)
986 .unwrap_or(false);
987
988 let link_id = self.allocate_link_id();
990
991 let link = if is_connection_oriented {
992 Link::new(
993 link_id,
994 transport_id,
995 remote_addr.clone(),
996 LinkDirection::Outbound,
997 Duration::from_millis(self.config.node.base_rtt_ms),
998 )
999 } else {
1000 Link::connectionless(
1001 link_id,
1002 transport_id,
1003 remote_addr.clone(),
1004 LinkDirection::Outbound,
1005 Duration::from_millis(self.config.node.base_rtt_ms),
1006 )
1007 };
1008
1009 self.links.insert(link_id, link);
1010
1011 self.addr_to_link
1013 .insert((transport_id, remote_addr.clone()), link_id);
1014
1015 if is_connection_oriented {
1016 if let Some(transport) = self.transports.get(&transport_id) {
1018 match transport.connect(&remote_addr).await {
1019 Ok(()) => {
1020 debug!(
1021 peer = %self.peer_display_name(&peer_node_addr),
1022 transport_id = %transport_id,
1023 remote_addr = %remote_addr,
1024 link_id = %link_id,
1025 "Transport connect initiated (non-blocking)"
1026 );
1027 self.pending_connects.push(super::PendingConnect {
1028 link_id,
1029 transport_id,
1030 remote_addr,
1031 peer_identity,
1032 });
1033 }
1034 Err(e) => {
1035 self.links.remove(&link_id);
1037 self.addr_to_link.remove(&(transport_id, remote_addr));
1038 return Err(NodeError::from_transport_error(e));
1039 }
1040 }
1041 }
1042 Ok(())
1043 } else {
1044 self.start_handshake(link_id, transport_id, remote_addr, peer_identity)
1046 .await
1047 }
1048 }
1049
1050 pub(super) async fn start_handshake(
1055 &mut self,
1056 link_id: LinkId,
1057 transport_id: TransportId,
1058 remote_addr: TransportAddr,
1059 peer_identity: PeerIdentity,
1060 ) -> Result<(), NodeError> {
1061 let peer_node_addr = *peer_identity.node_addr();
1062
1063 let current_time_ms = Self::now_ms();
1065 let mut connection = PeerConnection::outbound(link_id, peer_identity, current_time_ms);
1066
1067 let our_index = match self.index_allocator.allocate() {
1069 Ok(idx) => idx,
1070 Err(e) => {
1071 self.links.remove(&link_id);
1073 self.addr_to_link.remove(&(transport_id, remote_addr));
1074 return Err(NodeError::IndexAllocationFailed(e.to_string()));
1075 }
1076 };
1077
1078 let our_keypair = self.identity.keypair();
1080 let noise_msg1 =
1081 match connection.start_handshake(our_keypair, self.startup_epoch, current_time_ms) {
1082 Ok(msg) => msg,
1083 Err(e) => {
1084 let _ = self.index_allocator.free(our_index);
1086 self.links.remove(&link_id);
1087 self.addr_to_link.remove(&(transport_id, remote_addr));
1088 return Err(NodeError::HandshakeFailed(e.to_string()));
1089 }
1090 };
1091
1092 connection.set_our_index(our_index);
1094 connection.set_transport_id(transport_id);
1095 connection.set_source_addr(remote_addr.clone());
1096
1097 let wire_msg1 = build_msg1(our_index, &noise_msg1);
1099
1100 debug!(
1101 peer = %self.peer_display_name(&peer_node_addr),
1102 transport_id = %transport_id,
1103 remote_addr = %remote_addr,
1104 link_id = %link_id,
1105 our_index = %our_index,
1106 "Connection initiated"
1107 );
1108
1109 let resend_interval = self.config.node.rate_limit.handshake_resend_interval_ms;
1111 connection.set_handshake_msg1(wire_msg1.clone(), current_time_ms + resend_interval);
1112
1113 self.pending_outbound
1115 .insert((transport_id, our_index.as_u32()), link_id);
1116 self.connections.insert(link_id, connection);
1117
1118 let send_result = match self.transports.get(&transport_id) {
1123 Some(transport) => Some(transport.send(&remote_addr, &wire_msg1).await),
1124 None => None,
1125 };
1126 match send_result {
1127 Some(send_result) => {
1128 self.note_local_send_outcome(&send_result);
1129 match send_result {
1130 Ok(bytes) => {
1131 debug!(
1132 link_id = %link_id,
1133 our_index = %our_index,
1134 bytes,
1135 "Sent Noise handshake message 1 (wire format)"
1136 );
1137 }
1138 Err(e) => {
1139 warn!(
1140 link_id = %link_id,
1141 transport_id = %transport_id,
1142 remote_addr = %remote_addr,
1143 our_index = %our_index,
1144 error = %e,
1145 "Failed to send handshake message"
1146 );
1147 self.pending_outbound
1148 .remove(&(transport_id, our_index.as_u32()));
1149 self.connections.remove(&link_id);
1150 self.links.remove(&link_id);
1151 self.addr_to_link
1152 .remove(&(transport_id, remote_addr.clone()));
1153 let _ = self.index_allocator.free(our_index);
1154 return Err(NodeError::from_transport_error(e));
1155 }
1156 }
1157 }
1158 None => {
1159 self.pending_outbound
1160 .remove(&(transport_id, our_index.as_u32()));
1161 self.connections.remove(&link_id);
1162 self.links.remove(&link_id);
1163 self.addr_to_link
1164 .remove(&(transport_id, remote_addr.clone()));
1165 let _ = self.index_allocator.free(our_index);
1166 return Err(NodeError::TransportError(format!(
1167 "transport {transport_id} disappeared before first handshake send"
1168 )));
1169 }
1170 }
1171
1172 Ok(())
1173 }
1174
1175 pub(super) async fn poll_transport_discovery(&mut self) {
1181 let mut to_connect = Vec::new();
1183 let mut queued_per_peer: HashMap<NodeAddr, usize> = HashMap::new();
1184 let mut connect_budget = self.discovery_connect_budget();
1185 let mut skipped_budget = 0usize;
1186
1187 for transport in self.transports.values() {
1188 if !transport.is_operational() {
1189 continue;
1190 }
1191 if !transport.auto_connect() {
1192 let _ = transport.discover();
1194 continue;
1195 }
1196 let discovered = match transport.discover() {
1197 Ok(peers) => peers,
1198 Err(_) => continue,
1199 };
1200 for peer in discovered {
1201 let discovered_transport_id = peer.transport_id;
1202 let pubkey = match peer.pubkey_hint {
1203 Some(pk) => pk,
1204 None => continue,
1205 };
1206 let identity = PeerIdentity::from_pubkey(pubkey);
1207 let node_addr = *identity.node_addr();
1208
1209 if node_addr == *self.identity.node_addr() {
1211 continue;
1212 }
1213
1214 let Some((candidate_transport_id, remote_addr, transport_name)) =
1215 self.transport_discovery_candidate(discovered_transport_id, peer.addr)
1216 else {
1217 continue;
1218 };
1219
1220 if self.peers.contains_key(&node_addr) {
1221 let candidate = PeerAddress::new(
1222 transport_name,
1223 self.peer_address_string_for_transport_candidate(
1224 candidate_transport_id,
1225 transport_name,
1226 &remote_addr,
1227 ),
1228 );
1229 if self.active_peer_candidate_is_fresh_enough_to_skip(
1230 &node_addr,
1231 std::slice::from_ref(&candidate),
1232 ) {
1233 continue;
1234 }
1235 if self.is_connecting_to_peer_on_path(
1236 &node_addr,
1237 candidate_transport_id,
1238 &remote_addr,
1239 ) {
1240 continue;
1241 }
1242 let queued_for_peer = queued_per_peer.get(&node_addr).copied().unwrap_or(0);
1243 if connect_budget == 0
1244 || self
1245 .path_candidate_attempt_budget(&node_addr)
1246 .saturating_sub(queued_for_peer)
1247 == 0
1248 {
1249 skipped_budget = skipped_budget.saturating_add(1);
1250 continue;
1251 }
1252 to_connect.push((candidate_transport_id, remote_addr, identity, true));
1253 *queued_per_peer.entry(node_addr).or_default() += 1;
1254 connect_budget = connect_budget.saturating_sub(1);
1255 continue;
1256 }
1257
1258 if self.is_connecting_to_peer_on_path(
1259 &node_addr,
1260 candidate_transport_id,
1261 &remote_addr,
1262 ) {
1263 continue;
1264 }
1265
1266 let queued_for_peer = queued_per_peer.get(&node_addr).copied().unwrap_or(0);
1267 if connect_budget == 0
1268 || self
1269 .path_candidate_attempt_budget(&node_addr)
1270 .saturating_sub(queued_for_peer)
1271 == 0
1272 {
1273 skipped_budget = skipped_budget.saturating_add(1);
1274 continue;
1275 }
1276 to_connect.push((candidate_transport_id, remote_addr, identity, false));
1277 *queued_per_peer.entry(node_addr).or_default() += 1;
1278 connect_budget = connect_budget.saturating_sub(1);
1279 }
1280 }
1281
1282 if skipped_budget > 0 {
1283 debug!(
1284 skipped = skipped_budget,
1285 queued = to_connect.len(),
1286 "Transport discovery connect budget exhausted"
1287 );
1288 }
1289
1290 for (transport_id, remote_addr, identity, active_refresh) in to_connect {
1291 info!(
1292 peer = %self.peer_display_name(identity.node_addr()),
1293 transport_id = %transport_id,
1294 remote_addr = %remote_addr,
1295 active_refresh,
1296 "Auto-connecting to discovered peer"
1297 );
1298 if let Err(e) = self
1299 .initiate_connection(transport_id, remote_addr, identity)
1300 .await
1301 {
1302 warn!(error = %e, "Failed to auto-connect to discovered peer");
1303 }
1304 }
1305 }
1306
1307 pub(super) async fn poll_nostr_discovery(&mut self) {
1308 let Some(bootstrap) = self.nostr_discovery.clone() else {
1309 return;
1310 };
1311
1312 bootstrap.set_outbound_admission(self.open_discovery_outbound_admission_check());
1313 bootstrap.set_direct_refresh_admission(self.outbound_direct_refresh_admission_check());
1314
1315 self.drain_nostr_mesh_signals(&bootstrap).await;
1316
1317 for event in bootstrap.drain_events().await {
1318 match event {
1319 BootstrapEvent::Established { traversal } => {
1320 let active_refresh = PeerIdentity::from_npub(&traversal.peer_npub)
1321 .ok()
1322 .is_some_and(|identity| self.peers.contains_key(identity.node_addr()));
1323 let admission_allowed = if active_refresh {
1324 self.outbound_direct_refresh_admission_check()
1325 } else {
1326 self.outbound_admission_check()
1327 };
1328 if !admission_allowed {
1329 debug!(
1330 peer_npub = %traversal.peer_npub,
1331 peers = self.peers.len(),
1332 max_peers = self.max_peers,
1333 active_refresh,
1334 "Dropping established NAT traversal: at capacity"
1335 );
1336 continue;
1337 }
1338 let peer_npub = traversal.peer_npub.clone();
1339 match self.adopt_established_traversal(traversal).await {
1340 Ok(_) => {
1341 info!(peer_npub = %peer_npub, "Adopted NAT traversal socket");
1342 }
1343 Err(err) => {
1344 warn!(peer_npub = %peer_npub, error = %err, "Failed to adopt NAT traversal");
1345 if let Ok(peer_identity) = PeerIdentity::from_npub(&peer_npub) {
1346 self.schedule_retry(*peer_identity.node_addr(), Self::now_ms());
1347 }
1348 }
1349 }
1350 }
1351 BootstrapEvent::Failed {
1352 peer_config,
1353 reason,
1354 } => {
1355 let peer_identity = match PeerIdentity::from_npub(&peer_config.npub) {
1356 Ok(identity) => identity,
1357 Err(_) => continue,
1358 };
1359 let node_addr = *peer_identity.node_addr();
1360 let now_ms = Self::now_ms();
1361 if self.peers.contains_key(&node_addr) {
1362 if self.active_peer_should_keep_direct_retry(&node_addr, &peer_config) {
1363 let decision =
1364 bootstrap.record_traversal_failure(&peer_config.npub, now_ms);
1365 if decision.should_warn {
1366 warn!(
1367 npub = %peer_config.npub,
1368 error = %reason,
1369 consecutive_failures = decision.consecutive_failures,
1370 cooldown_secs = decision
1371 .cooldown_until_ms
1372 .map(|t| t.saturating_sub(now_ms) / 1000),
1373 "Direct-path NAT traversal upgrade failed"
1374 );
1375 } else {
1376 debug!(
1377 npub = %peer_config.npub,
1378 error = %reason,
1379 consecutive_failures = decision.consecutive_failures,
1380 "Direct-path NAT traversal upgrade failed (suppressed by warn-rate-limit)"
1381 );
1382 }
1383 if decision.crossed_threshold {
1384 bootstrap
1385 .request_advert_stale_check(peer_config.npub.clone())
1386 .await;
1387 }
1388 self.schedule_link_dead_reprobe(node_addr, now_ms);
1389 } else {
1390 debug!(
1391 npub = %peer_config.npub,
1392 error = %reason,
1393 "Ignoring failed NAT traversal for already-connected peer on fresh direct path"
1394 );
1395 }
1396 continue;
1397 }
1398 if self.is_connecting_to_peer(&node_addr) {
1399 debug!(
1400 npub = %peer_config.npub,
1401 error = %reason,
1402 "Ignoring failed NAT traversal while peer handshake is already in progress"
1403 );
1404 continue;
1405 }
1406
1407 let decision = bootstrap.record_traversal_failure(&peer_config.npub, now_ms);
1408 if decision.should_warn {
1409 warn!(
1410 npub = %peer_config.npub,
1411 error = %reason,
1412 consecutive_failures = decision.consecutive_failures,
1413 cooldown_secs = decision
1414 .cooldown_until_ms
1415 .map(|t| t.saturating_sub(now_ms) / 1000),
1416 "NAT traversal failed"
1417 );
1418 } else {
1419 debug!(
1420 npub = %peer_config.npub,
1421 error = %reason,
1422 consecutive_failures = decision.consecutive_failures,
1423 "NAT traversal failed (suppressed by warn-rate-limit)"
1424 );
1425 }
1426
1427 if decision.crossed_threshold {
1431 bootstrap
1432 .request_advert_stale_check(peer_config.npub.clone())
1433 .await;
1434 }
1435
1436 if self
1437 .try_peer_addresses(&peer_config, peer_identity, false)
1438 .await
1439 .is_ok()
1440 {
1441 continue;
1442 }
1443
1444 self.schedule_retry(node_addr, now_ms);
1445 if self.nostr_cooldown_applies_to_peer_config(&peer_config)
1446 && let Some(cooldown_until_ms) = decision.cooldown_until_ms
1447 && let Some(state) = self.retry_pending.get_mut(&node_addr)
1448 {
1449 state.retry_after_ms = state.retry_after_ms.max(cooldown_until_ms);
1453 }
1454 }
1455 }
1456 }
1457
1458 self.maybe_run_startup_open_discovery_sweep(&bootstrap)
1459 .await;
1460 self.queue_open_discovery_retries(&bootstrap).await;
1461 self.queue_active_fallback_direct_retries(&bootstrap);
1462
1463 if let Err(err) = self.refresh_overlay_advert(&bootstrap).await {
1467 debug!(error = %err, "Failed to refresh local Nostr overlay advert");
1468 }
1469 }
1470
1471 async fn drain_nostr_mesh_signals(&mut self, bootstrap: &std::sync::Arc<NostrDiscovery>) {
1472 let mut deferred = Vec::new();
1473
1474 for signal in bootstrap.drain_mesh_signals().await {
1475 let (peer_npub, msg_type, payload) = match &signal {
1476 MeshTraversalSignal::Offer { peer_npub, offer } => {
1477 let payload = match serde_json::to_vec(&offer) {
1478 Ok(payload) => payload,
1479 Err(error) => {
1480 debug!(
1481 peer = %peer_npub,
1482 error = %error,
1483 "Failed to encode mesh traversal offer"
1484 );
1485 continue;
1486 }
1487 };
1488 (
1489 peer_npub.clone(),
1490 SessionMessageType::TraversalOffer.to_byte(),
1491 payload,
1492 )
1493 }
1494 MeshTraversalSignal::Answer { peer_npub, answer } => {
1495 let payload = match serde_json::to_vec(&answer) {
1496 Ok(payload) => payload,
1497 Err(error) => {
1498 debug!(
1499 peer = %peer_npub,
1500 error = %error,
1501 "Failed to encode mesh traversal answer"
1502 );
1503 continue;
1504 }
1505 };
1506 (
1507 peer_npub.clone(),
1508 SessionMessageType::TraversalAnswer.to_byte(),
1509 payload,
1510 )
1511 }
1512 };
1513
1514 let peer_identity = match PeerIdentity::from_npub(&peer_npub) {
1515 Ok(identity) => identity,
1516 Err(error) => {
1517 debug!(
1518 peer = %peer_npub,
1519 error = %error,
1520 "Cannot send mesh traversal signal to invalid peer npub"
1521 );
1522 continue;
1523 }
1524 };
1525 let peer_addr = *peer_identity.node_addr();
1526 match self
1527 .mesh_signal_session_action(peer_addr, peer_identity.pubkey_full())
1528 .await
1529 {
1530 MeshSignalSessionAction::Send => {}
1531 MeshSignalSessionAction::Defer => {
1532 deferred.push(signal);
1533 continue;
1534 }
1535 MeshSignalSessionAction::Drop => continue,
1536 }
1537
1538 if let Err(error) = self.send_session_msg(&peer_addr, msg_type, &payload).await {
1539 debug!(
1540 peer = %self.peer_display_name(&peer_addr),
1541 error = %error,
1542 "Failed to send mesh traversal signal"
1543 );
1544 }
1545 }
1546
1547 for signal in deferred {
1548 bootstrap.requeue_mesh_signal(signal);
1549 }
1550 }
1551
1552 async fn mesh_signal_session_action(
1553 &mut self,
1554 peer_addr: NodeAddr,
1555 peer_pubkey: PublicKey,
1556 ) -> MeshSignalSessionAction {
1557 if let Some(entry) = self.sessions.get(&peer_addr) {
1558 if entry.is_established() {
1559 return MeshSignalSessionAction::Send;
1560 }
1561 if entry.is_initiating() || entry.is_awaiting_msg3() {
1562 debug!(
1563 peer = %self.peer_display_name(&peer_addr),
1564 "Deferring mesh traversal signal until end-to-end session is established"
1565 );
1566 return MeshSignalSessionAction::Defer;
1567 }
1568 }
1569
1570 if self.find_next_hop(&peer_addr).is_none() {
1571 debug!(
1572 peer = %self.peer_display_name(&peer_addr),
1573 "Cannot warm mesh traversal signal session without a FIPS route"
1574 );
1575 self.maybe_initiate_lookup(&peer_addr).await;
1576 return MeshSignalSessionAction::Drop;
1577 }
1578
1579 self.register_identity(peer_addr, peer_pubkey);
1580 match self.initiate_session(peer_addr, peer_pubkey).await {
1581 Ok(()) => {
1582 debug!(
1583 peer = %self.peer_display_name(&peer_addr),
1584 "Warming end-to-end session for mesh traversal signal"
1585 );
1586 MeshSignalSessionAction::Defer
1587 }
1588 Err(NodeError::SendFailed { node_addr, reason })
1589 if node_addr == peer_addr && reason == "no route to destination" =>
1590 {
1591 debug!(
1592 peer = %self.peer_display_name(&peer_addr),
1593 "Cannot warm mesh traversal signal session without a FIPS route"
1594 );
1595 self.maybe_initiate_lookup(&peer_addr).await;
1596 MeshSignalSessionAction::Drop
1597 }
1598 Err(error) => {
1599 debug!(
1600 peer = %self.peer_display_name(&peer_addr),
1601 error = %error,
1602 "Failed to warm end-to-end session for mesh traversal signal"
1603 );
1604 MeshSignalSessionAction::Drop
1605 }
1606 }
1607 }
1608
1609 pub(super) fn lan_discovery_scope(&self) -> Option<String> {
1615 if let Some(scope) = self.config.node.discovery.lan.scope.as_deref() {
1616 let scope = scope.trim();
1617 if !scope.is_empty() {
1618 return Some(scope.to_string());
1619 }
1620 }
1621
1622 let app = self.config.node.discovery.nostr.app.trim();
1623 if app.is_empty() {
1624 return None;
1625 }
1626 if let Some(rest) = app.strip_prefix("fips-overlay-v1:") {
1627 let scope = rest.trim();
1628 if scope.is_empty() {
1629 None
1630 } else {
1631 Some(scope.to_string())
1632 }
1633 } else {
1634 Some(app.to_string())
1635 }
1636 }
1637
1638 pub(super) fn start_local_instance_discovery(&mut self) {
1639 if !self.config.node.discovery.local.enabled {
1640 return;
1641 }
1642 let Some(scope) = self.lan_discovery_scope() else {
1643 debug!("local instance discovery not started: no discovery scope");
1644 return;
1645 };
1646 let now_ms = Self::now_ms();
1647 match crate::discovery::local::LocalInstanceRegistry::new(
1648 self.identity.npub(),
1649 scope,
1650 &self.config.node.discovery.local,
1651 now_ms,
1652 ) {
1653 Ok(registry) => {
1654 self.local_instance_registry = Some(registry);
1655 self.local_instance_started_at_ms = Some(now_ms);
1656 self.last_local_instance_publish_ms = None;
1657 self.last_local_instance_scan_ms = None;
1658 self.publish_local_instance_record(now_ms);
1659 info!("Same-host FIPS instance discovery enabled");
1660 }
1661 Err(crate::discovery::local::LocalInstanceRegistryError::Disabled) => {
1662 debug!("same-host FIPS instance discovery disabled");
1663 }
1664 Err(err) => {
1665 debug!(error = %err, "same-host FIPS instance discovery not started");
1666 }
1667 }
1668 }
1669
1670 fn local_instance_contacts(&self) -> Vec<crate::discovery::local::LocalInstanceContact> {
1671 let mut contacts = Vec::new();
1672 for handle in self.transports.values() {
1673 if !handle.is_operational() || !handle.accept_connections() {
1674 continue;
1675 }
1676 let transport = handle.transport_type().name;
1677 if transport != "udp" && transport != "tcp" {
1678 continue;
1679 }
1680 let Some(local_addr) = handle.local_addr() else {
1681 continue;
1682 };
1683 let Some(contact) =
1684 crate::discovery::local::contact_for_transport_addr(transport, local_addr)
1685 else {
1686 continue;
1687 };
1688 if contacts
1689 .iter()
1690 .any(|existing: &crate::discovery::local::LocalInstanceContact| {
1691 existing.transport == contact.transport && existing.addr == contact.addr
1692 })
1693 {
1694 continue;
1695 }
1696 contacts.push(contact);
1697 }
1698 contacts
1699 }
1700
1701 fn publish_local_instance_record(&mut self, now_ms: u64) {
1702 let Some(registry) = self.local_instance_registry.clone() else {
1703 return;
1704 };
1705 let contacts = self.local_instance_contacts();
1706 match registry.publish(contacts, now_ms) {
1707 Ok(()) => {
1708 self.last_local_instance_publish_ms = Some(now_ms);
1709 }
1710 Err(err) => {
1711 debug!(error = %err, "failed to publish same-host FIPS instance record");
1712 }
1713 }
1714 }
1715
1716 fn maybe_publish_local_instance_record(&mut self, now_ms: u64) {
1717 if self.local_instance_registry.is_none() {
1718 return;
1719 }
1720 let interval_ms = self.config.node.discovery.local.publish_interval_ms();
1721 let due = self
1722 .last_local_instance_publish_ms
1723 .map(|last| now_ms.saturating_sub(last) >= interval_ms)
1724 .unwrap_or(true);
1725 if due {
1726 self.publish_local_instance_record(now_ms);
1727 }
1728 }
1729
1730 fn local_instance_scan_due(&self, now_ms: u64) -> bool {
1731 if self.local_instance_registry.is_none() {
1732 return false;
1733 }
1734 let cfg = &self.config.node.discovery.local;
1735 let interval_ms = if self
1736 .local_instance_started_at_ms
1737 .map(|started| now_ms.saturating_sub(started) <= cfg.startup_scan_duration_ms())
1738 .unwrap_or(false)
1739 {
1740 cfg.startup_scan_interval_ms()
1741 } else {
1742 cfg.scan_interval_ms()
1743 };
1744 self.last_local_instance_scan_ms
1745 .map(|last| now_ms.saturating_sub(last) >= interval_ms)
1746 .unwrap_or(true)
1747 }
1748
1749 fn local_instance_peer_allowed(&self, identity: &PeerIdentity) -> bool {
1750 if self.config.peers().iter().any(|peer| {
1751 PeerIdentity::from_npub(&peer.npub)
1752 .map(|configured| configured.node_addr() == identity.node_addr())
1753 .unwrap_or(false)
1754 }) {
1755 return true;
1756 }
1757 self.config.node.discovery.nostr.policy == NostrDiscoveryPolicy::Open
1758 }
1759
1760 fn local_instance_peer_addresses(
1761 &self,
1762 record: &crate::discovery::local::LocalInstanceRecord,
1763 ) -> Vec<PeerAddress> {
1764 let mut addresses = Vec::new();
1765 for contact in &record.contacts {
1766 if contact.transport != "udp" && contact.transport != "tcp" {
1767 continue;
1768 }
1769 let Ok(socket_addr) = contact.addr.parse::<SocketAddr>() else {
1770 debug!(
1771 npub = %record.npub,
1772 transport = %contact.transport,
1773 addr = %contact.addr,
1774 "local instance discovery: skip non-socket contact"
1775 );
1776 continue;
1777 };
1778 if !socket_addr.ip().is_loopback() {
1779 debug!(
1780 npub = %record.npub,
1781 addr = %contact.addr,
1782 "local instance discovery: skip non-loopback contact"
1783 );
1784 continue;
1785 }
1786 let address =
1787 PeerAddress::with_priority(contact.transport.clone(), contact.addr.clone(), 10)
1788 .with_seen_at_ms(record.updated_at_ms);
1789 if addresses.iter().any(|existing: &PeerAddress| {
1790 existing.transport == address.transport && existing.addr == address.addr
1791 }) {
1792 continue;
1793 }
1794 addresses.push(address);
1795 }
1796 addresses
1797 }
1798
1799 pub(super) async fn poll_local_instance_discovery(&mut self) {
1803 let Some(registry) = self.local_instance_registry.clone() else {
1804 return;
1805 };
1806 let now_ms = Self::now_ms();
1807 self.maybe_publish_local_instance_record(now_ms);
1808 if !self.local_instance_scan_due(now_ms) {
1809 return;
1810 }
1811 self.last_local_instance_scan_ms = Some(now_ms);
1812
1813 let records = match registry.scan(now_ms, self.config.node.discovery.local.stale_after_ms())
1814 {
1815 Ok(records) => records,
1816 Err(err) => {
1817 debug!(error = %err, "same-host FIPS instance scan failed");
1818 return;
1819 }
1820 };
1821 if records.is_empty() {
1822 return;
1823 }
1824
1825 let mut connect_budget = self.discovery_connect_budget();
1826 let mut skipped_budget = 0usize;
1827 for record in records {
1828 let identity = match PeerIdentity::from_npub(&record.npub) {
1829 Ok(identity) => identity,
1830 Err(err) => {
1831 debug!(npub = %record.npub, error = %err, "local instance discovery: skip bad npub");
1832 continue;
1833 }
1834 };
1835 let peer_node_addr = *identity.node_addr();
1836 if peer_node_addr == *self.identity.node_addr() {
1837 continue;
1838 }
1839 if !self.local_instance_peer_allowed(&identity) {
1840 debug!(
1841 npub = %identity.short_npub(),
1842 "local instance discovery: skip unconfigured peer"
1843 );
1844 continue;
1845 }
1846
1847 let addresses = self.local_instance_peer_addresses(&record);
1848 if addresses.is_empty() {
1849 continue;
1850 }
1851
1852 if self.peers.contains_key(&peer_node_addr)
1853 && self.active_peer_candidate_is_fresh_enough_to_skip(&peer_node_addr, &addresses)
1854 {
1855 continue;
1856 }
1857
1858 for address in addresses {
1859 let Some((transport_id, remote_addr)) =
1860 self.resolve_peer_address_for_match(&address)
1861 else {
1862 continue;
1863 };
1864 if self.is_connecting_to_peer_on_path(&peer_node_addr, transport_id, &remote_addr) {
1865 continue;
1866 }
1867 if connect_budget == 0 || self.path_candidate_attempt_budget(&peer_node_addr) == 0 {
1868 skipped_budget = skipped_budget.saturating_add(1);
1869 continue;
1870 }
1871 info!(
1872 npub = %identity.short_npub(),
1873 transport = %address.transport,
1874 addr = %address.addr,
1875 "same-host FIPS instance discovery: initiating handshake"
1876 );
1877 if let Err(err) = self
1878 .initiate_connection(transport_id, remote_addr, identity)
1879 .await
1880 {
1881 debug!(
1882 npub = %record.npub,
1883 error = %err,
1884 "same-host FIPS instance discovery: failed to initiate connection"
1885 );
1886 }
1887 connect_budget = connect_budget.saturating_sub(1);
1888 }
1889 }
1890 if skipped_budget > 0 {
1891 debug!(
1892 skipped = skipped_budget,
1893 "same-host FIPS instance discovery connect budget exhausted"
1894 );
1895 }
1896 }
1897
1898 pub(super) async fn poll_lan_discovery(&mut self) {
1905 let Some(runtime) = self.lan_discovery.clone() else {
1906 return;
1907 };
1908 let events = runtime.drain_events().await;
1909 if events.is_empty() {
1910 return;
1911 }
1912 let mut connect_budget = self.discovery_connect_budget();
1913 let mut skipped_budget = 0usize;
1914 for event in events {
1915 let crate::discovery::lan::LanEvent::Discovered(peer) = event;
1916 let Some((transport_id, local_addr)) =
1917 self.find_udp_transport_for_remote_addr(peer.addr)
1918 else {
1919 debug!(
1920 addr = %peer.addr,
1921 "lan: skip discovered peer with no compatible UDP transport"
1922 );
1923 continue;
1924 };
1925 let identity = match crate::PeerIdentity::from_npub(&peer.npub) {
1926 Ok(id) => id,
1927 Err(err) => {
1928 debug!(npub = %peer.npub, error = %err, "lan: skip bad npub");
1929 continue;
1930 }
1931 };
1932 let peer_node_addr = *identity.node_addr();
1933 let remote_addr = crate::transport::TransportAddr::from_string(&peer.addr.to_string());
1934 if self.peers.contains_key(&peer_node_addr) {
1935 let candidate = PeerAddress::new("udp", peer.addr.to_string());
1936 if self.active_peer_candidate_is_fresh_enough_to_skip(
1937 &peer_node_addr,
1938 std::slice::from_ref(&candidate),
1939 ) {
1940 continue;
1941 }
1942 if self.is_connecting_to_peer_on_path(&peer_node_addr, transport_id, &remote_addr) {
1943 continue;
1944 }
1945 if connect_budget == 0 || self.path_candidate_attempt_budget(&peer_node_addr) == 0 {
1946 skipped_budget = skipped_budget.saturating_add(1);
1947 continue;
1948 }
1949 info!(
1950 npub = %identity.short_npub(),
1951 addr = %peer.addr,
1952 local_addr = %local_addr,
1953 "lan: initiating alternate-path handshake to active peer"
1954 );
1955 if let Err(err) = self
1956 .initiate_connection(transport_id, remote_addr, identity)
1957 .await
1958 {
1959 debug!(
1960 npub = %peer.npub,
1961 error = %err,
1962 "lan: failed to initiate active peer alternate-path handshake"
1963 );
1964 }
1965 connect_budget = connect_budget.saturating_sub(1);
1966 continue;
1967 }
1968 if self.is_connecting_to_peer_on_path(&peer_node_addr, transport_id, &remote_addr) {
1969 continue;
1970 }
1971 if connect_budget == 0 || self.path_candidate_attempt_budget(&peer_node_addr) == 0 {
1972 skipped_budget = skipped_budget.saturating_add(1);
1973 continue;
1974 }
1975 info!(
1976 npub = %identity.short_npub(),
1977 addr = %peer.addr,
1978 local_addr = %local_addr,
1979 "lan: initiating handshake to discovered peer"
1980 );
1981 if let Err(err) = self
1982 .initiate_connection(transport_id, remote_addr, identity)
1983 .await
1984 {
1985 debug!(
1986 npub = %peer.npub,
1987 error = %err,
1988 "lan: failed to initiate connection to discovered peer"
1989 );
1990 }
1991 connect_budget = connect_budget.saturating_sub(1);
1992 }
1993 if skipped_budget > 0 {
1994 debug!(
1995 skipped = skipped_budget,
1996 "lan: discovery connect budget exhausted"
1997 );
1998 }
1999 }
2000
2001 pub(super) async fn poll_pending_connects(&mut self) {
2008 if self.pending_connects.is_empty() {
2009 return;
2010 }
2011
2012 let mut completed = Vec::new();
2013
2014 for (i, pending) in self.pending_connects.iter().enumerate() {
2015 let state = if let Some(transport) = self.transports.get(&pending.transport_id) {
2016 transport.connection_state(&pending.remote_addr)
2017 } else {
2018 crate::transport::ConnectionState::Failed("transport removed".into())
2019 };
2020
2021 match state {
2022 crate::transport::ConnectionState::Connected => {
2023 completed.push((i, true, None));
2024 }
2025 crate::transport::ConnectionState::Failed(reason) => {
2026 completed.push((i, false, Some(reason)));
2027 }
2028 crate::transport::ConnectionState::Connecting => {
2029 }
2031 crate::transport::ConnectionState::None => {
2032 completed.push((i, false, Some("no connection attempt found".into())));
2034 }
2035 }
2036 }
2037
2038 for (i, success, reason) in completed.into_iter().rev() {
2040 let pending = self.pending_connects.remove(i);
2041
2042 if success {
2043 if let Some(link) = self.links.get_mut(&pending.link_id) {
2045 link.set_connected();
2046 }
2047
2048 debug!(
2049 peer = %self.peer_display_name(pending.peer_identity.node_addr()),
2050 transport_id = %pending.transport_id,
2051 remote_addr = %pending.remote_addr,
2052 link_id = %pending.link_id,
2053 "Transport connected, starting handshake"
2054 );
2055
2056 if let Err(e) = self
2058 .start_handshake(
2059 pending.link_id,
2060 pending.transport_id,
2061 pending.remote_addr.clone(),
2062 pending.peer_identity,
2063 )
2064 .await
2065 {
2066 warn!(
2067 link_id = %pending.link_id,
2068 error = %e,
2069 "Failed to start handshake after transport connect"
2070 );
2071 self.remove_link(&pending.link_id);
2073 }
2074 } else {
2075 let reason = reason.unwrap_or_default();
2076 warn!(
2077 peer = %self.peer_display_name(pending.peer_identity.node_addr()),
2078 transport_id = %pending.transport_id,
2079 remote_addr = %pending.remote_addr,
2080 link_id = %pending.link_id,
2081 reason = %reason,
2082 "Transport connect failed"
2083 );
2084
2085 self.remove_link(&pending.link_id);
2087 self.links.remove(&pending.link_id);
2088 self.schedule_retry(*pending.peer_identity.node_addr(), Self::now_ms());
2089 }
2090 }
2091 }
2092
2093 pub async fn start(&mut self) -> Result<(), NodeError> {
2100 node_start_debug_log("Node::start begin");
2101 if !self.state.can_start() {
2102 return Err(NodeError::AlreadyStarted);
2103 }
2104 self.state = NodeState::Starting;
2105 node_start_debug_log("Node::start state set to starting");
2106
2107 let packet_buffer_size = self.config.node.buffers.packet_channel;
2109 let (packet_tx, packet_rx) = packet_channel(packet_buffer_size);
2110 self.packet_tx = Some(packet_tx.clone());
2111 self.packet_rx = Some(packet_rx);
2112 node_start_debug_log("Node::start packet channel created");
2113
2114 node_start_debug_log("Node::start create transports begin");
2116 let transport_handles = self.create_transports(&packet_tx).await;
2117 node_start_debug_log(format!(
2118 "Node::start create transports complete count={}",
2119 transport_handles.len()
2120 ));
2121
2122 for mut handle in transport_handles {
2123 let transport_id = handle.transport_id();
2124 let transport_type = handle.transport_type().name;
2125 let name = handle.name().map(|s| s.to_string());
2126
2127 node_start_debug_log(format!(
2128 "Node::start transport start begin id={} type={} name={:?}",
2129 transport_id, transport_type, name
2130 ));
2131 match handle.start().await {
2132 Ok(()) => {
2133 node_start_debug_log(format!(
2134 "Node::start transport start ok id={} type={}",
2135 transport_id, transport_type
2136 ));
2137 self.transports.insert(transport_id, handle);
2138 }
2139 Err(e) => {
2140 node_start_debug_log(format!(
2141 "Node::start transport start error id={} type={} error={}",
2142 transport_id, transport_type, e
2143 ));
2144 if let Some(ref n) = name {
2145 warn!(transport_type, name = %n, error = %e, "Transport failed to start");
2146 } else {
2147 warn!(transport_type, error = %e, "Transport failed to start");
2148 }
2149 }
2150 }
2151 }
2152
2153 if !self.transports.is_empty() {
2154 info!(count = self.transports.len(), "Transports initialized");
2155 }
2156
2157 #[cfg(unix)]
2173 {
2174 if self.config.node.worker_pools_enabled {
2175 node_start_debug_log("Node::start worker pools begin");
2176 let cpu_default = std::thread::available_parallelism()
2177 .map(|n| n.get())
2178 .unwrap_or(1)
2179 .max(1);
2180 let encrypt_worker_count: usize = std::env::var("FIPS_ENCRYPT_WORKERS")
2181 .ok()
2182 .and_then(|s| s.parse().ok())
2183 .unwrap_or(cpu_default)
2184 .max(1);
2185 self.encrypt_workers = Some(super::encrypt_worker::EncryptWorkerPool::spawn(
2186 encrypt_worker_count,
2187 ));
2188 info!(
2189 workers = encrypt_worker_count,
2190 "Spawned FMP-encrypt worker pool"
2191 );
2192
2193 let decrypt_worker_count: usize = std::env::var("FIPS_DECRYPT_WORKERS")
2202 .ok()
2203 .and_then(|s| s.parse().ok())
2204 .unwrap_or(cpu_default);
2205 if decrypt_worker_count == 0 {
2206 info!("FIPS_DECRYPT_WORKERS=0 → in-line decrypt in rx_loop (no worker pool)");
2207 } else {
2208 self.decrypt_workers = Some(super::decrypt_worker::DecryptWorkerPool::spawn(
2209 decrypt_worker_count,
2210 ));
2211 info!(
2212 workers = decrypt_worker_count,
2213 "Spawned FMP+FSP-decrypt worker pool"
2214 );
2215 }
2216 node_start_debug_log("Node::start worker pools complete");
2217 } else {
2218 node_start_debug_log("Node::start worker pools disabled");
2219 info!("FIPS worker pools disabled; using in-line crypto/send path");
2220 }
2221 }
2222
2223 if self.config.node.discovery.nostr.enabled {
2224 node_start_debug_log("Node::start nostr discovery start begin");
2225 match NostrDiscovery::start(&self.identity, self.config.node.discovery.nostr.clone())
2226 .await
2227 {
2228 Ok(runtime) => {
2229 node_start_debug_log("Node::start nostr discovery runtime created");
2230 if let Err(err) = self.refresh_overlay_advert(&runtime).await {
2231 warn!(error = %err, "Failed to publish initial Nostr overlay advert");
2232 }
2233 node_start_debug_log("Node::start nostr overlay advert refreshed");
2234 self.nostr_discovery = Some(runtime);
2235 self.nostr_discovery_started_at_ms = Some(Self::now_ms());
2236 info!("Nostr overlay discovery enabled");
2237 }
2238 Err(err) => {
2239 node_start_debug_log(format!(
2240 "Node::start nostr discovery start error error={}",
2241 err
2242 ));
2243 warn!(error = %err, "Failed to start Nostr overlay discovery");
2244 }
2245 }
2246 }
2247
2248 if self.config.node.discovery.lan.enabled {
2252 node_start_debug_log("Node::start lan discovery start begin");
2253 let advertised_udp_port = self
2254 .transports
2255 .values()
2256 .filter(|h| h.is_operational())
2257 .filter(|h| h.transport_type().name == "udp")
2258 .find_map(|h| h.local_addr().map(|addr| addr.port()))
2259 .unwrap_or(0);
2260 let scope = self.lan_discovery_scope();
2261 match crate::discovery::lan::LanDiscovery::start(
2262 &self.identity,
2263 scope,
2264 advertised_udp_port,
2265 self.config.node.discovery.lan.clone(),
2266 )
2267 .await
2268 {
2269 Ok(runtime) => {
2270 node_start_debug_log("Node::start lan discovery start ok");
2271 self.lan_discovery = Some(runtime);
2272 info!("LAN mDNS discovery enabled");
2273 }
2274 Err(err) => {
2275 node_start_debug_log(format!(
2276 "Node::start lan discovery start error error={}",
2277 err
2278 ));
2279 debug!(error = %err, "LAN mDNS discovery not started");
2280 }
2281 }
2282 }
2283
2284 self.start_local_instance_discovery();
2285 self.poll_local_instance_discovery().await;
2286
2287 node_start_debug_log("Node::start initiate peer connections begin");
2290 self.initiate_peer_connections().await;
2291 node_start_debug_log("Node::start initiate peer connections complete");
2292
2293 if self.config.tun.enabled {
2295 node_start_debug_log("Node::start tun init begin");
2296 let address = *self.identity.address();
2297 match TunDevice::create(&self.config.tun, address).await {
2298 Ok(device) => {
2299 let mtu = device.mtu();
2300 let name = device.name().to_string();
2301 let our_addr = *device.address();
2302
2303 info!("TUN device active:");
2304 info!(" name: {}", name);
2305 info!(" address: {}", device.address());
2306 info!(" mtu: {}", mtu);
2307
2308 let effective_mtu = self.effective_ipv6_mtu();
2310 let max_mss = effective_mtu.saturating_sub(40).saturating_sub(20); info!("effective MTU: {} bytes", effective_mtu);
2313 debug!(" max TCP MSS: {} bytes", max_mss);
2314
2315 #[cfg(target_os = "macos")]
2319 let (shutdown_read_fd, shutdown_write_fd) = {
2320 let mut fds = [0i32; 2];
2321 if unsafe { libc::pipe(fds.as_mut_ptr()) } < 0 {
2322 return Err(NodeError::Tun(crate::upper::tun::TunError::Configure(
2323 "failed to create shutdown pipe".into(),
2324 )));
2325 }
2326 (fds[0], fds[1])
2327 };
2328
2329 let (writer, tun_tx) =
2333 device.create_writer(max_mss, self.path_mtu_lookup.clone())?;
2334
2335 let writer_handle = thread::spawn(move || {
2337 writer.run();
2338 });
2339
2340 let reader_tun_tx = tun_tx.clone();
2342
2343 let tun_channel_size = self.config.node.buffers.tun_channel;
2345 let (outbound_tx, outbound_rx) = tokio::sync::mpsc::channel(tun_channel_size);
2346
2347 let transport_mtu = self.transport_mtu();
2349 let path_mtu_lookup = self.path_mtu_lookup.clone();
2350 #[cfg(target_os = "macos")]
2351 let reader_handle = thread::spawn(move || {
2352 run_tun_reader(
2353 device,
2354 mtu,
2355 our_addr,
2356 reader_tun_tx,
2357 outbound_tx,
2358 transport_mtu,
2359 path_mtu_lookup,
2360 shutdown_read_fd,
2361 );
2362 });
2363 #[cfg(not(target_os = "macos"))]
2364 let reader_handle = thread::spawn(move || {
2365 run_tun_reader(
2366 device,
2367 mtu,
2368 our_addr,
2369 reader_tun_tx,
2370 outbound_tx,
2371 transport_mtu,
2372 path_mtu_lookup,
2373 );
2374 });
2375
2376 self.tun_state = TunState::Active;
2377 self.tun_name = Some(name);
2378 self.tun_tx = Some(tun_tx);
2379 self.tun_outbound_rx = Some(outbound_rx);
2380 self.tun_reader_handle = Some(reader_handle);
2381 self.tun_writer_handle = Some(writer_handle);
2382 #[cfg(target_os = "macos")]
2383 {
2384 self.tun_shutdown_fd = Some(shutdown_write_fd);
2385 }
2386 }
2387 Err(e) => {
2388 self.tun_state = TunState::Failed;
2389 warn!(error = %e, "Failed to initialize TUN, continuing without it");
2390 }
2391 }
2392 node_start_debug_log("Node::start tun init complete");
2393 }
2394
2395 if self.config.dns.enabled {
2412 node_start_debug_log("Node::start dns init begin");
2413 let addr_str = self.config.dns.bind_addr();
2414 match addr_str.parse::<std::net::IpAddr>() {
2415 Ok(ip) => {
2416 let bind = std::net::SocketAddr::new(ip, self.config.dns.port());
2417 match Self::bind_dns_socket(bind) {
2418 Ok(socket) => {
2419 let dns_channel_size = self.config.node.buffers.dns_channel;
2420 let (identity_tx, identity_rx) =
2421 tokio::sync::mpsc::channel(dns_channel_size);
2422 let dns_ttl = self.config.dns.ttl();
2423 let base_hosts = crate::upper::hosts::HostMap::from_peer_configs(
2424 self.config.peers(),
2425 );
2426 let reloader = if self.config.node.system_files_enabled {
2427 let hosts_path = std::path::PathBuf::from(
2428 crate::upper::hosts::DEFAULT_HOSTS_PATH,
2429 );
2430 crate::upper::hosts::HostMapReloader::new(base_hosts, hosts_path)
2431 } else {
2432 crate::upper::hosts::HostMapReloader::memory_only(base_hosts)
2433 };
2434 let mesh_ifindex = Self::lookup_mesh_ifindex(self.config.tun.name());
2442 info!(
2443 bind = %bind,
2444 hosts = reloader.hosts().len(),
2445 mesh_ifindex = ?mesh_ifindex,
2446 "DNS responder started for .fips domain (auto-reload enabled)"
2447 );
2448 let handle = tokio::spawn(crate::upper::dns::run_dns_responder(
2449 socket,
2450 identity_tx,
2451 dns_ttl,
2452 reloader,
2453 mesh_ifindex,
2454 ));
2455 self.dns_identity_rx = Some(identity_rx);
2456 self.dns_task = Some(handle);
2457 }
2458 Err(e) => {
2459 warn!(bind = %bind, error = %e, "Failed to start DNS responder");
2460 }
2461 }
2462 }
2463 Err(e) => {
2464 warn!(addr = %addr_str, error = %e, "Invalid dns.bind_addr; DNS responder not started");
2465 }
2466 }
2467 node_start_debug_log("Node::start dns init complete");
2468 }
2469
2470 self.state = NodeState::Running;
2471 node_start_debug_log("Node::start running");
2472 info!("Node started:");
2473 info!(" state: {}", self.state);
2474 info!(" transports: {}", self.transports.len());
2475 info!(" connections: {}", self.connections.len());
2476 Ok(())
2477 }
2478
2479 fn bind_dns_socket(
2492 addr: std::net::SocketAddr,
2493 ) -> Result<tokio::net::UdpSocket, std::io::Error> {
2494 use socket2::{Domain, Protocol, Socket, Type};
2495 let domain = if addr.is_ipv4() {
2496 Domain::IPV4
2497 } else {
2498 Domain::IPV6
2499 };
2500 let sock = Socket::new(domain, Type::DGRAM, Some(Protocol::UDP))?;
2501 if addr.is_ipv6() {
2502 sock.set_only_v6(false)?;
2503 #[cfg(unix)]
2504 Self::set_recv_pktinfo_v6(&sock)?;
2505 }
2506 sock.set_nonblocking(true)?;
2507 sock.bind(&addr.into())?;
2508 tokio::net::UdpSocket::from_std(sock.into())
2509 }
2510
2511 #[cfg(unix)]
2517 fn set_recv_pktinfo_v6(sock: &socket2::Socket) -> Result<(), std::io::Error> {
2518 use std::os::fd::AsRawFd;
2519 let enable: libc::c_int = 1;
2520 let ret = unsafe {
2521 libc::setsockopt(
2522 sock.as_raw_fd(),
2523 libc::IPPROTO_IPV6,
2524 libc::IPV6_RECVPKTINFO,
2525 &enable as *const _ as *const libc::c_void,
2526 std::mem::size_of::<libc::c_int>() as libc::socklen_t,
2527 )
2528 };
2529 if ret < 0 {
2530 return Err(std::io::Error::last_os_error());
2531 }
2532 Ok(())
2533 }
2534
2535 fn lookup_mesh_ifindex(name: &str) -> Option<u32> {
2542 #[cfg(unix)]
2543 {
2544 let c_name = std::ffi::CString::new(name).ok()?;
2545 let idx = unsafe { libc::if_nametoindex(c_name.as_ptr()) };
2546 if idx == 0 { None } else { Some(idx) }
2547 }
2548 #[cfg(not(unix))]
2549 {
2550 let _ = name;
2551 None
2552 }
2553 }
2554
2555 pub async fn stop(&mut self) -> Result<(), NodeError> {
2560 if !self.state.can_stop() {
2561 return Err(NodeError::NotStarted);
2562 }
2563 self.state = NodeState::Stopping;
2564 info!(state = %self.state, "Node stopping");
2565
2566 if let Some(handle) = self.dns_task.take() {
2568 handle.abort();
2569 debug!("DNS responder stopped");
2570 }
2571
2572 self.send_disconnect_to_all_peers(DisconnectReason::Shutdown)
2574 .await;
2575
2576 if let Some(bootstrap) = self.nostr_discovery.take()
2578 && let Err(e) = bootstrap.shutdown().await
2579 {
2580 warn!(error = %e, "Failed to shutdown Nostr overlay discovery");
2581 }
2582
2583 if let Some(lan) = self.lan_discovery.take() {
2587 lan.shutdown().await;
2588 }
2589
2590 if let Some(registry) = self.local_instance_registry.take()
2591 && let Err(err) = registry.remove()
2592 {
2593 debug!(error = %err, "failed to remove same-host FIPS instance record");
2594 }
2595
2596 let transport_ids: Vec<_> = self.transports.keys().cloned().collect();
2598 for transport_id in transport_ids {
2599 if let Some(mut handle) = self.transports.remove(&transport_id) {
2600 let transport_type = handle.transport_type().name;
2601 match handle.stop().await {
2602 Ok(()) => {
2603 info!(transport_id = %transport_id, transport_type, "Transport stopped");
2604 }
2605 Err(e) => {
2606 warn!(
2607 transport_id = %transport_id,
2608 transport_type,
2609 error = %e,
2610 "Transport stop failed"
2611 );
2612 }
2613 }
2614 }
2615 }
2616
2617 self.packet_tx.take();
2619 self.packet_rx.take();
2620
2621 if let Some(name) = self.tun_name.take() {
2623 info!(name = %name, "Shutting down TUN interface");
2624
2625 self.tun_tx.take();
2627
2628 if let Err(e) = shutdown_tun_interface(&name).await {
2630 warn!(name = %name, error = %e, "Failed to shutdown TUN interface");
2631 }
2632
2633 #[cfg(target_os = "macos")]
2636 if let Some(fd) = self.tun_shutdown_fd.take() {
2637 unsafe {
2638 libc::write(fd, b"x".as_ptr() as *const libc::c_void, 1);
2639 libc::close(fd);
2640 }
2641 }
2642
2643 if let Some(handle) = self.tun_reader_handle.take() {
2645 let _ = handle.join();
2646 }
2647 if let Some(handle) = self.tun_writer_handle.take() {
2648 let _ = handle.join();
2649 }
2650
2651 self.tun_state = TunState::Disabled;
2652 }
2653
2654 self.state = NodeState::Stopped;
2655 info!(state = %self.state, "Node stopped");
2656 Ok(())
2657 }
2658
2659 async fn send_disconnect_to_all_peers(&mut self, reason: DisconnectReason) {
2664 let disconnect = Disconnect::new(reason);
2665 let plaintext = disconnect.encode();
2666
2667 let peer_addrs: Vec<NodeAddr> = self
2669 .peers
2670 .iter()
2671 .filter(|(_, peer)| peer.can_send() && peer.has_session())
2672 .map(|(addr, _)| *addr)
2673 .collect();
2674
2675 if peer_addrs.is_empty() {
2676 debug!(
2677 total_peers = self.peers.len(),
2678 "No sendable peers for disconnect notification"
2679 );
2680 return;
2681 }
2682
2683 let mut sent = 0usize;
2684 for node_addr in &peer_addrs {
2685 match self
2686 .send_encrypted_link_message(node_addr, &plaintext)
2687 .await
2688 {
2689 Ok(()) => sent += 1,
2690 Err(e) => {
2691 debug!(
2692 peer = %self.peer_display_name(node_addr),
2693 error = %e,
2694 "Failed to send disconnect (transport may be down)"
2695 );
2696 }
2697 }
2698 }
2699
2700 info!(sent, total = peer_addrs.len(), reason = %reason, "Sent disconnect notifications");
2701 }
2702
2703 pub(in crate::node) fn static_peer_addresses(
2704 &self,
2705 peer_config: &PeerConfig,
2706 ) -> Vec<PeerAddress> {
2707 peer_config
2708 .addresses_by_priority()
2709 .into_iter()
2710 .cloned()
2711 .collect()
2712 }
2713
2714 async fn nostr_peer_fallback_addresses(
2715 &self,
2716 peer_config: &PeerConfig,
2717 existing: &[PeerAddress],
2718 ) -> Vec<PeerAddress> {
2719 if !self.config.node.discovery.nostr.enabled
2720 || self.config.node.discovery.nostr.policy
2721 == crate::config::NostrDiscoveryPolicy::Disabled
2722 {
2723 return Vec::new();
2724 }
2725
2726 let Some(bootstrap) = self.nostr_discovery.clone() else {
2727 return Vec::new();
2728 };
2729 if self.nostr_cooldown_applies_to_peer_config(peer_config)
2730 && bootstrap
2731 .cooldown_until(&peer_config.npub, Self::now_ms())
2732 .is_some()
2733 {
2734 debug!(
2735 npub = %peer_config.npub,
2736 "Skipping cached Nostr fallback endpoints while peer is in traversal cooldown"
2737 );
2738 return Vec::new();
2739 }
2740 let endpoints = match bootstrap
2741 .cached_advert_endpoints_for_peer(&peer_config.npub)
2742 .await
2743 {
2744 Some(endpoints) => endpoints,
2745 None => {
2746 debug!(
2747 npub = %peer_config.npub,
2748 "No cached Nostr advert endpoints for configured peer"
2749 );
2750 return Vec::new();
2751 }
2752 };
2753
2754 let mut fallback = Vec::new();
2755 let mut next_priority = existing
2756 .iter()
2757 .map(|addr| addr.priority)
2758 .max()
2759 .unwrap_or(100)
2760 .saturating_add(1);
2761 let seen_at_ms = Self::now_ms();
2767 for endpoint in endpoints {
2768 let Some(candidate) =
2769 Self::overlay_endpoint_to_peer_address(&endpoint, next_priority, seen_at_ms)
2770 else {
2771 continue;
2772 };
2773 if existing
2774 .iter()
2775 .any(|addr| addr.transport == candidate.transport && addr.addr == candidate.addr)
2776 || fallback.iter().any(|addr: &PeerAddress| {
2777 addr.transport == candidate.transport && addr.addr == candidate.addr
2778 })
2779 {
2780 continue;
2781 }
2782 fallback.push(candidate);
2783 next_priority = next_priority.saturating_add(1);
2784 }
2785 fallback
2786 }
2787
2788 pub(in crate::node) async fn request_nostr_bootstrap(&self, peer_config: &PeerConfig) -> bool {
2789 if !self.config.node.discovery.nostr.enabled
2790 || self.config.node.discovery.nostr.policy
2791 == crate::config::NostrDiscoveryPolicy::Disabled
2792 {
2793 return false;
2794 }
2795 let Some(bootstrap) = self.nostr_discovery.clone() else {
2796 return false;
2797 };
2798 let now_ms = Self::now_ms();
2799 if self.nostr_cooldown_applies_to_peer_config(peer_config)
2800 && let Some(cooldown_until_ms) = bootstrap.cooldown_until(&peer_config.npub, now_ms)
2801 {
2802 debug!(
2803 npub = %peer_config.npub,
2804 cooldown_secs = cooldown_until_ms.saturating_sub(now_ms) / 1000,
2805 "Skipping Nostr traversal request while peer is in cooldown"
2806 );
2807 return false;
2808 }
2809 bootstrap.set_outbound_admission(self.open_discovery_outbound_admission_check());
2810 bootstrap.set_direct_refresh_admission(self.outbound_direct_refresh_admission_check());
2811 let mesh_signaling_allowed = self.mesh_signaling_allowed_for_peer(peer_config);
2812 let started = bootstrap
2813 .request_connect_with_mesh_signaling(peer_config.clone(), mesh_signaling_allowed)
2814 .await;
2815 if started {
2816 info!(
2817 npub = %peer_config.npub,
2818 mesh_signaling_allowed,
2819 "Started background UDP NAT traversal attempt"
2820 );
2821 } else {
2822 debug!(
2823 npub = %peer_config.npub,
2824 mesh_signaling_allowed,
2825 "Background UDP NAT traversal attempt already in progress"
2826 );
2827 }
2828 true
2829 }
2830
2831 fn nostr_cooldown_applies_to_peer_config(&self, peer_config: &PeerConfig) -> bool {
2832 !self.mesh_signaling_allowed_for_peer(peer_config)
2833 }
2834
2835 pub(in crate::node) fn mesh_signaling_allowed_for_peer(
2836 &self,
2837 peer_config: &PeerConfig,
2838 ) -> bool {
2839 let Ok(identity) = PeerIdentity::from_npub(&peer_config.npub) else {
2840 return false;
2841 };
2842 let peer_addr = identity.node_addr();
2843 self.configured_peer(peer_addr).is_some()
2844 }
2845
2846 fn overlay_endpoint_to_peer_address(
2847 endpoint: &OverlayEndpointAdvert,
2848 priority: u8,
2849 seen_at_ms: u64,
2850 ) -> Option<PeerAddress> {
2851 let transport = match endpoint.transport {
2852 OverlayTransportKind::Udp => "udp",
2853 OverlayTransportKind::Tcp => "tcp",
2854 OverlayTransportKind::Tor => "tor",
2855 OverlayTransportKind::WebRtc => "webrtc",
2856 };
2857 Some(
2858 PeerAddress::with_priority(transport, endpoint.addr.clone(), priority)
2859 .with_seen_at_ms(seen_at_ms),
2860 )
2861 }
2862
2863 async fn attempt_peer_address_list(
2864 &mut self,
2865 peer_config: &PeerConfig,
2866 peer_identity: PeerIdentity,
2867 allow_bootstrap_nat: bool,
2868 addresses: &[PeerAddress],
2869 ) -> Result<(), NodeError> {
2870 let mut attempted = false;
2871 let mut local_route_error = None;
2872 let peer_node_addr = *peer_identity.node_addr();
2873 let mut concrete_budget = self.path_candidate_attempt_budget(&peer_node_addr);
2874
2875 for addr in addresses {
2876 if addr.transport == "udp" && addr.addr.eq_ignore_ascii_case("nat") {
2877 if !allow_bootstrap_nat {
2878 continue;
2879 }
2880 if self.request_nostr_bootstrap(peer_config).await {
2881 attempted = true;
2882 continue;
2883 }
2884 debug!(npub = %peer_config.npub, "No Nostr overlay runtime for udp:nat address");
2885 continue;
2886 }
2887
2888 let (transport_id, remote_addr) = if addr.transport == "ethernet" {
2889 match self.resolve_ethernet_addr(&addr.addr) {
2890 Ok(result) => result,
2891 Err(e) => {
2892 debug!(
2893 transport = %addr.transport,
2894 addr = %addr.addr,
2895 error = %e,
2896 "Failed to resolve Ethernet address"
2897 );
2898 continue;
2899 }
2900 }
2901 } else if addr.transport == "ble" {
2902 #[cfg(bluer_available)]
2903 {
2904 match self.resolve_ble_addr(&addr.addr) {
2905 Ok(result) => result,
2906 Err(e) => {
2907 debug!(
2908 transport = %addr.transport,
2909 addr = %addr.addr,
2910 error = %e,
2911 "Failed to resolve BLE address"
2912 );
2913 continue;
2914 }
2915 }
2916 }
2917 #[cfg(not(bluer_available))]
2918 {
2919 debug!(transport = %addr.transport, "BLE transport not available on this build");
2920 continue;
2921 }
2922 } else {
2923 let tid = if addr.transport == "udp"
2924 && let Ok(remote_socket_addr) = addr.addr.parse::<SocketAddr>()
2925 {
2926 match self.find_udp_transport_for_remote_addr(remote_socket_addr) {
2927 Some((id, _)) => id,
2928 None => {
2929 debug!(
2930 transport = %addr.transport,
2931 addr = %addr.addr,
2932 "No compatible operational UDP transport for address"
2933 );
2934 continue;
2935 }
2936 }
2937 } else {
2938 match self.find_transport_for_type(&addr.transport) {
2939 Some(id) => id,
2940 None => {
2941 debug!(
2942 transport = %addr.transport,
2943 addr = %addr.addr,
2944 "No operational transport for address type"
2945 );
2946 continue;
2947 }
2948 }
2949 };
2950 (tid, TransportAddr::from_string(&addr.addr))
2951 };
2952
2953 if self.is_connecting_to_peer_on_path(&peer_node_addr, transport_id, &remote_addr) {
2954 attempted = true;
2955 debug!(
2956 npub = %peer_config.npub,
2957 transport_id = %transport_id,
2958 remote_addr = %remote_addr,
2959 "Skipping duplicate in-flight candidate path"
2960 );
2961 continue;
2962 }
2963
2964 if concrete_budget == 0 {
2965 debug!(
2966 npub = %peer_config.npub,
2967 max_candidates = MAX_PARALLEL_PATH_CANDIDATES_PER_PEER,
2968 "Path candidate race budget exhausted"
2969 );
2970 break;
2971 }
2972
2973 match self
2974 .initiate_connection(transport_id, remote_addr, peer_identity)
2975 .await
2976 {
2977 Ok(()) => {
2978 attempted = true;
2979 concrete_budget = concrete_budget.saturating_sub(1);
2980 }
2981 Err(e @ NodeError::AccessDenied(_)) => return Err(e),
2982 Err(e) => {
2983 if e.is_local_route_unavailable() && local_route_error.is_none() {
2984 local_route_error = Some(e.to_string());
2985 }
2986 debug!(
2987 npub = %peer_config.npub,
2988 transport_id = %transport_id,
2989 error = %e,
2990 "Connection attempt failed, trying next address"
2991 );
2992 }
2993 }
2994 }
2995
2996 if attempted {
2997 return Ok(());
2998 }
2999
3000 if let Some(error) = local_route_error {
3001 return Err(NodeError::LocalRouteUnavailable(error));
3002 }
3003
3004 Err(NodeError::NoTransportForType(format!(
3005 "no operational transport for any of {}'s addresses",
3006 peer_config.npub
3007 )))
3008 }
3009
3010 async fn queue_open_discovery_retries(&mut self, bootstrap: &std::sync::Arc<NostrDiscovery>) {
3011 self.run_open_discovery_sweep(bootstrap, None, "per-tick")
3012 .await;
3013 }
3014
3015 pub(in crate::node) fn queue_active_fallback_direct_retries(
3016 &mut self,
3017 _bootstrap: &std::sync::Arc<NostrDiscovery>,
3018 ) {
3019 let now_ms = Self::now_ms();
3020 let peer_configs = self
3021 .config
3022 .auto_connect_peers()
3023 .cloned()
3024 .collect::<Vec<_>>();
3025
3026 for peer_config in peer_configs {
3027 let Ok(peer_identity) = PeerIdentity::from_npub(&peer_config.npub) else {
3028 continue;
3029 };
3030 let node_addr = *peer_identity.node_addr();
3031
3032 if self.retry_pending.contains_key(&node_addr)
3033 || !self.peers.contains_key(&node_addr)
3034 || self.is_connecting_to_peer(&node_addr)
3035 || !self.active_peer_should_keep_direct_retry(&node_addr, &peer_config)
3036 {
3037 continue;
3038 }
3039
3040 let mut state = super::retry::RetryState::new(peer_config.clone());
3041 state.reconnect = true;
3042 state.retry_after_ms = now_ms;
3043 self.retry_pending.insert(node_addr, state);
3044
3045 debug!(
3046 peer = %self.peer_display_name(&node_addr),
3047 "Queued direct-path retry for active fallback peer"
3048 );
3049 }
3050 }
3051
3052 pub(in crate::node) async fn run_open_discovery_sweep(
3063 &mut self,
3064 bootstrap: &std::sync::Arc<NostrDiscovery>,
3065 max_age_secs: Option<u64>,
3066 caller: &'static str,
3067 ) {
3068 if !self.config.node.discovery.nostr.enabled
3069 || self.config.node.discovery.nostr.policy != crate::config::NostrDiscoveryPolicy::Open
3070 {
3071 return;
3072 }
3073
3074 let configured_npubs = self
3075 .config
3076 .peers()
3077 .iter()
3078 .map(|peer| peer.npub.clone())
3079 .collect::<HashSet<_>>();
3080 let now_ms = Self::now_ms();
3081 let now_secs = now_ms / 1000;
3082 let mut enqueue_budget = self.open_discovery_enqueue_budget(&configured_npubs);
3083 if enqueue_budget == 0 {
3084 debug!(
3085 caller = %caller,
3086 "open-discovery sweep: enqueue budget is 0, skipping"
3087 );
3088 return;
3089 }
3090
3091 let candidates = bootstrap.cached_open_discovery_candidates(64).await;
3092 let cached_count = candidates.len();
3093 let mut enqueued = 0usize;
3094 let mut skipped_age = 0usize;
3095 let mut skipped_configured = 0usize;
3096 let mut skipped_self = 0usize;
3097 let mut skipped_connected = 0usize;
3098 let mut skipped_retry_pending = 0usize;
3099 let mut skipped_connecting = 0usize;
3100 let mut skipped_no_endpoints = 0usize;
3101 let mut skipped_invalid_npub = 0usize;
3102 let mut skipped_cooldown = 0usize;
3103
3104 for (npub, endpoints, created_at_secs) in candidates {
3105 if enqueue_budget == 0 {
3106 break;
3107 }
3108
3109 if let Some(max_age) = max_age_secs
3110 && now_secs.saturating_sub(created_at_secs) > max_age
3111 {
3112 skipped_age = skipped_age.saturating_add(1);
3113 continue;
3114 }
3115
3116 if configured_npubs.contains(&npub) {
3117 if let Ok(identity) = PeerIdentity::from_npub(&npub) {
3137 let configured_addr = *identity.node_addr();
3138 if bootstrap.cooldown_until(&npub, now_ms).is_some() {
3139 skipped_cooldown = skipped_cooldown.saturating_add(1);
3140 skipped_configured = skipped_configured.saturating_add(1);
3141 continue;
3142 }
3143 if let Some(state) = self.retry_pending.get_mut(&configured_addr)
3144 && state.retry_after_ms > now_ms
3145 {
3146 state.retry_after_ms = now_ms;
3147 debug!(
3148 caller = %caller,
3149 peer = %self.peer_display_name(&configured_addr),
3150 advert_age_secs = now_secs.saturating_sub(created_at_secs),
3151 "Expediting configured-peer retry after fresh overlay advert"
3152 );
3153 }
3154 }
3155 skipped_configured = skipped_configured.saturating_add(1);
3156 continue;
3157 }
3158
3159 let peer_identity = match PeerIdentity::from_npub(&npub) {
3160 Ok(identity) => identity,
3161 Err(_) => {
3162 skipped_invalid_npub = skipped_invalid_npub.saturating_add(1);
3163 continue;
3164 }
3165 };
3166 let node_addr = *peer_identity.node_addr();
3167 if node_addr == *self.identity.node_addr() {
3168 skipped_self = skipped_self.saturating_add(1);
3169 continue;
3170 }
3171 if self.peers.contains_key(&node_addr) {
3172 skipped_connected = skipped_connected.saturating_add(1);
3173 continue;
3174 }
3175 if self.retry_pending.contains_key(&node_addr) {
3176 skipped_retry_pending = skipped_retry_pending.saturating_add(1);
3177 continue;
3178 }
3179 if bootstrap.cooldown_until(&npub, now_ms).is_some() {
3180 skipped_cooldown = skipped_cooldown.saturating_add(1);
3181 continue;
3182 }
3183 let connecting = self.connections.values().any(|conn| {
3184 conn.expected_identity()
3185 .map(|id| id.node_addr() == &node_addr)
3186 .unwrap_or(false)
3187 });
3188 if connecting {
3189 skipped_connecting = skipped_connecting.saturating_add(1);
3190 continue;
3191 }
3192
3193 let mut addresses = Vec::new();
3194 let mut priority = 120u8;
3195 let seen_at_ms = Self::now_ms();
3196 for endpoint in endpoints {
3197 let Some(candidate) =
3198 Self::overlay_endpoint_to_peer_address(&endpoint, priority, seen_at_ms)
3199 else {
3200 continue;
3201 };
3202 if addresses.iter().any(|existing: &PeerAddress| {
3203 existing.transport == candidate.transport && existing.addr == candidate.addr
3204 }) {
3205 continue;
3206 }
3207 addresses.push(candidate);
3208 priority = priority.saturating_add(1);
3209 }
3210 if addresses.is_empty() {
3211 skipped_no_endpoints = skipped_no_endpoints.saturating_add(1);
3212 continue;
3213 }
3214
3215 self.peer_aliases
3216 .entry(node_addr)
3217 .or_insert_with(|| peer_identity.short_npub());
3218 self.register_identity(node_addr, peer_identity.pubkey_full());
3219
3220 let mut state = super::retry::RetryState::new(PeerConfig {
3221 npub: npub.clone(),
3222 alias: None,
3223 addresses,
3224 connect_policy: ConnectPolicy::AutoConnect,
3225 auto_reconnect: true,
3226 discovery_fallback_transit: false,
3227 });
3228 state.reconnect = false;
3229 state.retry_after_ms = now_ms;
3230 state.expires_at_ms = Some(self.open_discovery_retry_expires_at_ms(now_ms));
3231 self.retry_pending.insert(node_addr, state);
3232 info!(
3233 caller = %caller,
3234 peer = %peer_identity.short_npub(),
3235 advert_age_secs = now_secs.saturating_sub(created_at_secs),
3236 "open-discovery sweep: queued retry for cached advert"
3237 );
3238 enqueue_budget = enqueue_budget.saturating_sub(1);
3239 enqueued = enqueued.saturating_add(1);
3240 }
3241
3242 let total_skipped = skipped_age
3246 + skipped_configured
3247 + skipped_self
3248 + skipped_connected
3249 + skipped_retry_pending
3250 + skipped_connecting
3251 + skipped_no_endpoints
3252 + skipped_invalid_npub
3253 + skipped_cooldown;
3254 let should_summarize = caller == "startup" || enqueued > 0;
3255 if should_summarize {
3256 info!(
3257 caller = %caller,
3258 cached = cached_count,
3259 queued = enqueued,
3260 skipped_age = skipped_age,
3261 skipped_configured = skipped_configured,
3262 skipped_self = skipped_self,
3263 skipped_connected = skipped_connected,
3264 skipped_retry_pending = skipped_retry_pending,
3265 skipped_connecting = skipped_connecting,
3266 skipped_no_endpoints = skipped_no_endpoints,
3267 skipped_invalid_npub = skipped_invalid_npub,
3268 skipped_cooldown = skipped_cooldown,
3269 skipped_total = total_skipped,
3270 "open-discovery sweep complete"
3271 );
3272 }
3273 }
3274
3275 async fn maybe_run_startup_open_discovery_sweep(
3283 &mut self,
3284 bootstrap: &std::sync::Arc<NostrDiscovery>,
3285 ) {
3286 if self.startup_open_discovery_sweep_done {
3287 return;
3288 }
3289 if !self.config.node.discovery.nostr.enabled
3290 || self.config.node.discovery.nostr.policy != crate::config::NostrDiscoveryPolicy::Open
3291 {
3292 self.startup_open_discovery_sweep_done = true;
3294 return;
3295 }
3296 let Some(started_at_ms) = self.nostr_discovery_started_at_ms else {
3297 return;
3298 };
3299 let now_ms = Self::now_ms();
3300 let delay_ms = self
3301 .config
3302 .node
3303 .discovery
3304 .nostr
3305 .startup_sweep_delay_secs
3306 .saturating_mul(1000);
3307 if now_ms < started_at_ms.saturating_add(delay_ms) {
3308 return;
3309 }
3310
3311 let max_age_secs = self.config.node.discovery.nostr.startup_sweep_max_age_secs;
3312 self.run_open_discovery_sweep(bootstrap, Some(max_age_secs), "startup")
3313 .await;
3314 self.startup_open_discovery_sweep_done = true;
3315 }
3316
3317 fn available_outbound_slots(&self) -> usize {
3318 let connection_used = self
3319 .connections
3320 .len()
3321 .saturating_add(self.pending_connects.len());
3322 let connection_slots = if self.max_connections == 0 {
3323 usize::MAX
3324 } else {
3325 self.max_connections.saturating_sub(connection_used)
3326 };
3327
3328 let peer_slots = if self.max_peers == 0 {
3329 usize::MAX
3330 } else {
3331 self.max_peers.saturating_sub(self.peers.len())
3332 };
3333
3334 let link_slots = if self.max_links == 0 {
3335 usize::MAX
3336 } else {
3337 self.max_links.saturating_sub(self.links.len())
3338 };
3339
3340 connection_slots.min(peer_slots).min(link_slots)
3341 }
3342
3343 pub(in crate::node) fn open_discovery_enqueue_budget(
3344 &self,
3345 configured_npubs: &HashSet<String>,
3346 ) -> usize {
3347 let current_open_discovery_active = self
3348 .peers
3349 .values()
3350 .filter(|peer| !configured_npubs.contains(&peer.npub()))
3351 .count();
3352 let current_open_discovery_pending = self
3353 .retry_pending
3354 .values()
3355 .filter(|state| !configured_npubs.contains(&state.peer_config.npub))
3356 .count();
3357
3358 let cap_remaining = self
3359 .config
3360 .node
3361 .discovery
3362 .nostr
3363 .open_discovery_max_pending
3364 .saturating_sub(current_open_discovery_active)
3365 .saturating_sub(current_open_discovery_pending);
3366
3367 cap_remaining.min(self.available_outbound_slots())
3368 }
3369
3370 fn open_discovery_retry_expires_at_ms(&self, now_ms: u64) -> u64 {
3371 now_ms.saturating_add(
3372 self.config
3373 .node
3374 .discovery
3375 .nostr
3376 .advert_ttl_secs
3377 .saturating_mul(1000)
3378 .saturating_mul(OPEN_DISCOVERY_RETRY_LIFETIME_MULTIPLIER),
3379 )
3380 }
3381
3382 async fn build_overlay_advert(
3383 &self,
3384 bootstrap: &std::sync::Arc<NostrDiscovery>,
3385 ) -> Option<OverlayAdvert> {
3386 if !self.config.node.discovery.nostr.enabled {
3387 return None;
3388 }
3389
3390 let mut endpoints = Vec::new();
3391 let mut has_udp_nat = false;
3392 let mut has_webrtc = false;
3393
3394 for handle in self.transports.values() {
3395 if !handle.is_operational() {
3396 continue;
3397 }
3398
3399 match handle.transport_type().name {
3400 "udp" => {
3401 let Some(cfg) = self.lookup_udp_config(handle.name()) else {
3402 continue;
3403 };
3404 if !cfg.advertise_on_nostr() {
3405 continue;
3406 }
3407 if cfg.is_public() {
3408 if let Some(explicit) = cfg.external_advert_addr() {
3418 endpoints.push(OverlayEndpointAdvert {
3419 transport: OverlayTransportKind::Udp,
3420 addr: explicit.to_string(),
3421 });
3422 } else {
3423 match handle.local_addr() {
3424 Some(addr)
3425 if !addr.ip().is_unspecified()
3426 && !is_unroutable_advert_ip(addr.ip()) =>
3427 {
3428 endpoints.push(OverlayEndpointAdvert {
3429 transport: OverlayTransportKind::Udp,
3430 addr: addr.to_string(),
3431 });
3432 }
3433 Some(addr) => {
3434 let key = handle.transport_id().as_u32();
3435 let port = addr.port();
3436 if let Some(public) =
3437 bootstrap.learn_public_udp_addr(key, port).await
3438 {
3439 endpoints.push(OverlayEndpointAdvert {
3440 transport: OverlayTransportKind::Udp,
3441 addr: public.to_string(),
3442 });
3443 } else {
3444 warn!(
3445 transport_id = key,
3446 bind_addr = %addr,
3447 "advert: udp public=true but bind is wildcard \
3448 or private and STUN observation failed; \
3449 advertising no UDP endpoint. Either set \
3450 transports.udp.external_addr, bind to a \
3451 specific *public* IP, or ensure \
3452 node.discovery.nostr.stun_servers is reachable"
3453 );
3454 }
3455 }
3456 None => {}
3457 }
3458 }
3459 } else {
3460 endpoints.push(OverlayEndpointAdvert {
3461 transport: OverlayTransportKind::Udp,
3462 addr: "nat".to_string(),
3463 });
3464 has_udp_nat = true;
3465 }
3466 }
3467 "webrtc" => {
3468 let Some(cfg) = self.lookup_webrtc_config(handle.name()) else {
3469 continue;
3470 };
3471 if !cfg.advertise_on_nostr() {
3472 continue;
3473 }
3474 endpoints.push(OverlayEndpointAdvert {
3475 transport: OverlayTransportKind::WebRtc,
3476 addr: hex::encode(self.identity.pubkey_full().serialize()),
3477 });
3478 has_webrtc = true;
3479 }
3480 "tcp" => {
3481 let Some(cfg) = self.lookup_tcp_config(handle.name()) else {
3482 continue;
3483 };
3484 if !cfg.advertise_on_nostr() {
3485 continue;
3486 }
3487 if let Some(explicit) = cfg.external_advert_addr() {
3499 endpoints.push(OverlayEndpointAdvert {
3500 transport: OverlayTransportKind::Tcp,
3501 addr: explicit.to_string(),
3502 });
3503 } else {
3504 match handle.local_addr() {
3505 Some(addr)
3506 if !addr.ip().is_unspecified()
3507 && !is_unroutable_advert_ip(addr.ip()) =>
3508 {
3509 endpoints.push(OverlayEndpointAdvert {
3510 transport: OverlayTransportKind::Tcp,
3511 addr: addr.to_string(),
3512 });
3513 }
3514 Some(addr) => {
3515 warn!(
3516 bind_addr = %addr,
3517 "advert: tcp advertise_on_nostr=true bound to wildcard \
3518 or private IP and no transports.tcp.external_addr set; \
3519 advertising no TCP endpoint. Either set external_addr \
3520 to the public IP (recommended for cloud 1:1-NAT setups) \
3521 or bind explicitly to the public IP"
3522 );
3523 }
3524 None => {}
3525 }
3526 }
3527 }
3528 "tor" => {
3529 let Some(cfg) = self.lookup_tor_config(handle.name()) else {
3530 continue;
3531 };
3532 if !cfg.advertise_on_nostr() {
3533 continue;
3534 }
3535 if let Some(addr) = handle.onion_address() {
3536 endpoints.push(OverlayEndpointAdvert {
3537 transport: OverlayTransportKind::Tor,
3538 addr: format!("{}:{}", addr, cfg.advertised_port()),
3539 });
3540 }
3541 }
3542 _ => {}
3543 }
3544 }
3545
3546 if endpoints.is_empty() {
3547 return None;
3548 }
3549
3550 Some(OverlayAdvert {
3551 identifier: ADVERT_IDENTIFIER.to_string(),
3552 version: ADVERT_VERSION,
3553 endpoints,
3554 signal_relays: (has_udp_nat || has_webrtc)
3555 .then(|| self.config.node.discovery.nostr.dm_relays.clone()),
3556 stun_servers: (has_udp_nat || has_webrtc)
3557 .then(|| self.config.node.discovery.nostr.stun_servers.clone()),
3558 })
3559 }
3560
3561 async fn refresh_overlay_advert(
3562 &self,
3563 bootstrap: &std::sync::Arc<NostrDiscovery>,
3564 ) -> Result<(), crate::discovery::nostr::BootstrapError> {
3565 let advert = self.build_overlay_advert(bootstrap).await;
3566 bootstrap.update_local_advert(advert).await
3567 }
3568
3569 fn lookup_udp_config(&self, transport_name: Option<&str>) -> Option<&crate::config::UdpConfig> {
3570 match (&self.config.transports.udp, transport_name) {
3571 (crate::config::TransportInstances::Single(cfg), None) => Some(cfg),
3572 (crate::config::TransportInstances::Named(configs), Some(name)) => configs.get(name),
3573 _ => None,
3574 }
3575 }
3576
3577 fn lookup_tcp_config(&self, transport_name: Option<&str>) -> Option<&crate::config::TcpConfig> {
3578 match (&self.config.transports.tcp, transport_name) {
3579 (crate::config::TransportInstances::Single(cfg), None) => Some(cfg),
3580 (crate::config::TransportInstances::Named(configs), Some(name)) => configs.get(name),
3581 _ => None,
3582 }
3583 }
3584
3585 fn lookup_tor_config(&self, transport_name: Option<&str>) -> Option<&crate::config::TorConfig> {
3586 match (&self.config.transports.tor, transport_name) {
3587 (crate::config::TransportInstances::Single(cfg), None) => Some(cfg),
3588 (crate::config::TransportInstances::Named(configs), Some(name)) => configs.get(name),
3589 _ => None,
3590 }
3591 }
3592
3593 fn lookup_webrtc_config(
3594 &self,
3595 transport_name: Option<&str>,
3596 ) -> Option<&crate::config::WebRtcConfig> {
3597 match (&self.config.transports.webrtc, transport_name) {
3598 (crate::config::TransportInstances::Single(cfg), None) => Some(cfg),
3599 (crate::config::TransportInstances::Named(configs), Some(name)) => configs.get(name),
3600 _ => None,
3601 }
3602 }
3603
3604 pub(in crate::node) async fn try_peer_addresses(
3605 &mut self,
3606 peer_config: &PeerConfig,
3607 peer_identity: PeerIdentity,
3608 allow_bootstrap_nat: bool,
3609 ) -> Result<(), NodeError> {
3610 let peer_node_addr = *peer_identity.node_addr();
3611 if self.peers.contains_key(&peer_node_addr) {
3612 debug!(
3613 npub = %peer_config.npub,
3614 "Peer already exists, skipping address attempts"
3615 );
3616 return Ok(());
3617 }
3618
3619 let candidates = self.peer_address_candidates(peer_config).await;
3620
3621 if candidates.is_empty() {
3622 if allow_bootstrap_nat && self.request_nostr_bootstrap(peer_config).await {
3623 return Ok(());
3624 }
3625 return Err(NodeError::NoTransportForType(format!(
3626 "no addresses known for {}",
3627 peer_config.npub
3628 )));
3629 }
3630
3631 if self
3632 .attempt_peer_address_list(peer_config, peer_identity, allow_bootstrap_nat, &candidates)
3633 .await
3634 .is_ok()
3635 {
3636 if allow_bootstrap_nat {
3637 self.request_nostr_bootstrap(peer_config).await;
3638 }
3639 return Ok(());
3640 }
3641
3642 if allow_bootstrap_nat && self.request_nostr_bootstrap(peer_config).await {
3643 return Ok(());
3644 }
3645
3646 Err(NodeError::NoTransportForType(format!(
3647 "no operational transport for any of {}'s addresses",
3648 peer_config.npub
3649 )))
3650 }
3651
3652 async fn try_active_peer_alternative_addresses(
3653 &mut self,
3654 peer_config: &PeerConfig,
3655 peer_identity: PeerIdentity,
3656 allow_same_path_refresh: bool,
3657 ) -> Result<bool, NodeError> {
3658 let peer_node_addr = *peer_identity.node_addr();
3659 let mut candidates = self.peer_address_candidates(peer_config).await;
3660 let same_path_refresh_needed = allow_same_path_refresh
3661 && (self.active_peer_needs_same_path_refresh(&peer_node_addr)
3662 || self
3663 .peers
3664 .get(&peer_node_addr)
3665 .is_some_and(|peer| !peer.can_send()));
3666 if same_path_refresh_needed
3667 && let Some(candidate) = self.active_peer_current_udp_candidate(&peer_node_addr)
3668 && !candidates.iter().any(|existing| {
3669 existing.transport == candidate.transport && existing.addr == candidate.addr
3670 })
3671 {
3672 candidates.push(candidate);
3673 Self::sort_peer_address_candidates(&mut candidates);
3674 }
3675 let should_try_nostr =
3676 self.active_peer_should_keep_direct_retry(&peer_node_addr, peer_config);
3677
3678 if candidates.is_empty() {
3679 if should_try_nostr && self.request_nostr_bootstrap(peer_config).await {
3680 return Ok(true);
3681 }
3682 return Err(NodeError::NoTransportForType(format!(
3683 "no addresses known for {}",
3684 peer_config.npub
3685 )));
3686 }
3687
3688 let alternatives: Vec<_> = candidates
3689 .into_iter()
3690 .filter(|addr| {
3691 same_path_refresh_needed
3692 || !self.active_peer_matches_candidate(&peer_node_addr, addr)
3693 })
3694 .collect();
3695
3696 if alternatives.is_empty() {
3697 if should_try_nostr && self.request_nostr_bootstrap(peer_config).await {
3698 return Ok(true);
3699 }
3700 return Ok(false);
3701 }
3702
3703 let needs_separate_nostr_attempt = should_try_nostr
3704 && !alternatives
3705 .iter()
3706 .any(|addr| addr.transport == "udp" && addr.addr.eq_ignore_ascii_case("nat"));
3707 let address_result = self
3708 .attempt_peer_address_list(peer_config, peer_identity, true, &alternatives)
3709 .await;
3710 let nostr_attempted =
3711 needs_separate_nostr_attempt && self.request_nostr_bootstrap(peer_config).await;
3712
3713 match address_result {
3714 Ok(()) => Ok(true),
3715 Err(err) if nostr_attempted => {
3716 debug!(
3717 npub = %peer_config.npub,
3718 error = %err,
3719 "Static active-peer direct-path alternatives failed; Nostr traversal still queued"
3720 );
3721 Ok(true)
3722 }
3723 Err(err) => Err(err),
3724 }
3725 }
3726
3727 async fn peer_address_candidates(&self, peer_config: &PeerConfig) -> Vec<PeerAddress> {
3728 let static_addresses = self.static_peer_addresses(peer_config);
3735 let overlay_addresses = self
3736 .nostr_peer_fallback_addresses(peer_config, &static_addresses)
3737 .await;
3738
3739 let mut candidates = Vec::with_capacity(overlay_addresses.len() + static_addresses.len());
3740 for addr in overlay_addresses.into_iter().chain(static_addresses) {
3741 if !candidates.iter().any(|existing: &PeerAddress| {
3742 existing.transport == addr.transport && existing.addr == addr.addr
3743 }) {
3744 candidates.push(addr);
3745 }
3746 }
3747
3748 Self::sort_peer_address_candidates(&mut candidates);
3749
3750 candidates
3751 }
3752
3753 fn sort_peer_address_candidates(candidates: &mut [PeerAddress]) {
3754 candidates.sort_by(|a, b| {
3760 if a.priority != b.priority {
3761 return a.priority.cmp(&b.priority);
3762 }
3763 match (a.seen_at_ms, b.seen_at_ms) {
3764 (Some(a_ts), Some(b_ts)) => b_ts.cmp(&a_ts),
3765 (Some(_), None) => std::cmp::Ordering::Less,
3766 (None, Some(_)) => std::cmp::Ordering::Greater,
3767 (None, None) => std::cmp::Ordering::Equal,
3768 }
3769 });
3770 }
3771
3772 fn active_peer_matches_any_candidate(
3773 &self,
3774 peer_node_addr: &NodeAddr,
3775 candidates: &[PeerAddress],
3776 ) -> bool {
3777 candidates
3778 .iter()
3779 .any(|candidate| self.active_peer_matches_candidate(peer_node_addr, candidate))
3780 }
3781
3782 pub(in crate::node) fn active_peer_candidate_is_fresh_enough_to_skip(
3783 &self,
3784 peer_node_addr: &NodeAddr,
3785 candidates: &[PeerAddress],
3786 ) -> bool {
3787 if !self
3788 .peers
3789 .get(peer_node_addr)
3790 .is_some_and(|peer| peer.can_send())
3791 {
3792 return false;
3793 }
3794 if !self.active_peer_matches_any_candidate(peer_node_addr, candidates) {
3795 return false;
3796 }
3797 !self.active_peer_needs_same_path_refresh(peer_node_addr)
3798 }
3799
3800 pub(in crate::node) fn active_peer_should_keep_direct_retry(
3801 &self,
3802 peer_node_addr: &NodeAddr,
3803 peer_config: &PeerConfig,
3804 ) -> bool {
3805 let Some(peer) = self.peers.get(peer_node_addr) else {
3806 return false;
3807 };
3808
3809 let static_addresses = self.static_peer_addresses(peer_config);
3810 if !static_addresses.is_empty() {
3811 return !self
3812 .active_peer_candidate_is_fresh_enough_to_skip(peer_node_addr, &static_addresses);
3813 }
3814
3815 if peer_config.npub.is_empty() {
3816 return false;
3817 }
3818
3819 if !self.config.node.discovery.nostr.enabled
3820 || self.config.node.discovery.nostr.policy == NostrDiscoveryPolicy::Disabled
3821 {
3822 return false;
3823 }
3824
3825 let Some(transport_id) = peer.transport_id() else {
3826 return true;
3827 };
3828
3829 if self.bootstrap_transports.contains(&transport_id) {
3830 return self.active_peer_needs_same_path_refresh(peer_node_addr);
3831 }
3832
3833 let Some(transport) = self.transports.get(&transport_id) else {
3834 return true;
3835 };
3836
3837 if transport.transport_type().name != "udp" {
3838 return true;
3839 }
3840
3841 self.active_peer_needs_same_path_refresh(peer_node_addr)
3842 }
3843
3844 pub(in crate::node) fn clear_retry_unless_direct_refresh_needed(
3845 &mut self,
3846 peer_node_addr: &NodeAddr,
3847 ) {
3848 let keep_retry = self
3849 .retry_pending
3850 .get(peer_node_addr)
3851 .map(|state| state.peer_config.clone())
3852 .is_some_and(|peer_config| {
3853 self.active_peer_should_keep_direct_retry(peer_node_addr, &peer_config)
3854 });
3855
3856 if !keep_retry {
3857 self.retry_pending.remove(peer_node_addr);
3858 }
3859 }
3860
3861 fn active_peer_needs_same_path_refresh(&self, peer_node_addr: &NodeAddr) -> bool {
3862 let Some(peer) = self.peers.get(peer_node_addr) else {
3863 return false;
3864 };
3865 let stale_after_ms = self
3866 .config
3867 .node
3868 .heartbeat_interval_secs
3869 .saturating_mul(1000)
3870 .max(1000);
3871 peer.idle_time(Self::now_ms()) > stale_after_ms
3872 }
3873
3874 pub(in crate::node) fn active_peer_current_udp_candidate(
3875 &self,
3876 peer_node_addr: &NodeAddr,
3877 ) -> Option<PeerAddress> {
3878 let peer = self.peers.get(peer_node_addr)?;
3879 let current_addr = peer.current_addr()?;
3880 if let Some(transport_id) = peer.transport_id() {
3881 if let Some(transport) = self.transports.get(&transport_id) {
3882 if transport.transport_type().name != "udp" {
3883 return None;
3884 }
3885 } else if !self
3886 .transports
3887 .values()
3888 .any(|transport| transport.transport_type().name == "udp")
3889 {
3890 return None;
3891 }
3892 } else if !self
3893 .transports
3894 .values()
3895 .any(|transport| transport.transport_type().name == "udp")
3896 {
3897 return None;
3898 }
3899 let socket_addr = current_addr.as_str()?.parse::<SocketAddr>().ok()?;
3900
3901 Some(
3902 PeerAddress::with_priority("udp", socket_addr.to_string(), 240)
3903 .with_seen_at_ms(Self::now_ms()),
3904 )
3905 }
3906
3907 pub(in crate::node) fn active_peer_matches_candidate(
3908 &self,
3909 peer_node_addr: &NodeAddr,
3910 candidate: &PeerAddress,
3911 ) -> bool {
3912 let Some(peer) = self.peers.get(peer_node_addr) else {
3913 return false;
3914 };
3915 let Some(current_addr) = peer.current_addr() else {
3916 return false;
3917 };
3918 if let Some(peer_transport_id) = peer.transport_id()
3919 && let Some((candidate_transport_id, candidate_addr)) =
3920 self.resolve_peer_address_for_match(candidate)
3921 {
3922 return peer_transport_id == candidate_transport_id && current_addr == &candidate_addr;
3923 }
3924 if peer
3925 .transport_id()
3926 .map(|id| self.bootstrap_transports.contains(&id))
3927 .unwrap_or(false)
3928 {
3929 return false;
3930 }
3931 let current_addr = current_addr.to_string();
3932 let current_transport = peer
3933 .transport_id()
3934 .and_then(|id| self.transports.get(&id))
3935 .map(|transport| transport.transport_type().name);
3936
3937 candidate.addr == current_addr
3938 && current_transport
3939 .map(|transport| transport == candidate.transport)
3940 .unwrap_or(true)
3941 }
3942
3943 fn configured_path_priority(
3944 &self,
3945 peer_node_addr: &NodeAddr,
3946 transport_id: TransportId,
3947 remote_addr: &TransportAddr,
3948 ) -> Option<u8> {
3949 self.configured_peer(peer_node_addr)?
3950 .addresses
3951 .iter()
3952 .filter_map(|candidate| {
3953 let (candidate_transport_id, candidate_addr) =
3954 self.resolve_peer_address_for_match(candidate)?;
3955 (candidate_transport_id == transport_id && &candidate_addr == remote_addr)
3956 .then_some(candidate.priority)
3957 })
3958 .min()
3959 }
3960
3961 pub(in crate::node) fn alternate_path_priority_allows_replace(
3962 &self,
3963 peer_node_addr: &NodeAddr,
3964 candidate_transport_id: TransportId,
3965 candidate_addr: &TransportAddr,
3966 ) -> bool {
3967 const UNKNOWN_PATH_PRIORITY: u16 = u8::MAX as u16 + 1;
3968
3969 let Some(peer) = self.peers.get(peer_node_addr) else {
3970 return true;
3971 };
3972 let Some(current_transport_id) = peer.transport_id() else {
3973 return true;
3974 };
3975 let Some(current_addr) = peer.current_addr() else {
3976 return true;
3977 };
3978
3979 let current_priority = self
3980 .configured_path_priority(peer_node_addr, current_transport_id, current_addr)
3981 .map(u16::from)
3982 .unwrap_or(UNKNOWN_PATH_PRIORITY);
3983 let candidate_priority = self
3984 .configured_path_priority(peer_node_addr, candidate_transport_id, candidate_addr)
3985 .map(u16::from)
3986 .unwrap_or(UNKNOWN_PATH_PRIORITY);
3987
3988 if candidate_priority <= current_priority {
3989 return true;
3990 }
3991
3992 debug!(
3993 peer = %self.peer_display_name(peer_node_addr),
3994 current_transport_id = %current_transport_id,
3995 current_addr = %current_addr,
3996 current_priority,
3997 candidate_transport_id = %candidate_transport_id,
3998 candidate_addr = %candidate_addr,
3999 candidate_priority,
4000 "Suppressing lower-priority alternate path while current path remains healthy"
4001 );
4002 false
4003 }
4004
4005 pub(in crate::node) fn authenticated_packet_path_allows_bookkeeping(
4006 &mut self,
4007 peer_node_addr: &NodeAddr,
4008 candidate_transport_id: TransportId,
4009 candidate_addr: &TransportAddr,
4010 now_ms: u64,
4011 ) -> bool {
4012 let Some(peer) = self.peers.get(peer_node_addr) else {
4013 return true;
4014 };
4015
4016 if peer.transport_id() == Some(candidate_transport_id)
4017 && peer.current_addr() == Some(candidate_addr)
4018 {
4019 return true;
4020 }
4021
4022 if !peer.can_send() || self.session_direct_path_is_degraded(peer_node_addr, now_ms) {
4023 return true;
4024 }
4025
4026 self.alternate_path_priority_allows_replace(
4027 peer_node_addr,
4028 candidate_transport_id,
4029 candidate_addr,
4030 )
4031 }
4032
4033 pub(in crate::node) fn active_peer_uses_recent_endpoint_path(
4034 &self,
4035 peer_node_addr: &NodeAddr,
4036 peer_config: &PeerConfig,
4037 ) -> bool {
4038 peer_config.addresses.iter().any(|addr| {
4039 addr.seen_at_ms.is_some() && self.active_peer_matches_candidate(peer_node_addr, addr)
4040 })
4041 }
4042
4043 pub(in crate::node) fn active_peer_uses_traversal_path(
4044 &self,
4045 peer_node_addr: &NodeAddr,
4046 peer_config: &PeerConfig,
4047 ) -> bool {
4048 let via_bootstrap_transport = self
4049 .peers
4050 .get(peer_node_addr)
4051 .and_then(|peer| peer.transport_id())
4052 .map(|id| self.bootstrap_transports.contains(&id))
4053 .unwrap_or(false);
4054
4055 via_bootstrap_transport
4056 || self.active_peer_uses_recent_endpoint_path(peer_node_addr, peer_config)
4057 }
4058
4059 pub(crate) async fn api_connect(
4067 &mut self,
4068 npub: &str,
4069 address: &str,
4070 transport: &str,
4071 ) -> Result<serde_json::Value, String> {
4072 let peer_config = PeerConfig {
4073 npub: npub.to_string(),
4074 alias: None,
4075 addresses: vec![PeerAddress::new(transport, address)],
4076 connect_policy: ConnectPolicy::Manual,
4077 auto_reconnect: false,
4078 discovery_fallback_transit: true,
4079 };
4080
4081 if let Ok(identity) = PeerIdentity::from_npub(npub) {
4083 self.peer_aliases
4084 .insert(*identity.node_addr(), identity.short_npub());
4085 self.register_identity(*identity.node_addr(), identity.pubkey_full());
4086 }
4087
4088 self.initiate_peer_connection(&peer_config)
4089 .await
4090 .map(|()| {
4091 info!(
4092 npub = %npub,
4093 address = %address,
4094 transport = %transport,
4095 "API connect initiated"
4096 );
4097 serde_json::json!({
4098 "npub": npub,
4099 "address": address,
4100 "transport": transport,
4101 })
4102 })
4103 .map_err(|e| e.to_string())
4104 }
4105
4106 pub(crate) fn api_disconnect(&mut self, npub: &str) -> Result<serde_json::Value, String> {
4110 let peer_identity =
4111 PeerIdentity::from_npub(npub).map_err(|e| format!("invalid npub '{npub}': {e}"))?;
4112 let node_addr = *peer_identity.node_addr();
4113
4114 if !self.peers.contains_key(&node_addr) {
4115 return Err(format!("peer not found: {npub}"));
4116 }
4117
4118 self.remove_active_peer(&node_addr);
4120
4121 self.retry_pending.remove(&node_addr);
4123
4124 info!(npub = %npub, "API disconnect completed");
4125
4126 Ok(serde_json::json!({
4127 "npub": npub,
4128 "disconnected": true,
4129 }))
4130 }
4131
4132 pub async fn adopt_established_traversal(
4139 &mut self,
4140 traversal: EstablishedTraversal,
4141 ) -> Result<BootstrapHandoffResult, NodeError> {
4142 debug!(
4143 peer_npub = %traversal.peer_npub,
4144 session_id = %traversal.session_id,
4145 remote_addr = %traversal.remote_addr,
4146 "adopting established traversal socket"
4147 );
4148
4149 if !self.state.is_operational() {
4150 return Err(NodeError::NotStarted);
4151 }
4152
4153 let packet_tx = self.packet_tx.clone().ok_or(NodeError::NotStarted)?;
4154 let peer_identity = PeerIdentity::from_npub(&traversal.peer_npub).map_err(|e| {
4155 NodeError::InvalidPeerNpub {
4156 npub: traversal.peer_npub.clone(),
4157 reason: e.to_string(),
4158 }
4159 })?;
4160 let peer_node_addr = *peer_identity.node_addr();
4161 if self.peers.contains_key(&peer_node_addr) {
4162 debug!(
4163 peer_npub = %traversal.peer_npub,
4164 "Adopting NAT traversal handoff as alternate path for already-connected peer"
4165 );
4166 }
4167
4168 self.peer_aliases
4169 .insert(peer_node_addr, peer_identity.short_npub());
4170 self.register_identity(peer_node_addr, peer_identity.pubkey_full());
4171
4172 let transport_id = self.allocate_transport_id();
4173 let inherited_config = traversal.transport_config.clone().unwrap_or_else(|| {
4193 let mut cfg = self
4194 .lookup_udp_config(traversal.transport_name.as_deref())
4195 .or_else(|| self.lookup_udp_config(None))
4196 .cloned()
4197 .unwrap_or_default();
4198 cfg.bind_addr = None;
4199 cfg.external_addr = None;
4200 cfg
4201 });
4202 let mut transport = crate::transport::udp::UdpTransport::new(
4203 transport_id,
4204 traversal.transport_name.clone(),
4205 inherited_config,
4206 packet_tx,
4207 );
4208
4209 transport
4210 .adopt_socket_async(traversal.socket)
4211 .await
4212 .map_err(|e| NodeError::BootstrapHandoff(e.to_string()))?;
4213
4214 let local_addr = transport.local_addr().ok_or_else(|| {
4215 NodeError::BootstrapHandoff("adopted UDP transport has no local address".into())
4216 })?;
4217
4218 self.transports.insert(
4219 transport_id,
4220 crate::transport::TransportHandle::Udp(transport),
4221 );
4222 self.bootstrap_transports.insert(transport_id);
4223 self.bootstrap_transport_npubs
4224 .insert(transport_id, traversal.peer_npub.clone());
4225
4226 let remote_addr = TransportAddr::from_string(&traversal.remote_addr.to_string());
4227 if let Err(err) = self
4228 .initiate_connection(transport_id, remote_addr.clone(), peer_identity)
4229 .await
4230 {
4231 self.bootstrap_transports.remove(&transport_id);
4232 self.bootstrap_transport_npubs.remove(&transport_id);
4233 if let Some(mut handle) = self.transports.remove(&transport_id) {
4234 let _ = handle.stop().await;
4235 }
4236 return Err(err);
4237 }
4238
4239 info!(
4240 peer = %self.peer_display_name(&peer_node_addr),
4241 transport_id = %transport_id,
4242 local_addr = %local_addr,
4243 remote_addr = %traversal.remote_addr,
4244 session_id = %traversal.session_id,
4245 "adopted NAT traversal socket; handshake initiated"
4246 );
4247
4248 Ok(BootstrapHandoffResult {
4249 transport_id,
4250 local_addr,
4251 remote_addr: traversal.remote_addr,
4252 peer_node_addr,
4253 session_id: traversal.session_id,
4254 })
4255 }
4256}