1use super::{Node, NodeError, NodeState};
4use crate::config::{ConnectPolicy, NostrDiscoveryPolicy, PeerAddress, PeerConfig};
5use crate::discovery::nostr::{
6 ADVERT_IDENTIFIER, ADVERT_VERSION, BootstrapEvent, NostrDiscovery, OverlayAdvert,
7 OverlayEndpointAdvert, OverlayTransportKind,
8};
9use crate::discovery::{BootstrapHandoffResult, EstablishedTraversal};
10use crate::node::acl::PeerAclContext;
11use crate::node::wire::build_msg1;
12use crate::peer::PeerConnection;
13use crate::protocol::{Disconnect, DisconnectReason};
14use crate::transport::{Link, LinkDirection, LinkId, TransportAddr, TransportId, packet_channel};
15use crate::upper::tun::{TunDevice, TunState, run_tun_reader, shutdown_tun_interface};
16use crate::{NodeAddr, PeerIdentity};
17use std::collections::{HashMap, HashSet};
18use std::net::{IpAddr, SocketAddr};
19use std::thread;
20use std::time::Duration;
21use tracing::{debug, info, warn};
22
23#[cfg(debug_assertions)]
24fn node_start_debug_log(message: impl AsRef<str>) {
25 use std::io::Write as _;
26
27 if let Ok(mut file) = std::fs::OpenOptions::new()
28 .create(true)
29 .append(true)
30 .open(std::env::temp_dir().join("nvpn-fips-endpoint-debug.log"))
31 {
32 let _ = writeln!(
33 file,
34 "{:?} {}",
35 std::time::SystemTime::now(),
36 message.as_ref()
37 );
38 }
39}
40
41#[cfg(not(debug_assertions))]
42fn node_start_debug_log(_message: impl AsRef<str>) {}
43
44fn is_unroutable_advert_ip(ip: IpAddr) -> bool {
52 match ip {
53 IpAddr::V4(v4) => {
54 v4.is_private()
55 || v4.is_loopback()
56 || v4.is_link_local()
57 || v4.is_unspecified()
58 || v4.is_multicast()
59 || v4.is_broadcast()
60 || v4.is_documentation()
61 || (v4.octets()[0] == 100 && (v4.octets()[1] & 0xc0) == 64)
64 }
65 IpAddr::V6(v6) => {
66 v6.is_loopback()
67 || v6.is_unspecified()
68 || v6.is_unique_local()
69 || v6.is_multicast()
70 || (v6.segments()[0] & 0xffc0) == 0xfe80
72 }
73 }
74}
75
76fn socket_addr_families_compatible(local: SocketAddr, remote: SocketAddr) -> bool {
77 matches!(
78 (local, remote),
79 (SocketAddr::V4(_), SocketAddr::V4(_)) | (SocketAddr::V6(_), SocketAddr::V6(_))
80 )
81}
82
83const OPEN_DISCOVERY_RETRY_LIFETIME_MULTIPLIER: u64 = 2;
84const MAX_PARALLEL_PATH_CANDIDATES_PER_PEER: usize = 4;
85const MAX_AUTO_CONNECT_GRAPH_WARMUPS_PER_TICK: usize = 16;
86const MAX_DISCOVERY_CONNECTS_PER_TICK: usize = 16;
87
88impl Node {
89 pub(super) async fn update_peers(
101 &mut self,
102 new_peers: Vec<crate::config::PeerConfig>,
103 ) -> Result<crate::node::UpdatePeersOutcome, crate::node::NodeError> {
104 use std::collections::{HashMap, HashSet};
105
106 let mut new_by_addr: HashMap<crate::identity::NodeAddr, crate::config::PeerConfig> =
107 HashMap::with_capacity(new_peers.len());
108 let mut new_order = Vec::with_capacity(new_peers.len());
109 for peer in new_peers {
110 let identity = match PeerIdentity::from_npub(&peer.npub) {
111 Ok(id) => id,
112 Err(e) => {
113 return Err(crate::node::NodeError::InvalidPeerNpub {
114 npub: peer.npub.clone(),
115 reason: e.to_string(),
116 });
117 }
118 };
119 let node_addr = *identity.node_addr();
123 if !new_by_addr.contains_key(&node_addr) {
124 new_order.push(node_addr);
125 }
126 new_by_addr.insert(node_addr, peer);
127 }
128
129 let current_by_addr: HashMap<crate::identity::NodeAddr, crate::config::PeerConfig> = self
130 .config
131 .peers()
132 .iter()
133 .filter_map(|pc| {
134 PeerIdentity::from_npub(&pc.npub)
135 .ok()
136 .map(|id| (*id.node_addr(), pc.clone()))
137 })
138 .collect();
139
140 let new_addrs: HashSet<_> = new_by_addr.keys().copied().collect();
141 let current_addrs: HashSet<_> = current_by_addr.keys().copied().collect();
142
143 let removed: Vec<_> = current_addrs.difference(&new_addrs).copied().collect();
144 let added: Vec<_> = new_addrs.difference(¤t_addrs).copied().collect();
145 let kept: Vec<_> = new_addrs.intersection(¤t_addrs).copied().collect();
146
147 let mut outcome = crate::node::UpdatePeersOutcome::default();
148
149 for node_addr in &removed {
150 if self.retry_pending.remove(node_addr).is_some() {
151 debug!(
152 peer = %self.peer_display_name(node_addr),
153 "Dropping retry entry for peer removed from runtime peer list"
154 );
155 }
156 self.peer_aliases.remove(node_addr);
157 self.set_discovery_fallback_transit_allowed(*node_addr, false);
158 outcome.removed += 1;
159 }
160
161 let mut auto_connect_refresh_configs = Vec::new();
162 for node_addr in &kept {
163 let new_pc = &new_by_addr[node_addr];
164 let current_pc = ¤t_by_addr[node_addr];
165 if new_pc.addresses != current_pc.addresses
166 || new_pc.alias != current_pc.alias
167 || new_pc.connect_policy != current_pc.connect_policy
168 || new_pc.auto_reconnect != current_pc.auto_reconnect
169 || new_pc.discovery_fallback_transit != current_pc.discovery_fallback_transit
170 {
171 outcome.updated += 1;
172 self.set_discovery_fallback_transit_allowed(
173 *node_addr,
174 new_pc.discovery_fallback_transit,
175 );
176 if let Some(state) = self.retry_pending.get_mut(node_addr) {
177 state.peer_config = new_pc.clone();
178 state.retry_after_ms = Self::now_ms();
179 }
180 if let Some(alias) = new_pc.alias.clone() {
181 self.peer_aliases.insert(*node_addr, alias);
182 }
183 if new_pc.is_auto_connect() && !new_pc.addresses.is_empty() {
184 auto_connect_refresh_configs.push(new_pc.clone());
185 }
186 } else {
187 outcome.unchanged += 1;
188 self.set_discovery_fallback_transit_allowed(
189 *node_addr,
190 new_pc.discovery_fallback_transit,
191 );
192 if new_pc.is_auto_connect() && !new_pc.addresses.is_empty() {
193 auto_connect_refresh_configs.push(new_pc.clone());
194 }
195 }
196 }
197
198 let added_configs: Vec<crate::config::PeerConfig> = new_order
199 .iter()
200 .filter(|addr| added.contains(addr))
201 .map(|addr| new_by_addr[addr].clone())
202 .collect();
203
204 self.config.peers = new_order
208 .iter()
209 .filter_map(|addr| new_by_addr.get(addr).cloned())
210 .collect();
211 self.configured_peer_send_weights = Self::configured_peer_send_weights(&self.config);
212
213 for peer_config in added_configs {
214 outcome.added += 1;
215 let Ok(identity) = PeerIdentity::from_npub(&peer_config.npub) else {
216 continue;
217 };
218 let name = peer_config
219 .alias
220 .clone()
221 .unwrap_or_else(|| identity.short_npub());
222 self.peer_aliases.insert(*identity.node_addr(), name);
223 self.set_discovery_fallback_transit_allowed(
224 *identity.node_addr(),
225 peer_config.discovery_fallback_transit,
226 );
227 self.register_identity(*identity.node_addr(), identity.pubkey_full());
228
229 if !peer_config.is_auto_connect() {
230 continue;
231 }
232
233 match self
234 .try_auto_connect_graph_session(&peer_config, identity)
235 .await
236 {
237 Ok(true) => continue,
238 Ok(false) => {}
239 Err(err) => {
240 debug!(
241 npub = %peer_config.npub,
242 error = %err,
243 "Existing FIPS graph did not warm newly added peer"
244 );
245 }
246 }
247
248 if let Err(e) = self.initiate_peer_connection(&peer_config).await {
249 warn!(
250 npub = %peer_config.npub,
251 error = %e,
252 "Failed to initiate connection for newly added peer"
253 );
254 if let Ok(peer_identity) = PeerIdentity::from_npub(&peer_config.npub) {
255 self.schedule_retry(*peer_identity.node_addr(), Self::now_ms());
256 }
257 if matches!(e, crate::node::NodeError::NoTransportForType(_))
258 && let Some(bootstrap) = self.nostr_discovery.clone()
259 {
260 bootstrap
261 .request_advert_stale_check(peer_config.npub.clone())
262 .await;
263 }
264 }
265 }
266
267 for peer_config in auto_connect_refresh_configs {
268 let Ok(peer_identity) = PeerIdentity::from_npub(&peer_config.npub) else {
269 continue;
270 };
271 let node_addr = *peer_identity.node_addr();
272
273 if self.peers.contains_key(&node_addr) {
274 match self
275 .initiate_active_peer_alternative_connection(&peer_config)
276 .await
277 {
278 Ok(attempted) => {
279 if attempted {
280 debug!(
281 peer = %self.peer_display_name(&node_addr),
282 "Started non-disruptive alternate-path handshake for active peer"
283 );
284 }
285 }
286 Err(e) => {
287 debug!(
288 npub = %peer_config.npub,
289 error = %e,
290 "Active peer alternate-path refresh did not start"
291 );
292 }
293 }
294 continue;
295 }
296
297 match self
298 .try_auto_connect_graph_session(&peer_config, peer_identity)
299 .await
300 {
301 Ok(true) => continue,
302 Ok(false) => {}
303 Err(err) => {
304 debug!(
305 npub = %peer_config.npub,
306 error = %err,
307 "Existing FIPS graph did not warm refreshed peer"
308 );
309 }
310 }
311
312 match self.initiate_peer_connection(&peer_config).await {
313 Ok(()) => {
314 let hs_timeout_ms = self.config.node.rate_limit.handshake_timeout_secs * 1000;
315 if let Some(state) = self.retry_pending.get_mut(&node_addr) {
316 state.peer_config = peer_config;
317 state.retry_after_ms = Self::now_ms().saturating_add(hs_timeout_ms);
318 }
319 }
320 Err(e) => {
321 debug!(
322 npub = %peer_config.npub,
323 error = %e,
324 "Refreshed peer addresses did not initiate a direct connection"
325 );
326 self.schedule_retry(node_addr, Self::now_ms());
327 }
328 }
329 }
330
331 self.warm_auto_connect_graph_sessions().await;
332
333 Ok(outcome)
334 }
335
336 pub(super) async fn initiate_peer_connections(&mut self) {
337 let peer_identities: Vec<(PeerIdentity, Option<String>)> = self
343 .config
344 .peers()
345 .iter()
346 .filter_map(|pc| {
347 PeerIdentity::from_npub(&pc.npub)
348 .ok()
349 .map(|id| (id, pc.alias.clone()))
350 })
351 .collect();
352
353 for (identity, alias) in peer_identities {
354 let name = alias.unwrap_or_else(|| identity.short_npub());
355 self.peer_aliases.insert(*identity.node_addr(), name);
356 self.register_identity(*identity.node_addr(), identity.pubkey_full());
360 }
361
362 let peer_configs: Vec<_> = self.config.auto_connect_peers().cloned().collect();
364
365 if peer_configs.is_empty() {
366 debug!("No static peers configured");
367 return;
368 }
369
370 debug!(
371 count = peer_configs.len(),
372 "Initiating static peer connections"
373 );
374
375 for peer_config in peer_configs {
376 let peer_identity = match PeerIdentity::from_npub(&peer_config.npub) {
377 Ok(identity) => identity,
378 Err(_) => continue,
379 };
380 match self
381 .try_auto_connect_graph_session(&peer_config, peer_identity)
382 .await
383 {
384 Ok(true) => continue,
385 Ok(false) => {}
386 Err(err) => {
387 debug!(
388 npub = %peer_config.npub,
389 error = %err,
390 "Existing FIPS graph did not warm auto-connect peer"
391 );
392 }
393 }
394 if let Err(e) = self.initiate_peer_connection(&peer_config).await {
395 warn!(
396 npub = %peer_config.npub,
397 alias = ?peer_config.alias,
398 error = %e,
399 "Failed to initiate peer connection"
400 );
401 if let Ok(peer_identity) = PeerIdentity::from_npub(&peer_config.npub) {
405 self.schedule_retry(*peer_identity.node_addr(), Self::now_ms());
406 }
407 if matches!(e, crate::node::NodeError::NoTransportForType(_))
413 && let Some(bootstrap) = self.nostr_discovery.clone()
414 {
415 bootstrap
416 .request_advert_stale_check(peer_config.npub.clone())
417 .await;
418 }
419 }
420 }
421
422 self.warm_auto_connect_graph_sessions().await;
423 }
424
425 pub(in crate::node) async fn try_auto_connect_graph_session(
426 &mut self,
427 peer_config: &PeerConfig,
428 peer_identity: PeerIdentity,
429 ) -> Result<bool, NodeError> {
430 if !peer_config.is_auto_connect() {
431 return Ok(false);
432 }
433
434 let peer_node_addr = *peer_identity.node_addr();
435 if self.peers.contains_key(&peer_node_addr) {
436 return Ok(false);
437 }
438 if self
439 .sessions
440 .get(&peer_node_addr)
441 .is_some_and(|entry| entry.is_established() || entry.is_initiating())
442 {
443 return Ok(true);
444 }
445 if self.find_next_hop(&peer_node_addr).is_none() {
446 return Ok(false);
447 }
448
449 self.register_identity(peer_node_addr, peer_identity.pubkey_full());
450 match self
451 .initiate_session(peer_node_addr, peer_identity.pubkey_full())
452 .await
453 {
454 Ok(()) => {
455 debug!(
456 peer = %self.peer_display_name(&peer_node_addr),
457 "Warmed auto-connect peer session over existing FIPS graph"
458 );
459 Ok(true)
460 }
461 Err(NodeError::SendFailed { node_addr, reason })
462 if node_addr == peer_node_addr && reason == "no route to destination" =>
463 {
464 self.maybe_initiate_lookup(&peer_node_addr).await;
465 Ok(false)
466 }
467 Err(err) => Err(err),
468 }
469 }
470
471 pub(super) async fn initiate_peer_connection(
475 &mut self,
476 peer_config: &crate::config::PeerConfig,
477 ) -> Result<(), NodeError> {
478 self.initiate_peer_connection_inner(peer_config).await
479 }
480
481 pub(super) async fn initiate_peer_retry_connection(
487 &mut self,
488 peer_config: &crate::config::PeerConfig,
489 ) -> Result<(), NodeError> {
490 self.initiate_peer_connection_inner(peer_config).await
491 }
492
493 async fn initiate_active_peer_alternative_connection(
494 &mut self,
495 peer_config: &crate::config::PeerConfig,
496 ) -> Result<bool, NodeError> {
497 let peer_identity =
498 PeerIdentity::from_npub(&peer_config.npub).map_err(|e| NodeError::InvalidPeerNpub {
499 npub: peer_config.npub.clone(),
500 reason: e.to_string(),
501 })?;
502 let peer_node_addr = *peer_identity.node_addr();
503
504 if !self.peers.contains_key(&peer_node_addr) {
505 self.initiate_peer_connection(peer_config).await?;
506 return Ok(true);
507 }
508
509 self.try_active_peer_alternative_addresses(peer_config, peer_identity)
515 .await
516 }
517
518 async fn initiate_peer_connection_inner(
519 &mut self,
520 peer_config: &crate::config::PeerConfig,
521 ) -> Result<(), NodeError> {
522 let peer_identity =
524 PeerIdentity::from_npub(&peer_config.npub).map_err(|e| NodeError::InvalidPeerNpub {
525 npub: peer_config.npub.clone(),
526 reason: e.to_string(),
527 })?;
528
529 let peer_node_addr = *peer_identity.node_addr();
530
531 if self.peers.contains_key(&peer_node_addr) {
533 debug!(
534 npub = %peer_config.npub,
535 "Peer already exists, skipping"
536 );
537 return Ok(());
538 }
539
540 self.try_peer_addresses(peer_config, peer_identity, true)
541 .await
542 }
543
544 fn is_connecting_to_peer(&self, peer_node_addr: &NodeAddr) -> bool {
545 self.connections.values().any(|conn| {
546 conn.expected_identity()
547 .map(|id| id.node_addr() == peer_node_addr)
548 .unwrap_or(false)
549 })
550 }
551
552 fn is_connecting_to_peer_on_path(
553 &self,
554 peer_node_addr: &NodeAddr,
555 transport_id: TransportId,
556 remote_addr: &TransportAddr,
557 ) -> bool {
558 self.connections.values().any(|conn| {
559 conn.expected_identity()
560 .map(|id| id.node_addr() == peer_node_addr)
561 .unwrap_or(false)
562 && conn.transport_id() == Some(transport_id)
563 && conn.source_addr() == Some(remote_addr)
564 }) || self.pending_connects.iter().any(|pending| {
565 pending.peer_identity.node_addr() == peer_node_addr
566 && pending.transport_id == transport_id
567 && &pending.remote_addr == remote_addr
568 })
569 }
570
571 pub(in crate::node) fn should_warm_auto_connect_session(
572 &self,
573 peer_node_addr: &NodeAddr,
574 ) -> bool {
575 if self.peers.contains_key(peer_node_addr)
576 || self
577 .sessions
578 .get(peer_node_addr)
579 .is_some_and(|entry| entry.is_established())
580 {
581 return false;
582 }
583
584 self.config.peers().iter().any(|peer| {
585 peer.is_auto_connect()
586 && PeerIdentity::from_npub(&peer.npub)
587 .map(|identity| identity.node_addr() == peer_node_addr)
588 .unwrap_or(false)
589 })
590 }
591
592 pub(in crate::node) async fn warm_auto_connect_graph_sessions(&mut self) -> usize {
593 if !self.peers.values().any(|peer| peer.can_send()) {
594 return 0;
595 }
596
597 let mut budget = self.graph_session_warmup_budget();
598 if budget == 0 {
599 return 0;
600 }
601
602 let peer_identities: Vec<_> = self
603 .config
604 .auto_connect_peers()
605 .filter_map(|peer| PeerIdentity::from_npub(&peer.npub).ok())
606 .collect();
607
608 let mut warmed = 0;
609 for identity in peer_identities {
610 if budget == 0 {
611 break;
612 }
613
614 let peer_node_addr = *identity.node_addr();
615 if peer_node_addr == *self.identity.node_addr()
616 || !self.should_warm_auto_connect_session(&peer_node_addr)
617 || self
618 .sessions
619 .get(&peer_node_addr)
620 .is_some_and(|entry| entry.is_initiating())
621 {
622 continue;
623 }
624
625 self.register_identity(peer_node_addr, identity.pubkey_full());
626
627 if self.find_next_hop(&peer_node_addr).is_some() {
628 match self
629 .initiate_session(peer_node_addr, identity.pubkey_full())
630 .await
631 {
632 Ok(()) => {
633 warmed += 1;
634 budget = budget.saturating_sub(1);
635 debug!(
636 peer = %self.peer_display_name(&peer_node_addr),
637 "Warmed auto-connect peer session over existing FIPS graph"
638 );
639 }
640 Err(NodeError::SendFailed { node_addr, reason })
641 if node_addr == peer_node_addr && reason == "no route to destination" =>
642 {
643 self.maybe_initiate_lookup(&peer_node_addr).await;
644 warmed += 1;
645 budget = budget.saturating_sub(1);
646 }
647 Err(err) => {
648 debug!(
649 peer = %self.peer_display_name(&peer_node_addr),
650 error = %err,
651 "Failed to warm auto-connect peer session"
652 );
653 }
654 }
655 } else {
656 self.maybe_initiate_lookup(&peer_node_addr).await;
657 warmed += 1;
658 budget = budget.saturating_sub(1);
659 }
660 }
661
662 warmed
663 }
664
665 pub(in crate::node) fn graph_session_warmup_budget(&self) -> usize {
666 let max_destinations = self.config.node.session.pending_max_destinations;
667 if max_destinations == 0 {
668 return 0;
669 }
670
671 let pending_sessions = self
672 .sessions
673 .values()
674 .filter(|entry| !entry.is_established())
675 .count();
676 let pending_total = pending_sessions.saturating_add(self.pending_lookups.len());
677 max_destinations
678 .saturating_sub(pending_total)
679 .min(MAX_AUTO_CONNECT_GRAPH_WARMUPS_PER_TICK)
680 }
681
682 fn outbound_handshake_slots(&self) -> usize {
683 let used = self
684 .connections
685 .len()
686 .saturating_add(self.pending_connects.len());
687 if self.max_connections == 0 {
688 usize::MAX
689 } else {
690 self.max_connections.saturating_sub(used)
691 }
692 }
693
694 fn outbound_link_slots(&self) -> usize {
695 if self.max_links == 0 {
696 usize::MAX
697 } else {
698 self.max_links.saturating_sub(self.links.len())
699 }
700 }
701
702 fn path_candidate_attempt_budget(&self, peer_node_addr: &NodeAddr) -> usize {
703 if !self.peers.contains_key(peer_node_addr)
704 && self.max_peers > 0
705 && self.peers.len() >= self.max_peers
706 {
707 return 0;
708 }
709
710 let in_flight_for_peer = self
711 .connections
712 .values()
713 .filter(|conn| {
714 conn.expected_identity()
715 .map(|id| id.node_addr() == peer_node_addr)
716 .unwrap_or(false)
717 })
718 .count()
719 .saturating_add(
720 self.pending_connects
721 .iter()
722 .filter(|pending| pending.peer_identity.node_addr() == peer_node_addr)
723 .count(),
724 );
725
726 self.outbound_handshake_slots()
727 .min(self.outbound_link_slots())
728 .min(MAX_PARALLEL_PATH_CANDIDATES_PER_PEER.saturating_sub(in_flight_for_peer))
729 }
730
731 fn discovery_connect_budget(&self) -> usize {
732 self.outbound_handshake_slots()
733 .min(self.outbound_link_slots())
734 .min(MAX_DISCOVERY_CONNECTS_PER_TICK)
735 }
736
737 fn find_udp_transport_for_remote_addr(
744 &self,
745 remote_addr: SocketAddr,
746 ) -> Option<(TransportId, SocketAddr)> {
747 self.transports
748 .iter()
749 .filter(|(id, handle)| {
750 handle.transport_type().name == "udp"
751 && handle.is_operational()
752 && !self.bootstrap_transports.contains(id)
753 })
754 .filter_map(|(id, handle)| {
755 let local_addr = handle.local_addr()?;
756 socket_addr_families_compatible(local_addr, remote_addr)
757 .then_some((*id, local_addr))
758 })
759 .min_by_key(|(id, _)| id.as_u32())
760 }
761
762 pub(super) fn transport_discovery_candidate(
763 &self,
764 discovered_transport_id: TransportId,
765 discovered_addr: TransportAddr,
766 ) -> Option<(TransportId, TransportAddr, &'static str)> {
767 let transport = self.transports.get(&discovered_transport_id)?;
768 let transport_name = transport.transport_type().name;
769
770 if transport_name != "udp" {
771 return Some((discovered_transport_id, discovered_addr, transport_name));
772 }
773
774 let Some(remote_socket_addr) = discovered_addr
775 .as_str()
776 .and_then(|addr| addr.parse::<SocketAddr>().ok())
777 else {
778 if self.bootstrap_transports.contains(&discovered_transport_id) {
779 debug!(
780 transport_id = %discovered_transport_id,
781 remote_addr = %discovered_addr,
782 "transport discovery: skip non-numeric UDP address from bootstrap transport"
783 );
784 return None;
785 }
786 return Some((discovered_transport_id, discovered_addr, transport_name));
787 };
788
789 let Some((transport_id, local_addr)) =
790 self.find_udp_transport_for_remote_addr(remote_socket_addr)
791 else {
792 debug!(
793 transport_id = %discovered_transport_id,
794 remote_addr = %discovered_addr,
795 "transport discovery: skip UDP peer with no compatible local socket"
796 );
797 return None;
798 };
799
800 if transport_id != discovered_transport_id {
801 debug!(
802 discovered_transport_id = %discovered_transport_id,
803 selected_transport_id = %transport_id,
804 local_addr = %local_addr,
805 remote_addr = %remote_socket_addr,
806 "transport discovery: selected compatible UDP transport"
807 );
808 }
809
810 Some((
811 transport_id,
812 TransportAddr::from_socket_addr(remote_socket_addr),
813 transport_name,
814 ))
815 }
816
817 fn peer_address_string_for_transport_candidate(
818 &self,
819 transport_id: TransportId,
820 transport_name: &str,
821 remote_addr: &TransportAddr,
822 ) -> String {
823 #[cfg(not(any(target_os = "linux", target_os = "macos")))]
824 let _ = (transport_id, transport_name);
825
826 #[cfg(any(target_os = "linux", target_os = "macos"))]
827 if transport_name == "ethernet"
828 && remote_addr.as_bytes().len() == 6
829 && let Some(interface) = self
830 .transports
831 .get(&transport_id)
832 .and_then(|transport| transport.interface_name())
833 {
834 let mut mac = [0u8; 6];
835 mac.copy_from_slice(remote_addr.as_bytes());
836 return format!(
837 "{interface}/{}",
838 crate::transport::ethernet::format_mac(&mac)
839 );
840 }
841
842 remote_addr.to_string()
843 }
844
845 fn resolve_peer_address_for_match(
846 &self,
847 candidate: &PeerAddress,
848 ) -> Option<(TransportId, TransportAddr)> {
849 if candidate.transport == "udp" && candidate.addr.eq_ignore_ascii_case("nat") {
850 return None;
851 }
852
853 if candidate.transport == "ethernet" {
854 return self.resolve_ethernet_addr(&candidate.addr).ok();
855 }
856
857 if candidate.transport == "ble" {
858 #[cfg(bluer_available)]
859 {
860 return self.resolve_ble_addr(&candidate.addr).ok();
861 }
862 #[cfg(not(bluer_available))]
863 {
864 return None;
865 }
866 }
867
868 let transport_id = if candidate.transport == "udp"
869 && let Ok(remote_socket_addr) = candidate.addr.parse::<SocketAddr>()
870 {
871 self.find_udp_transport_for_remote_addr(remote_socket_addr)
872 .map(|(id, _)| id)?
873 } else {
874 self.find_transport_for_type(&candidate.transport)?
875 };
876
877 Some((transport_id, TransportAddr::from_string(&candidate.addr)))
878 }
879
880 pub(super) async fn initiate_connection(
891 &mut self,
892 transport_id: TransportId,
893 remote_addr: TransportAddr,
894 peer_identity: PeerIdentity,
895 ) -> Result<(), NodeError> {
896 let peer_node_addr = *peer_identity.node_addr();
897
898 if self.is_connecting_to_peer_on_path(&peer_node_addr, transport_id, &remote_addr) {
899 debug!(
900 peer = %self.peer_display_name(&peer_node_addr),
901 transport_id = %transport_id,
902 remote_addr = %remote_addr,
903 "Connection already in progress for candidate path"
904 );
905 return Ok(());
906 }
907
908 if self.outbound_handshake_slots() == 0 {
909 return Err(NodeError::MaxConnectionsExceeded {
910 max: self.max_connections,
911 });
912 }
913
914 if self.outbound_link_slots() == 0 {
915 return Err(NodeError::MaxLinksExceeded {
916 max: self.max_links,
917 });
918 }
919
920 if !self.peers.contains_key(&peer_node_addr)
921 && self.max_peers > 0
922 && self.peers.len() >= self.max_peers
923 {
924 return Err(NodeError::MaxPeersExceeded {
925 max: self.max_peers,
926 });
927 }
928
929 self.authorize_peer(
930 &peer_identity,
931 PeerAclContext::OutboundConnect,
932 transport_id,
933 &remote_addr,
934 )?;
935
936 let is_connection_oriented = self
937 .transports
938 .get(&transport_id)
939 .map(|t| t.transport_type().connection_oriented)
940 .unwrap_or(false);
941
942 let link_id = self.allocate_link_id();
944
945 let link = if is_connection_oriented {
946 Link::new(
947 link_id,
948 transport_id,
949 remote_addr.clone(),
950 LinkDirection::Outbound,
951 Duration::from_millis(self.config.node.base_rtt_ms),
952 )
953 } else {
954 Link::connectionless(
955 link_id,
956 transport_id,
957 remote_addr.clone(),
958 LinkDirection::Outbound,
959 Duration::from_millis(self.config.node.base_rtt_ms),
960 )
961 };
962
963 self.links.insert(link_id, link);
964
965 self.addr_to_link
967 .insert((transport_id, remote_addr.clone()), link_id);
968
969 if is_connection_oriented {
970 if let Some(transport) = self.transports.get(&transport_id) {
972 match transport.connect(&remote_addr).await {
973 Ok(()) => {
974 debug!(
975 peer = %self.peer_display_name(&peer_node_addr),
976 transport_id = %transport_id,
977 remote_addr = %remote_addr,
978 link_id = %link_id,
979 "Transport connect initiated (non-blocking)"
980 );
981 self.pending_connects.push(super::PendingConnect {
982 link_id,
983 transport_id,
984 remote_addr,
985 peer_identity,
986 });
987 }
988 Err(e) => {
989 self.links.remove(&link_id);
991 self.addr_to_link.remove(&(transport_id, remote_addr));
992 return Err(NodeError::TransportError(e.to_string()));
993 }
994 }
995 }
996 Ok(())
997 } else {
998 self.start_handshake(link_id, transport_id, remote_addr, peer_identity)
1000 .await
1001 }
1002 }
1003
1004 pub(super) async fn start_handshake(
1009 &mut self,
1010 link_id: LinkId,
1011 transport_id: TransportId,
1012 remote_addr: TransportAddr,
1013 peer_identity: PeerIdentity,
1014 ) -> Result<(), NodeError> {
1015 let peer_node_addr = *peer_identity.node_addr();
1016
1017 let current_time_ms = Self::now_ms();
1019 let mut connection = PeerConnection::outbound(link_id, peer_identity, current_time_ms);
1020
1021 let our_index = match self.index_allocator.allocate() {
1023 Ok(idx) => idx,
1024 Err(e) => {
1025 self.links.remove(&link_id);
1027 self.addr_to_link.remove(&(transport_id, remote_addr));
1028 return Err(NodeError::IndexAllocationFailed(e.to_string()));
1029 }
1030 };
1031
1032 let our_keypair = self.identity.keypair();
1034 let noise_msg1 =
1035 match connection.start_handshake(our_keypair, self.startup_epoch, current_time_ms) {
1036 Ok(msg) => msg,
1037 Err(e) => {
1038 let _ = self.index_allocator.free(our_index);
1040 self.links.remove(&link_id);
1041 self.addr_to_link.remove(&(transport_id, remote_addr));
1042 return Err(NodeError::HandshakeFailed(e.to_string()));
1043 }
1044 };
1045
1046 connection.set_our_index(our_index);
1048 connection.set_transport_id(transport_id);
1049 connection.set_source_addr(remote_addr.clone());
1050
1051 let wire_msg1 = build_msg1(our_index, &noise_msg1);
1053
1054 debug!(
1055 peer = %self.peer_display_name(&peer_node_addr),
1056 transport_id = %transport_id,
1057 remote_addr = %remote_addr,
1058 link_id = %link_id,
1059 our_index = %our_index,
1060 "Connection initiated"
1061 );
1062
1063 let resend_interval = self.config.node.rate_limit.handshake_resend_interval_ms;
1065 connection.set_handshake_msg1(wire_msg1.clone(), current_time_ms + resend_interval);
1066
1067 self.pending_outbound
1069 .insert((transport_id, our_index.as_u32()), link_id);
1070 self.connections.insert(link_id, connection);
1071
1072 let send_result = match self.transports.get(&transport_id) {
1077 Some(transport) => Some(transport.send(&remote_addr, &wire_msg1).await),
1078 None => None,
1079 };
1080 match send_result {
1081 Some(send_result) => {
1082 self.note_local_send_outcome(&send_result);
1083 match send_result {
1084 Ok(bytes) => {
1085 debug!(
1086 link_id = %link_id,
1087 our_index = %our_index,
1088 bytes,
1089 "Sent Noise handshake message 1 (wire format)"
1090 );
1091 }
1092 Err(e) => {
1093 warn!(
1094 link_id = %link_id,
1095 transport_id = %transport_id,
1096 remote_addr = %remote_addr,
1097 our_index = %our_index,
1098 error = %e,
1099 "Failed to send handshake message"
1100 );
1101 self.pending_outbound
1102 .remove(&(transport_id, our_index.as_u32()));
1103 self.connections.remove(&link_id);
1104 self.links.remove(&link_id);
1105 self.addr_to_link
1106 .remove(&(transport_id, remote_addr.clone()));
1107 let _ = self.index_allocator.free(our_index);
1108 return Err(NodeError::TransportError(e.to_string()));
1109 }
1110 }
1111 }
1112 None => {
1113 self.pending_outbound
1114 .remove(&(transport_id, our_index.as_u32()));
1115 self.connections.remove(&link_id);
1116 self.links.remove(&link_id);
1117 self.addr_to_link
1118 .remove(&(transport_id, remote_addr.clone()));
1119 let _ = self.index_allocator.free(our_index);
1120 return Err(NodeError::TransportError(format!(
1121 "transport {transport_id} disappeared before first handshake send"
1122 )));
1123 }
1124 }
1125
1126 Ok(())
1127 }
1128
1129 pub(super) async fn poll_transport_discovery(&mut self) {
1135 let mut to_connect = Vec::new();
1137 let mut queued_per_peer: HashMap<NodeAddr, usize> = HashMap::new();
1138 let mut connect_budget = self.discovery_connect_budget();
1139 let mut skipped_budget = 0usize;
1140
1141 for transport in self.transports.values() {
1142 if !transport.is_operational() {
1143 continue;
1144 }
1145 if !transport.auto_connect() {
1146 let _ = transport.discover();
1148 continue;
1149 }
1150 let discovered = match transport.discover() {
1151 Ok(peers) => peers,
1152 Err(_) => continue,
1153 };
1154 for peer in discovered {
1155 let discovered_transport_id = peer.transport_id;
1156 let pubkey = match peer.pubkey_hint {
1157 Some(pk) => pk,
1158 None => continue,
1159 };
1160 let identity = PeerIdentity::from_pubkey(pubkey);
1161 let node_addr = *identity.node_addr();
1162
1163 if node_addr == *self.identity.node_addr() {
1165 continue;
1166 }
1167
1168 let Some((candidate_transport_id, remote_addr, transport_name)) =
1169 self.transport_discovery_candidate(discovered_transport_id, peer.addr)
1170 else {
1171 continue;
1172 };
1173
1174 if self.peers.contains_key(&node_addr) {
1175 let candidate = PeerAddress::new(
1176 transport_name,
1177 self.peer_address_string_for_transport_candidate(
1178 candidate_transport_id,
1179 transport_name,
1180 &remote_addr,
1181 ),
1182 );
1183 if self.active_peer_candidate_is_fresh_enough_to_skip(
1184 &node_addr,
1185 std::slice::from_ref(&candidate),
1186 ) {
1187 continue;
1188 }
1189 if self.is_connecting_to_peer_on_path(
1190 &node_addr,
1191 candidate_transport_id,
1192 &remote_addr,
1193 ) {
1194 continue;
1195 }
1196 let queued_for_peer = queued_per_peer.get(&node_addr).copied().unwrap_or(0);
1197 if connect_budget == 0
1198 || self
1199 .path_candidate_attempt_budget(&node_addr)
1200 .saturating_sub(queued_for_peer)
1201 == 0
1202 {
1203 skipped_budget = skipped_budget.saturating_add(1);
1204 continue;
1205 }
1206 to_connect.push((candidate_transport_id, remote_addr, identity, true));
1207 *queued_per_peer.entry(node_addr).or_default() += 1;
1208 connect_budget = connect_budget.saturating_sub(1);
1209 continue;
1210 }
1211
1212 if self.is_connecting_to_peer_on_path(
1213 &node_addr,
1214 candidate_transport_id,
1215 &remote_addr,
1216 ) {
1217 continue;
1218 }
1219
1220 let queued_for_peer = queued_per_peer.get(&node_addr).copied().unwrap_or(0);
1221 if connect_budget == 0
1222 || self
1223 .path_candidate_attempt_budget(&node_addr)
1224 .saturating_sub(queued_for_peer)
1225 == 0
1226 {
1227 skipped_budget = skipped_budget.saturating_add(1);
1228 continue;
1229 }
1230 to_connect.push((candidate_transport_id, remote_addr, identity, false));
1231 *queued_per_peer.entry(node_addr).or_default() += 1;
1232 connect_budget = connect_budget.saturating_sub(1);
1233 }
1234 }
1235
1236 if skipped_budget > 0 {
1237 debug!(
1238 skipped = skipped_budget,
1239 queued = to_connect.len(),
1240 "Transport discovery connect budget exhausted"
1241 );
1242 }
1243
1244 for (transport_id, remote_addr, identity, active_refresh) in to_connect {
1245 info!(
1246 peer = %self.peer_display_name(identity.node_addr()),
1247 transport_id = %transport_id,
1248 remote_addr = %remote_addr,
1249 active_refresh,
1250 "Auto-connecting to discovered peer"
1251 );
1252 if let Err(e) = self
1253 .initiate_connection(transport_id, remote_addr, identity)
1254 .await
1255 {
1256 warn!(error = %e, "Failed to auto-connect to discovered peer");
1257 }
1258 }
1259 }
1260
1261 pub(super) async fn poll_nostr_discovery(&mut self) {
1262 let Some(bootstrap) = self.nostr_discovery.clone() else {
1263 return;
1264 };
1265
1266 bootstrap.set_outbound_admission(self.outbound_admission_check());
1267
1268 if let Err(err) = self.refresh_overlay_advert(&bootstrap).await {
1269 debug!(error = %err, "Failed to refresh local Nostr overlay advert");
1270 }
1271
1272 for event in bootstrap.drain_events().await {
1273 match event {
1274 BootstrapEvent::Established { traversal } => {
1275 if !self.outbound_admission_check() {
1276 debug!(
1277 peer_npub = %traversal.peer_npub,
1278 peers = self.peers.len(),
1279 max_peers = self.max_peers,
1280 "Dropping established NAT traversal: at capacity"
1281 );
1282 continue;
1283 }
1284 let peer_npub = traversal.peer_npub.clone();
1285 match self.adopt_established_traversal(traversal).await {
1286 Ok(_) => {
1287 info!(peer_npub = %peer_npub, "Adopted NAT traversal socket");
1288 }
1289 Err(err) => {
1290 warn!(peer_npub = %peer_npub, error = %err, "Failed to adopt NAT traversal");
1291 if let Ok(peer_identity) = PeerIdentity::from_npub(&peer_npub) {
1292 self.schedule_retry(*peer_identity.node_addr(), Self::now_ms());
1293 }
1294 }
1295 }
1296 }
1297 BootstrapEvent::Failed {
1298 peer_config,
1299 reason,
1300 } => {
1301 let peer_identity = match PeerIdentity::from_npub(&peer_config.npub) {
1302 Ok(identity) => identity,
1303 Err(_) => continue,
1304 };
1305 let node_addr = *peer_identity.node_addr();
1306 if self.peers.contains_key(&node_addr) {
1307 debug!(
1308 npub = %peer_config.npub,
1309 error = %reason,
1310 "Ignoring failed NAT traversal for already-connected peer"
1311 );
1312 continue;
1313 }
1314 if self.is_connecting_to_peer(&node_addr) {
1315 debug!(
1316 npub = %peer_config.npub,
1317 error = %reason,
1318 "Ignoring failed NAT traversal while peer handshake is already in progress"
1319 );
1320 continue;
1321 }
1322
1323 let now_ms = Self::now_ms();
1324 let decision = bootstrap.record_traversal_failure(&peer_config.npub, now_ms);
1325 if decision.should_warn {
1326 warn!(
1327 npub = %peer_config.npub,
1328 error = %reason,
1329 consecutive_failures = decision.consecutive_failures,
1330 cooldown_secs = decision
1331 .cooldown_until_ms
1332 .map(|t| t.saturating_sub(now_ms) / 1000),
1333 "NAT traversal failed"
1334 );
1335 } else {
1336 debug!(
1337 npub = %peer_config.npub,
1338 error = %reason,
1339 consecutive_failures = decision.consecutive_failures,
1340 "NAT traversal failed (suppressed by warn-rate-limit)"
1341 );
1342 }
1343
1344 if decision.crossed_threshold {
1348 bootstrap
1349 .request_advert_stale_check(peer_config.npub.clone())
1350 .await;
1351 }
1352
1353 if self
1354 .try_peer_addresses(&peer_config, peer_identity, false)
1355 .await
1356 .is_ok()
1357 {
1358 continue;
1359 }
1360
1361 self.schedule_retry(node_addr, now_ms);
1362 if let Some(cooldown_until_ms) = decision.cooldown_until_ms
1363 && let Some(state) = self.retry_pending.get_mut(&node_addr)
1364 {
1365 state.retry_after_ms = state.retry_after_ms.max(cooldown_until_ms);
1369 }
1370 }
1371 }
1372 }
1373
1374 self.maybe_run_startup_open_discovery_sweep(&bootstrap)
1375 .await;
1376 self.queue_open_discovery_retries(&bootstrap).await;
1377 }
1378
1379 pub(super) fn lan_discovery_scope(&self) -> Option<String> {
1385 if let Some(scope) = self.config.node.discovery.lan.scope.as_deref() {
1386 let scope = scope.trim();
1387 if !scope.is_empty() {
1388 return Some(scope.to_string());
1389 }
1390 }
1391
1392 let app = self.config.node.discovery.nostr.app.trim();
1393 if app.is_empty() {
1394 return None;
1395 }
1396 if let Some(rest) = app.strip_prefix("fips-overlay-v1:") {
1397 let scope = rest.trim();
1398 if scope.is_empty() {
1399 None
1400 } else {
1401 Some(scope.to_string())
1402 }
1403 } else {
1404 Some(app.to_string())
1405 }
1406 }
1407
1408 pub(super) fn start_local_instance_discovery(&mut self) {
1409 if !self.config.node.discovery.local.enabled {
1410 return;
1411 }
1412 let Some(scope) = self.lan_discovery_scope() else {
1413 debug!("local instance discovery not started: no discovery scope");
1414 return;
1415 };
1416 let now_ms = Self::now_ms();
1417 match crate::discovery::local::LocalInstanceRegistry::new(
1418 self.identity.npub(),
1419 scope,
1420 &self.config.node.discovery.local,
1421 now_ms,
1422 ) {
1423 Ok(registry) => {
1424 self.local_instance_registry = Some(registry);
1425 self.local_instance_started_at_ms = Some(now_ms);
1426 self.last_local_instance_publish_ms = None;
1427 self.last_local_instance_scan_ms = None;
1428 self.publish_local_instance_record(now_ms);
1429 info!("Same-host FIPS instance discovery enabled");
1430 }
1431 Err(crate::discovery::local::LocalInstanceRegistryError::Disabled) => {
1432 debug!("same-host FIPS instance discovery disabled");
1433 }
1434 Err(err) => {
1435 debug!(error = %err, "same-host FIPS instance discovery not started");
1436 }
1437 }
1438 }
1439
1440 fn local_instance_contacts(&self) -> Vec<crate::discovery::local::LocalInstanceContact> {
1441 let mut contacts = Vec::new();
1442 for handle in self.transports.values() {
1443 if !handle.is_operational() || !handle.accept_connections() {
1444 continue;
1445 }
1446 let transport = handle.transport_type().name;
1447 if transport != "udp" && transport != "tcp" {
1448 continue;
1449 }
1450 let Some(local_addr) = handle.local_addr() else {
1451 continue;
1452 };
1453 let Some(contact) =
1454 crate::discovery::local::contact_for_transport_addr(transport, local_addr)
1455 else {
1456 continue;
1457 };
1458 if contacts
1459 .iter()
1460 .any(|existing: &crate::discovery::local::LocalInstanceContact| {
1461 existing.transport == contact.transport && existing.addr == contact.addr
1462 })
1463 {
1464 continue;
1465 }
1466 contacts.push(contact);
1467 }
1468 contacts
1469 }
1470
1471 fn publish_local_instance_record(&mut self, now_ms: u64) {
1472 let Some(registry) = self.local_instance_registry.clone() else {
1473 return;
1474 };
1475 let contacts = self.local_instance_contacts();
1476 match registry.publish(contacts, now_ms) {
1477 Ok(()) => {
1478 self.last_local_instance_publish_ms = Some(now_ms);
1479 }
1480 Err(err) => {
1481 debug!(error = %err, "failed to publish same-host FIPS instance record");
1482 }
1483 }
1484 }
1485
1486 fn maybe_publish_local_instance_record(&mut self, now_ms: u64) {
1487 if self.local_instance_registry.is_none() {
1488 return;
1489 }
1490 let interval_ms = self.config.node.discovery.local.publish_interval_ms();
1491 let due = self
1492 .last_local_instance_publish_ms
1493 .map(|last| now_ms.saturating_sub(last) >= interval_ms)
1494 .unwrap_or(true);
1495 if due {
1496 self.publish_local_instance_record(now_ms);
1497 }
1498 }
1499
1500 fn local_instance_scan_due(&self, now_ms: u64) -> bool {
1501 if self.local_instance_registry.is_none() {
1502 return false;
1503 }
1504 let cfg = &self.config.node.discovery.local;
1505 let interval_ms = if self
1506 .local_instance_started_at_ms
1507 .map(|started| now_ms.saturating_sub(started) <= cfg.startup_scan_duration_ms())
1508 .unwrap_or(false)
1509 {
1510 cfg.startup_scan_interval_ms()
1511 } else {
1512 cfg.scan_interval_ms()
1513 };
1514 self.last_local_instance_scan_ms
1515 .map(|last| now_ms.saturating_sub(last) >= interval_ms)
1516 .unwrap_or(true)
1517 }
1518
1519 fn local_instance_peer_allowed(&self, identity: &PeerIdentity) -> bool {
1520 if self.config.peers().iter().any(|peer| {
1521 PeerIdentity::from_npub(&peer.npub)
1522 .map(|configured| configured.node_addr() == identity.node_addr())
1523 .unwrap_or(false)
1524 }) {
1525 return true;
1526 }
1527 self.config.node.discovery.nostr.policy == NostrDiscoveryPolicy::Open
1528 }
1529
1530 fn local_instance_peer_addresses(
1531 &self,
1532 record: &crate::discovery::local::LocalInstanceRecord,
1533 ) -> Vec<PeerAddress> {
1534 let mut addresses = Vec::new();
1535 for contact in &record.contacts {
1536 if contact.transport != "udp" && contact.transport != "tcp" {
1537 continue;
1538 }
1539 let Ok(socket_addr) = contact.addr.parse::<SocketAddr>() else {
1540 debug!(
1541 npub = %record.npub,
1542 transport = %contact.transport,
1543 addr = %contact.addr,
1544 "local instance discovery: skip non-socket contact"
1545 );
1546 continue;
1547 };
1548 if !socket_addr.ip().is_loopback() {
1549 debug!(
1550 npub = %record.npub,
1551 addr = %contact.addr,
1552 "local instance discovery: skip non-loopback contact"
1553 );
1554 continue;
1555 }
1556 let address =
1557 PeerAddress::with_priority(contact.transport.clone(), contact.addr.clone(), 10)
1558 .with_seen_at_ms(record.updated_at_ms);
1559 if addresses.iter().any(|existing: &PeerAddress| {
1560 existing.transport == address.transport && existing.addr == address.addr
1561 }) {
1562 continue;
1563 }
1564 addresses.push(address);
1565 }
1566 addresses
1567 }
1568
1569 pub(super) async fn poll_local_instance_discovery(&mut self) {
1573 let Some(registry) = self.local_instance_registry.clone() else {
1574 return;
1575 };
1576 let now_ms = Self::now_ms();
1577 self.maybe_publish_local_instance_record(now_ms);
1578 if !self.local_instance_scan_due(now_ms) {
1579 return;
1580 }
1581 self.last_local_instance_scan_ms = Some(now_ms);
1582
1583 let records = match registry.scan(now_ms, self.config.node.discovery.local.stale_after_ms())
1584 {
1585 Ok(records) => records,
1586 Err(err) => {
1587 debug!(error = %err, "same-host FIPS instance scan failed");
1588 return;
1589 }
1590 };
1591 if records.is_empty() {
1592 return;
1593 }
1594
1595 let mut connect_budget = self.discovery_connect_budget();
1596 let mut skipped_budget = 0usize;
1597 for record in records {
1598 let identity = match PeerIdentity::from_npub(&record.npub) {
1599 Ok(identity) => identity,
1600 Err(err) => {
1601 debug!(npub = %record.npub, error = %err, "local instance discovery: skip bad npub");
1602 continue;
1603 }
1604 };
1605 let peer_node_addr = *identity.node_addr();
1606 if peer_node_addr == *self.identity.node_addr() {
1607 continue;
1608 }
1609 if !self.local_instance_peer_allowed(&identity) {
1610 debug!(
1611 npub = %identity.short_npub(),
1612 "local instance discovery: skip unconfigured peer"
1613 );
1614 continue;
1615 }
1616
1617 let addresses = self.local_instance_peer_addresses(&record);
1618 if addresses.is_empty() {
1619 continue;
1620 }
1621
1622 if self.peers.contains_key(&peer_node_addr)
1623 && self.active_peer_candidate_is_fresh_enough_to_skip(&peer_node_addr, &addresses)
1624 {
1625 continue;
1626 }
1627
1628 for address in addresses {
1629 let Some((transport_id, remote_addr)) =
1630 self.resolve_peer_address_for_match(&address)
1631 else {
1632 continue;
1633 };
1634 if self.is_connecting_to_peer_on_path(&peer_node_addr, transport_id, &remote_addr) {
1635 continue;
1636 }
1637 if connect_budget == 0 || self.path_candidate_attempt_budget(&peer_node_addr) == 0 {
1638 skipped_budget = skipped_budget.saturating_add(1);
1639 continue;
1640 }
1641 info!(
1642 npub = %identity.short_npub(),
1643 transport = %address.transport,
1644 addr = %address.addr,
1645 "same-host FIPS instance discovery: initiating handshake"
1646 );
1647 if let Err(err) = self
1648 .initiate_connection(transport_id, remote_addr, identity)
1649 .await
1650 {
1651 debug!(
1652 npub = %record.npub,
1653 error = %err,
1654 "same-host FIPS instance discovery: failed to initiate connection"
1655 );
1656 }
1657 connect_budget = connect_budget.saturating_sub(1);
1658 }
1659 }
1660 if skipped_budget > 0 {
1661 debug!(
1662 skipped = skipped_budget,
1663 "same-host FIPS instance discovery connect budget exhausted"
1664 );
1665 }
1666 }
1667
1668 pub(super) async fn poll_lan_discovery(&mut self) {
1675 let Some(runtime) = self.lan_discovery.clone() else {
1676 return;
1677 };
1678 let events = runtime.drain_events().await;
1679 if events.is_empty() {
1680 return;
1681 }
1682 let mut connect_budget = self.discovery_connect_budget();
1683 let mut skipped_budget = 0usize;
1684 for event in events {
1685 let crate::discovery::lan::LanEvent::Discovered(peer) = event;
1686 let Some((transport_id, local_addr)) =
1687 self.find_udp_transport_for_remote_addr(peer.addr)
1688 else {
1689 debug!(
1690 addr = %peer.addr,
1691 "lan: skip discovered peer with no compatible UDP transport"
1692 );
1693 continue;
1694 };
1695 let identity = match crate::PeerIdentity::from_npub(&peer.npub) {
1696 Ok(id) => id,
1697 Err(err) => {
1698 debug!(npub = %peer.npub, error = %err, "lan: skip bad npub");
1699 continue;
1700 }
1701 };
1702 let peer_node_addr = *identity.node_addr();
1703 let remote_addr = crate::transport::TransportAddr::from_string(&peer.addr.to_string());
1704 if self.peers.contains_key(&peer_node_addr) {
1705 let candidate = PeerAddress::new("udp", peer.addr.to_string());
1706 if self.active_peer_candidate_is_fresh_enough_to_skip(
1707 &peer_node_addr,
1708 std::slice::from_ref(&candidate),
1709 ) {
1710 continue;
1711 }
1712 if self.is_connecting_to_peer_on_path(&peer_node_addr, transport_id, &remote_addr) {
1713 continue;
1714 }
1715 if connect_budget == 0 || self.path_candidate_attempt_budget(&peer_node_addr) == 0 {
1716 skipped_budget = skipped_budget.saturating_add(1);
1717 continue;
1718 }
1719 info!(
1720 npub = %identity.short_npub(),
1721 addr = %peer.addr,
1722 local_addr = %local_addr,
1723 "lan: initiating alternate-path handshake to active peer"
1724 );
1725 if let Err(err) = self
1726 .initiate_connection(transport_id, remote_addr, identity)
1727 .await
1728 {
1729 debug!(
1730 npub = %peer.npub,
1731 error = %err,
1732 "lan: failed to initiate active peer alternate-path handshake"
1733 );
1734 }
1735 connect_budget = connect_budget.saturating_sub(1);
1736 continue;
1737 }
1738 if self.is_connecting_to_peer_on_path(&peer_node_addr, transport_id, &remote_addr) {
1739 continue;
1740 }
1741 if connect_budget == 0 || self.path_candidate_attempt_budget(&peer_node_addr) == 0 {
1742 skipped_budget = skipped_budget.saturating_add(1);
1743 continue;
1744 }
1745 info!(
1746 npub = %identity.short_npub(),
1747 addr = %peer.addr,
1748 local_addr = %local_addr,
1749 "lan: initiating handshake to discovered peer"
1750 );
1751 if let Err(err) = self
1752 .initiate_connection(transport_id, remote_addr, identity)
1753 .await
1754 {
1755 debug!(
1756 npub = %peer.npub,
1757 error = %err,
1758 "lan: failed to initiate connection to discovered peer"
1759 );
1760 }
1761 connect_budget = connect_budget.saturating_sub(1);
1762 }
1763 if skipped_budget > 0 {
1764 debug!(
1765 skipped = skipped_budget,
1766 "lan: discovery connect budget exhausted"
1767 );
1768 }
1769 }
1770
1771 pub(super) async fn poll_pending_connects(&mut self) {
1778 if self.pending_connects.is_empty() {
1779 return;
1780 }
1781
1782 let mut completed = Vec::new();
1783
1784 for (i, pending) in self.pending_connects.iter().enumerate() {
1785 let state = if let Some(transport) = self.transports.get(&pending.transport_id) {
1786 transport.connection_state(&pending.remote_addr)
1787 } else {
1788 crate::transport::ConnectionState::Failed("transport removed".into())
1789 };
1790
1791 match state {
1792 crate::transport::ConnectionState::Connected => {
1793 completed.push((i, true, None));
1794 }
1795 crate::transport::ConnectionState::Failed(reason) => {
1796 completed.push((i, false, Some(reason)));
1797 }
1798 crate::transport::ConnectionState::Connecting => {
1799 }
1801 crate::transport::ConnectionState::None => {
1802 completed.push((i, false, Some("no connection attempt found".into())));
1804 }
1805 }
1806 }
1807
1808 for (i, success, reason) in completed.into_iter().rev() {
1810 let pending = self.pending_connects.remove(i);
1811
1812 if success {
1813 if let Some(link) = self.links.get_mut(&pending.link_id) {
1815 link.set_connected();
1816 }
1817
1818 debug!(
1819 peer = %self.peer_display_name(pending.peer_identity.node_addr()),
1820 transport_id = %pending.transport_id,
1821 remote_addr = %pending.remote_addr,
1822 link_id = %pending.link_id,
1823 "Transport connected, starting handshake"
1824 );
1825
1826 if let Err(e) = self
1828 .start_handshake(
1829 pending.link_id,
1830 pending.transport_id,
1831 pending.remote_addr.clone(),
1832 pending.peer_identity,
1833 )
1834 .await
1835 {
1836 warn!(
1837 link_id = %pending.link_id,
1838 error = %e,
1839 "Failed to start handshake after transport connect"
1840 );
1841 self.remove_link(&pending.link_id);
1843 }
1844 } else {
1845 let reason = reason.unwrap_or_default();
1846 warn!(
1847 peer = %self.peer_display_name(pending.peer_identity.node_addr()),
1848 transport_id = %pending.transport_id,
1849 remote_addr = %pending.remote_addr,
1850 link_id = %pending.link_id,
1851 reason = %reason,
1852 "Transport connect failed"
1853 );
1854
1855 self.remove_link(&pending.link_id);
1857 self.links.remove(&pending.link_id);
1858 self.schedule_retry(*pending.peer_identity.node_addr(), Self::now_ms());
1859 }
1860 }
1861 }
1862
1863 pub async fn start(&mut self) -> Result<(), NodeError> {
1870 node_start_debug_log("Node::start begin");
1871 if !self.state.can_start() {
1872 return Err(NodeError::AlreadyStarted);
1873 }
1874 self.state = NodeState::Starting;
1875 node_start_debug_log("Node::start state set to starting");
1876
1877 let packet_buffer_size = self.config.node.buffers.packet_channel;
1879 let (packet_tx, packet_rx) = packet_channel(packet_buffer_size);
1880 self.packet_tx = Some(packet_tx.clone());
1881 self.packet_rx = Some(packet_rx);
1882 node_start_debug_log("Node::start packet channel created");
1883
1884 node_start_debug_log("Node::start create transports begin");
1886 let transport_handles = self.create_transports(&packet_tx).await;
1887 node_start_debug_log(format!(
1888 "Node::start create transports complete count={}",
1889 transport_handles.len()
1890 ));
1891
1892 for mut handle in transport_handles {
1893 let transport_id = handle.transport_id();
1894 let transport_type = handle.transport_type().name;
1895 let name = handle.name().map(|s| s.to_string());
1896
1897 node_start_debug_log(format!(
1898 "Node::start transport start begin id={} type={} name={:?}",
1899 transport_id, transport_type, name
1900 ));
1901 match handle.start().await {
1902 Ok(()) => {
1903 node_start_debug_log(format!(
1904 "Node::start transport start ok id={} type={}",
1905 transport_id, transport_type
1906 ));
1907 self.transports.insert(transport_id, handle);
1908 }
1909 Err(e) => {
1910 node_start_debug_log(format!(
1911 "Node::start transport start error id={} type={} error={}",
1912 transport_id, transport_type, e
1913 ));
1914 if let Some(ref n) = name {
1915 warn!(transport_type, name = %n, error = %e, "Transport failed to start");
1916 } else {
1917 warn!(transport_type, error = %e, "Transport failed to start");
1918 }
1919 }
1920 }
1921 }
1922
1923 if !self.transports.is_empty() {
1924 info!(count = self.transports.len(), "Transports initialized");
1925 }
1926
1927 #[cfg(unix)]
1943 {
1944 if self.config.node.worker_pools_enabled {
1945 node_start_debug_log("Node::start worker pools begin");
1946 let cpu_default = std::thread::available_parallelism()
1947 .map(|n| n.get())
1948 .unwrap_or(1)
1949 .max(1);
1950 let encrypt_worker_count: usize = std::env::var("FIPS_ENCRYPT_WORKERS")
1951 .ok()
1952 .and_then(|s| s.parse().ok())
1953 .unwrap_or(cpu_default)
1954 .max(1);
1955 self.encrypt_workers = Some(super::encrypt_worker::EncryptWorkerPool::spawn(
1956 encrypt_worker_count,
1957 ));
1958 info!(
1959 workers = encrypt_worker_count,
1960 "Spawned FMP-encrypt worker pool"
1961 );
1962
1963 let decrypt_worker_count: usize = std::env::var("FIPS_DECRYPT_WORKERS")
1972 .ok()
1973 .and_then(|s| s.parse().ok())
1974 .unwrap_or(cpu_default);
1975 if decrypt_worker_count == 0 {
1976 info!("FIPS_DECRYPT_WORKERS=0 → in-line decrypt in rx_loop (no worker pool)");
1977 } else {
1978 self.decrypt_workers = Some(super::decrypt_worker::DecryptWorkerPool::spawn(
1979 decrypt_worker_count,
1980 ));
1981 info!(
1982 workers = decrypt_worker_count,
1983 "Spawned FMP+FSP-decrypt worker pool"
1984 );
1985 }
1986 node_start_debug_log("Node::start worker pools complete");
1987 } else {
1988 node_start_debug_log("Node::start worker pools disabled");
1989 info!("FIPS worker pools disabled; using in-line crypto/send path");
1990 }
1991 }
1992
1993 if self.config.node.discovery.nostr.enabled {
1994 node_start_debug_log("Node::start nostr discovery start begin");
1995 match NostrDiscovery::start(&self.identity, self.config.node.discovery.nostr.clone())
1996 .await
1997 {
1998 Ok(runtime) => {
1999 node_start_debug_log("Node::start nostr discovery runtime created");
2000 if let Err(err) = self.refresh_overlay_advert(&runtime).await {
2001 warn!(error = %err, "Failed to publish initial Nostr overlay advert");
2002 }
2003 node_start_debug_log("Node::start nostr overlay advert refreshed");
2004 self.nostr_discovery = Some(runtime);
2005 self.nostr_discovery_started_at_ms = Some(Self::now_ms());
2006 info!("Nostr overlay discovery enabled");
2007 }
2008 Err(err) => {
2009 node_start_debug_log(format!(
2010 "Node::start nostr discovery start error error={}",
2011 err
2012 ));
2013 warn!(error = %err, "Failed to start Nostr overlay discovery");
2014 }
2015 }
2016 }
2017
2018 if self.config.node.discovery.lan.enabled {
2022 node_start_debug_log("Node::start lan discovery start begin");
2023 let advertised_udp_port = self
2024 .transports
2025 .values()
2026 .filter(|h| h.is_operational())
2027 .filter(|h| h.transport_type().name == "udp")
2028 .find_map(|h| h.local_addr().map(|addr| addr.port()))
2029 .unwrap_or(0);
2030 let scope = self.lan_discovery_scope();
2031 match crate::discovery::lan::LanDiscovery::start(
2032 &self.identity,
2033 scope,
2034 advertised_udp_port,
2035 self.config.node.discovery.lan.clone(),
2036 )
2037 .await
2038 {
2039 Ok(runtime) => {
2040 node_start_debug_log("Node::start lan discovery start ok");
2041 self.lan_discovery = Some(runtime);
2042 info!("LAN mDNS discovery enabled");
2043 }
2044 Err(err) => {
2045 node_start_debug_log(format!(
2046 "Node::start lan discovery start error error={}",
2047 err
2048 ));
2049 debug!(error = %err, "LAN mDNS discovery not started");
2050 }
2051 }
2052 }
2053
2054 self.start_local_instance_discovery();
2055 self.poll_local_instance_discovery().await;
2056
2057 node_start_debug_log("Node::start initiate peer connections begin");
2060 self.initiate_peer_connections().await;
2061 node_start_debug_log("Node::start initiate peer connections complete");
2062
2063 if self.config.tun.enabled {
2065 node_start_debug_log("Node::start tun init begin");
2066 let address = *self.identity.address();
2067 match TunDevice::create(&self.config.tun, address).await {
2068 Ok(device) => {
2069 let mtu = device.mtu();
2070 let name = device.name().to_string();
2071 let our_addr = *device.address();
2072
2073 info!("TUN device active:");
2074 info!(" name: {}", name);
2075 info!(" address: {}", device.address());
2076 info!(" mtu: {}", mtu);
2077
2078 let effective_mtu = self.effective_ipv6_mtu();
2080 let max_mss = effective_mtu.saturating_sub(40).saturating_sub(20); info!("effective MTU: {} bytes", effective_mtu);
2083 debug!(" max TCP MSS: {} bytes", max_mss);
2084
2085 #[cfg(target_os = "macos")]
2089 let (shutdown_read_fd, shutdown_write_fd) = {
2090 let mut fds = [0i32; 2];
2091 if unsafe { libc::pipe(fds.as_mut_ptr()) } < 0 {
2092 return Err(NodeError::Tun(crate::upper::tun::TunError::Configure(
2093 "failed to create shutdown pipe".into(),
2094 )));
2095 }
2096 (fds[0], fds[1])
2097 };
2098
2099 let (writer, tun_tx) =
2103 device.create_writer(max_mss, self.path_mtu_lookup.clone())?;
2104
2105 let writer_handle = thread::spawn(move || {
2107 writer.run();
2108 });
2109
2110 let reader_tun_tx = tun_tx.clone();
2112
2113 let tun_channel_size = self.config.node.buffers.tun_channel;
2115 let (outbound_tx, outbound_rx) = tokio::sync::mpsc::channel(tun_channel_size);
2116
2117 let transport_mtu = self.transport_mtu();
2119 let path_mtu_lookup = self.path_mtu_lookup.clone();
2120 #[cfg(target_os = "macos")]
2121 let reader_handle = thread::spawn(move || {
2122 run_tun_reader(
2123 device,
2124 mtu,
2125 our_addr,
2126 reader_tun_tx,
2127 outbound_tx,
2128 transport_mtu,
2129 path_mtu_lookup,
2130 shutdown_read_fd,
2131 );
2132 });
2133 #[cfg(not(target_os = "macos"))]
2134 let reader_handle = thread::spawn(move || {
2135 run_tun_reader(
2136 device,
2137 mtu,
2138 our_addr,
2139 reader_tun_tx,
2140 outbound_tx,
2141 transport_mtu,
2142 path_mtu_lookup,
2143 );
2144 });
2145
2146 self.tun_state = TunState::Active;
2147 self.tun_name = Some(name);
2148 self.tun_tx = Some(tun_tx);
2149 self.tun_outbound_rx = Some(outbound_rx);
2150 self.tun_reader_handle = Some(reader_handle);
2151 self.tun_writer_handle = Some(writer_handle);
2152 #[cfg(target_os = "macos")]
2153 {
2154 self.tun_shutdown_fd = Some(shutdown_write_fd);
2155 }
2156 }
2157 Err(e) => {
2158 self.tun_state = TunState::Failed;
2159 warn!(error = %e, "Failed to initialize TUN, continuing without it");
2160 }
2161 }
2162 node_start_debug_log("Node::start tun init complete");
2163 }
2164
2165 if self.config.dns.enabled {
2182 node_start_debug_log("Node::start dns init begin");
2183 let addr_str = self.config.dns.bind_addr();
2184 match addr_str.parse::<std::net::IpAddr>() {
2185 Ok(ip) => {
2186 let bind = std::net::SocketAddr::new(ip, self.config.dns.port());
2187 match Self::bind_dns_socket(bind) {
2188 Ok(socket) => {
2189 let dns_channel_size = self.config.node.buffers.dns_channel;
2190 let (identity_tx, identity_rx) =
2191 tokio::sync::mpsc::channel(dns_channel_size);
2192 let dns_ttl = self.config.dns.ttl();
2193 let base_hosts = crate::upper::hosts::HostMap::from_peer_configs(
2194 self.config.peers(),
2195 );
2196 let reloader = if self.config.node.system_files_enabled {
2197 let hosts_path = std::path::PathBuf::from(
2198 crate::upper::hosts::DEFAULT_HOSTS_PATH,
2199 );
2200 crate::upper::hosts::HostMapReloader::new(base_hosts, hosts_path)
2201 } else {
2202 crate::upper::hosts::HostMapReloader::memory_only(base_hosts)
2203 };
2204 let mesh_ifindex = Self::lookup_mesh_ifindex(self.config.tun.name());
2212 info!(
2213 bind = %bind,
2214 hosts = reloader.hosts().len(),
2215 mesh_ifindex = ?mesh_ifindex,
2216 "DNS responder started for .fips domain (auto-reload enabled)"
2217 );
2218 let handle = tokio::spawn(crate::upper::dns::run_dns_responder(
2219 socket,
2220 identity_tx,
2221 dns_ttl,
2222 reloader,
2223 mesh_ifindex,
2224 ));
2225 self.dns_identity_rx = Some(identity_rx);
2226 self.dns_task = Some(handle);
2227 }
2228 Err(e) => {
2229 warn!(bind = %bind, error = %e, "Failed to start DNS responder");
2230 }
2231 }
2232 }
2233 Err(e) => {
2234 warn!(addr = %addr_str, error = %e, "Invalid dns.bind_addr; DNS responder not started");
2235 }
2236 }
2237 node_start_debug_log("Node::start dns init complete");
2238 }
2239
2240 self.state = NodeState::Running;
2241 node_start_debug_log("Node::start running");
2242 info!("Node started:");
2243 info!(" state: {}", self.state);
2244 info!(" transports: {}", self.transports.len());
2245 info!(" connections: {}", self.connections.len());
2246 Ok(())
2247 }
2248
2249 fn bind_dns_socket(
2262 addr: std::net::SocketAddr,
2263 ) -> Result<tokio::net::UdpSocket, std::io::Error> {
2264 use socket2::{Domain, Protocol, Socket, Type};
2265 let domain = if addr.is_ipv4() {
2266 Domain::IPV4
2267 } else {
2268 Domain::IPV6
2269 };
2270 let sock = Socket::new(domain, Type::DGRAM, Some(Protocol::UDP))?;
2271 if addr.is_ipv6() {
2272 sock.set_only_v6(false)?;
2273 #[cfg(unix)]
2274 Self::set_recv_pktinfo_v6(&sock)?;
2275 }
2276 sock.set_nonblocking(true)?;
2277 sock.bind(&addr.into())?;
2278 tokio::net::UdpSocket::from_std(sock.into())
2279 }
2280
2281 #[cfg(unix)]
2287 fn set_recv_pktinfo_v6(sock: &socket2::Socket) -> Result<(), std::io::Error> {
2288 use std::os::fd::AsRawFd;
2289 let enable: libc::c_int = 1;
2290 let ret = unsafe {
2291 libc::setsockopt(
2292 sock.as_raw_fd(),
2293 libc::IPPROTO_IPV6,
2294 libc::IPV6_RECVPKTINFO,
2295 &enable as *const _ as *const libc::c_void,
2296 std::mem::size_of::<libc::c_int>() as libc::socklen_t,
2297 )
2298 };
2299 if ret < 0 {
2300 return Err(std::io::Error::last_os_error());
2301 }
2302 Ok(())
2303 }
2304
2305 fn lookup_mesh_ifindex(name: &str) -> Option<u32> {
2312 #[cfg(unix)]
2313 {
2314 let c_name = std::ffi::CString::new(name).ok()?;
2315 let idx = unsafe { libc::if_nametoindex(c_name.as_ptr()) };
2316 if idx == 0 { None } else { Some(idx) }
2317 }
2318 #[cfg(not(unix))]
2319 {
2320 let _ = name;
2321 None
2322 }
2323 }
2324
2325 pub async fn stop(&mut self) -> Result<(), NodeError> {
2330 if !self.state.can_stop() {
2331 return Err(NodeError::NotStarted);
2332 }
2333 self.state = NodeState::Stopping;
2334 info!(state = %self.state, "Node stopping");
2335
2336 if let Some(handle) = self.dns_task.take() {
2338 handle.abort();
2339 debug!("DNS responder stopped");
2340 }
2341
2342 self.send_disconnect_to_all_peers(DisconnectReason::Shutdown)
2344 .await;
2345
2346 if let Some(bootstrap) = self.nostr_discovery.take()
2348 && let Err(e) = bootstrap.shutdown().await
2349 {
2350 warn!(error = %e, "Failed to shutdown Nostr overlay discovery");
2351 }
2352
2353 if let Some(lan) = self.lan_discovery.take() {
2357 lan.shutdown().await;
2358 }
2359
2360 if let Some(registry) = self.local_instance_registry.take()
2361 && let Err(err) = registry.remove()
2362 {
2363 debug!(error = %err, "failed to remove same-host FIPS instance record");
2364 }
2365
2366 let transport_ids: Vec<_> = self.transports.keys().cloned().collect();
2368 for transport_id in transport_ids {
2369 if let Some(mut handle) = self.transports.remove(&transport_id) {
2370 let transport_type = handle.transport_type().name;
2371 match handle.stop().await {
2372 Ok(()) => {
2373 info!(transport_id = %transport_id, transport_type, "Transport stopped");
2374 }
2375 Err(e) => {
2376 warn!(
2377 transport_id = %transport_id,
2378 transport_type,
2379 error = %e,
2380 "Transport stop failed"
2381 );
2382 }
2383 }
2384 }
2385 }
2386
2387 self.packet_tx.take();
2389 self.packet_rx.take();
2390
2391 if let Some(name) = self.tun_name.take() {
2393 info!(name = %name, "Shutting down TUN interface");
2394
2395 self.tun_tx.take();
2397
2398 if let Err(e) = shutdown_tun_interface(&name).await {
2400 warn!(name = %name, error = %e, "Failed to shutdown TUN interface");
2401 }
2402
2403 #[cfg(target_os = "macos")]
2406 if let Some(fd) = self.tun_shutdown_fd.take() {
2407 unsafe {
2408 libc::write(fd, b"x".as_ptr() as *const libc::c_void, 1);
2409 libc::close(fd);
2410 }
2411 }
2412
2413 if let Some(handle) = self.tun_reader_handle.take() {
2415 let _ = handle.join();
2416 }
2417 if let Some(handle) = self.tun_writer_handle.take() {
2418 let _ = handle.join();
2419 }
2420
2421 self.tun_state = TunState::Disabled;
2422 }
2423
2424 self.state = NodeState::Stopped;
2425 info!(state = %self.state, "Node stopped");
2426 Ok(())
2427 }
2428
2429 async fn send_disconnect_to_all_peers(&mut self, reason: DisconnectReason) {
2434 let disconnect = Disconnect::new(reason);
2435 let plaintext = disconnect.encode();
2436
2437 let peer_addrs: Vec<NodeAddr> = self
2439 .peers
2440 .iter()
2441 .filter(|(_, peer)| peer.can_send() && peer.has_session())
2442 .map(|(addr, _)| *addr)
2443 .collect();
2444
2445 if peer_addrs.is_empty() {
2446 debug!(
2447 total_peers = self.peers.len(),
2448 "No sendable peers for disconnect notification"
2449 );
2450 return;
2451 }
2452
2453 let mut sent = 0usize;
2454 for node_addr in &peer_addrs {
2455 match self
2456 .send_encrypted_link_message(node_addr, &plaintext)
2457 .await
2458 {
2459 Ok(()) => sent += 1,
2460 Err(e) => {
2461 debug!(
2462 peer = %self.peer_display_name(node_addr),
2463 error = %e,
2464 "Failed to send disconnect (transport may be down)"
2465 );
2466 }
2467 }
2468 }
2469
2470 info!(sent, total = peer_addrs.len(), reason = %reason, "Sent disconnect notifications");
2471 }
2472
2473 fn static_peer_addresses(&self, peer_config: &PeerConfig) -> Vec<PeerAddress> {
2474 peer_config
2475 .addresses_by_priority()
2476 .into_iter()
2477 .cloned()
2478 .collect()
2479 }
2480
2481 async fn nostr_peer_fallback_addresses(
2482 &self,
2483 peer_config: &PeerConfig,
2484 existing: &[PeerAddress],
2485 ) -> Vec<PeerAddress> {
2486 if !self.config.node.discovery.nostr.enabled
2487 || self.config.node.discovery.nostr.policy
2488 == crate::config::NostrDiscoveryPolicy::Disabled
2489 {
2490 return Vec::new();
2491 }
2492
2493 let Some(bootstrap) = self.nostr_discovery.clone() else {
2494 return Vec::new();
2495 };
2496 let endpoints = match bootstrap
2497 .cached_advert_endpoints_for_peer(&peer_config.npub)
2498 .await
2499 {
2500 Some(endpoints) => endpoints,
2501 None => {
2502 debug!(
2503 npub = %peer_config.npub,
2504 "No cached Nostr advert endpoints for configured peer"
2505 );
2506 return Vec::new();
2507 }
2508 };
2509
2510 let mut fallback = Vec::new();
2511 let mut next_priority = existing
2512 .iter()
2513 .map(|addr| addr.priority)
2514 .max()
2515 .unwrap_or(100)
2516 .saturating_add(1);
2517 let seen_at_ms = Self::now_ms();
2522 for endpoint in endpoints {
2523 let Some(candidate) =
2524 Self::overlay_endpoint_to_peer_address(&endpoint, next_priority, seen_at_ms)
2525 else {
2526 continue;
2527 };
2528 if existing
2529 .iter()
2530 .any(|addr| addr.transport == candidate.transport && addr.addr == candidate.addr)
2531 || fallback.iter().any(|addr: &PeerAddress| {
2532 addr.transport == candidate.transport && addr.addr == candidate.addr
2533 })
2534 {
2535 continue;
2536 }
2537 fallback.push(candidate);
2538 next_priority = next_priority.saturating_add(1);
2539 }
2540 fallback
2541 }
2542
2543 async fn request_nostr_bootstrap(&self, peer_config: &PeerConfig) -> bool {
2544 if !self.config.node.discovery.nostr.enabled
2545 || self.config.node.discovery.nostr.policy
2546 == crate::config::NostrDiscoveryPolicy::Disabled
2547 {
2548 return false;
2549 }
2550 let Some(bootstrap) = self.nostr_discovery.clone() else {
2551 return false;
2552 };
2553 bootstrap.request_connect(peer_config.clone()).await;
2554 info!(npub = %peer_config.npub, "Started background Nostr UDP NAT traversal attempt");
2555 true
2556 }
2557
2558 fn overlay_endpoint_to_peer_address(
2559 endpoint: &OverlayEndpointAdvert,
2560 priority: u8,
2561 seen_at_ms: u64,
2562 ) -> Option<PeerAddress> {
2563 let transport = match endpoint.transport {
2564 OverlayTransportKind::Udp => "udp",
2565 OverlayTransportKind::Tcp => "tcp",
2566 OverlayTransportKind::Tor => "tor",
2567 OverlayTransportKind::WebRtc => "webrtc",
2568 };
2569 Some(
2570 PeerAddress::with_priority(transport, endpoint.addr.clone(), priority)
2571 .with_seen_at_ms(seen_at_ms),
2572 )
2573 }
2574
2575 async fn attempt_peer_address_list(
2576 &mut self,
2577 peer_config: &PeerConfig,
2578 peer_identity: PeerIdentity,
2579 allow_bootstrap_nat: bool,
2580 addresses: &[PeerAddress],
2581 ) -> Result<(), NodeError> {
2582 let mut attempted = false;
2583 let peer_node_addr = *peer_identity.node_addr();
2584 let mut concrete_budget = self.path_candidate_attempt_budget(&peer_node_addr);
2585
2586 for addr in addresses {
2587 if attempted && addr.seen_at_ms.is_some() {
2588 debug!(
2589 npub = %peer_config.npub,
2590 transport = %addr.transport,
2591 addr = %addr.addr,
2592 "Skipping overlay fallback because a configured direct candidate is already in flight"
2593 );
2594 continue;
2595 }
2596
2597 if addr.transport == "udp" && addr.addr.eq_ignore_ascii_case("nat") {
2598 if attempted {
2599 debug!(
2600 npub = %peer_config.npub,
2601 "Skipping Nostr NAT fallback because a configured direct candidate is already in flight"
2602 );
2603 continue;
2604 }
2605 if !allow_bootstrap_nat {
2606 continue;
2607 }
2608 if self.request_nostr_bootstrap(peer_config).await {
2609 attempted = true;
2610 continue;
2611 }
2612 debug!(npub = %peer_config.npub, "No Nostr overlay runtime for udp:nat address");
2613 continue;
2614 }
2615
2616 let (transport_id, remote_addr) = if addr.transport == "ethernet" {
2617 match self.resolve_ethernet_addr(&addr.addr) {
2618 Ok(result) => result,
2619 Err(e) => {
2620 debug!(
2621 transport = %addr.transport,
2622 addr = %addr.addr,
2623 error = %e,
2624 "Failed to resolve Ethernet address"
2625 );
2626 continue;
2627 }
2628 }
2629 } else if addr.transport == "ble" {
2630 #[cfg(bluer_available)]
2631 {
2632 match self.resolve_ble_addr(&addr.addr) {
2633 Ok(result) => result,
2634 Err(e) => {
2635 debug!(
2636 transport = %addr.transport,
2637 addr = %addr.addr,
2638 error = %e,
2639 "Failed to resolve BLE address"
2640 );
2641 continue;
2642 }
2643 }
2644 }
2645 #[cfg(not(bluer_available))]
2646 {
2647 debug!(transport = %addr.transport, "BLE transport not available on this build");
2648 continue;
2649 }
2650 } else {
2651 let tid = if addr.transport == "udp"
2652 && let Ok(remote_socket_addr) = addr.addr.parse::<SocketAddr>()
2653 {
2654 match self.find_udp_transport_for_remote_addr(remote_socket_addr) {
2655 Some((id, _)) => id,
2656 None => {
2657 debug!(
2658 transport = %addr.transport,
2659 addr = %addr.addr,
2660 "No compatible operational UDP transport for address"
2661 );
2662 continue;
2663 }
2664 }
2665 } else {
2666 match self.find_transport_for_type(&addr.transport) {
2667 Some(id) => id,
2668 None => {
2669 debug!(
2670 transport = %addr.transport,
2671 addr = %addr.addr,
2672 "No operational transport for address type"
2673 );
2674 continue;
2675 }
2676 }
2677 };
2678 (tid, TransportAddr::from_string(&addr.addr))
2679 };
2680
2681 if self.is_connecting_to_peer_on_path(&peer_node_addr, transport_id, &remote_addr) {
2682 attempted = true;
2683 debug!(
2684 npub = %peer_config.npub,
2685 transport_id = %transport_id,
2686 remote_addr = %remote_addr,
2687 "Skipping duplicate in-flight candidate path"
2688 );
2689 continue;
2690 }
2691
2692 if concrete_budget == 0 {
2693 debug!(
2694 npub = %peer_config.npub,
2695 max_candidates = MAX_PARALLEL_PATH_CANDIDATES_PER_PEER,
2696 "Path candidate race budget exhausted"
2697 );
2698 break;
2699 }
2700
2701 match self
2702 .initiate_connection(transport_id, remote_addr, peer_identity)
2703 .await
2704 {
2705 Ok(()) => {
2706 attempted = true;
2707 concrete_budget = concrete_budget.saturating_sub(1);
2708 }
2709 Err(e @ NodeError::AccessDenied(_)) => return Err(e),
2710 Err(e) => {
2711 debug!(
2712 npub = %peer_config.npub,
2713 transport_id = %transport_id,
2714 error = %e,
2715 "Connection attempt failed, trying next address"
2716 );
2717 }
2718 }
2719 }
2720
2721 if attempted {
2722 return Ok(());
2723 }
2724
2725 Err(NodeError::NoTransportForType(format!(
2726 "no operational transport for any of {}'s addresses",
2727 peer_config.npub
2728 )))
2729 }
2730
2731 async fn queue_open_discovery_retries(&mut self, bootstrap: &std::sync::Arc<NostrDiscovery>) {
2732 self.run_open_discovery_sweep(bootstrap, None, "per-tick")
2733 .await;
2734 }
2735
2736 pub(in crate::node) async fn run_open_discovery_sweep(
2747 &mut self,
2748 bootstrap: &std::sync::Arc<NostrDiscovery>,
2749 max_age_secs: Option<u64>,
2750 caller: &'static str,
2751 ) {
2752 if !self.config.node.discovery.nostr.enabled
2753 || self.config.node.discovery.nostr.policy != crate::config::NostrDiscoveryPolicy::Open
2754 {
2755 return;
2756 }
2757
2758 let configured_npubs = self
2759 .config
2760 .peers()
2761 .iter()
2762 .map(|peer| peer.npub.clone())
2763 .collect::<HashSet<_>>();
2764 let now_ms = Self::now_ms();
2765 let now_secs = now_ms / 1000;
2766 let mut enqueue_budget = self.open_discovery_enqueue_budget(&configured_npubs);
2767 if enqueue_budget == 0 {
2768 debug!(
2769 caller = %caller,
2770 "open-discovery sweep: enqueue budget is 0, skipping"
2771 );
2772 return;
2773 }
2774
2775 let candidates = bootstrap.cached_open_discovery_candidates(64).await;
2776 let cached_count = candidates.len();
2777 let mut enqueued = 0usize;
2778 let mut skipped_age = 0usize;
2779 let mut skipped_configured = 0usize;
2780 let mut skipped_self = 0usize;
2781 let mut skipped_connected = 0usize;
2782 let mut skipped_retry_pending = 0usize;
2783 let mut skipped_connecting = 0usize;
2784 let mut skipped_no_endpoints = 0usize;
2785 let mut skipped_invalid_npub = 0usize;
2786 let mut skipped_cooldown = 0usize;
2787
2788 for (npub, endpoints, created_at_secs) in candidates {
2789 if enqueue_budget == 0 {
2790 break;
2791 }
2792
2793 if let Some(max_age) = max_age_secs
2794 && now_secs.saturating_sub(created_at_secs) > max_age
2795 {
2796 skipped_age = skipped_age.saturating_add(1);
2797 continue;
2798 }
2799
2800 if configured_npubs.contains(&npub) {
2801 if let Ok(identity) = PeerIdentity::from_npub(&npub) {
2821 let configured_addr = *identity.node_addr();
2822 if let Some(state) = self.retry_pending.get_mut(&configured_addr)
2823 && state.retry_after_ms > now_ms
2824 {
2825 state.retry_after_ms = now_ms;
2826 debug!(
2827 caller = %caller,
2828 peer = %self.peer_display_name(&configured_addr),
2829 advert_age_secs = now_secs.saturating_sub(created_at_secs),
2830 "Expediting configured-peer retry after fresh overlay advert"
2831 );
2832 }
2833 }
2834 skipped_configured = skipped_configured.saturating_add(1);
2835 continue;
2836 }
2837
2838 let peer_identity = match PeerIdentity::from_npub(&npub) {
2839 Ok(identity) => identity,
2840 Err(_) => {
2841 skipped_invalid_npub = skipped_invalid_npub.saturating_add(1);
2842 continue;
2843 }
2844 };
2845 let node_addr = *peer_identity.node_addr();
2846 if node_addr == *self.identity.node_addr() {
2847 skipped_self = skipped_self.saturating_add(1);
2848 continue;
2849 }
2850 if self.peers.contains_key(&node_addr) {
2851 skipped_connected = skipped_connected.saturating_add(1);
2852 continue;
2853 }
2854 if self.retry_pending.contains_key(&node_addr) {
2855 skipped_retry_pending = skipped_retry_pending.saturating_add(1);
2856 continue;
2857 }
2858 if bootstrap.cooldown_until(&npub, now_ms).is_some() {
2859 skipped_cooldown = skipped_cooldown.saturating_add(1);
2860 continue;
2861 }
2862 let connecting = self.connections.values().any(|conn| {
2863 conn.expected_identity()
2864 .map(|id| id.node_addr() == &node_addr)
2865 .unwrap_or(false)
2866 });
2867 if connecting {
2868 skipped_connecting = skipped_connecting.saturating_add(1);
2869 continue;
2870 }
2871
2872 let mut addresses = Vec::new();
2873 let mut priority = 120u8;
2874 let seen_at_ms = Self::now_ms();
2875 for endpoint in endpoints {
2876 let Some(candidate) =
2877 Self::overlay_endpoint_to_peer_address(&endpoint, priority, seen_at_ms)
2878 else {
2879 continue;
2880 };
2881 if addresses.iter().any(|existing: &PeerAddress| {
2882 existing.transport == candidate.transport && existing.addr == candidate.addr
2883 }) {
2884 continue;
2885 }
2886 addresses.push(candidate);
2887 priority = priority.saturating_add(1);
2888 }
2889 if addresses.is_empty() {
2890 skipped_no_endpoints = skipped_no_endpoints.saturating_add(1);
2891 continue;
2892 }
2893
2894 self.peer_aliases
2895 .entry(node_addr)
2896 .or_insert_with(|| peer_identity.short_npub());
2897 self.register_identity(node_addr, peer_identity.pubkey_full());
2898
2899 let mut state = super::retry::RetryState::new(PeerConfig {
2900 npub: npub.clone(),
2901 alias: None,
2902 addresses,
2903 connect_policy: ConnectPolicy::AutoConnect,
2904 auto_reconnect: true,
2905 discovery_fallback_transit: false,
2906 });
2907 state.reconnect = false;
2908 state.retry_after_ms = now_ms;
2909 state.expires_at_ms = Some(self.open_discovery_retry_expires_at_ms(now_ms));
2910 self.retry_pending.insert(node_addr, state);
2911 info!(
2912 caller = %caller,
2913 peer = %peer_identity.short_npub(),
2914 advert_age_secs = now_secs.saturating_sub(created_at_secs),
2915 "open-discovery sweep: queued retry for cached advert"
2916 );
2917 enqueue_budget = enqueue_budget.saturating_sub(1);
2918 enqueued = enqueued.saturating_add(1);
2919 }
2920
2921 let total_skipped = skipped_age
2925 + skipped_configured
2926 + skipped_self
2927 + skipped_connected
2928 + skipped_retry_pending
2929 + skipped_connecting
2930 + skipped_no_endpoints
2931 + skipped_invalid_npub
2932 + skipped_cooldown;
2933 let should_summarize = caller == "startup" || enqueued > 0;
2934 if should_summarize {
2935 info!(
2936 caller = %caller,
2937 cached = cached_count,
2938 queued = enqueued,
2939 skipped_age = skipped_age,
2940 skipped_configured = skipped_configured,
2941 skipped_self = skipped_self,
2942 skipped_connected = skipped_connected,
2943 skipped_retry_pending = skipped_retry_pending,
2944 skipped_connecting = skipped_connecting,
2945 skipped_no_endpoints = skipped_no_endpoints,
2946 skipped_invalid_npub = skipped_invalid_npub,
2947 skipped_cooldown = skipped_cooldown,
2948 skipped_total = total_skipped,
2949 "open-discovery sweep complete"
2950 );
2951 }
2952 }
2953
2954 async fn maybe_run_startup_open_discovery_sweep(
2962 &mut self,
2963 bootstrap: &std::sync::Arc<NostrDiscovery>,
2964 ) {
2965 if self.startup_open_discovery_sweep_done {
2966 return;
2967 }
2968 if !self.config.node.discovery.nostr.enabled
2969 || self.config.node.discovery.nostr.policy != crate::config::NostrDiscoveryPolicy::Open
2970 {
2971 self.startup_open_discovery_sweep_done = true;
2973 return;
2974 }
2975 let Some(started_at_ms) = self.nostr_discovery_started_at_ms else {
2976 return;
2977 };
2978 let now_ms = Self::now_ms();
2979 let delay_ms = self
2980 .config
2981 .node
2982 .discovery
2983 .nostr
2984 .startup_sweep_delay_secs
2985 .saturating_mul(1000);
2986 if now_ms < started_at_ms.saturating_add(delay_ms) {
2987 return;
2988 }
2989
2990 let max_age_secs = self.config.node.discovery.nostr.startup_sweep_max_age_secs;
2991 self.run_open_discovery_sweep(bootstrap, Some(max_age_secs), "startup")
2992 .await;
2993 self.startup_open_discovery_sweep_done = true;
2994 }
2995
2996 fn available_outbound_slots(&self) -> usize {
2997 let connection_used = self
2998 .connections
2999 .len()
3000 .saturating_add(self.pending_connects.len());
3001 let connection_slots = if self.max_connections == 0 {
3002 usize::MAX
3003 } else {
3004 self.max_connections.saturating_sub(connection_used)
3005 };
3006
3007 let peer_slots = if self.max_peers == 0 {
3008 usize::MAX
3009 } else {
3010 self.max_peers.saturating_sub(self.peers.len())
3011 };
3012
3013 connection_slots.min(peer_slots)
3014 }
3015
3016 fn open_discovery_enqueue_budget(&self, configured_npubs: &HashSet<String>) -> usize {
3017 let current_open_discovery_pending = self
3018 .retry_pending
3019 .values()
3020 .filter(|state| !configured_npubs.contains(&state.peer_config.npub))
3021 .count();
3022
3023 let cap_remaining = self
3024 .config
3025 .node
3026 .discovery
3027 .nostr
3028 .open_discovery_max_pending
3029 .saturating_sub(current_open_discovery_pending);
3030
3031 cap_remaining.min(self.available_outbound_slots())
3032 }
3033
3034 fn open_discovery_retry_expires_at_ms(&self, now_ms: u64) -> u64 {
3035 now_ms.saturating_add(
3036 self.config
3037 .node
3038 .discovery
3039 .nostr
3040 .advert_ttl_secs
3041 .saturating_mul(1000)
3042 .saturating_mul(OPEN_DISCOVERY_RETRY_LIFETIME_MULTIPLIER),
3043 )
3044 }
3045
3046 async fn build_overlay_advert(
3047 &self,
3048 bootstrap: &std::sync::Arc<NostrDiscovery>,
3049 ) -> Option<OverlayAdvert> {
3050 if !self.config.node.discovery.nostr.enabled {
3051 return None;
3052 }
3053
3054 let mut endpoints = Vec::new();
3055 let mut has_udp_nat = false;
3056 let mut has_webrtc = false;
3057
3058 for handle in self.transports.values() {
3059 if !handle.is_operational() {
3060 continue;
3061 }
3062
3063 match handle.transport_type().name {
3064 "udp" => {
3065 let Some(cfg) = self.lookup_udp_config(handle.name()) else {
3066 continue;
3067 };
3068 if !cfg.advertise_on_nostr() {
3069 continue;
3070 }
3071 if cfg.is_public() {
3072 if let Some(explicit) = cfg.external_advert_addr() {
3082 endpoints.push(OverlayEndpointAdvert {
3083 transport: OverlayTransportKind::Udp,
3084 addr: explicit.to_string(),
3085 });
3086 } else {
3087 match handle.local_addr() {
3088 Some(addr)
3089 if !addr.ip().is_unspecified()
3090 && !is_unroutable_advert_ip(addr.ip()) =>
3091 {
3092 endpoints.push(OverlayEndpointAdvert {
3093 transport: OverlayTransportKind::Udp,
3094 addr: addr.to_string(),
3095 });
3096 }
3097 Some(addr) => {
3098 let key = handle.transport_id().as_u32();
3099 let port = addr.port();
3100 if let Some(public) =
3101 bootstrap.learn_public_udp_addr(key, port).await
3102 {
3103 endpoints.push(OverlayEndpointAdvert {
3104 transport: OverlayTransportKind::Udp,
3105 addr: public.to_string(),
3106 });
3107 } else {
3108 warn!(
3109 transport_id = key,
3110 bind_addr = %addr,
3111 "advert: udp public=true but bind is wildcard \
3112 or private and STUN observation failed; \
3113 advertising no UDP endpoint. Either set \
3114 transports.udp.external_addr, bind to a \
3115 specific *public* IP, or ensure \
3116 node.discovery.nostr.stun_servers is reachable"
3117 );
3118 }
3119 }
3120 None => {}
3121 }
3122 }
3123 } else {
3124 endpoints.push(OverlayEndpointAdvert {
3125 transport: OverlayTransportKind::Udp,
3126 addr: "nat".to_string(),
3127 });
3128 has_udp_nat = true;
3129 }
3130 }
3131 "webrtc" => {
3132 let Some(cfg) = self.lookup_webrtc_config(handle.name()) else {
3133 continue;
3134 };
3135 if !cfg.advertise_on_nostr() {
3136 continue;
3137 }
3138 endpoints.push(OverlayEndpointAdvert {
3139 transport: OverlayTransportKind::WebRtc,
3140 addr: hex::encode(self.identity.pubkey_full().serialize()),
3141 });
3142 has_webrtc = true;
3143 }
3144 "tcp" => {
3145 let Some(cfg) = self.lookup_tcp_config(handle.name()) else {
3146 continue;
3147 };
3148 if !cfg.advertise_on_nostr() {
3149 continue;
3150 }
3151 if let Some(explicit) = cfg.external_advert_addr() {
3163 endpoints.push(OverlayEndpointAdvert {
3164 transport: OverlayTransportKind::Tcp,
3165 addr: explicit.to_string(),
3166 });
3167 } else {
3168 match handle.local_addr() {
3169 Some(addr)
3170 if !addr.ip().is_unspecified()
3171 && !is_unroutable_advert_ip(addr.ip()) =>
3172 {
3173 endpoints.push(OverlayEndpointAdvert {
3174 transport: OverlayTransportKind::Tcp,
3175 addr: addr.to_string(),
3176 });
3177 }
3178 Some(addr) => {
3179 warn!(
3180 bind_addr = %addr,
3181 "advert: tcp advertise_on_nostr=true bound to wildcard \
3182 or private IP and no transports.tcp.external_addr set; \
3183 advertising no TCP endpoint. Either set external_addr \
3184 to the public IP (recommended for cloud 1:1-NAT setups) \
3185 or bind explicitly to the public IP"
3186 );
3187 }
3188 None => {}
3189 }
3190 }
3191 }
3192 "tor" => {
3193 let Some(cfg) = self.lookup_tor_config(handle.name()) else {
3194 continue;
3195 };
3196 if !cfg.advertise_on_nostr() {
3197 continue;
3198 }
3199 if let Some(addr) = handle.onion_address() {
3200 endpoints.push(OverlayEndpointAdvert {
3201 transport: OverlayTransportKind::Tor,
3202 addr: format!("{}:{}", addr, cfg.advertised_port()),
3203 });
3204 }
3205 }
3206 _ => {}
3207 }
3208 }
3209
3210 if endpoints.is_empty() {
3211 return None;
3212 }
3213
3214 Some(OverlayAdvert {
3215 identifier: ADVERT_IDENTIFIER.to_string(),
3216 version: ADVERT_VERSION,
3217 endpoints,
3218 signal_relays: (has_udp_nat || has_webrtc)
3219 .then(|| self.config.node.discovery.nostr.dm_relays.clone()),
3220 stun_servers: (has_udp_nat || has_webrtc)
3221 .then(|| self.config.node.discovery.nostr.stun_servers.clone()),
3222 })
3223 }
3224
3225 async fn refresh_overlay_advert(
3226 &self,
3227 bootstrap: &std::sync::Arc<NostrDiscovery>,
3228 ) -> Result<(), crate::discovery::nostr::BootstrapError> {
3229 let advert = self.build_overlay_advert(bootstrap).await;
3230 bootstrap.update_local_advert(advert).await
3231 }
3232
3233 fn lookup_udp_config(&self, transport_name: Option<&str>) -> Option<&crate::config::UdpConfig> {
3234 match (&self.config.transports.udp, transport_name) {
3235 (crate::config::TransportInstances::Single(cfg), None) => Some(cfg),
3236 (crate::config::TransportInstances::Named(configs), Some(name)) => configs.get(name),
3237 _ => None,
3238 }
3239 }
3240
3241 fn lookup_tcp_config(&self, transport_name: Option<&str>) -> Option<&crate::config::TcpConfig> {
3242 match (&self.config.transports.tcp, transport_name) {
3243 (crate::config::TransportInstances::Single(cfg), None) => Some(cfg),
3244 (crate::config::TransportInstances::Named(configs), Some(name)) => configs.get(name),
3245 _ => None,
3246 }
3247 }
3248
3249 fn lookup_tor_config(&self, transport_name: Option<&str>) -> Option<&crate::config::TorConfig> {
3250 match (&self.config.transports.tor, transport_name) {
3251 (crate::config::TransportInstances::Single(cfg), None) => Some(cfg),
3252 (crate::config::TransportInstances::Named(configs), Some(name)) => configs.get(name),
3253 _ => None,
3254 }
3255 }
3256
3257 fn lookup_webrtc_config(
3258 &self,
3259 transport_name: Option<&str>,
3260 ) -> Option<&crate::config::WebRtcConfig> {
3261 match (&self.config.transports.webrtc, transport_name) {
3262 (crate::config::TransportInstances::Single(cfg), None) => Some(cfg),
3263 (crate::config::TransportInstances::Named(configs), Some(name)) => configs.get(name),
3264 _ => None,
3265 }
3266 }
3267
3268 pub(in crate::node) async fn try_peer_addresses(
3269 &mut self,
3270 peer_config: &PeerConfig,
3271 peer_identity: PeerIdentity,
3272 allow_bootstrap_nat: bool,
3273 ) -> Result<(), NodeError> {
3274 let peer_node_addr = *peer_identity.node_addr();
3275 if self.peers.contains_key(&peer_node_addr) {
3276 debug!(
3277 npub = %peer_config.npub,
3278 "Peer already exists, skipping address attempts"
3279 );
3280 return Ok(());
3281 }
3282
3283 let candidates = self.peer_address_candidates(peer_config).await;
3284
3285 if candidates.is_empty() {
3286 if allow_bootstrap_nat && self.request_nostr_bootstrap(peer_config).await {
3287 return Ok(());
3288 }
3289 return Err(NodeError::NoTransportForType(format!(
3290 "no addresses known for {}",
3291 peer_config.npub
3292 )));
3293 }
3294
3295 if self
3296 .attempt_peer_address_list(peer_config, peer_identity, allow_bootstrap_nat, &candidates)
3297 .await
3298 .is_ok()
3299 {
3300 return Ok(());
3301 }
3302
3303 Err(NodeError::NoTransportForType(format!(
3304 "no operational transport for any of {}'s addresses",
3305 peer_config.npub
3306 )))
3307 }
3308
3309 async fn try_active_peer_alternative_addresses(
3310 &mut self,
3311 peer_config: &PeerConfig,
3312 peer_identity: PeerIdentity,
3313 ) -> Result<bool, NodeError> {
3314 let peer_node_addr = *peer_identity.node_addr();
3315 let candidates = self.peer_address_candidates(peer_config).await;
3316
3317 if candidates.is_empty() {
3318 return Err(NodeError::NoTransportForType(format!(
3319 "no addresses known for {}",
3320 peer_config.npub
3321 )));
3322 }
3323
3324 let alternatives: Vec<_> = candidates
3325 .into_iter()
3326 .filter(|addr| !(addr.transport == "udp" && addr.addr.eq_ignore_ascii_case("nat")))
3327 .filter(|addr| !self.active_peer_matches_candidate(&peer_node_addr, addr))
3328 .collect();
3329
3330 if alternatives.is_empty() {
3331 return Err(NodeError::NoTransportForType(format!(
3332 "no concrete alternate addresses known for {}",
3333 peer_config.npub
3334 )));
3335 }
3336
3337 self.attempt_peer_address_list(peer_config, peer_identity, false, &alternatives)
3338 .await?;
3339 Ok(true)
3340 }
3341
3342 async fn peer_address_candidates(&self, peer_config: &PeerConfig) -> Vec<PeerAddress> {
3343 let static_addresses = self.static_peer_addresses(peer_config);
3350 let overlay_addresses = self
3351 .nostr_peer_fallback_addresses(peer_config, &static_addresses)
3352 .await;
3353
3354 let mut candidates = Vec::with_capacity(overlay_addresses.len() + static_addresses.len());
3355 for addr in overlay_addresses.into_iter().chain(static_addresses) {
3356 if !candidates.iter().any(|existing: &PeerAddress| {
3357 existing.transport == addr.transport && existing.addr == addr.addr
3358 }) {
3359 candidates.push(addr);
3360 }
3361 }
3362
3363 candidates.sort_by(|a, b| match (a.seen_at_ms, b.seen_at_ms) {
3368 _ if a.priority != b.priority => a.priority.cmp(&b.priority),
3369 (Some(a_ts), Some(b_ts)) => b_ts.cmp(&a_ts),
3370 (Some(_), None) => std::cmp::Ordering::Less,
3371 (None, Some(_)) => std::cmp::Ordering::Greater,
3372 (None, None) => std::cmp::Ordering::Equal,
3373 });
3374
3375 candidates
3376 }
3377
3378 fn active_peer_matches_any_candidate(
3379 &self,
3380 peer_node_addr: &NodeAddr,
3381 candidates: &[PeerAddress],
3382 ) -> bool {
3383 candidates
3384 .iter()
3385 .any(|candidate| self.active_peer_matches_candidate(peer_node_addr, candidate))
3386 }
3387
3388 pub(in crate::node) fn active_peer_candidate_is_fresh_enough_to_skip(
3389 &self,
3390 peer_node_addr: &NodeAddr,
3391 candidates: &[PeerAddress],
3392 ) -> bool {
3393 if !self.active_peer_matches_any_candidate(peer_node_addr, candidates) {
3394 return false;
3395 }
3396 !self.active_peer_needs_same_path_refresh(peer_node_addr)
3397 }
3398
3399 fn active_peer_needs_same_path_refresh(&self, peer_node_addr: &NodeAddr) -> bool {
3400 let Some(peer) = self.peers.get(peer_node_addr) else {
3401 return false;
3402 };
3403 let stale_after_ms = self
3404 .config
3405 .node
3406 .heartbeat_interval_secs
3407 .saturating_mul(1000)
3408 .max(1000);
3409 peer.idle_time(Self::now_ms()) > stale_after_ms
3410 }
3411
3412 fn active_peer_matches_candidate(
3413 &self,
3414 peer_node_addr: &NodeAddr,
3415 candidate: &PeerAddress,
3416 ) -> bool {
3417 let Some(peer) = self.peers.get(peer_node_addr) else {
3418 return false;
3419 };
3420 let Some(current_addr) = peer.current_addr() else {
3421 return false;
3422 };
3423 if let Some(peer_transport_id) = peer.transport_id()
3424 && let Some((candidate_transport_id, candidate_addr)) =
3425 self.resolve_peer_address_for_match(candidate)
3426 {
3427 return peer_transport_id == candidate_transport_id && current_addr == &candidate_addr;
3428 }
3429 if peer
3430 .transport_id()
3431 .map(|id| self.bootstrap_transports.contains(&id))
3432 .unwrap_or(false)
3433 {
3434 return false;
3435 }
3436 let current_addr = current_addr.to_string();
3437 let current_transport = peer
3438 .transport_id()
3439 .and_then(|id| self.transports.get(&id))
3440 .map(|transport| transport.transport_type().name);
3441
3442 candidate.addr == current_addr
3443 && current_transport
3444 .map(|transport| transport == candidate.transport)
3445 .unwrap_or(true)
3446 }
3447
3448 pub(crate) async fn api_connect(
3456 &mut self,
3457 npub: &str,
3458 address: &str,
3459 transport: &str,
3460 ) -> Result<serde_json::Value, String> {
3461 let peer_config = PeerConfig {
3462 npub: npub.to_string(),
3463 alias: None,
3464 addresses: vec![PeerAddress::new(transport, address)],
3465 connect_policy: ConnectPolicy::Manual,
3466 auto_reconnect: false,
3467 discovery_fallback_transit: true,
3468 };
3469
3470 if let Ok(identity) = PeerIdentity::from_npub(npub) {
3472 self.peer_aliases
3473 .insert(*identity.node_addr(), identity.short_npub());
3474 self.register_identity(*identity.node_addr(), identity.pubkey_full());
3475 }
3476
3477 self.initiate_peer_connection(&peer_config)
3478 .await
3479 .map(|()| {
3480 info!(
3481 npub = %npub,
3482 address = %address,
3483 transport = %transport,
3484 "API connect initiated"
3485 );
3486 serde_json::json!({
3487 "npub": npub,
3488 "address": address,
3489 "transport": transport,
3490 })
3491 })
3492 .map_err(|e| e.to_string())
3493 }
3494
3495 pub(crate) fn api_disconnect(&mut self, npub: &str) -> Result<serde_json::Value, String> {
3499 let peer_identity =
3500 PeerIdentity::from_npub(npub).map_err(|e| format!("invalid npub '{npub}': {e}"))?;
3501 let node_addr = *peer_identity.node_addr();
3502
3503 if !self.peers.contains_key(&node_addr) {
3504 return Err(format!("peer not found: {npub}"));
3505 }
3506
3507 self.remove_active_peer(&node_addr);
3509
3510 self.retry_pending.remove(&node_addr);
3512
3513 info!(npub = %npub, "API disconnect completed");
3514
3515 Ok(serde_json::json!({
3516 "npub": npub,
3517 "disconnected": true,
3518 }))
3519 }
3520
3521 pub async fn adopt_established_traversal(
3528 &mut self,
3529 traversal: EstablishedTraversal,
3530 ) -> Result<BootstrapHandoffResult, NodeError> {
3531 debug!(
3532 peer_npub = %traversal.peer_npub,
3533 session_id = %traversal.session_id,
3534 remote_addr = %traversal.remote_addr,
3535 "adopting established traversal socket"
3536 );
3537
3538 if !self.state.is_operational() {
3539 return Err(NodeError::NotStarted);
3540 }
3541
3542 let packet_tx = self.packet_tx.clone().ok_or(NodeError::NotStarted)?;
3543 let peer_identity = PeerIdentity::from_npub(&traversal.peer_npub).map_err(|e| {
3544 NodeError::InvalidPeerNpub {
3545 npub: traversal.peer_npub.clone(),
3546 reason: e.to_string(),
3547 }
3548 })?;
3549 let peer_node_addr = *peer_identity.node_addr();
3550 if self.peers.contains_key(&peer_node_addr) {
3551 debug!(
3552 peer_npub = %traversal.peer_npub,
3553 "Adopting NAT traversal handoff as alternate path for already-connected peer"
3554 );
3555 }
3556
3557 self.peer_aliases
3558 .insert(peer_node_addr, peer_identity.short_npub());
3559 self.register_identity(peer_node_addr, peer_identity.pubkey_full());
3560
3561 let transport_id = self.allocate_transport_id();
3562 let inherited_config = traversal.transport_config.clone().unwrap_or_else(|| {
3582 let mut cfg = self
3583 .lookup_udp_config(traversal.transport_name.as_deref())
3584 .or_else(|| self.lookup_udp_config(None))
3585 .cloned()
3586 .unwrap_or_default();
3587 cfg.bind_addr = None;
3588 cfg.external_addr = None;
3589 cfg
3590 });
3591 let mut transport = crate::transport::udp::UdpTransport::new(
3592 transport_id,
3593 traversal.transport_name.clone(),
3594 inherited_config,
3595 packet_tx,
3596 );
3597
3598 transport
3599 .adopt_socket_async(traversal.socket)
3600 .await
3601 .map_err(|e| NodeError::BootstrapHandoff(e.to_string()))?;
3602
3603 let local_addr = transport.local_addr().ok_or_else(|| {
3604 NodeError::BootstrapHandoff("adopted UDP transport has no local address".into())
3605 })?;
3606
3607 self.transports.insert(
3608 transport_id,
3609 crate::transport::TransportHandle::Udp(transport),
3610 );
3611 self.bootstrap_transports.insert(transport_id);
3612 self.bootstrap_transport_npubs
3613 .insert(transport_id, traversal.peer_npub.clone());
3614
3615 let remote_addr = TransportAddr::from_string(&traversal.remote_addr.to_string());
3616 if let Err(err) = self
3617 .initiate_connection(transport_id, remote_addr.clone(), peer_identity)
3618 .await
3619 {
3620 self.bootstrap_transports.remove(&transport_id);
3621 self.bootstrap_transport_npubs.remove(&transport_id);
3622 if let Some(mut handle) = self.transports.remove(&transport_id) {
3623 let _ = handle.stop().await;
3624 }
3625 return Err(err);
3626 }
3627
3628 info!(
3629 peer = %self.peer_display_name(&peer_node_addr),
3630 transport_id = %transport_id,
3631 local_addr = %local_addr,
3632 remote_addr = %traversal.remote_addr,
3633 session_id = %traversal.session_id,
3634 "adopted NAT traversal socket; handshake initiated"
3635 );
3636
3637 Ok(BootstrapHandoffResult {
3638 transport_id,
3639 local_addr,
3640 remote_addr: traversal.remote_addr,
3641 peer_node_addr,
3642 session_id: traversal.session_id,
3643 })
3644 }
3645}