1use super::{Node, NodeError, NodeState};
4use crate::config::{ConnectPolicy, NostrDiscoveryPolicy, PeerAddress, PeerConfig};
5use crate::discovery::nostr::{
6 ADVERT_IDENTIFIER, ADVERT_VERSION, BootstrapEvent, NostrDiscovery, OverlayAdvert,
7 OverlayEndpointAdvert, OverlayTransportKind,
8};
9use crate::discovery::{BootstrapHandoffResult, EstablishedTraversal};
10use crate::node::acl::PeerAclContext;
11use crate::node::wire::build_msg1;
12use crate::peer::PeerConnection;
13use crate::protocol::{Disconnect, DisconnectReason};
14use crate::transport::{Link, LinkDirection, LinkId, TransportAddr, TransportId, packet_channel};
15use crate::upper::tun::{TunDevice, TunState, run_tun_reader, shutdown_tun_interface};
16use crate::{NodeAddr, PeerIdentity};
17use std::collections::{HashMap, HashSet};
18use std::net::{IpAddr, SocketAddr};
19use std::thread;
20use std::time::Duration;
21use tracing::{debug, info, warn};
22
23#[cfg(debug_assertions)]
24fn node_start_debug_log(message: impl AsRef<str>) {
25 use std::io::Write as _;
26
27 if let Ok(mut file) = std::fs::OpenOptions::new()
28 .create(true)
29 .append(true)
30 .open(std::env::temp_dir().join("nvpn-fips-endpoint-debug.log"))
31 {
32 let _ = writeln!(
33 file,
34 "{:?} {}",
35 std::time::SystemTime::now(),
36 message.as_ref()
37 );
38 }
39}
40
41#[cfg(not(debug_assertions))]
42fn node_start_debug_log(_message: impl AsRef<str>) {}
43
44fn is_unroutable_advert_ip(ip: IpAddr) -> bool {
52 match ip {
53 IpAddr::V4(v4) => {
54 v4.is_private()
55 || v4.is_loopback()
56 || v4.is_link_local()
57 || v4.is_unspecified()
58 || v4.is_multicast()
59 || v4.is_broadcast()
60 || v4.is_documentation()
61 || (v4.octets()[0] == 100 && (v4.octets()[1] & 0xc0) == 64)
64 }
65 IpAddr::V6(v6) => {
66 v6.is_loopback()
67 || v6.is_unspecified()
68 || v6.is_unique_local()
69 || v6.is_multicast()
70 || (v6.segments()[0] & 0xffc0) == 0xfe80
72 }
73 }
74}
75
76fn socket_addr_families_compatible(local: SocketAddr, remote: SocketAddr) -> bool {
77 matches!(
78 (local, remote),
79 (SocketAddr::V4(_), SocketAddr::V4(_)) | (SocketAddr::V6(_), SocketAddr::V6(_))
80 )
81}
82
83const OPEN_DISCOVERY_RETRY_LIFETIME_MULTIPLIER: u64 = 2;
84const MAX_PARALLEL_PATH_CANDIDATES_PER_PEER: usize = 4;
85const MAX_AUTO_CONNECT_GRAPH_WARMUPS_PER_TICK: usize = 16;
86const MAX_DISCOVERY_CONNECTS_PER_TICK: usize = 16;
87
88impl Node {
89 pub(super) async fn update_peers(
101 &mut self,
102 new_peers: Vec<crate::config::PeerConfig>,
103 ) -> Result<crate::node::UpdatePeersOutcome, crate::node::NodeError> {
104 use std::collections::{HashMap, HashSet};
105
106 let mut new_by_addr: HashMap<crate::identity::NodeAddr, crate::config::PeerConfig> =
107 HashMap::with_capacity(new_peers.len());
108 let mut new_order = Vec::with_capacity(new_peers.len());
109 for peer in new_peers {
110 let identity = match PeerIdentity::from_npub(&peer.npub) {
111 Ok(id) => id,
112 Err(e) => {
113 return Err(crate::node::NodeError::InvalidPeerNpub {
114 npub: peer.npub.clone(),
115 reason: e.to_string(),
116 });
117 }
118 };
119 let node_addr = *identity.node_addr();
123 if !new_by_addr.contains_key(&node_addr) {
124 new_order.push(node_addr);
125 }
126 new_by_addr.insert(node_addr, peer);
127 }
128
129 let current_by_addr: HashMap<crate::identity::NodeAddr, crate::config::PeerConfig> = self
130 .config
131 .peers()
132 .iter()
133 .filter_map(|pc| {
134 PeerIdentity::from_npub(&pc.npub)
135 .ok()
136 .map(|id| (*id.node_addr(), pc.clone()))
137 })
138 .collect();
139
140 let new_addrs: HashSet<_> = new_by_addr.keys().copied().collect();
141 let current_addrs: HashSet<_> = current_by_addr.keys().copied().collect();
142
143 let removed: Vec<_> = current_addrs.difference(&new_addrs).copied().collect();
144 let added: Vec<_> = new_addrs.difference(¤t_addrs).copied().collect();
145 let kept: Vec<_> = new_addrs.intersection(¤t_addrs).copied().collect();
146
147 let mut outcome = crate::node::UpdatePeersOutcome::default();
148
149 for node_addr in &removed {
150 if self.retry_pending.remove(node_addr).is_some() {
151 debug!(
152 peer = %self.peer_display_name(node_addr),
153 "Dropping retry entry for peer removed from runtime peer list"
154 );
155 }
156 self.peer_aliases.remove(node_addr);
157 self.set_discovery_fallback_transit_allowed(*node_addr, false);
158 outcome.removed += 1;
159 }
160
161 let mut auto_connect_refresh_configs = Vec::new();
162 for node_addr in &kept {
163 let new_pc = &new_by_addr[node_addr];
164 let current_pc = ¤t_by_addr[node_addr];
165 if new_pc.addresses != current_pc.addresses
166 || new_pc.alias != current_pc.alias
167 || new_pc.connect_policy != current_pc.connect_policy
168 || new_pc.auto_reconnect != current_pc.auto_reconnect
169 || new_pc.discovery_fallback_transit != current_pc.discovery_fallback_transit
170 {
171 outcome.updated += 1;
172 self.set_discovery_fallback_transit_allowed(
173 *node_addr,
174 new_pc.discovery_fallback_transit,
175 );
176 if let Some(state) = self.retry_pending.get_mut(node_addr) {
177 state.peer_config = new_pc.clone();
178 state.retry_after_ms = Self::now_ms();
179 }
180 if let Some(alias) = new_pc.alias.clone() {
181 self.peer_aliases.insert(*node_addr, alias);
182 }
183 if new_pc.is_auto_connect() && !new_pc.addresses.is_empty() {
184 auto_connect_refresh_configs.push(new_pc.clone());
185 }
186 } else {
187 outcome.unchanged += 1;
188 self.set_discovery_fallback_transit_allowed(
189 *node_addr,
190 new_pc.discovery_fallback_transit,
191 );
192 if new_pc.is_auto_connect() && !new_pc.addresses.is_empty() {
193 auto_connect_refresh_configs.push(new_pc.clone());
194 }
195 }
196 }
197
198 let added_configs: Vec<crate::config::PeerConfig> = new_order
199 .iter()
200 .filter(|addr| added.contains(addr))
201 .map(|addr| new_by_addr[addr].clone())
202 .collect();
203
204 self.config.peers = new_order
208 .iter()
209 .filter_map(|addr| new_by_addr.get(addr).cloned())
210 .collect();
211
212 for peer_config in added_configs {
213 outcome.added += 1;
214 let Ok(identity) = PeerIdentity::from_npub(&peer_config.npub) else {
215 continue;
216 };
217 let name = peer_config
218 .alias
219 .clone()
220 .unwrap_or_else(|| identity.short_npub());
221 self.peer_aliases.insert(*identity.node_addr(), name);
222 self.set_discovery_fallback_transit_allowed(
223 *identity.node_addr(),
224 peer_config.discovery_fallback_transit,
225 );
226 self.register_identity(*identity.node_addr(), identity.pubkey_full());
227
228 if !peer_config.is_auto_connect() {
229 continue;
230 }
231
232 match self
233 .try_auto_connect_graph_session(&peer_config, identity)
234 .await
235 {
236 Ok(true) => continue,
237 Ok(false) => {}
238 Err(err) => {
239 debug!(
240 npub = %peer_config.npub,
241 error = %err,
242 "Existing FIPS graph did not warm newly added peer"
243 );
244 }
245 }
246
247 if let Err(e) = self.initiate_peer_connection(&peer_config).await {
248 warn!(
249 npub = %peer_config.npub,
250 error = %e,
251 "Failed to initiate connection for newly added peer"
252 );
253 if let Ok(peer_identity) = PeerIdentity::from_npub(&peer_config.npub) {
254 self.schedule_retry(*peer_identity.node_addr(), Self::now_ms());
255 }
256 if matches!(e, crate::node::NodeError::NoTransportForType(_))
257 && let Some(bootstrap) = self.nostr_discovery.clone()
258 {
259 bootstrap
260 .request_advert_stale_check(peer_config.npub.clone())
261 .await;
262 }
263 }
264 }
265
266 for peer_config in auto_connect_refresh_configs {
267 let Ok(peer_identity) = PeerIdentity::from_npub(&peer_config.npub) else {
268 continue;
269 };
270 let node_addr = *peer_identity.node_addr();
271
272 if self.peers.contains_key(&node_addr) {
273 match self
274 .initiate_active_peer_alternative_connection(&peer_config)
275 .await
276 {
277 Ok(attempted) => {
278 if attempted {
279 debug!(
280 peer = %self.peer_display_name(&node_addr),
281 "Started non-disruptive alternate-path handshake for active peer"
282 );
283 }
284 }
285 Err(e) => {
286 debug!(
287 npub = %peer_config.npub,
288 error = %e,
289 "Active peer alternate-path refresh did not start"
290 );
291 }
292 }
293 continue;
294 }
295
296 match self
297 .try_auto_connect_graph_session(&peer_config, peer_identity)
298 .await
299 {
300 Ok(true) => continue,
301 Ok(false) => {}
302 Err(err) => {
303 debug!(
304 npub = %peer_config.npub,
305 error = %err,
306 "Existing FIPS graph did not warm refreshed peer"
307 );
308 }
309 }
310
311 match self.initiate_peer_connection(&peer_config).await {
312 Ok(()) => {
313 let hs_timeout_ms = self.config.node.rate_limit.handshake_timeout_secs * 1000;
314 if let Some(state) = self.retry_pending.get_mut(&node_addr) {
315 state.peer_config = peer_config;
316 state.retry_after_ms = Self::now_ms().saturating_add(hs_timeout_ms);
317 }
318 }
319 Err(e) => {
320 debug!(
321 npub = %peer_config.npub,
322 error = %e,
323 "Refreshed peer addresses did not initiate a direct connection"
324 );
325 self.schedule_retry(node_addr, Self::now_ms());
326 }
327 }
328 }
329
330 self.warm_auto_connect_graph_sessions().await;
331
332 Ok(outcome)
333 }
334
335 pub(super) async fn initiate_peer_connections(&mut self) {
336 let peer_identities: Vec<(PeerIdentity, Option<String>)> = self
342 .config
343 .peers()
344 .iter()
345 .filter_map(|pc| {
346 PeerIdentity::from_npub(&pc.npub)
347 .ok()
348 .map(|id| (id, pc.alias.clone()))
349 })
350 .collect();
351
352 for (identity, alias) in peer_identities {
353 let name = alias.unwrap_or_else(|| identity.short_npub());
354 self.peer_aliases.insert(*identity.node_addr(), name);
355 self.register_identity(*identity.node_addr(), identity.pubkey_full());
359 }
360
361 let peer_configs: Vec<_> = self.config.auto_connect_peers().cloned().collect();
363
364 if peer_configs.is_empty() {
365 debug!("No static peers configured");
366 return;
367 }
368
369 debug!(
370 count = peer_configs.len(),
371 "Initiating static peer connections"
372 );
373
374 for peer_config in peer_configs {
375 let peer_identity = match PeerIdentity::from_npub(&peer_config.npub) {
376 Ok(identity) => identity,
377 Err(_) => continue,
378 };
379 match self
380 .try_auto_connect_graph_session(&peer_config, peer_identity)
381 .await
382 {
383 Ok(true) => continue,
384 Ok(false) => {}
385 Err(err) => {
386 debug!(
387 npub = %peer_config.npub,
388 error = %err,
389 "Existing FIPS graph did not warm auto-connect peer"
390 );
391 }
392 }
393 if let Err(e) = self.initiate_peer_connection(&peer_config).await {
394 warn!(
395 npub = %peer_config.npub,
396 alias = ?peer_config.alias,
397 error = %e,
398 "Failed to initiate peer connection"
399 );
400 if let Ok(peer_identity) = PeerIdentity::from_npub(&peer_config.npub) {
404 self.schedule_retry(*peer_identity.node_addr(), Self::now_ms());
405 }
406 if matches!(e, crate::node::NodeError::NoTransportForType(_))
412 && let Some(bootstrap) = self.nostr_discovery.clone()
413 {
414 bootstrap
415 .request_advert_stale_check(peer_config.npub.clone())
416 .await;
417 }
418 }
419 }
420
421 self.warm_auto_connect_graph_sessions().await;
422 }
423
424 pub(in crate::node) async fn try_auto_connect_graph_session(
425 &mut self,
426 peer_config: &PeerConfig,
427 peer_identity: PeerIdentity,
428 ) -> Result<bool, NodeError> {
429 if !peer_config.is_auto_connect() {
430 return Ok(false);
431 }
432
433 let peer_node_addr = *peer_identity.node_addr();
434 if self.peers.contains_key(&peer_node_addr) {
435 return Ok(false);
436 }
437 if self
438 .sessions
439 .get(&peer_node_addr)
440 .is_some_and(|entry| entry.is_established() || entry.is_initiating())
441 {
442 return Ok(true);
443 }
444 if self.find_next_hop(&peer_node_addr).is_none() {
445 return Ok(false);
446 }
447
448 self.register_identity(peer_node_addr, peer_identity.pubkey_full());
449 match self
450 .initiate_session(peer_node_addr, peer_identity.pubkey_full())
451 .await
452 {
453 Ok(()) => {
454 debug!(
455 peer = %self.peer_display_name(&peer_node_addr),
456 "Warmed auto-connect peer session over existing FIPS graph"
457 );
458 Ok(true)
459 }
460 Err(NodeError::SendFailed { node_addr, reason })
461 if node_addr == peer_node_addr && reason == "no route to destination" =>
462 {
463 self.maybe_initiate_lookup(&peer_node_addr).await;
464 Ok(false)
465 }
466 Err(err) => Err(err),
467 }
468 }
469
470 pub(super) async fn initiate_peer_connection(
474 &mut self,
475 peer_config: &crate::config::PeerConfig,
476 ) -> Result<(), NodeError> {
477 self.initiate_peer_connection_inner(peer_config).await
478 }
479
480 pub(super) async fn initiate_peer_retry_connection(
486 &mut self,
487 peer_config: &crate::config::PeerConfig,
488 ) -> Result<(), NodeError> {
489 self.initiate_peer_connection_inner(peer_config).await
490 }
491
492 async fn initiate_active_peer_alternative_connection(
493 &mut self,
494 peer_config: &crate::config::PeerConfig,
495 ) -> Result<bool, NodeError> {
496 let peer_identity =
497 PeerIdentity::from_npub(&peer_config.npub).map_err(|e| NodeError::InvalidPeerNpub {
498 npub: peer_config.npub.clone(),
499 reason: e.to_string(),
500 })?;
501 let peer_node_addr = *peer_identity.node_addr();
502
503 if !self.peers.contains_key(&peer_node_addr) {
504 self.initiate_peer_connection(peer_config).await?;
505 return Ok(true);
506 }
507
508 self.try_active_peer_alternative_addresses(peer_config, peer_identity)
514 .await
515 }
516
517 async fn initiate_peer_connection_inner(
518 &mut self,
519 peer_config: &crate::config::PeerConfig,
520 ) -> Result<(), NodeError> {
521 let peer_identity =
523 PeerIdentity::from_npub(&peer_config.npub).map_err(|e| NodeError::InvalidPeerNpub {
524 npub: peer_config.npub.clone(),
525 reason: e.to_string(),
526 })?;
527
528 let peer_node_addr = *peer_identity.node_addr();
529
530 if self.peers.contains_key(&peer_node_addr) {
532 debug!(
533 npub = %peer_config.npub,
534 "Peer already exists, skipping"
535 );
536 return Ok(());
537 }
538
539 self.try_peer_addresses(peer_config, peer_identity, true)
540 .await
541 }
542
543 fn is_connecting_to_peer(&self, peer_node_addr: &NodeAddr) -> bool {
544 self.connections.values().any(|conn| {
545 conn.expected_identity()
546 .map(|id| id.node_addr() == peer_node_addr)
547 .unwrap_or(false)
548 })
549 }
550
551 fn is_connecting_to_peer_on_path(
552 &self,
553 peer_node_addr: &NodeAddr,
554 transport_id: TransportId,
555 remote_addr: &TransportAddr,
556 ) -> bool {
557 self.connections.values().any(|conn| {
558 conn.expected_identity()
559 .map(|id| id.node_addr() == peer_node_addr)
560 .unwrap_or(false)
561 && conn.transport_id() == Some(transport_id)
562 && conn.source_addr() == Some(remote_addr)
563 }) || self.pending_connects.iter().any(|pending| {
564 pending.peer_identity.node_addr() == peer_node_addr
565 && pending.transport_id == transport_id
566 && &pending.remote_addr == remote_addr
567 })
568 }
569
570 pub(in crate::node) fn should_warm_auto_connect_session(
571 &self,
572 peer_node_addr: &NodeAddr,
573 ) -> bool {
574 if self.peers.contains_key(peer_node_addr)
575 || self
576 .sessions
577 .get(peer_node_addr)
578 .is_some_and(|entry| entry.is_established())
579 {
580 return false;
581 }
582
583 self.config.peers().iter().any(|peer| {
584 peer.is_auto_connect()
585 && PeerIdentity::from_npub(&peer.npub)
586 .map(|identity| identity.node_addr() == peer_node_addr)
587 .unwrap_or(false)
588 })
589 }
590
591 pub(in crate::node) async fn warm_auto_connect_graph_sessions(&mut self) -> usize {
592 if !self.peers.values().any(|peer| peer.can_send()) {
593 return 0;
594 }
595
596 let mut budget = self.graph_session_warmup_budget();
597 if budget == 0 {
598 return 0;
599 }
600
601 let peer_identities: Vec<_> = self
602 .config
603 .auto_connect_peers()
604 .filter_map(|peer| PeerIdentity::from_npub(&peer.npub).ok())
605 .collect();
606
607 let mut warmed = 0;
608 for identity in peer_identities {
609 if budget == 0 {
610 break;
611 }
612
613 let peer_node_addr = *identity.node_addr();
614 if peer_node_addr == *self.identity.node_addr()
615 || !self.should_warm_auto_connect_session(&peer_node_addr)
616 || self
617 .sessions
618 .get(&peer_node_addr)
619 .is_some_and(|entry| entry.is_initiating())
620 {
621 continue;
622 }
623
624 self.register_identity(peer_node_addr, identity.pubkey_full());
625
626 if self.find_next_hop(&peer_node_addr).is_some() {
627 match self
628 .initiate_session(peer_node_addr, identity.pubkey_full())
629 .await
630 {
631 Ok(()) => {
632 warmed += 1;
633 budget = budget.saturating_sub(1);
634 debug!(
635 peer = %self.peer_display_name(&peer_node_addr),
636 "Warmed auto-connect peer session over existing FIPS graph"
637 );
638 }
639 Err(NodeError::SendFailed { node_addr, reason })
640 if node_addr == peer_node_addr && reason == "no route to destination" =>
641 {
642 self.maybe_initiate_lookup(&peer_node_addr).await;
643 warmed += 1;
644 budget = budget.saturating_sub(1);
645 }
646 Err(err) => {
647 debug!(
648 peer = %self.peer_display_name(&peer_node_addr),
649 error = %err,
650 "Failed to warm auto-connect peer session"
651 );
652 }
653 }
654 } else {
655 self.maybe_initiate_lookup(&peer_node_addr).await;
656 warmed += 1;
657 budget = budget.saturating_sub(1);
658 }
659 }
660
661 warmed
662 }
663
664 pub(in crate::node) fn graph_session_warmup_budget(&self) -> usize {
665 let max_destinations = self.config.node.session.pending_max_destinations;
666 if max_destinations == 0 {
667 return 0;
668 }
669
670 let pending_sessions = self
671 .sessions
672 .values()
673 .filter(|entry| !entry.is_established())
674 .count();
675 let pending_total = pending_sessions.saturating_add(self.pending_lookups.len());
676 max_destinations
677 .saturating_sub(pending_total)
678 .min(MAX_AUTO_CONNECT_GRAPH_WARMUPS_PER_TICK)
679 }
680
681 fn outbound_handshake_slots(&self) -> usize {
682 let used = self
683 .connections
684 .len()
685 .saturating_add(self.pending_connects.len());
686 if self.max_connections == 0 {
687 usize::MAX
688 } else {
689 self.max_connections.saturating_sub(used)
690 }
691 }
692
693 fn outbound_link_slots(&self) -> usize {
694 if self.max_links == 0 {
695 usize::MAX
696 } else {
697 self.max_links.saturating_sub(self.links.len())
698 }
699 }
700
701 fn path_candidate_attempt_budget(&self, peer_node_addr: &NodeAddr) -> usize {
702 if !self.peers.contains_key(peer_node_addr)
703 && self.max_peers > 0
704 && self.peers.len() >= self.max_peers
705 {
706 return 0;
707 }
708
709 let in_flight_for_peer = self
710 .connections
711 .values()
712 .filter(|conn| {
713 conn.expected_identity()
714 .map(|id| id.node_addr() == peer_node_addr)
715 .unwrap_or(false)
716 })
717 .count()
718 .saturating_add(
719 self.pending_connects
720 .iter()
721 .filter(|pending| pending.peer_identity.node_addr() == peer_node_addr)
722 .count(),
723 );
724
725 self.outbound_handshake_slots()
726 .min(self.outbound_link_slots())
727 .min(MAX_PARALLEL_PATH_CANDIDATES_PER_PEER.saturating_sub(in_flight_for_peer))
728 }
729
730 fn discovery_connect_budget(&self) -> usize {
731 self.outbound_handshake_slots()
732 .min(self.outbound_link_slots())
733 .min(MAX_DISCOVERY_CONNECTS_PER_TICK)
734 }
735
736 fn find_udp_transport_for_remote_addr(
743 &self,
744 remote_addr: SocketAddr,
745 ) -> Option<(TransportId, SocketAddr)> {
746 self.transports
747 .iter()
748 .filter(|(id, handle)| {
749 handle.transport_type().name == "udp"
750 && handle.is_operational()
751 && !self.bootstrap_transports.contains(id)
752 })
753 .filter_map(|(id, handle)| {
754 let local_addr = handle.local_addr()?;
755 socket_addr_families_compatible(local_addr, remote_addr)
756 .then_some((*id, local_addr))
757 })
758 .min_by_key(|(id, _)| id.as_u32())
759 }
760
761 pub(super) fn transport_discovery_candidate(
762 &self,
763 discovered_transport_id: TransportId,
764 discovered_addr: TransportAddr,
765 ) -> Option<(TransportId, TransportAddr, &'static str)> {
766 let transport = self.transports.get(&discovered_transport_id)?;
767 let transport_name = transport.transport_type().name;
768
769 if transport_name != "udp" {
770 return Some((discovered_transport_id, discovered_addr, transport_name));
771 }
772
773 let Some(remote_socket_addr) = discovered_addr
774 .as_str()
775 .and_then(|addr| addr.parse::<SocketAddr>().ok())
776 else {
777 if self.bootstrap_transports.contains(&discovered_transport_id) {
778 debug!(
779 transport_id = %discovered_transport_id,
780 remote_addr = %discovered_addr,
781 "transport discovery: skip non-numeric UDP address from bootstrap transport"
782 );
783 return None;
784 }
785 return Some((discovered_transport_id, discovered_addr, transport_name));
786 };
787
788 let Some((transport_id, local_addr)) =
789 self.find_udp_transport_for_remote_addr(remote_socket_addr)
790 else {
791 debug!(
792 transport_id = %discovered_transport_id,
793 remote_addr = %discovered_addr,
794 "transport discovery: skip UDP peer with no compatible local socket"
795 );
796 return None;
797 };
798
799 if transport_id != discovered_transport_id {
800 debug!(
801 discovered_transport_id = %discovered_transport_id,
802 selected_transport_id = %transport_id,
803 local_addr = %local_addr,
804 remote_addr = %remote_socket_addr,
805 "transport discovery: selected compatible UDP transport"
806 );
807 }
808
809 Some((
810 transport_id,
811 TransportAddr::from_socket_addr(remote_socket_addr),
812 transport_name,
813 ))
814 }
815
816 fn peer_address_string_for_transport_candidate(
817 &self,
818 transport_id: TransportId,
819 transport_name: &str,
820 remote_addr: &TransportAddr,
821 ) -> String {
822 #[cfg(not(any(target_os = "linux", target_os = "macos")))]
823 let _ = (transport_id, transport_name);
824
825 #[cfg(any(target_os = "linux", target_os = "macos"))]
826 if transport_name == "ethernet"
827 && remote_addr.as_bytes().len() == 6
828 && let Some(interface) = self
829 .transports
830 .get(&transport_id)
831 .and_then(|transport| transport.interface_name())
832 {
833 let mut mac = [0u8; 6];
834 mac.copy_from_slice(remote_addr.as_bytes());
835 return format!(
836 "{interface}/{}",
837 crate::transport::ethernet::format_mac(&mac)
838 );
839 }
840
841 remote_addr.to_string()
842 }
843
844 fn resolve_peer_address_for_match(
845 &self,
846 candidate: &PeerAddress,
847 ) -> Option<(TransportId, TransportAddr)> {
848 if candidate.transport == "udp" && candidate.addr.eq_ignore_ascii_case("nat") {
849 return None;
850 }
851
852 if candidate.transport == "ethernet" {
853 return self.resolve_ethernet_addr(&candidate.addr).ok();
854 }
855
856 if candidate.transport == "ble" {
857 #[cfg(bluer_available)]
858 {
859 return self.resolve_ble_addr(&candidate.addr).ok();
860 }
861 #[cfg(not(bluer_available))]
862 {
863 return None;
864 }
865 }
866
867 let transport_id = if candidate.transport == "udp"
868 && let Ok(remote_socket_addr) = candidate.addr.parse::<SocketAddr>()
869 {
870 self.find_udp_transport_for_remote_addr(remote_socket_addr)
871 .map(|(id, _)| id)?
872 } else {
873 self.find_transport_for_type(&candidate.transport)?
874 };
875
876 Some((transport_id, TransportAddr::from_string(&candidate.addr)))
877 }
878
879 pub(super) async fn initiate_connection(
890 &mut self,
891 transport_id: TransportId,
892 remote_addr: TransportAddr,
893 peer_identity: PeerIdentity,
894 ) -> Result<(), NodeError> {
895 let peer_node_addr = *peer_identity.node_addr();
896
897 if self.is_connecting_to_peer_on_path(&peer_node_addr, transport_id, &remote_addr) {
898 debug!(
899 peer = %self.peer_display_name(&peer_node_addr),
900 transport_id = %transport_id,
901 remote_addr = %remote_addr,
902 "Connection already in progress for candidate path"
903 );
904 return Ok(());
905 }
906
907 if self.outbound_handshake_slots() == 0 {
908 return Err(NodeError::MaxConnectionsExceeded {
909 max: self.max_connections,
910 });
911 }
912
913 if self.outbound_link_slots() == 0 {
914 return Err(NodeError::MaxLinksExceeded {
915 max: self.max_links,
916 });
917 }
918
919 if !self.peers.contains_key(&peer_node_addr)
920 && self.max_peers > 0
921 && self.peers.len() >= self.max_peers
922 {
923 return Err(NodeError::MaxPeersExceeded {
924 max: self.max_peers,
925 });
926 }
927
928 self.authorize_peer(
929 &peer_identity,
930 PeerAclContext::OutboundConnect,
931 transport_id,
932 &remote_addr,
933 )?;
934
935 let is_connection_oriented = self
936 .transports
937 .get(&transport_id)
938 .map(|t| t.transport_type().connection_oriented)
939 .unwrap_or(false);
940
941 let link_id = self.allocate_link_id();
943
944 let link = if is_connection_oriented {
945 Link::new(
946 link_id,
947 transport_id,
948 remote_addr.clone(),
949 LinkDirection::Outbound,
950 Duration::from_millis(self.config.node.base_rtt_ms),
951 )
952 } else {
953 Link::connectionless(
954 link_id,
955 transport_id,
956 remote_addr.clone(),
957 LinkDirection::Outbound,
958 Duration::from_millis(self.config.node.base_rtt_ms),
959 )
960 };
961
962 self.links.insert(link_id, link);
963
964 self.addr_to_link
966 .insert((transport_id, remote_addr.clone()), link_id);
967
968 if is_connection_oriented {
969 if let Some(transport) = self.transports.get(&transport_id) {
971 match transport.connect(&remote_addr).await {
972 Ok(()) => {
973 debug!(
974 peer = %self.peer_display_name(&peer_node_addr),
975 transport_id = %transport_id,
976 remote_addr = %remote_addr,
977 link_id = %link_id,
978 "Transport connect initiated (non-blocking)"
979 );
980 self.pending_connects.push(super::PendingConnect {
981 link_id,
982 transport_id,
983 remote_addr,
984 peer_identity,
985 });
986 }
987 Err(e) => {
988 self.links.remove(&link_id);
990 self.addr_to_link.remove(&(transport_id, remote_addr));
991 return Err(NodeError::TransportError(e.to_string()));
992 }
993 }
994 }
995 Ok(())
996 } else {
997 self.start_handshake(link_id, transport_id, remote_addr, peer_identity)
999 .await
1000 }
1001 }
1002
1003 pub(super) async fn start_handshake(
1008 &mut self,
1009 link_id: LinkId,
1010 transport_id: TransportId,
1011 remote_addr: TransportAddr,
1012 peer_identity: PeerIdentity,
1013 ) -> Result<(), NodeError> {
1014 let peer_node_addr = *peer_identity.node_addr();
1015
1016 let current_time_ms = Self::now_ms();
1018 let mut connection = PeerConnection::outbound(link_id, peer_identity, current_time_ms);
1019
1020 let our_index = match self.index_allocator.allocate() {
1022 Ok(idx) => idx,
1023 Err(e) => {
1024 self.links.remove(&link_id);
1026 self.addr_to_link.remove(&(transport_id, remote_addr));
1027 return Err(NodeError::IndexAllocationFailed(e.to_string()));
1028 }
1029 };
1030
1031 let our_keypair = self.identity.keypair();
1033 let noise_msg1 =
1034 match connection.start_handshake(our_keypair, self.startup_epoch, current_time_ms) {
1035 Ok(msg) => msg,
1036 Err(e) => {
1037 let _ = self.index_allocator.free(our_index);
1039 self.links.remove(&link_id);
1040 self.addr_to_link.remove(&(transport_id, remote_addr));
1041 return Err(NodeError::HandshakeFailed(e.to_string()));
1042 }
1043 };
1044
1045 connection.set_our_index(our_index);
1047 connection.set_transport_id(transport_id);
1048 connection.set_source_addr(remote_addr.clone());
1049
1050 let wire_msg1 = build_msg1(our_index, &noise_msg1);
1052
1053 debug!(
1054 peer = %self.peer_display_name(&peer_node_addr),
1055 transport_id = %transport_id,
1056 remote_addr = %remote_addr,
1057 link_id = %link_id,
1058 our_index = %our_index,
1059 "Connection initiated"
1060 );
1061
1062 let resend_interval = self.config.node.rate_limit.handshake_resend_interval_ms;
1064 connection.set_handshake_msg1(wire_msg1.clone(), current_time_ms + resend_interval);
1065
1066 self.pending_outbound
1068 .insert((transport_id, our_index.as_u32()), link_id);
1069 self.connections.insert(link_id, connection);
1070
1071 let send_result = match self.transports.get(&transport_id) {
1076 Some(transport) => Some(transport.send(&remote_addr, &wire_msg1).await),
1077 None => None,
1078 };
1079 match send_result {
1080 Some(send_result) => {
1081 self.note_local_send_outcome(&send_result);
1082 match send_result {
1083 Ok(bytes) => {
1084 debug!(
1085 link_id = %link_id,
1086 our_index = %our_index,
1087 bytes,
1088 "Sent Noise handshake message 1 (wire format)"
1089 );
1090 }
1091 Err(e) => {
1092 warn!(
1093 link_id = %link_id,
1094 transport_id = %transport_id,
1095 remote_addr = %remote_addr,
1096 our_index = %our_index,
1097 error = %e,
1098 "Failed to send handshake message"
1099 );
1100 self.pending_outbound
1101 .remove(&(transport_id, our_index.as_u32()));
1102 self.connections.remove(&link_id);
1103 self.links.remove(&link_id);
1104 self.addr_to_link
1105 .remove(&(transport_id, remote_addr.clone()));
1106 let _ = self.index_allocator.free(our_index);
1107 return Err(NodeError::TransportError(e.to_string()));
1108 }
1109 }
1110 }
1111 None => {
1112 self.pending_outbound
1113 .remove(&(transport_id, our_index.as_u32()));
1114 self.connections.remove(&link_id);
1115 self.links.remove(&link_id);
1116 self.addr_to_link
1117 .remove(&(transport_id, remote_addr.clone()));
1118 let _ = self.index_allocator.free(our_index);
1119 return Err(NodeError::TransportError(format!(
1120 "transport {transport_id} disappeared before first handshake send"
1121 )));
1122 }
1123 }
1124
1125 Ok(())
1126 }
1127
1128 pub(super) async fn poll_transport_discovery(&mut self) {
1134 let mut to_connect = Vec::new();
1136 let mut queued_per_peer: HashMap<NodeAddr, usize> = HashMap::new();
1137 let mut connect_budget = self.discovery_connect_budget();
1138 let mut skipped_budget = 0usize;
1139
1140 for transport in self.transports.values() {
1141 if !transport.is_operational() {
1142 continue;
1143 }
1144 if !transport.auto_connect() {
1145 let _ = transport.discover();
1147 continue;
1148 }
1149 let discovered = match transport.discover() {
1150 Ok(peers) => peers,
1151 Err(_) => continue,
1152 };
1153 for peer in discovered {
1154 let discovered_transport_id = peer.transport_id;
1155 let pubkey = match peer.pubkey_hint {
1156 Some(pk) => pk,
1157 None => continue,
1158 };
1159 let identity = PeerIdentity::from_pubkey(pubkey);
1160 let node_addr = *identity.node_addr();
1161
1162 if node_addr == *self.identity.node_addr() {
1164 continue;
1165 }
1166
1167 let Some((candidate_transport_id, remote_addr, transport_name)) =
1168 self.transport_discovery_candidate(discovered_transport_id, peer.addr)
1169 else {
1170 continue;
1171 };
1172
1173 if self.peers.contains_key(&node_addr) {
1174 let candidate = PeerAddress::new(
1175 transport_name,
1176 self.peer_address_string_for_transport_candidate(
1177 candidate_transport_id,
1178 transport_name,
1179 &remote_addr,
1180 ),
1181 );
1182 if self.active_peer_candidate_is_fresh_enough_to_skip(
1183 &node_addr,
1184 std::slice::from_ref(&candidate),
1185 ) {
1186 continue;
1187 }
1188 if self.is_connecting_to_peer_on_path(
1189 &node_addr,
1190 candidate_transport_id,
1191 &remote_addr,
1192 ) {
1193 continue;
1194 }
1195 let queued_for_peer = queued_per_peer.get(&node_addr).copied().unwrap_or(0);
1196 if connect_budget == 0
1197 || self
1198 .path_candidate_attempt_budget(&node_addr)
1199 .saturating_sub(queued_for_peer)
1200 == 0
1201 {
1202 skipped_budget = skipped_budget.saturating_add(1);
1203 continue;
1204 }
1205 to_connect.push((candidate_transport_id, remote_addr, identity, true));
1206 *queued_per_peer.entry(node_addr).or_default() += 1;
1207 connect_budget = connect_budget.saturating_sub(1);
1208 continue;
1209 }
1210
1211 if self.is_connecting_to_peer_on_path(
1212 &node_addr,
1213 candidate_transport_id,
1214 &remote_addr,
1215 ) {
1216 continue;
1217 }
1218
1219 let queued_for_peer = queued_per_peer.get(&node_addr).copied().unwrap_or(0);
1220 if connect_budget == 0
1221 || self
1222 .path_candidate_attempt_budget(&node_addr)
1223 .saturating_sub(queued_for_peer)
1224 == 0
1225 {
1226 skipped_budget = skipped_budget.saturating_add(1);
1227 continue;
1228 }
1229 to_connect.push((candidate_transport_id, remote_addr, identity, false));
1230 *queued_per_peer.entry(node_addr).or_default() += 1;
1231 connect_budget = connect_budget.saturating_sub(1);
1232 }
1233 }
1234
1235 if skipped_budget > 0 {
1236 debug!(
1237 skipped = skipped_budget,
1238 queued = to_connect.len(),
1239 "Transport discovery connect budget exhausted"
1240 );
1241 }
1242
1243 for (transport_id, remote_addr, identity, active_refresh) in to_connect {
1244 info!(
1245 peer = %self.peer_display_name(identity.node_addr()),
1246 transport_id = %transport_id,
1247 remote_addr = %remote_addr,
1248 active_refresh,
1249 "Auto-connecting to discovered peer"
1250 );
1251 if let Err(e) = self
1252 .initiate_connection(transport_id, remote_addr, identity)
1253 .await
1254 {
1255 warn!(error = %e, "Failed to auto-connect to discovered peer");
1256 }
1257 }
1258 }
1259
1260 pub(super) async fn poll_nostr_discovery(&mut self) {
1261 let Some(bootstrap) = self.nostr_discovery.clone() else {
1262 return;
1263 };
1264
1265 if let Err(err) = self.refresh_overlay_advert(&bootstrap).await {
1266 debug!(error = %err, "Failed to refresh local Nostr overlay advert");
1267 }
1268
1269 for event in bootstrap.drain_events().await {
1270 match event {
1271 BootstrapEvent::Established { traversal } => {
1272 let peer_npub = traversal.peer_npub.clone();
1273 match self.adopt_established_traversal(traversal).await {
1274 Ok(_) => {
1275 info!(peer_npub = %peer_npub, "Adopted NAT traversal socket");
1276 }
1277 Err(err) => {
1278 warn!(peer_npub = %peer_npub, error = %err, "Failed to adopt NAT traversal");
1279 if let Ok(peer_identity) = PeerIdentity::from_npub(&peer_npub) {
1280 self.schedule_retry(*peer_identity.node_addr(), Self::now_ms());
1281 }
1282 }
1283 }
1284 }
1285 BootstrapEvent::Failed {
1286 peer_config,
1287 reason,
1288 } => {
1289 let peer_identity = match PeerIdentity::from_npub(&peer_config.npub) {
1290 Ok(identity) => identity,
1291 Err(_) => continue,
1292 };
1293 let node_addr = *peer_identity.node_addr();
1294 if self.peers.contains_key(&node_addr) {
1295 debug!(
1296 npub = %peer_config.npub,
1297 error = %reason,
1298 "Ignoring failed NAT traversal for already-connected peer"
1299 );
1300 continue;
1301 }
1302 if self.is_connecting_to_peer(&node_addr) {
1303 debug!(
1304 npub = %peer_config.npub,
1305 error = %reason,
1306 "Ignoring failed NAT traversal while peer handshake is already in progress"
1307 );
1308 continue;
1309 }
1310
1311 let now_ms = Self::now_ms();
1312 let decision = bootstrap.record_traversal_failure(&peer_config.npub, now_ms);
1313 if decision.should_warn {
1314 warn!(
1315 npub = %peer_config.npub,
1316 error = %reason,
1317 consecutive_failures = decision.consecutive_failures,
1318 cooldown_secs = decision
1319 .cooldown_until_ms
1320 .map(|t| t.saturating_sub(now_ms) / 1000),
1321 "NAT traversal failed"
1322 );
1323 } else {
1324 debug!(
1325 npub = %peer_config.npub,
1326 error = %reason,
1327 consecutive_failures = decision.consecutive_failures,
1328 "NAT traversal failed (suppressed by warn-rate-limit)"
1329 );
1330 }
1331
1332 if decision.crossed_threshold {
1336 bootstrap
1337 .request_advert_stale_check(peer_config.npub.clone())
1338 .await;
1339 }
1340
1341 if self
1342 .try_peer_addresses(&peer_config, peer_identity, false)
1343 .await
1344 .is_ok()
1345 {
1346 continue;
1347 }
1348
1349 self.schedule_retry(node_addr, now_ms);
1350 if let Some(cooldown_until_ms) = decision.cooldown_until_ms
1351 && let Some(state) = self.retry_pending.get_mut(&node_addr)
1352 {
1353 state.retry_after_ms = state.retry_after_ms.max(cooldown_until_ms);
1357 }
1358 }
1359 }
1360 }
1361
1362 self.maybe_run_startup_open_discovery_sweep(&bootstrap)
1363 .await;
1364 self.queue_open_discovery_retries(&bootstrap).await;
1365 }
1366
1367 pub(super) fn lan_discovery_scope(&self) -> Option<String> {
1373 if let Some(scope) = self.config.node.discovery.lan.scope.as_deref() {
1374 let scope = scope.trim();
1375 if !scope.is_empty() {
1376 return Some(scope.to_string());
1377 }
1378 }
1379
1380 let app = self.config.node.discovery.nostr.app.trim();
1381 if app.is_empty() {
1382 return None;
1383 }
1384 if let Some(rest) = app.strip_prefix("fips-overlay-v1:") {
1385 let scope = rest.trim();
1386 if scope.is_empty() {
1387 None
1388 } else {
1389 Some(scope.to_string())
1390 }
1391 } else {
1392 Some(app.to_string())
1393 }
1394 }
1395
1396 pub(super) fn start_local_instance_discovery(&mut self) {
1397 if !self.config.node.discovery.local.enabled {
1398 return;
1399 }
1400 let Some(scope) = self.lan_discovery_scope() else {
1401 debug!("local instance discovery not started: no discovery scope");
1402 return;
1403 };
1404 let now_ms = Self::now_ms();
1405 match crate::discovery::local::LocalInstanceRegistry::new(
1406 self.identity.npub(),
1407 scope,
1408 &self.config.node.discovery.local,
1409 now_ms,
1410 ) {
1411 Ok(registry) => {
1412 self.local_instance_registry = Some(registry);
1413 self.local_instance_started_at_ms = Some(now_ms);
1414 self.last_local_instance_publish_ms = None;
1415 self.last_local_instance_scan_ms = None;
1416 self.publish_local_instance_record(now_ms);
1417 info!("Same-host FIPS instance discovery enabled");
1418 }
1419 Err(crate::discovery::local::LocalInstanceRegistryError::Disabled) => {
1420 debug!("same-host FIPS instance discovery disabled");
1421 }
1422 Err(err) => {
1423 debug!(error = %err, "same-host FIPS instance discovery not started");
1424 }
1425 }
1426 }
1427
1428 fn local_instance_contacts(&self) -> Vec<crate::discovery::local::LocalInstanceContact> {
1429 let mut contacts = Vec::new();
1430 for handle in self.transports.values() {
1431 if !handle.is_operational() || !handle.accept_connections() {
1432 continue;
1433 }
1434 let transport = handle.transport_type().name;
1435 if transport != "udp" && transport != "tcp" {
1436 continue;
1437 }
1438 let Some(local_addr) = handle.local_addr() else {
1439 continue;
1440 };
1441 let Some(contact) =
1442 crate::discovery::local::contact_for_transport_addr(transport, local_addr)
1443 else {
1444 continue;
1445 };
1446 if contacts
1447 .iter()
1448 .any(|existing: &crate::discovery::local::LocalInstanceContact| {
1449 existing.transport == contact.transport && existing.addr == contact.addr
1450 })
1451 {
1452 continue;
1453 }
1454 contacts.push(contact);
1455 }
1456 contacts
1457 }
1458
1459 fn publish_local_instance_record(&mut self, now_ms: u64) {
1460 let Some(registry) = self.local_instance_registry.clone() else {
1461 return;
1462 };
1463 let contacts = self.local_instance_contacts();
1464 match registry.publish(contacts, now_ms) {
1465 Ok(()) => {
1466 self.last_local_instance_publish_ms = Some(now_ms);
1467 }
1468 Err(err) => {
1469 debug!(error = %err, "failed to publish same-host FIPS instance record");
1470 }
1471 }
1472 }
1473
1474 fn maybe_publish_local_instance_record(&mut self, now_ms: u64) {
1475 if self.local_instance_registry.is_none() {
1476 return;
1477 }
1478 let interval_ms = self.config.node.discovery.local.publish_interval_ms();
1479 let due = self
1480 .last_local_instance_publish_ms
1481 .map(|last| now_ms.saturating_sub(last) >= interval_ms)
1482 .unwrap_or(true);
1483 if due {
1484 self.publish_local_instance_record(now_ms);
1485 }
1486 }
1487
1488 fn local_instance_scan_due(&self, now_ms: u64) -> bool {
1489 if self.local_instance_registry.is_none() {
1490 return false;
1491 }
1492 let cfg = &self.config.node.discovery.local;
1493 let interval_ms = if self
1494 .local_instance_started_at_ms
1495 .map(|started| now_ms.saturating_sub(started) <= cfg.startup_scan_duration_ms())
1496 .unwrap_or(false)
1497 {
1498 cfg.startup_scan_interval_ms()
1499 } else {
1500 cfg.scan_interval_ms()
1501 };
1502 self.last_local_instance_scan_ms
1503 .map(|last| now_ms.saturating_sub(last) >= interval_ms)
1504 .unwrap_or(true)
1505 }
1506
1507 fn local_instance_peer_allowed(&self, identity: &PeerIdentity) -> bool {
1508 if self.config.peers().iter().any(|peer| {
1509 PeerIdentity::from_npub(&peer.npub)
1510 .map(|configured| configured.node_addr() == identity.node_addr())
1511 .unwrap_or(false)
1512 }) {
1513 return true;
1514 }
1515 self.config.node.discovery.nostr.policy == NostrDiscoveryPolicy::Open
1516 }
1517
1518 fn local_instance_peer_addresses(
1519 &self,
1520 record: &crate::discovery::local::LocalInstanceRecord,
1521 ) -> Vec<PeerAddress> {
1522 let mut addresses = Vec::new();
1523 for contact in &record.contacts {
1524 if contact.transport != "udp" && contact.transport != "tcp" {
1525 continue;
1526 }
1527 let Ok(socket_addr) = contact.addr.parse::<SocketAddr>() else {
1528 debug!(
1529 npub = %record.npub,
1530 transport = %contact.transport,
1531 addr = %contact.addr,
1532 "local instance discovery: skip non-socket contact"
1533 );
1534 continue;
1535 };
1536 if !socket_addr.ip().is_loopback() {
1537 debug!(
1538 npub = %record.npub,
1539 addr = %contact.addr,
1540 "local instance discovery: skip non-loopback contact"
1541 );
1542 continue;
1543 }
1544 let address =
1545 PeerAddress::with_priority(contact.transport.clone(), contact.addr.clone(), 10)
1546 .with_seen_at_ms(record.updated_at_ms);
1547 if addresses.iter().any(|existing: &PeerAddress| {
1548 existing.transport == address.transport && existing.addr == address.addr
1549 }) {
1550 continue;
1551 }
1552 addresses.push(address);
1553 }
1554 addresses
1555 }
1556
1557 pub(super) async fn poll_local_instance_discovery(&mut self) {
1561 let Some(registry) = self.local_instance_registry.clone() else {
1562 return;
1563 };
1564 let now_ms = Self::now_ms();
1565 self.maybe_publish_local_instance_record(now_ms);
1566 if !self.local_instance_scan_due(now_ms) {
1567 return;
1568 }
1569 self.last_local_instance_scan_ms = Some(now_ms);
1570
1571 let records = match registry.scan(now_ms, self.config.node.discovery.local.stale_after_ms())
1572 {
1573 Ok(records) => records,
1574 Err(err) => {
1575 debug!(error = %err, "same-host FIPS instance scan failed");
1576 return;
1577 }
1578 };
1579 if records.is_empty() {
1580 return;
1581 }
1582
1583 let mut connect_budget = self.discovery_connect_budget();
1584 let mut skipped_budget = 0usize;
1585 for record in records {
1586 let identity = match PeerIdentity::from_npub(&record.npub) {
1587 Ok(identity) => identity,
1588 Err(err) => {
1589 debug!(npub = %record.npub, error = %err, "local instance discovery: skip bad npub");
1590 continue;
1591 }
1592 };
1593 let peer_node_addr = *identity.node_addr();
1594 if peer_node_addr == *self.identity.node_addr() {
1595 continue;
1596 }
1597 if !self.local_instance_peer_allowed(&identity) {
1598 debug!(
1599 npub = %identity.short_npub(),
1600 "local instance discovery: skip unconfigured peer"
1601 );
1602 continue;
1603 }
1604
1605 let addresses = self.local_instance_peer_addresses(&record);
1606 if addresses.is_empty() {
1607 continue;
1608 }
1609
1610 if self.peers.contains_key(&peer_node_addr)
1611 && self.active_peer_candidate_is_fresh_enough_to_skip(&peer_node_addr, &addresses)
1612 {
1613 continue;
1614 }
1615
1616 for address in addresses {
1617 let Some((transport_id, remote_addr)) =
1618 self.resolve_peer_address_for_match(&address)
1619 else {
1620 continue;
1621 };
1622 if self.is_connecting_to_peer_on_path(&peer_node_addr, transport_id, &remote_addr) {
1623 continue;
1624 }
1625 if connect_budget == 0 || self.path_candidate_attempt_budget(&peer_node_addr) == 0 {
1626 skipped_budget = skipped_budget.saturating_add(1);
1627 continue;
1628 }
1629 info!(
1630 npub = %identity.short_npub(),
1631 transport = %address.transport,
1632 addr = %address.addr,
1633 "same-host FIPS instance discovery: initiating handshake"
1634 );
1635 if let Err(err) = self
1636 .initiate_connection(transport_id, remote_addr, identity)
1637 .await
1638 {
1639 debug!(
1640 npub = %record.npub,
1641 error = %err,
1642 "same-host FIPS instance discovery: failed to initiate connection"
1643 );
1644 }
1645 connect_budget = connect_budget.saturating_sub(1);
1646 }
1647 }
1648 if skipped_budget > 0 {
1649 debug!(
1650 skipped = skipped_budget,
1651 "same-host FIPS instance discovery connect budget exhausted"
1652 );
1653 }
1654 }
1655
1656 pub(super) async fn poll_lan_discovery(&mut self) {
1663 let Some(runtime) = self.lan_discovery.clone() else {
1664 return;
1665 };
1666 let events = runtime.drain_events().await;
1667 if events.is_empty() {
1668 return;
1669 }
1670 let mut connect_budget = self.discovery_connect_budget();
1671 let mut skipped_budget = 0usize;
1672 for event in events {
1673 let crate::discovery::lan::LanEvent::Discovered(peer) = event;
1674 let Some((transport_id, local_addr)) =
1675 self.find_udp_transport_for_remote_addr(peer.addr)
1676 else {
1677 debug!(
1678 addr = %peer.addr,
1679 "lan: skip discovered peer with no compatible UDP transport"
1680 );
1681 continue;
1682 };
1683 let identity = match crate::PeerIdentity::from_npub(&peer.npub) {
1684 Ok(id) => id,
1685 Err(err) => {
1686 debug!(npub = %peer.npub, error = %err, "lan: skip bad npub");
1687 continue;
1688 }
1689 };
1690 let peer_node_addr = *identity.node_addr();
1691 let remote_addr = crate::transport::TransportAddr::from_string(&peer.addr.to_string());
1692 if self.peers.contains_key(&peer_node_addr) {
1693 let candidate = PeerAddress::new("udp", peer.addr.to_string());
1694 if self.active_peer_candidate_is_fresh_enough_to_skip(
1695 &peer_node_addr,
1696 std::slice::from_ref(&candidate),
1697 ) {
1698 continue;
1699 }
1700 if self.is_connecting_to_peer_on_path(&peer_node_addr, transport_id, &remote_addr) {
1701 continue;
1702 }
1703 if connect_budget == 0 || self.path_candidate_attempt_budget(&peer_node_addr) == 0 {
1704 skipped_budget = skipped_budget.saturating_add(1);
1705 continue;
1706 }
1707 info!(
1708 npub = %identity.short_npub(),
1709 addr = %peer.addr,
1710 local_addr = %local_addr,
1711 "lan: initiating alternate-path handshake to active peer"
1712 );
1713 if let Err(err) = self
1714 .initiate_connection(transport_id, remote_addr, identity)
1715 .await
1716 {
1717 debug!(
1718 npub = %peer.npub,
1719 error = %err,
1720 "lan: failed to initiate active peer alternate-path handshake"
1721 );
1722 }
1723 connect_budget = connect_budget.saturating_sub(1);
1724 continue;
1725 }
1726 if self.is_connecting_to_peer_on_path(&peer_node_addr, transport_id, &remote_addr) {
1727 continue;
1728 }
1729 if connect_budget == 0 || self.path_candidate_attempt_budget(&peer_node_addr) == 0 {
1730 skipped_budget = skipped_budget.saturating_add(1);
1731 continue;
1732 }
1733 info!(
1734 npub = %identity.short_npub(),
1735 addr = %peer.addr,
1736 local_addr = %local_addr,
1737 "lan: initiating handshake to discovered peer"
1738 );
1739 if let Err(err) = self
1740 .initiate_connection(transport_id, remote_addr, identity)
1741 .await
1742 {
1743 debug!(
1744 npub = %peer.npub,
1745 error = %err,
1746 "lan: failed to initiate connection to discovered peer"
1747 );
1748 }
1749 connect_budget = connect_budget.saturating_sub(1);
1750 }
1751 if skipped_budget > 0 {
1752 debug!(
1753 skipped = skipped_budget,
1754 "lan: discovery connect budget exhausted"
1755 );
1756 }
1757 }
1758
1759 pub(super) async fn poll_pending_connects(&mut self) {
1766 if self.pending_connects.is_empty() {
1767 return;
1768 }
1769
1770 let mut completed = Vec::new();
1771
1772 for (i, pending) in self.pending_connects.iter().enumerate() {
1773 let state = if let Some(transport) = self.transports.get(&pending.transport_id) {
1774 transport.connection_state(&pending.remote_addr)
1775 } else {
1776 crate::transport::ConnectionState::Failed("transport removed".into())
1777 };
1778
1779 match state {
1780 crate::transport::ConnectionState::Connected => {
1781 completed.push((i, true, None));
1782 }
1783 crate::transport::ConnectionState::Failed(reason) => {
1784 completed.push((i, false, Some(reason)));
1785 }
1786 crate::transport::ConnectionState::Connecting => {
1787 }
1789 crate::transport::ConnectionState::None => {
1790 completed.push((i, false, Some("no connection attempt found".into())));
1792 }
1793 }
1794 }
1795
1796 for (i, success, reason) in completed.into_iter().rev() {
1798 let pending = self.pending_connects.remove(i);
1799
1800 if success {
1801 if let Some(link) = self.links.get_mut(&pending.link_id) {
1803 link.set_connected();
1804 }
1805
1806 debug!(
1807 peer = %self.peer_display_name(pending.peer_identity.node_addr()),
1808 transport_id = %pending.transport_id,
1809 remote_addr = %pending.remote_addr,
1810 link_id = %pending.link_id,
1811 "Transport connected, starting handshake"
1812 );
1813
1814 if let Err(e) = self
1816 .start_handshake(
1817 pending.link_id,
1818 pending.transport_id,
1819 pending.remote_addr.clone(),
1820 pending.peer_identity,
1821 )
1822 .await
1823 {
1824 warn!(
1825 link_id = %pending.link_id,
1826 error = %e,
1827 "Failed to start handshake after transport connect"
1828 );
1829 self.remove_link(&pending.link_id);
1831 }
1832 } else {
1833 let reason = reason.unwrap_or_default();
1834 warn!(
1835 peer = %self.peer_display_name(pending.peer_identity.node_addr()),
1836 transport_id = %pending.transport_id,
1837 remote_addr = %pending.remote_addr,
1838 link_id = %pending.link_id,
1839 reason = %reason,
1840 "Transport connect failed"
1841 );
1842
1843 self.remove_link(&pending.link_id);
1845 self.links.remove(&pending.link_id);
1846 self.schedule_retry(*pending.peer_identity.node_addr(), Self::now_ms());
1847 }
1848 }
1849 }
1850
1851 pub async fn start(&mut self) -> Result<(), NodeError> {
1858 node_start_debug_log("Node::start begin");
1859 if !self.state.can_start() {
1860 return Err(NodeError::AlreadyStarted);
1861 }
1862 self.state = NodeState::Starting;
1863 node_start_debug_log("Node::start state set to starting");
1864
1865 let packet_buffer_size = self.config.node.buffers.packet_channel;
1867 let (packet_tx, packet_rx) = packet_channel(packet_buffer_size);
1868 self.packet_tx = Some(packet_tx.clone());
1869 self.packet_rx = Some(packet_rx);
1870 node_start_debug_log("Node::start packet channel created");
1871
1872 node_start_debug_log("Node::start create transports begin");
1874 let transport_handles = self.create_transports(&packet_tx).await;
1875 node_start_debug_log(format!(
1876 "Node::start create transports complete count={}",
1877 transport_handles.len()
1878 ));
1879
1880 for mut handle in transport_handles {
1881 let transport_id = handle.transport_id();
1882 let transport_type = handle.transport_type().name;
1883 let name = handle.name().map(|s| s.to_string());
1884
1885 node_start_debug_log(format!(
1886 "Node::start transport start begin id={} type={} name={:?}",
1887 transport_id, transport_type, name
1888 ));
1889 match handle.start().await {
1890 Ok(()) => {
1891 node_start_debug_log(format!(
1892 "Node::start transport start ok id={} type={}",
1893 transport_id, transport_type
1894 ));
1895 self.transports.insert(transport_id, handle);
1896 }
1897 Err(e) => {
1898 node_start_debug_log(format!(
1899 "Node::start transport start error id={} type={} error={}",
1900 transport_id, transport_type, e
1901 ));
1902 if let Some(ref n) = name {
1903 warn!(transport_type, name = %n, error = %e, "Transport failed to start");
1904 } else {
1905 warn!(transport_type, error = %e, "Transport failed to start");
1906 }
1907 }
1908 }
1909 }
1910
1911 if !self.transports.is_empty() {
1912 info!(count = self.transports.len(), "Transports initialized");
1913 }
1914
1915 #[cfg(unix)]
1931 {
1932 if self.config.node.worker_pools_enabled {
1933 node_start_debug_log("Node::start worker pools begin");
1934 let cpu_default = std::thread::available_parallelism()
1935 .map(|n| n.get())
1936 .unwrap_or(1)
1937 .max(1);
1938 let encrypt_worker_count: usize = std::env::var("FIPS_ENCRYPT_WORKERS")
1939 .ok()
1940 .and_then(|s| s.parse().ok())
1941 .unwrap_or(cpu_default)
1942 .max(1);
1943 self.encrypt_workers = Some(super::encrypt_worker::EncryptWorkerPool::spawn(
1944 encrypt_worker_count,
1945 ));
1946 info!(
1947 workers = encrypt_worker_count,
1948 "Spawned FMP-encrypt worker pool"
1949 );
1950
1951 let decrypt_worker_count: usize = std::env::var("FIPS_DECRYPT_WORKERS")
1960 .ok()
1961 .and_then(|s| s.parse().ok())
1962 .unwrap_or(cpu_default);
1963 if decrypt_worker_count == 0 {
1964 info!("FIPS_DECRYPT_WORKERS=0 → in-line decrypt in rx_loop (no worker pool)");
1965 } else {
1966 self.decrypt_workers = Some(super::decrypt_worker::DecryptWorkerPool::spawn(
1967 decrypt_worker_count,
1968 ));
1969 info!(
1970 workers = decrypt_worker_count,
1971 "Spawned FMP+FSP-decrypt worker pool"
1972 );
1973 }
1974 node_start_debug_log("Node::start worker pools complete");
1975 } else {
1976 node_start_debug_log("Node::start worker pools disabled");
1977 info!("FIPS worker pools disabled; using in-line crypto/send path");
1978 }
1979 }
1980
1981 if self.config.node.discovery.nostr.enabled {
1982 node_start_debug_log("Node::start nostr discovery start begin");
1983 match NostrDiscovery::start(&self.identity, self.config.node.discovery.nostr.clone())
1984 .await
1985 {
1986 Ok(runtime) => {
1987 node_start_debug_log("Node::start nostr discovery runtime created");
1988 if let Err(err) = self.refresh_overlay_advert(&runtime).await {
1989 warn!(error = %err, "Failed to publish initial Nostr overlay advert");
1990 }
1991 node_start_debug_log("Node::start nostr overlay advert refreshed");
1992 self.nostr_discovery = Some(runtime);
1993 self.nostr_discovery_started_at_ms = Some(Self::now_ms());
1994 info!("Nostr overlay discovery enabled");
1995 }
1996 Err(err) => {
1997 node_start_debug_log(format!(
1998 "Node::start nostr discovery start error error={}",
1999 err
2000 ));
2001 warn!(error = %err, "Failed to start Nostr overlay discovery");
2002 }
2003 }
2004 }
2005
2006 if self.config.node.discovery.lan.enabled {
2010 node_start_debug_log("Node::start lan discovery start begin");
2011 let advertised_udp_port = self
2012 .transports
2013 .values()
2014 .filter(|h| h.is_operational())
2015 .filter(|h| h.transport_type().name == "udp")
2016 .find_map(|h| h.local_addr().map(|addr| addr.port()))
2017 .unwrap_or(0);
2018 let scope = self.lan_discovery_scope();
2019 match crate::discovery::lan::LanDiscovery::start(
2020 &self.identity,
2021 scope,
2022 advertised_udp_port,
2023 self.config.node.discovery.lan.clone(),
2024 )
2025 .await
2026 {
2027 Ok(runtime) => {
2028 node_start_debug_log("Node::start lan discovery start ok");
2029 self.lan_discovery = Some(runtime);
2030 info!("LAN mDNS discovery enabled");
2031 }
2032 Err(err) => {
2033 node_start_debug_log(format!(
2034 "Node::start lan discovery start error error={}",
2035 err
2036 ));
2037 debug!(error = %err, "LAN mDNS discovery not started");
2038 }
2039 }
2040 }
2041
2042 self.start_local_instance_discovery();
2043 self.poll_local_instance_discovery().await;
2044
2045 node_start_debug_log("Node::start initiate peer connections begin");
2048 self.initiate_peer_connections().await;
2049 node_start_debug_log("Node::start initiate peer connections complete");
2050
2051 if self.config.tun.enabled {
2053 node_start_debug_log("Node::start tun init begin");
2054 let address = *self.identity.address();
2055 match TunDevice::create(&self.config.tun, address).await {
2056 Ok(device) => {
2057 let mtu = device.mtu();
2058 let name = device.name().to_string();
2059 let our_addr = *device.address();
2060
2061 info!("TUN device active:");
2062 info!(" name: {}", name);
2063 info!(" address: {}", device.address());
2064 info!(" mtu: {}", mtu);
2065
2066 let effective_mtu = self.effective_ipv6_mtu();
2068 let max_mss = effective_mtu.saturating_sub(40).saturating_sub(20); info!("effective MTU: {} bytes", effective_mtu);
2071 debug!(" max TCP MSS: {} bytes", max_mss);
2072
2073 #[cfg(target_os = "macos")]
2077 let (shutdown_read_fd, shutdown_write_fd) = {
2078 let mut fds = [0i32; 2];
2079 if unsafe { libc::pipe(fds.as_mut_ptr()) } < 0 {
2080 return Err(NodeError::Tun(crate::upper::tun::TunError::Configure(
2081 "failed to create shutdown pipe".into(),
2082 )));
2083 }
2084 (fds[0], fds[1])
2085 };
2086
2087 let (writer, tun_tx) =
2091 device.create_writer(max_mss, self.path_mtu_lookup.clone())?;
2092
2093 let writer_handle = thread::spawn(move || {
2095 writer.run();
2096 });
2097
2098 let reader_tun_tx = tun_tx.clone();
2100
2101 let tun_channel_size = self.config.node.buffers.tun_channel;
2103 let (outbound_tx, outbound_rx) = tokio::sync::mpsc::channel(tun_channel_size);
2104
2105 let transport_mtu = self.transport_mtu();
2107 let path_mtu_lookup = self.path_mtu_lookup.clone();
2108 #[cfg(target_os = "macos")]
2109 let reader_handle = thread::spawn(move || {
2110 run_tun_reader(
2111 device,
2112 mtu,
2113 our_addr,
2114 reader_tun_tx,
2115 outbound_tx,
2116 transport_mtu,
2117 path_mtu_lookup,
2118 shutdown_read_fd,
2119 );
2120 });
2121 #[cfg(not(target_os = "macos"))]
2122 let reader_handle = thread::spawn(move || {
2123 run_tun_reader(
2124 device,
2125 mtu,
2126 our_addr,
2127 reader_tun_tx,
2128 outbound_tx,
2129 transport_mtu,
2130 path_mtu_lookup,
2131 );
2132 });
2133
2134 self.tun_state = TunState::Active;
2135 self.tun_name = Some(name);
2136 self.tun_tx = Some(tun_tx);
2137 self.tun_outbound_rx = Some(outbound_rx);
2138 self.tun_reader_handle = Some(reader_handle);
2139 self.tun_writer_handle = Some(writer_handle);
2140 #[cfg(target_os = "macos")]
2141 {
2142 self.tun_shutdown_fd = Some(shutdown_write_fd);
2143 }
2144 }
2145 Err(e) => {
2146 self.tun_state = TunState::Failed;
2147 warn!(error = %e, "Failed to initialize TUN, continuing without it");
2148 }
2149 }
2150 node_start_debug_log("Node::start tun init complete");
2151 }
2152
2153 if self.config.dns.enabled {
2170 node_start_debug_log("Node::start dns init begin");
2171 let addr_str = self.config.dns.bind_addr();
2172 match addr_str.parse::<std::net::IpAddr>() {
2173 Ok(ip) => {
2174 let bind = std::net::SocketAddr::new(ip, self.config.dns.port());
2175 match Self::bind_dns_socket(bind) {
2176 Ok(socket) => {
2177 let dns_channel_size = self.config.node.buffers.dns_channel;
2178 let (identity_tx, identity_rx) =
2179 tokio::sync::mpsc::channel(dns_channel_size);
2180 let dns_ttl = self.config.dns.ttl();
2181 let base_hosts = crate::upper::hosts::HostMap::from_peer_configs(
2182 self.config.peers(),
2183 );
2184 let reloader = if self.config.node.system_files_enabled {
2185 let hosts_path = std::path::PathBuf::from(
2186 crate::upper::hosts::DEFAULT_HOSTS_PATH,
2187 );
2188 crate::upper::hosts::HostMapReloader::new(base_hosts, hosts_path)
2189 } else {
2190 crate::upper::hosts::HostMapReloader::memory_only(base_hosts)
2191 };
2192 let mesh_ifindex = Self::lookup_mesh_ifindex(self.config.tun.name());
2200 info!(
2201 bind = %bind,
2202 hosts = reloader.hosts().len(),
2203 mesh_ifindex = ?mesh_ifindex,
2204 "DNS responder started for .fips domain (auto-reload enabled)"
2205 );
2206 let handle = tokio::spawn(crate::upper::dns::run_dns_responder(
2207 socket,
2208 identity_tx,
2209 dns_ttl,
2210 reloader,
2211 mesh_ifindex,
2212 ));
2213 self.dns_identity_rx = Some(identity_rx);
2214 self.dns_task = Some(handle);
2215 }
2216 Err(e) => {
2217 warn!(bind = %bind, error = %e, "Failed to start DNS responder");
2218 }
2219 }
2220 }
2221 Err(e) => {
2222 warn!(addr = %addr_str, error = %e, "Invalid dns.bind_addr; DNS responder not started");
2223 }
2224 }
2225 node_start_debug_log("Node::start dns init complete");
2226 }
2227
2228 self.state = NodeState::Running;
2229 node_start_debug_log("Node::start running");
2230 info!("Node started:");
2231 info!(" state: {}", self.state);
2232 info!(" transports: {}", self.transports.len());
2233 info!(" connections: {}", self.connections.len());
2234 Ok(())
2235 }
2236
2237 fn bind_dns_socket(
2250 addr: std::net::SocketAddr,
2251 ) -> Result<tokio::net::UdpSocket, std::io::Error> {
2252 use socket2::{Domain, Protocol, Socket, Type};
2253 let domain = if addr.is_ipv4() {
2254 Domain::IPV4
2255 } else {
2256 Domain::IPV6
2257 };
2258 let sock = Socket::new(domain, Type::DGRAM, Some(Protocol::UDP))?;
2259 if addr.is_ipv6() {
2260 sock.set_only_v6(false)?;
2261 #[cfg(unix)]
2262 Self::set_recv_pktinfo_v6(&sock)?;
2263 }
2264 sock.set_nonblocking(true)?;
2265 sock.bind(&addr.into())?;
2266 tokio::net::UdpSocket::from_std(sock.into())
2267 }
2268
2269 #[cfg(unix)]
2275 fn set_recv_pktinfo_v6(sock: &socket2::Socket) -> Result<(), std::io::Error> {
2276 use std::os::fd::AsRawFd;
2277 let enable: libc::c_int = 1;
2278 let ret = unsafe {
2279 libc::setsockopt(
2280 sock.as_raw_fd(),
2281 libc::IPPROTO_IPV6,
2282 libc::IPV6_RECVPKTINFO,
2283 &enable as *const _ as *const libc::c_void,
2284 std::mem::size_of::<libc::c_int>() as libc::socklen_t,
2285 )
2286 };
2287 if ret < 0 {
2288 return Err(std::io::Error::last_os_error());
2289 }
2290 Ok(())
2291 }
2292
2293 fn lookup_mesh_ifindex(name: &str) -> Option<u32> {
2300 #[cfg(unix)]
2301 {
2302 let c_name = std::ffi::CString::new(name).ok()?;
2303 let idx = unsafe { libc::if_nametoindex(c_name.as_ptr()) };
2304 if idx == 0 { None } else { Some(idx) }
2305 }
2306 #[cfg(not(unix))]
2307 {
2308 let _ = name;
2309 None
2310 }
2311 }
2312
2313 pub async fn stop(&mut self) -> Result<(), NodeError> {
2318 if !self.state.can_stop() {
2319 return Err(NodeError::NotStarted);
2320 }
2321 self.state = NodeState::Stopping;
2322 info!(state = %self.state, "Node stopping");
2323
2324 if let Some(handle) = self.dns_task.take() {
2326 handle.abort();
2327 debug!("DNS responder stopped");
2328 }
2329
2330 self.send_disconnect_to_all_peers(DisconnectReason::Shutdown)
2332 .await;
2333
2334 if let Some(bootstrap) = self.nostr_discovery.take()
2336 && let Err(e) = bootstrap.shutdown().await
2337 {
2338 warn!(error = %e, "Failed to shutdown Nostr overlay discovery");
2339 }
2340
2341 if let Some(lan) = self.lan_discovery.take() {
2345 lan.shutdown().await;
2346 }
2347
2348 if let Some(registry) = self.local_instance_registry.take()
2349 && let Err(err) = registry.remove()
2350 {
2351 debug!(error = %err, "failed to remove same-host FIPS instance record");
2352 }
2353
2354 let transport_ids: Vec<_> = self.transports.keys().cloned().collect();
2356 for transport_id in transport_ids {
2357 if let Some(mut handle) = self.transports.remove(&transport_id) {
2358 let transport_type = handle.transport_type().name;
2359 match handle.stop().await {
2360 Ok(()) => {
2361 info!(transport_id = %transport_id, transport_type, "Transport stopped");
2362 }
2363 Err(e) => {
2364 warn!(
2365 transport_id = %transport_id,
2366 transport_type,
2367 error = %e,
2368 "Transport stop failed"
2369 );
2370 }
2371 }
2372 }
2373 }
2374
2375 self.packet_tx.take();
2377 self.packet_rx.take();
2378
2379 if let Some(name) = self.tun_name.take() {
2381 info!(name = %name, "Shutting down TUN interface");
2382
2383 self.tun_tx.take();
2385
2386 if let Err(e) = shutdown_tun_interface(&name).await {
2388 warn!(name = %name, error = %e, "Failed to shutdown TUN interface");
2389 }
2390
2391 #[cfg(target_os = "macos")]
2394 if let Some(fd) = self.tun_shutdown_fd.take() {
2395 unsafe {
2396 libc::write(fd, b"x".as_ptr() as *const libc::c_void, 1);
2397 libc::close(fd);
2398 }
2399 }
2400
2401 if let Some(handle) = self.tun_reader_handle.take() {
2403 let _ = handle.join();
2404 }
2405 if let Some(handle) = self.tun_writer_handle.take() {
2406 let _ = handle.join();
2407 }
2408
2409 self.tun_state = TunState::Disabled;
2410 }
2411
2412 self.state = NodeState::Stopped;
2413 info!(state = %self.state, "Node stopped");
2414 Ok(())
2415 }
2416
2417 async fn send_disconnect_to_all_peers(&mut self, reason: DisconnectReason) {
2422 let disconnect = Disconnect::new(reason);
2423 let plaintext = disconnect.encode();
2424
2425 let peer_addrs: Vec<NodeAddr> = self
2427 .peers
2428 .iter()
2429 .filter(|(_, peer)| peer.can_send() && peer.has_session())
2430 .map(|(addr, _)| *addr)
2431 .collect();
2432
2433 if peer_addrs.is_empty() {
2434 debug!(
2435 total_peers = self.peers.len(),
2436 "No sendable peers for disconnect notification"
2437 );
2438 return;
2439 }
2440
2441 let mut sent = 0usize;
2442 for node_addr in &peer_addrs {
2443 match self
2444 .send_encrypted_link_message(node_addr, &plaintext)
2445 .await
2446 {
2447 Ok(()) => sent += 1,
2448 Err(e) => {
2449 debug!(
2450 peer = %self.peer_display_name(node_addr),
2451 error = %e,
2452 "Failed to send disconnect (transport may be down)"
2453 );
2454 }
2455 }
2456 }
2457
2458 info!(sent, total = peer_addrs.len(), reason = %reason, "Sent disconnect notifications");
2459 }
2460
2461 fn static_peer_addresses(&self, peer_config: &PeerConfig) -> Vec<PeerAddress> {
2462 peer_config
2463 .addresses_by_priority()
2464 .into_iter()
2465 .cloned()
2466 .collect()
2467 }
2468
2469 async fn nostr_peer_fallback_addresses(
2470 &self,
2471 peer_config: &PeerConfig,
2472 existing: &[PeerAddress],
2473 ) -> Vec<PeerAddress> {
2474 if !self.config.node.discovery.nostr.enabled
2475 || self.config.node.discovery.nostr.policy
2476 == crate::config::NostrDiscoveryPolicy::Disabled
2477 {
2478 return Vec::new();
2479 }
2480
2481 let Some(bootstrap) = self.nostr_discovery.clone() else {
2482 return Vec::new();
2483 };
2484 let endpoints = match bootstrap
2485 .cached_advert_endpoints_for_peer(&peer_config.npub)
2486 .await
2487 {
2488 Some(endpoints) => endpoints,
2489 None => {
2490 debug!(
2491 npub = %peer_config.npub,
2492 "No cached Nostr advert endpoints for configured peer"
2493 );
2494 return Vec::new();
2495 }
2496 };
2497
2498 let mut fallback = Vec::new();
2499 let mut next_priority = existing
2500 .iter()
2501 .map(|addr| addr.priority)
2502 .max()
2503 .unwrap_or(100)
2504 .saturating_add(1);
2505 let seen_at_ms = Self::now_ms();
2510 for endpoint in endpoints {
2511 let Some(candidate) =
2512 Self::overlay_endpoint_to_peer_address(&endpoint, next_priority, seen_at_ms)
2513 else {
2514 continue;
2515 };
2516 if existing
2517 .iter()
2518 .any(|addr| addr.transport == candidate.transport && addr.addr == candidate.addr)
2519 || fallback.iter().any(|addr: &PeerAddress| {
2520 addr.transport == candidate.transport && addr.addr == candidate.addr
2521 })
2522 {
2523 continue;
2524 }
2525 fallback.push(candidate);
2526 next_priority = next_priority.saturating_add(1);
2527 }
2528 fallback
2529 }
2530
2531 async fn request_nostr_bootstrap(&self, peer_config: &PeerConfig) -> bool {
2532 if !self.config.node.discovery.nostr.enabled
2533 || self.config.node.discovery.nostr.policy
2534 == crate::config::NostrDiscoveryPolicy::Disabled
2535 {
2536 return false;
2537 }
2538 let Some(bootstrap) = self.nostr_discovery.clone() else {
2539 return false;
2540 };
2541 bootstrap.request_connect(peer_config.clone()).await;
2542 info!(npub = %peer_config.npub, "Started background Nostr UDP NAT traversal attempt");
2543 true
2544 }
2545
2546 fn overlay_endpoint_to_peer_address(
2547 endpoint: &OverlayEndpointAdvert,
2548 priority: u8,
2549 seen_at_ms: u64,
2550 ) -> Option<PeerAddress> {
2551 let transport = match endpoint.transport {
2552 OverlayTransportKind::Udp => "udp",
2553 OverlayTransportKind::Tcp => "tcp",
2554 OverlayTransportKind::Tor => "tor",
2555 OverlayTransportKind::WebRtc => "webrtc",
2556 };
2557 Some(
2558 PeerAddress::with_priority(transport, endpoint.addr.clone(), priority)
2559 .with_seen_at_ms(seen_at_ms),
2560 )
2561 }
2562
2563 async fn attempt_peer_address_list(
2564 &mut self,
2565 peer_config: &PeerConfig,
2566 peer_identity: PeerIdentity,
2567 allow_bootstrap_nat: bool,
2568 addresses: &[PeerAddress],
2569 ) -> Result<(), NodeError> {
2570 let mut attempted = false;
2571 let peer_node_addr = *peer_identity.node_addr();
2572 let mut concrete_budget = self.path_candidate_attempt_budget(&peer_node_addr);
2573
2574 for addr in addresses {
2575 if attempted && addr.seen_at_ms.is_some() {
2576 debug!(
2577 npub = %peer_config.npub,
2578 transport = %addr.transport,
2579 addr = %addr.addr,
2580 "Skipping overlay fallback because a configured direct candidate is already in flight"
2581 );
2582 continue;
2583 }
2584
2585 if addr.transport == "udp" && addr.addr.eq_ignore_ascii_case("nat") {
2586 if attempted {
2587 debug!(
2588 npub = %peer_config.npub,
2589 "Skipping Nostr NAT fallback because a configured direct candidate is already in flight"
2590 );
2591 continue;
2592 }
2593 if !allow_bootstrap_nat {
2594 continue;
2595 }
2596 if self.request_nostr_bootstrap(peer_config).await {
2597 attempted = true;
2598 continue;
2599 }
2600 debug!(npub = %peer_config.npub, "No Nostr overlay runtime for udp:nat address");
2601 continue;
2602 }
2603
2604 let (transport_id, remote_addr) = if addr.transport == "ethernet" {
2605 match self.resolve_ethernet_addr(&addr.addr) {
2606 Ok(result) => result,
2607 Err(e) => {
2608 debug!(
2609 transport = %addr.transport,
2610 addr = %addr.addr,
2611 error = %e,
2612 "Failed to resolve Ethernet address"
2613 );
2614 continue;
2615 }
2616 }
2617 } else if addr.transport == "ble" {
2618 #[cfg(bluer_available)]
2619 {
2620 match self.resolve_ble_addr(&addr.addr) {
2621 Ok(result) => result,
2622 Err(e) => {
2623 debug!(
2624 transport = %addr.transport,
2625 addr = %addr.addr,
2626 error = %e,
2627 "Failed to resolve BLE address"
2628 );
2629 continue;
2630 }
2631 }
2632 }
2633 #[cfg(not(bluer_available))]
2634 {
2635 debug!(transport = %addr.transport, "BLE transport not available on this build");
2636 continue;
2637 }
2638 } else {
2639 let tid = if addr.transport == "udp"
2640 && let Ok(remote_socket_addr) = addr.addr.parse::<SocketAddr>()
2641 {
2642 match self.find_udp_transport_for_remote_addr(remote_socket_addr) {
2643 Some((id, _)) => id,
2644 None => {
2645 debug!(
2646 transport = %addr.transport,
2647 addr = %addr.addr,
2648 "No compatible operational UDP transport for address"
2649 );
2650 continue;
2651 }
2652 }
2653 } else {
2654 match self.find_transport_for_type(&addr.transport) {
2655 Some(id) => id,
2656 None => {
2657 debug!(
2658 transport = %addr.transport,
2659 addr = %addr.addr,
2660 "No operational transport for address type"
2661 );
2662 continue;
2663 }
2664 }
2665 };
2666 (tid, TransportAddr::from_string(&addr.addr))
2667 };
2668
2669 if self.is_connecting_to_peer_on_path(&peer_node_addr, transport_id, &remote_addr) {
2670 attempted = true;
2671 debug!(
2672 npub = %peer_config.npub,
2673 transport_id = %transport_id,
2674 remote_addr = %remote_addr,
2675 "Skipping duplicate in-flight candidate path"
2676 );
2677 continue;
2678 }
2679
2680 if concrete_budget == 0 {
2681 debug!(
2682 npub = %peer_config.npub,
2683 max_candidates = MAX_PARALLEL_PATH_CANDIDATES_PER_PEER,
2684 "Path candidate race budget exhausted"
2685 );
2686 break;
2687 }
2688
2689 match self
2690 .initiate_connection(transport_id, remote_addr, peer_identity)
2691 .await
2692 {
2693 Ok(()) => {
2694 attempted = true;
2695 concrete_budget = concrete_budget.saturating_sub(1);
2696 }
2697 Err(e @ NodeError::AccessDenied(_)) => return Err(e),
2698 Err(e) => {
2699 debug!(
2700 npub = %peer_config.npub,
2701 transport_id = %transport_id,
2702 error = %e,
2703 "Connection attempt failed, trying next address"
2704 );
2705 }
2706 }
2707 }
2708
2709 if attempted {
2710 return Ok(());
2711 }
2712
2713 Err(NodeError::NoTransportForType(format!(
2714 "no operational transport for any of {}'s addresses",
2715 peer_config.npub
2716 )))
2717 }
2718
2719 async fn queue_open_discovery_retries(&mut self, bootstrap: &std::sync::Arc<NostrDiscovery>) {
2720 self.run_open_discovery_sweep(bootstrap, None, "per-tick")
2721 .await;
2722 }
2723
2724 pub(in crate::node) async fn run_open_discovery_sweep(
2735 &mut self,
2736 bootstrap: &std::sync::Arc<NostrDiscovery>,
2737 max_age_secs: Option<u64>,
2738 caller: &'static str,
2739 ) {
2740 if !self.config.node.discovery.nostr.enabled
2741 || self.config.node.discovery.nostr.policy != crate::config::NostrDiscoveryPolicy::Open
2742 {
2743 return;
2744 }
2745
2746 let configured_npubs = self
2747 .config
2748 .peers()
2749 .iter()
2750 .map(|peer| peer.npub.clone())
2751 .collect::<HashSet<_>>();
2752 let now_ms = Self::now_ms();
2753 let now_secs = now_ms / 1000;
2754 let mut enqueue_budget = self.open_discovery_enqueue_budget(&configured_npubs);
2755 if enqueue_budget == 0 {
2756 debug!(
2757 caller = %caller,
2758 "open-discovery sweep: enqueue budget is 0, skipping"
2759 );
2760 return;
2761 }
2762
2763 let candidates = bootstrap.cached_open_discovery_candidates(64).await;
2764 let cached_count = candidates.len();
2765 let mut enqueued = 0usize;
2766 let mut skipped_age = 0usize;
2767 let mut skipped_configured = 0usize;
2768 let mut skipped_self = 0usize;
2769 let mut skipped_connected = 0usize;
2770 let mut skipped_retry_pending = 0usize;
2771 let mut skipped_connecting = 0usize;
2772 let mut skipped_no_endpoints = 0usize;
2773 let mut skipped_invalid_npub = 0usize;
2774 let mut skipped_cooldown = 0usize;
2775
2776 for (npub, endpoints, created_at_secs) in candidates {
2777 if enqueue_budget == 0 {
2778 break;
2779 }
2780
2781 if let Some(max_age) = max_age_secs
2782 && now_secs.saturating_sub(created_at_secs) > max_age
2783 {
2784 skipped_age = skipped_age.saturating_add(1);
2785 continue;
2786 }
2787
2788 if configured_npubs.contains(&npub) {
2789 if let Ok(identity) = PeerIdentity::from_npub(&npub) {
2809 let configured_addr = *identity.node_addr();
2810 if let Some(state) = self.retry_pending.get_mut(&configured_addr)
2811 && state.retry_after_ms > now_ms
2812 {
2813 state.retry_after_ms = now_ms;
2814 debug!(
2815 caller = %caller,
2816 peer = %self.peer_display_name(&configured_addr),
2817 advert_age_secs = now_secs.saturating_sub(created_at_secs),
2818 "Expediting configured-peer retry after fresh overlay advert"
2819 );
2820 }
2821 }
2822 skipped_configured = skipped_configured.saturating_add(1);
2823 continue;
2824 }
2825
2826 let peer_identity = match PeerIdentity::from_npub(&npub) {
2827 Ok(identity) => identity,
2828 Err(_) => {
2829 skipped_invalid_npub = skipped_invalid_npub.saturating_add(1);
2830 continue;
2831 }
2832 };
2833 let node_addr = *peer_identity.node_addr();
2834 if node_addr == *self.identity.node_addr() {
2835 skipped_self = skipped_self.saturating_add(1);
2836 continue;
2837 }
2838 if self.peers.contains_key(&node_addr) {
2839 skipped_connected = skipped_connected.saturating_add(1);
2840 continue;
2841 }
2842 if self.retry_pending.contains_key(&node_addr) {
2843 skipped_retry_pending = skipped_retry_pending.saturating_add(1);
2844 continue;
2845 }
2846 if bootstrap.cooldown_until(&npub, now_ms).is_some() {
2847 skipped_cooldown = skipped_cooldown.saturating_add(1);
2848 continue;
2849 }
2850 let connecting = self.connections.values().any(|conn| {
2851 conn.expected_identity()
2852 .map(|id| id.node_addr() == &node_addr)
2853 .unwrap_or(false)
2854 });
2855 if connecting {
2856 skipped_connecting = skipped_connecting.saturating_add(1);
2857 continue;
2858 }
2859
2860 let mut addresses = Vec::new();
2861 let mut priority = 120u8;
2862 let seen_at_ms = Self::now_ms();
2863 for endpoint in endpoints {
2864 let Some(candidate) =
2865 Self::overlay_endpoint_to_peer_address(&endpoint, priority, seen_at_ms)
2866 else {
2867 continue;
2868 };
2869 if addresses.iter().any(|existing: &PeerAddress| {
2870 existing.transport == candidate.transport && existing.addr == candidate.addr
2871 }) {
2872 continue;
2873 }
2874 addresses.push(candidate);
2875 priority = priority.saturating_add(1);
2876 }
2877 if addresses.is_empty() {
2878 skipped_no_endpoints = skipped_no_endpoints.saturating_add(1);
2879 continue;
2880 }
2881
2882 self.peer_aliases
2883 .entry(node_addr)
2884 .or_insert_with(|| peer_identity.short_npub());
2885 self.register_identity(node_addr, peer_identity.pubkey_full());
2886
2887 let mut state = super::retry::RetryState::new(PeerConfig {
2888 npub: npub.clone(),
2889 alias: None,
2890 addresses,
2891 connect_policy: ConnectPolicy::AutoConnect,
2892 auto_reconnect: true,
2893 discovery_fallback_transit: false,
2894 });
2895 state.reconnect = false;
2896 state.retry_after_ms = now_ms;
2897 state.expires_at_ms = Some(self.open_discovery_retry_expires_at_ms(now_ms));
2898 self.retry_pending.insert(node_addr, state);
2899 info!(
2900 caller = %caller,
2901 peer = %peer_identity.short_npub(),
2902 advert_age_secs = now_secs.saturating_sub(created_at_secs),
2903 "open-discovery sweep: queued retry for cached advert"
2904 );
2905 enqueue_budget = enqueue_budget.saturating_sub(1);
2906 enqueued = enqueued.saturating_add(1);
2907 }
2908
2909 let total_skipped = skipped_age
2913 + skipped_configured
2914 + skipped_self
2915 + skipped_connected
2916 + skipped_retry_pending
2917 + skipped_connecting
2918 + skipped_no_endpoints
2919 + skipped_invalid_npub
2920 + skipped_cooldown;
2921 let should_summarize = caller == "startup" || enqueued > 0;
2922 if should_summarize {
2923 info!(
2924 caller = %caller,
2925 cached = cached_count,
2926 queued = enqueued,
2927 skipped_age = skipped_age,
2928 skipped_configured = skipped_configured,
2929 skipped_self = skipped_self,
2930 skipped_connected = skipped_connected,
2931 skipped_retry_pending = skipped_retry_pending,
2932 skipped_connecting = skipped_connecting,
2933 skipped_no_endpoints = skipped_no_endpoints,
2934 skipped_invalid_npub = skipped_invalid_npub,
2935 skipped_cooldown = skipped_cooldown,
2936 skipped_total = total_skipped,
2937 "open-discovery sweep complete"
2938 );
2939 }
2940 }
2941
2942 async fn maybe_run_startup_open_discovery_sweep(
2950 &mut self,
2951 bootstrap: &std::sync::Arc<NostrDiscovery>,
2952 ) {
2953 if self.startup_open_discovery_sweep_done {
2954 return;
2955 }
2956 if !self.config.node.discovery.nostr.enabled
2957 || self.config.node.discovery.nostr.policy != crate::config::NostrDiscoveryPolicy::Open
2958 {
2959 self.startup_open_discovery_sweep_done = true;
2961 return;
2962 }
2963 let Some(started_at_ms) = self.nostr_discovery_started_at_ms else {
2964 return;
2965 };
2966 let now_ms = Self::now_ms();
2967 let delay_ms = self
2968 .config
2969 .node
2970 .discovery
2971 .nostr
2972 .startup_sweep_delay_secs
2973 .saturating_mul(1000);
2974 if now_ms < started_at_ms.saturating_add(delay_ms) {
2975 return;
2976 }
2977
2978 let max_age_secs = self.config.node.discovery.nostr.startup_sweep_max_age_secs;
2979 self.run_open_discovery_sweep(bootstrap, Some(max_age_secs), "startup")
2980 .await;
2981 self.startup_open_discovery_sweep_done = true;
2982 }
2983
2984 fn available_outbound_slots(&self) -> usize {
2985 let connection_used = self
2986 .connections
2987 .len()
2988 .saturating_add(self.pending_connects.len());
2989 let connection_slots = if self.max_connections == 0 {
2990 usize::MAX
2991 } else {
2992 self.max_connections.saturating_sub(connection_used)
2993 };
2994
2995 let peer_slots = if self.max_peers == 0 {
2996 usize::MAX
2997 } else {
2998 self.max_peers.saturating_sub(self.peers.len())
2999 };
3000
3001 connection_slots.min(peer_slots)
3002 }
3003
3004 fn open_discovery_enqueue_budget(&self, configured_npubs: &HashSet<String>) -> usize {
3005 let current_open_discovery_pending = self
3006 .retry_pending
3007 .values()
3008 .filter(|state| !configured_npubs.contains(&state.peer_config.npub))
3009 .count();
3010
3011 let cap_remaining = self
3012 .config
3013 .node
3014 .discovery
3015 .nostr
3016 .open_discovery_max_pending
3017 .saturating_sub(current_open_discovery_pending);
3018
3019 cap_remaining.min(self.available_outbound_slots())
3020 }
3021
3022 fn open_discovery_retry_expires_at_ms(&self, now_ms: u64) -> u64 {
3023 now_ms.saturating_add(
3024 self.config
3025 .node
3026 .discovery
3027 .nostr
3028 .advert_ttl_secs
3029 .saturating_mul(1000)
3030 .saturating_mul(OPEN_DISCOVERY_RETRY_LIFETIME_MULTIPLIER),
3031 )
3032 }
3033
3034 async fn build_overlay_advert(
3035 &self,
3036 bootstrap: &std::sync::Arc<NostrDiscovery>,
3037 ) -> Option<OverlayAdvert> {
3038 if !self.config.node.discovery.nostr.enabled {
3039 return None;
3040 }
3041
3042 let mut endpoints = Vec::new();
3043 let mut has_udp_nat = false;
3044 let mut has_webrtc = false;
3045
3046 for handle in self.transports.values() {
3047 if !handle.is_operational() {
3048 continue;
3049 }
3050
3051 match handle.transport_type().name {
3052 "udp" => {
3053 let Some(cfg) = self.lookup_udp_config(handle.name()) else {
3054 continue;
3055 };
3056 if !cfg.advertise_on_nostr() {
3057 continue;
3058 }
3059 if cfg.is_public() {
3060 if let Some(explicit) = cfg.external_advert_addr() {
3070 endpoints.push(OverlayEndpointAdvert {
3071 transport: OverlayTransportKind::Udp,
3072 addr: explicit.to_string(),
3073 });
3074 } else {
3075 match handle.local_addr() {
3076 Some(addr)
3077 if !addr.ip().is_unspecified()
3078 && !is_unroutable_advert_ip(addr.ip()) =>
3079 {
3080 endpoints.push(OverlayEndpointAdvert {
3081 transport: OverlayTransportKind::Udp,
3082 addr: addr.to_string(),
3083 });
3084 }
3085 Some(addr) => {
3086 let key = handle.transport_id().as_u32();
3087 let port = addr.port();
3088 if let Some(public) =
3089 bootstrap.learn_public_udp_addr(key, port).await
3090 {
3091 endpoints.push(OverlayEndpointAdvert {
3092 transport: OverlayTransportKind::Udp,
3093 addr: public.to_string(),
3094 });
3095 } else {
3096 warn!(
3097 transport_id = key,
3098 bind_addr = %addr,
3099 "advert: udp public=true but bind is wildcard \
3100 or private and STUN observation failed; \
3101 advertising no UDP endpoint. Either set \
3102 transports.udp.external_addr, bind to a \
3103 specific *public* IP, or ensure \
3104 node.discovery.nostr.stun_servers is reachable"
3105 );
3106 }
3107 }
3108 None => {}
3109 }
3110 }
3111 } else {
3112 endpoints.push(OverlayEndpointAdvert {
3113 transport: OverlayTransportKind::Udp,
3114 addr: "nat".to_string(),
3115 });
3116 has_udp_nat = true;
3117 }
3118 }
3119 "webrtc" => {
3120 let Some(cfg) = self.lookup_webrtc_config(handle.name()) else {
3121 continue;
3122 };
3123 if !cfg.advertise_on_nostr() {
3124 continue;
3125 }
3126 endpoints.push(OverlayEndpointAdvert {
3127 transport: OverlayTransportKind::WebRtc,
3128 addr: hex::encode(self.identity.pubkey_full().serialize()),
3129 });
3130 has_webrtc = true;
3131 }
3132 "tcp" => {
3133 let Some(cfg) = self.lookup_tcp_config(handle.name()) else {
3134 continue;
3135 };
3136 if !cfg.advertise_on_nostr() {
3137 continue;
3138 }
3139 if let Some(explicit) = cfg.external_advert_addr() {
3151 endpoints.push(OverlayEndpointAdvert {
3152 transport: OverlayTransportKind::Tcp,
3153 addr: explicit.to_string(),
3154 });
3155 } else {
3156 match handle.local_addr() {
3157 Some(addr)
3158 if !addr.ip().is_unspecified()
3159 && !is_unroutable_advert_ip(addr.ip()) =>
3160 {
3161 endpoints.push(OverlayEndpointAdvert {
3162 transport: OverlayTransportKind::Tcp,
3163 addr: addr.to_string(),
3164 });
3165 }
3166 Some(addr) => {
3167 warn!(
3168 bind_addr = %addr,
3169 "advert: tcp advertise_on_nostr=true bound to wildcard \
3170 or private IP and no transports.tcp.external_addr set; \
3171 advertising no TCP endpoint. Either set external_addr \
3172 to the public IP (recommended for cloud 1:1-NAT setups) \
3173 or bind explicitly to the public IP"
3174 );
3175 }
3176 None => {}
3177 }
3178 }
3179 }
3180 "tor" => {
3181 let Some(cfg) = self.lookup_tor_config(handle.name()) else {
3182 continue;
3183 };
3184 if !cfg.advertise_on_nostr() {
3185 continue;
3186 }
3187 if let Some(addr) = handle.onion_address() {
3188 endpoints.push(OverlayEndpointAdvert {
3189 transport: OverlayTransportKind::Tor,
3190 addr: format!("{}:{}", addr, cfg.advertised_port()),
3191 });
3192 }
3193 }
3194 _ => {}
3195 }
3196 }
3197
3198 if endpoints.is_empty() {
3199 return None;
3200 }
3201
3202 Some(OverlayAdvert {
3203 identifier: ADVERT_IDENTIFIER.to_string(),
3204 version: ADVERT_VERSION,
3205 endpoints,
3206 signal_relays: (has_udp_nat || has_webrtc)
3207 .then(|| self.config.node.discovery.nostr.dm_relays.clone()),
3208 stun_servers: (has_udp_nat || has_webrtc)
3209 .then(|| self.config.node.discovery.nostr.stun_servers.clone()),
3210 })
3211 }
3212
3213 async fn refresh_overlay_advert(
3214 &self,
3215 bootstrap: &std::sync::Arc<NostrDiscovery>,
3216 ) -> Result<(), crate::discovery::nostr::BootstrapError> {
3217 let advert = self.build_overlay_advert(bootstrap).await;
3218 bootstrap.update_local_advert(advert).await
3219 }
3220
3221 fn lookup_udp_config(&self, transport_name: Option<&str>) -> Option<&crate::config::UdpConfig> {
3222 match (&self.config.transports.udp, transport_name) {
3223 (crate::config::TransportInstances::Single(cfg), None) => Some(cfg),
3224 (crate::config::TransportInstances::Named(configs), Some(name)) => configs.get(name),
3225 _ => None,
3226 }
3227 }
3228
3229 fn lookup_tcp_config(&self, transport_name: Option<&str>) -> Option<&crate::config::TcpConfig> {
3230 match (&self.config.transports.tcp, transport_name) {
3231 (crate::config::TransportInstances::Single(cfg), None) => Some(cfg),
3232 (crate::config::TransportInstances::Named(configs), Some(name)) => configs.get(name),
3233 _ => None,
3234 }
3235 }
3236
3237 fn lookup_tor_config(&self, transport_name: Option<&str>) -> Option<&crate::config::TorConfig> {
3238 match (&self.config.transports.tor, transport_name) {
3239 (crate::config::TransportInstances::Single(cfg), None) => Some(cfg),
3240 (crate::config::TransportInstances::Named(configs), Some(name)) => configs.get(name),
3241 _ => None,
3242 }
3243 }
3244
3245 fn lookup_webrtc_config(
3246 &self,
3247 transport_name: Option<&str>,
3248 ) -> Option<&crate::config::WebRtcConfig> {
3249 match (&self.config.transports.webrtc, transport_name) {
3250 (crate::config::TransportInstances::Single(cfg), None) => Some(cfg),
3251 (crate::config::TransportInstances::Named(configs), Some(name)) => configs.get(name),
3252 _ => None,
3253 }
3254 }
3255
3256 pub(in crate::node) async fn try_peer_addresses(
3257 &mut self,
3258 peer_config: &PeerConfig,
3259 peer_identity: PeerIdentity,
3260 allow_bootstrap_nat: bool,
3261 ) -> Result<(), NodeError> {
3262 let peer_node_addr = *peer_identity.node_addr();
3263 if self.peers.contains_key(&peer_node_addr) {
3264 debug!(
3265 npub = %peer_config.npub,
3266 "Peer already exists, skipping address attempts"
3267 );
3268 return Ok(());
3269 }
3270
3271 let candidates = self.peer_address_candidates(peer_config).await;
3272
3273 if candidates.is_empty() {
3274 if allow_bootstrap_nat && self.request_nostr_bootstrap(peer_config).await {
3275 return Ok(());
3276 }
3277 return Err(NodeError::NoTransportForType(format!(
3278 "no addresses known for {}",
3279 peer_config.npub
3280 )));
3281 }
3282
3283 if self
3284 .attempt_peer_address_list(peer_config, peer_identity, allow_bootstrap_nat, &candidates)
3285 .await
3286 .is_ok()
3287 {
3288 return Ok(());
3289 }
3290
3291 Err(NodeError::NoTransportForType(format!(
3292 "no operational transport for any of {}'s addresses",
3293 peer_config.npub
3294 )))
3295 }
3296
3297 async fn try_active_peer_alternative_addresses(
3298 &mut self,
3299 peer_config: &PeerConfig,
3300 peer_identity: PeerIdentity,
3301 ) -> Result<bool, NodeError> {
3302 let peer_node_addr = *peer_identity.node_addr();
3303 let candidates = self.peer_address_candidates(peer_config).await;
3304
3305 if candidates.is_empty() {
3306 return Err(NodeError::NoTransportForType(format!(
3307 "no addresses known for {}",
3308 peer_config.npub
3309 )));
3310 }
3311
3312 let alternatives: Vec<_> = candidates
3313 .into_iter()
3314 .filter(|addr| !(addr.transport == "udp" && addr.addr.eq_ignore_ascii_case("nat")))
3315 .filter(|addr| !self.active_peer_matches_candidate(&peer_node_addr, addr))
3316 .collect();
3317
3318 if alternatives.is_empty() {
3319 return Err(NodeError::NoTransportForType(format!(
3320 "no concrete alternate addresses known for {}",
3321 peer_config.npub
3322 )));
3323 }
3324
3325 self.attempt_peer_address_list(peer_config, peer_identity, false, &alternatives)
3326 .await?;
3327 Ok(true)
3328 }
3329
3330 async fn peer_address_candidates(&self, peer_config: &PeerConfig) -> Vec<PeerAddress> {
3331 let static_addresses = self.static_peer_addresses(peer_config);
3338 let overlay_addresses = self
3339 .nostr_peer_fallback_addresses(peer_config, &static_addresses)
3340 .await;
3341
3342 let mut candidates = Vec::with_capacity(overlay_addresses.len() + static_addresses.len());
3343 for addr in overlay_addresses.into_iter().chain(static_addresses) {
3344 if !candidates.iter().any(|existing: &PeerAddress| {
3345 existing.transport == addr.transport && existing.addr == addr.addr
3346 }) {
3347 candidates.push(addr);
3348 }
3349 }
3350
3351 candidates.sort_by(|a, b| match (a.seen_at_ms, b.seen_at_ms) {
3356 _ if a.priority != b.priority => a.priority.cmp(&b.priority),
3357 (Some(a_ts), Some(b_ts)) => b_ts.cmp(&a_ts),
3358 (Some(_), None) => std::cmp::Ordering::Less,
3359 (None, Some(_)) => std::cmp::Ordering::Greater,
3360 (None, None) => std::cmp::Ordering::Equal,
3361 });
3362
3363 candidates
3364 }
3365
3366 fn active_peer_matches_any_candidate(
3367 &self,
3368 peer_node_addr: &NodeAddr,
3369 candidates: &[PeerAddress],
3370 ) -> bool {
3371 candidates
3372 .iter()
3373 .any(|candidate| self.active_peer_matches_candidate(peer_node_addr, candidate))
3374 }
3375
3376 pub(in crate::node) fn active_peer_candidate_is_fresh_enough_to_skip(
3377 &self,
3378 peer_node_addr: &NodeAddr,
3379 candidates: &[PeerAddress],
3380 ) -> bool {
3381 if !self.active_peer_matches_any_candidate(peer_node_addr, candidates) {
3382 return false;
3383 }
3384 !self.active_peer_needs_same_path_refresh(peer_node_addr)
3385 }
3386
3387 fn active_peer_needs_same_path_refresh(&self, peer_node_addr: &NodeAddr) -> bool {
3388 let Some(peer) = self.peers.get(peer_node_addr) else {
3389 return false;
3390 };
3391 let stale_after_ms = self
3392 .config
3393 .node
3394 .heartbeat_interval_secs
3395 .saturating_mul(1000)
3396 .max(1000);
3397 peer.idle_time(Self::now_ms()) > stale_after_ms
3398 }
3399
3400 fn active_peer_matches_candidate(
3401 &self,
3402 peer_node_addr: &NodeAddr,
3403 candidate: &PeerAddress,
3404 ) -> bool {
3405 let Some(peer) = self.peers.get(peer_node_addr) else {
3406 return false;
3407 };
3408 let Some(current_addr) = peer.current_addr() else {
3409 return false;
3410 };
3411 if let Some(peer_transport_id) = peer.transport_id()
3412 && let Some((candidate_transport_id, candidate_addr)) =
3413 self.resolve_peer_address_for_match(candidate)
3414 {
3415 return peer_transport_id == candidate_transport_id && current_addr == &candidate_addr;
3416 }
3417 if peer
3418 .transport_id()
3419 .map(|id| self.bootstrap_transports.contains(&id))
3420 .unwrap_or(false)
3421 {
3422 return false;
3423 }
3424 let current_addr = current_addr.to_string();
3425 let current_transport = peer
3426 .transport_id()
3427 .and_then(|id| self.transports.get(&id))
3428 .map(|transport| transport.transport_type().name);
3429
3430 candidate.addr == current_addr
3431 && current_transport
3432 .map(|transport| transport == candidate.transport)
3433 .unwrap_or(true)
3434 }
3435
3436 pub(crate) async fn api_connect(
3444 &mut self,
3445 npub: &str,
3446 address: &str,
3447 transport: &str,
3448 ) -> Result<serde_json::Value, String> {
3449 let peer_config = PeerConfig {
3450 npub: npub.to_string(),
3451 alias: None,
3452 addresses: vec![PeerAddress::new(transport, address)],
3453 connect_policy: ConnectPolicy::Manual,
3454 auto_reconnect: false,
3455 discovery_fallback_transit: true,
3456 };
3457
3458 if let Ok(identity) = PeerIdentity::from_npub(npub) {
3460 self.peer_aliases
3461 .insert(*identity.node_addr(), identity.short_npub());
3462 self.register_identity(*identity.node_addr(), identity.pubkey_full());
3463 }
3464
3465 self.initiate_peer_connection(&peer_config)
3466 .await
3467 .map(|()| {
3468 info!(
3469 npub = %npub,
3470 address = %address,
3471 transport = %transport,
3472 "API connect initiated"
3473 );
3474 serde_json::json!({
3475 "npub": npub,
3476 "address": address,
3477 "transport": transport,
3478 })
3479 })
3480 .map_err(|e| e.to_string())
3481 }
3482
3483 pub(crate) fn api_disconnect(&mut self, npub: &str) -> Result<serde_json::Value, String> {
3487 let peer_identity =
3488 PeerIdentity::from_npub(npub).map_err(|e| format!("invalid npub '{npub}': {e}"))?;
3489 let node_addr = *peer_identity.node_addr();
3490
3491 if !self.peers.contains_key(&node_addr) {
3492 return Err(format!("peer not found: {npub}"));
3493 }
3494
3495 self.remove_active_peer(&node_addr);
3497
3498 self.retry_pending.remove(&node_addr);
3500
3501 info!(npub = %npub, "API disconnect completed");
3502
3503 Ok(serde_json::json!({
3504 "npub": npub,
3505 "disconnected": true,
3506 }))
3507 }
3508
3509 pub async fn adopt_established_traversal(
3516 &mut self,
3517 traversal: EstablishedTraversal,
3518 ) -> Result<BootstrapHandoffResult, NodeError> {
3519 debug!(
3520 peer_npub = %traversal.peer_npub,
3521 session_id = %traversal.session_id,
3522 remote_addr = %traversal.remote_addr,
3523 "adopting established traversal socket"
3524 );
3525
3526 if !self.state.is_operational() {
3527 return Err(NodeError::NotStarted);
3528 }
3529
3530 let packet_tx = self.packet_tx.clone().ok_or(NodeError::NotStarted)?;
3531 let peer_identity = PeerIdentity::from_npub(&traversal.peer_npub).map_err(|e| {
3532 NodeError::InvalidPeerNpub {
3533 npub: traversal.peer_npub.clone(),
3534 reason: e.to_string(),
3535 }
3536 })?;
3537 let peer_node_addr = *peer_identity.node_addr();
3538 if self.peers.contains_key(&peer_node_addr) {
3539 debug!(
3540 peer_npub = %traversal.peer_npub,
3541 "Adopting NAT traversal handoff as alternate path for already-connected peer"
3542 );
3543 }
3544
3545 self.peer_aliases
3546 .insert(peer_node_addr, peer_identity.short_npub());
3547 self.register_identity(peer_node_addr, peer_identity.pubkey_full());
3548
3549 let transport_id = self.allocate_transport_id();
3550 let inherited_config = traversal.transport_config.clone().unwrap_or_else(|| {
3570 let mut cfg = self
3571 .lookup_udp_config(traversal.transport_name.as_deref())
3572 .or_else(|| self.lookup_udp_config(None))
3573 .cloned()
3574 .unwrap_or_default();
3575 cfg.bind_addr = None;
3576 cfg.external_addr = None;
3577 cfg
3578 });
3579 let mut transport = crate::transport::udp::UdpTransport::new(
3580 transport_id,
3581 traversal.transport_name.clone(),
3582 inherited_config,
3583 packet_tx,
3584 );
3585
3586 transport
3587 .adopt_socket_async(traversal.socket)
3588 .await
3589 .map_err(|e| NodeError::BootstrapHandoff(e.to_string()))?;
3590
3591 let local_addr = transport.local_addr().ok_or_else(|| {
3592 NodeError::BootstrapHandoff("adopted UDP transport has no local address".into())
3593 })?;
3594
3595 self.transports.insert(
3596 transport_id,
3597 crate::transport::TransportHandle::Udp(transport),
3598 );
3599 self.bootstrap_transports.insert(transport_id);
3600 self.bootstrap_transport_npubs
3601 .insert(transport_id, traversal.peer_npub.clone());
3602
3603 let remote_addr = TransportAddr::from_string(&traversal.remote_addr.to_string());
3604 if let Err(err) = self
3605 .initiate_connection(transport_id, remote_addr.clone(), peer_identity)
3606 .await
3607 {
3608 self.bootstrap_transports.remove(&transport_id);
3609 self.bootstrap_transport_npubs.remove(&transport_id);
3610 if let Some(mut handle) = self.transports.remove(&transport_id) {
3611 let _ = handle.stop().await;
3612 }
3613 return Err(err);
3614 }
3615
3616 info!(
3617 peer = %self.peer_display_name(&peer_node_addr),
3618 transport_id = %transport_id,
3619 local_addr = %local_addr,
3620 remote_addr = %traversal.remote_addr,
3621 session_id = %traversal.session_id,
3622 "adopted NAT traversal socket; handshake initiated"
3623 );
3624
3625 Ok(BootstrapHandoffResult {
3626 transport_id,
3627 local_addr,
3628 remote_addr: traversal.remote_addr,
3629 peer_node_addr,
3630 session_id: traversal.session_id,
3631 })
3632 }
3633}