1use super::{Node, NodeError, NodeState};
4use crate::config::{ConnectPolicy, NostrDiscoveryPolicy, PeerAddress, PeerConfig};
5use crate::discovery::nostr::{
6 ADVERT_IDENTIFIER, ADVERT_VERSION, BootstrapEvent, NostrDiscovery, OverlayAdvert,
7 OverlayEndpointAdvert, OverlayTransportKind,
8};
9use crate::discovery::{BootstrapHandoffResult, EstablishedTraversal};
10use crate::node::acl::PeerAclContext;
11use crate::node::wire::build_msg1;
12use crate::peer::PeerConnection;
13use crate::protocol::{Disconnect, DisconnectReason};
14use crate::transport::{Link, LinkDirection, LinkId, TransportAddr, TransportId, packet_channel};
15use crate::upper::tun::{TunDevice, TunState, run_tun_reader, shutdown_tun_interface};
16use crate::{NodeAddr, PeerIdentity};
17use std::collections::{HashMap, HashSet};
18use std::net::{IpAddr, SocketAddr};
19use std::thread;
20use std::time::Duration;
21use tracing::{debug, info, warn};
22
23#[cfg(debug_assertions)]
24fn node_start_debug_log(message: impl AsRef<str>) {
25 use std::io::Write as _;
26
27 if let Ok(mut file) = std::fs::OpenOptions::new()
28 .create(true)
29 .append(true)
30 .open(std::env::temp_dir().join("nvpn-fips-endpoint-debug.log"))
31 {
32 let _ = writeln!(
33 file,
34 "{:?} {}",
35 std::time::SystemTime::now(),
36 message.as_ref()
37 );
38 }
39}
40
41#[cfg(not(debug_assertions))]
42fn node_start_debug_log(_message: impl AsRef<str>) {}
43
44fn is_unroutable_advert_ip(ip: IpAddr) -> bool {
52 match ip {
53 IpAddr::V4(v4) => {
54 v4.is_private()
55 || v4.is_loopback()
56 || v4.is_link_local()
57 || v4.is_unspecified()
58 || v4.is_multicast()
59 || v4.is_broadcast()
60 || v4.is_documentation()
61 || (v4.octets()[0] == 100 && (v4.octets()[1] & 0xc0) == 64)
64 }
65 IpAddr::V6(v6) => {
66 v6.is_loopback()
67 || v6.is_unspecified()
68 || v6.is_unique_local()
69 || v6.is_multicast()
70 || (v6.segments()[0] & 0xffc0) == 0xfe80
72 }
73 }
74}
75
76fn socket_addr_families_compatible(local: SocketAddr, remote: SocketAddr) -> bool {
77 matches!(
78 (local, remote),
79 (SocketAddr::V4(_), SocketAddr::V4(_)) | (SocketAddr::V6(_), SocketAddr::V6(_))
80 )
81}
82
83const OPEN_DISCOVERY_RETRY_LIFETIME_MULTIPLIER: u64 = 2;
84const MAX_PARALLEL_PATH_CANDIDATES_PER_PEER: usize = 4;
85const MAX_AUTO_CONNECT_GRAPH_WARMUPS_PER_TICK: usize = 16;
86const MAX_DISCOVERY_CONNECTS_PER_TICK: usize = 16;
87
88impl Node {
89 pub(super) async fn update_peers(
101 &mut self,
102 new_peers: Vec<crate::config::PeerConfig>,
103 ) -> Result<crate::node::UpdatePeersOutcome, crate::node::NodeError> {
104 use std::collections::{HashMap, HashSet};
105
106 let mut new_by_addr: HashMap<crate::identity::NodeAddr, crate::config::PeerConfig> =
107 HashMap::with_capacity(new_peers.len());
108 let mut new_order = Vec::with_capacity(new_peers.len());
109 for peer in new_peers {
110 let identity = match PeerIdentity::from_npub(&peer.npub) {
111 Ok(id) => id,
112 Err(e) => {
113 return Err(crate::node::NodeError::InvalidPeerNpub {
114 npub: peer.npub.clone(),
115 reason: e.to_string(),
116 });
117 }
118 };
119 let node_addr = *identity.node_addr();
123 if !new_by_addr.contains_key(&node_addr) {
124 new_order.push(node_addr);
125 }
126 new_by_addr.insert(node_addr, peer);
127 }
128
129 let current_by_addr: HashMap<crate::identity::NodeAddr, crate::config::PeerConfig> = self
130 .config
131 .peers()
132 .iter()
133 .filter_map(|pc| {
134 PeerIdentity::from_npub(&pc.npub)
135 .ok()
136 .map(|id| (*id.node_addr(), pc.clone()))
137 })
138 .collect();
139
140 let new_addrs: HashSet<_> = new_by_addr.keys().copied().collect();
141 let current_addrs: HashSet<_> = current_by_addr.keys().copied().collect();
142
143 let removed: Vec<_> = current_addrs.difference(&new_addrs).copied().collect();
144 let added: Vec<_> = new_addrs.difference(¤t_addrs).copied().collect();
145 let kept: Vec<_> = new_addrs.intersection(¤t_addrs).copied().collect();
146
147 let mut outcome = crate::node::UpdatePeersOutcome::default();
148
149 for node_addr in &removed {
150 if self.retry_pending.remove(node_addr).is_some() {
151 debug!(
152 peer = %self.peer_display_name(node_addr),
153 "Dropping retry entry for peer removed from runtime peer list"
154 );
155 }
156 self.peer_aliases.remove(node_addr);
157 self.set_discovery_fallback_transit_allowed(*node_addr, false);
158 outcome.removed += 1;
159 }
160
161 let mut auto_connect_refresh_configs = Vec::new();
162 for node_addr in &kept {
163 let new_pc = &new_by_addr[node_addr];
164 let current_pc = ¤t_by_addr[node_addr];
165 if new_pc.addresses != current_pc.addresses
166 || new_pc.alias != current_pc.alias
167 || new_pc.connect_policy != current_pc.connect_policy
168 || new_pc.auto_reconnect != current_pc.auto_reconnect
169 || new_pc.discovery_fallback_transit != current_pc.discovery_fallback_transit
170 {
171 outcome.updated += 1;
172 self.set_discovery_fallback_transit_allowed(
173 *node_addr,
174 new_pc.discovery_fallback_transit,
175 );
176 if let Some(state) = self.retry_pending.get_mut(node_addr) {
177 state.peer_config = new_pc.clone();
178 state.retry_after_ms = Self::now_ms();
179 }
180 if let Some(alias) = new_pc.alias.clone() {
181 self.peer_aliases.insert(*node_addr, alias);
182 }
183 if new_pc.is_auto_connect() && !new_pc.addresses.is_empty() {
184 auto_connect_refresh_configs.push(new_pc.clone());
185 }
186 } else {
187 outcome.unchanged += 1;
188 self.set_discovery_fallback_transit_allowed(
189 *node_addr,
190 new_pc.discovery_fallback_transit,
191 );
192 if new_pc.is_auto_connect() && !new_pc.addresses.is_empty() {
193 auto_connect_refresh_configs.push(new_pc.clone());
194 }
195 }
196 }
197
198 let added_configs: Vec<crate::config::PeerConfig> = new_order
199 .iter()
200 .filter(|addr| added.contains(addr))
201 .map(|addr| new_by_addr[addr].clone())
202 .collect();
203
204 self.config.peers = new_order
208 .iter()
209 .filter_map(|addr| new_by_addr.get(addr).cloned())
210 .collect();
211
212 for peer_config in added_configs {
213 outcome.added += 1;
214 let Ok(identity) = PeerIdentity::from_npub(&peer_config.npub) else {
215 continue;
216 };
217 let name = peer_config
218 .alias
219 .clone()
220 .unwrap_or_else(|| identity.short_npub());
221 self.peer_aliases.insert(*identity.node_addr(), name);
222 self.set_discovery_fallback_transit_allowed(
223 *identity.node_addr(),
224 peer_config.discovery_fallback_transit,
225 );
226 self.register_identity(*identity.node_addr(), identity.pubkey_full());
227
228 if !peer_config.is_auto_connect() {
229 continue;
230 }
231
232 match self
233 .try_auto_connect_graph_session(&peer_config, identity)
234 .await
235 {
236 Ok(true) => continue,
237 Ok(false) => {}
238 Err(err) => {
239 debug!(
240 npub = %peer_config.npub,
241 error = %err,
242 "Existing FIPS graph did not warm newly added peer"
243 );
244 }
245 }
246
247 if let Err(e) = self.initiate_peer_connection(&peer_config).await {
248 warn!(
249 npub = %peer_config.npub,
250 error = %e,
251 "Failed to initiate connection for newly added peer"
252 );
253 if let Ok(peer_identity) = PeerIdentity::from_npub(&peer_config.npub) {
254 self.schedule_retry(*peer_identity.node_addr(), Self::now_ms());
255 }
256 if matches!(e, crate::node::NodeError::NoTransportForType(_))
257 && let Some(bootstrap) = self.nostr_discovery.clone()
258 {
259 bootstrap
260 .request_advert_stale_check(peer_config.npub.clone())
261 .await;
262 }
263 }
264 }
265
266 for peer_config in auto_connect_refresh_configs {
267 let Ok(peer_identity) = PeerIdentity::from_npub(&peer_config.npub) else {
268 continue;
269 };
270 let node_addr = *peer_identity.node_addr();
271
272 if self.peers.contains_key(&node_addr) {
273 match self
274 .initiate_active_peer_alternative_connection(&peer_config)
275 .await
276 {
277 Ok(attempted) => {
278 if attempted {
279 debug!(
280 peer = %self.peer_display_name(&node_addr),
281 "Started non-disruptive alternate-path handshake for active peer"
282 );
283 }
284 }
285 Err(e) => {
286 debug!(
287 npub = %peer_config.npub,
288 error = %e,
289 "Active peer alternate-path refresh did not start"
290 );
291 }
292 }
293 continue;
294 }
295
296 match self
297 .try_auto_connect_graph_session(&peer_config, peer_identity)
298 .await
299 {
300 Ok(true) => continue,
301 Ok(false) => {}
302 Err(err) => {
303 debug!(
304 npub = %peer_config.npub,
305 error = %err,
306 "Existing FIPS graph did not warm refreshed peer"
307 );
308 }
309 }
310
311 match self.initiate_peer_connection(&peer_config).await {
312 Ok(()) => {
313 let hs_timeout_ms = self.config.node.rate_limit.handshake_timeout_secs * 1000;
314 if let Some(state) = self.retry_pending.get_mut(&node_addr) {
315 state.peer_config = peer_config;
316 state.retry_after_ms = Self::now_ms().saturating_add(hs_timeout_ms);
317 }
318 }
319 Err(e) => {
320 debug!(
321 npub = %peer_config.npub,
322 error = %e,
323 "Refreshed peer addresses did not initiate a direct connection"
324 );
325 self.schedule_retry(node_addr, Self::now_ms());
326 }
327 }
328 }
329
330 self.warm_auto_connect_graph_sessions().await;
331
332 Ok(outcome)
333 }
334
335 pub(super) async fn initiate_peer_connections(&mut self) {
336 let peer_identities: Vec<(PeerIdentity, Option<String>)> = self
342 .config
343 .peers()
344 .iter()
345 .filter_map(|pc| {
346 PeerIdentity::from_npub(&pc.npub)
347 .ok()
348 .map(|id| (id, pc.alias.clone()))
349 })
350 .collect();
351
352 for (identity, alias) in peer_identities {
353 let name = alias.unwrap_or_else(|| identity.short_npub());
354 self.peer_aliases.insert(*identity.node_addr(), name);
355 self.register_identity(*identity.node_addr(), identity.pubkey_full());
359 }
360
361 let peer_configs: Vec<_> = self.config.auto_connect_peers().cloned().collect();
363
364 if peer_configs.is_empty() {
365 debug!("No static peers configured");
366 return;
367 }
368
369 debug!(
370 count = peer_configs.len(),
371 "Initiating static peer connections"
372 );
373
374 for peer_config in peer_configs {
375 let peer_identity = match PeerIdentity::from_npub(&peer_config.npub) {
376 Ok(identity) => identity,
377 Err(_) => continue,
378 };
379 match self
380 .try_auto_connect_graph_session(&peer_config, peer_identity)
381 .await
382 {
383 Ok(true) => continue,
384 Ok(false) => {}
385 Err(err) => {
386 debug!(
387 npub = %peer_config.npub,
388 error = %err,
389 "Existing FIPS graph did not warm auto-connect peer"
390 );
391 }
392 }
393 if let Err(e) = self.initiate_peer_connection(&peer_config).await {
394 warn!(
395 npub = %peer_config.npub,
396 alias = ?peer_config.alias,
397 error = %e,
398 "Failed to initiate peer connection"
399 );
400 if let Ok(peer_identity) = PeerIdentity::from_npub(&peer_config.npub) {
404 self.schedule_retry(*peer_identity.node_addr(), Self::now_ms());
405 }
406 if matches!(e, crate::node::NodeError::NoTransportForType(_))
412 && let Some(bootstrap) = self.nostr_discovery.clone()
413 {
414 bootstrap
415 .request_advert_stale_check(peer_config.npub.clone())
416 .await;
417 }
418 }
419 }
420
421 self.warm_auto_connect_graph_sessions().await;
422 }
423
424 pub(in crate::node) async fn try_auto_connect_graph_session(
425 &mut self,
426 peer_config: &PeerConfig,
427 peer_identity: PeerIdentity,
428 ) -> Result<bool, NodeError> {
429 if !peer_config.is_auto_connect() {
430 return Ok(false);
431 }
432
433 let peer_node_addr = *peer_identity.node_addr();
434 if self.peers.contains_key(&peer_node_addr) {
435 return Ok(false);
436 }
437 if self
438 .sessions
439 .get(&peer_node_addr)
440 .is_some_and(|entry| entry.is_established() || entry.is_initiating())
441 {
442 return Ok(true);
443 }
444 if self.find_next_hop(&peer_node_addr).is_none() {
445 return Ok(false);
446 }
447
448 self.register_identity(peer_node_addr, peer_identity.pubkey_full());
449 match self
450 .initiate_session(peer_node_addr, peer_identity.pubkey_full())
451 .await
452 {
453 Ok(()) => {
454 debug!(
455 peer = %self.peer_display_name(&peer_node_addr),
456 "Warmed auto-connect peer session over existing FIPS graph"
457 );
458 Ok(true)
459 }
460 Err(NodeError::SendFailed { node_addr, reason })
461 if node_addr == peer_node_addr && reason == "no route to destination" =>
462 {
463 self.maybe_initiate_lookup(&peer_node_addr).await;
464 Ok(false)
465 }
466 Err(err) => Err(err),
467 }
468 }
469
470 pub(super) async fn initiate_peer_connection(
474 &mut self,
475 peer_config: &crate::config::PeerConfig,
476 ) -> Result<(), NodeError> {
477 self.initiate_peer_connection_inner(peer_config).await
478 }
479
480 pub(super) async fn initiate_peer_retry_connection(
486 &mut self,
487 peer_config: &crate::config::PeerConfig,
488 ) -> Result<(), NodeError> {
489 self.initiate_peer_connection_inner(peer_config).await
490 }
491
492 async fn initiate_active_peer_alternative_connection(
493 &mut self,
494 peer_config: &crate::config::PeerConfig,
495 ) -> Result<bool, NodeError> {
496 let peer_identity =
497 PeerIdentity::from_npub(&peer_config.npub).map_err(|e| NodeError::InvalidPeerNpub {
498 npub: peer_config.npub.clone(),
499 reason: e.to_string(),
500 })?;
501 let peer_node_addr = *peer_identity.node_addr();
502
503 if !self.peers.contains_key(&peer_node_addr) {
504 self.initiate_peer_connection(peer_config).await?;
505 return Ok(true);
506 }
507
508 self.try_active_peer_alternative_addresses(peer_config, peer_identity)
514 .await
515 }
516
517 async fn initiate_peer_connection_inner(
518 &mut self,
519 peer_config: &crate::config::PeerConfig,
520 ) -> Result<(), NodeError> {
521 let peer_identity =
523 PeerIdentity::from_npub(&peer_config.npub).map_err(|e| NodeError::InvalidPeerNpub {
524 npub: peer_config.npub.clone(),
525 reason: e.to_string(),
526 })?;
527
528 let peer_node_addr = *peer_identity.node_addr();
529
530 if self.peers.contains_key(&peer_node_addr) {
532 debug!(
533 npub = %peer_config.npub,
534 "Peer already exists, skipping"
535 );
536 return Ok(());
537 }
538
539 self.try_peer_addresses(peer_config, peer_identity, true)
540 .await
541 }
542
543 fn is_connecting_to_peer(&self, peer_node_addr: &NodeAddr) -> bool {
544 self.connections.values().any(|conn| {
545 conn.expected_identity()
546 .map(|id| id.node_addr() == peer_node_addr)
547 .unwrap_or(false)
548 })
549 }
550
551 fn is_connecting_to_peer_on_path(
552 &self,
553 peer_node_addr: &NodeAddr,
554 transport_id: TransportId,
555 remote_addr: &TransportAddr,
556 ) -> bool {
557 self.connections.values().any(|conn| {
558 conn.expected_identity()
559 .map(|id| id.node_addr() == peer_node_addr)
560 .unwrap_or(false)
561 && conn.transport_id() == Some(transport_id)
562 && conn.source_addr() == Some(remote_addr)
563 }) || self.pending_connects.iter().any(|pending| {
564 pending.peer_identity.node_addr() == peer_node_addr
565 && pending.transport_id == transport_id
566 && &pending.remote_addr == remote_addr
567 })
568 }
569
570 pub(in crate::node) fn should_warm_auto_connect_session(
571 &self,
572 peer_node_addr: &NodeAddr,
573 ) -> bool {
574 if self.peers.contains_key(peer_node_addr)
575 || self
576 .sessions
577 .get(peer_node_addr)
578 .is_some_and(|entry| entry.is_established())
579 {
580 return false;
581 }
582
583 self.config.peers().iter().any(|peer| {
584 peer.is_auto_connect()
585 && PeerIdentity::from_npub(&peer.npub)
586 .map(|identity| identity.node_addr() == peer_node_addr)
587 .unwrap_or(false)
588 })
589 }
590
591 pub(in crate::node) async fn warm_auto_connect_graph_sessions(&mut self) -> usize {
592 if !self.peers.values().any(|peer| peer.can_send()) {
593 return 0;
594 }
595
596 let mut budget = self.graph_session_warmup_budget();
597 if budget == 0 {
598 return 0;
599 }
600
601 let peer_identities: Vec<_> = self
602 .config
603 .auto_connect_peers()
604 .filter_map(|peer| PeerIdentity::from_npub(&peer.npub).ok())
605 .collect();
606
607 let mut warmed = 0;
608 for identity in peer_identities {
609 if budget == 0 {
610 break;
611 }
612
613 let peer_node_addr = *identity.node_addr();
614 if peer_node_addr == *self.identity.node_addr()
615 || !self.should_warm_auto_connect_session(&peer_node_addr)
616 || self
617 .sessions
618 .get(&peer_node_addr)
619 .is_some_and(|entry| entry.is_initiating())
620 {
621 continue;
622 }
623
624 self.register_identity(peer_node_addr, identity.pubkey_full());
625
626 if self.find_next_hop(&peer_node_addr).is_some() {
627 match self
628 .initiate_session(peer_node_addr, identity.pubkey_full())
629 .await
630 {
631 Ok(()) => {
632 warmed += 1;
633 budget = budget.saturating_sub(1);
634 debug!(
635 peer = %self.peer_display_name(&peer_node_addr),
636 "Warmed auto-connect peer session over existing FIPS graph"
637 );
638 }
639 Err(NodeError::SendFailed { node_addr, reason })
640 if node_addr == peer_node_addr && reason == "no route to destination" =>
641 {
642 self.maybe_initiate_lookup(&peer_node_addr).await;
643 warmed += 1;
644 budget = budget.saturating_sub(1);
645 }
646 Err(err) => {
647 debug!(
648 peer = %self.peer_display_name(&peer_node_addr),
649 error = %err,
650 "Failed to warm auto-connect peer session"
651 );
652 }
653 }
654 } else {
655 self.maybe_initiate_lookup(&peer_node_addr).await;
656 warmed += 1;
657 budget = budget.saturating_sub(1);
658 }
659 }
660
661 warmed
662 }
663
664 pub(in crate::node) fn graph_session_warmup_budget(&self) -> usize {
665 let max_destinations = self.config.node.session.pending_max_destinations;
666 if max_destinations == 0 {
667 return 0;
668 }
669
670 let pending_sessions = self
671 .sessions
672 .values()
673 .filter(|entry| !entry.is_established())
674 .count();
675 let pending_total = pending_sessions.saturating_add(self.pending_lookups.len());
676 max_destinations
677 .saturating_sub(pending_total)
678 .min(MAX_AUTO_CONNECT_GRAPH_WARMUPS_PER_TICK)
679 }
680
681 fn outbound_handshake_slots(&self) -> usize {
682 let used = self
683 .connections
684 .len()
685 .saturating_add(self.pending_connects.len());
686 if self.max_connections == 0 {
687 usize::MAX
688 } else {
689 self.max_connections.saturating_sub(used)
690 }
691 }
692
693 fn outbound_link_slots(&self) -> usize {
694 if self.max_links == 0 {
695 usize::MAX
696 } else {
697 self.max_links.saturating_sub(self.links.len())
698 }
699 }
700
701 fn path_candidate_attempt_budget(&self, peer_node_addr: &NodeAddr) -> usize {
702 if !self.peers.contains_key(peer_node_addr)
703 && self.max_peers > 0
704 && self.peers.len() >= self.max_peers
705 {
706 return 0;
707 }
708
709 let in_flight_for_peer = self
710 .connections
711 .values()
712 .filter(|conn| {
713 conn.expected_identity()
714 .map(|id| id.node_addr() == peer_node_addr)
715 .unwrap_or(false)
716 })
717 .count()
718 .saturating_add(
719 self.pending_connects
720 .iter()
721 .filter(|pending| pending.peer_identity.node_addr() == peer_node_addr)
722 .count(),
723 );
724
725 self.outbound_handshake_slots()
726 .min(self.outbound_link_slots())
727 .min(MAX_PARALLEL_PATH_CANDIDATES_PER_PEER.saturating_sub(in_flight_for_peer))
728 }
729
730 fn discovery_connect_budget(&self) -> usize {
731 self.outbound_handshake_slots()
732 .min(self.outbound_link_slots())
733 .min(MAX_DISCOVERY_CONNECTS_PER_TICK)
734 }
735
736 fn find_udp_transport_for_remote_addr(
743 &self,
744 remote_addr: SocketAddr,
745 ) -> Option<(TransportId, SocketAddr)> {
746 self.transports
747 .iter()
748 .filter(|(id, handle)| {
749 handle.transport_type().name == "udp"
750 && handle.is_operational()
751 && !self.bootstrap_transports.contains(id)
752 })
753 .filter_map(|(id, handle)| {
754 let local_addr = handle.local_addr()?;
755 socket_addr_families_compatible(local_addr, remote_addr)
756 .then_some((*id, local_addr))
757 })
758 .min_by_key(|(id, _)| id.as_u32())
759 }
760
761 pub(super) fn transport_discovery_candidate(
762 &self,
763 discovered_transport_id: TransportId,
764 discovered_addr: TransportAddr,
765 ) -> Option<(TransportId, TransportAddr, &'static str)> {
766 let transport = self.transports.get(&discovered_transport_id)?;
767 let transport_name = transport.transport_type().name;
768
769 if transport_name != "udp" {
770 return Some((discovered_transport_id, discovered_addr, transport_name));
771 }
772
773 let Some(remote_socket_addr) = discovered_addr
774 .as_str()
775 .and_then(|addr| addr.parse::<SocketAddr>().ok())
776 else {
777 if self.bootstrap_transports.contains(&discovered_transport_id) {
778 debug!(
779 transport_id = %discovered_transport_id,
780 remote_addr = %discovered_addr,
781 "transport discovery: skip non-numeric UDP address from bootstrap transport"
782 );
783 return None;
784 }
785 return Some((discovered_transport_id, discovered_addr, transport_name));
786 };
787
788 let Some((transport_id, local_addr)) =
789 self.find_udp_transport_for_remote_addr(remote_socket_addr)
790 else {
791 debug!(
792 transport_id = %discovered_transport_id,
793 remote_addr = %discovered_addr,
794 "transport discovery: skip UDP peer with no compatible local socket"
795 );
796 return None;
797 };
798
799 if transport_id != discovered_transport_id {
800 debug!(
801 discovered_transport_id = %discovered_transport_id,
802 selected_transport_id = %transport_id,
803 local_addr = %local_addr,
804 remote_addr = %remote_socket_addr,
805 "transport discovery: selected compatible UDP transport"
806 );
807 }
808
809 Some((
810 transport_id,
811 TransportAddr::from_socket_addr(remote_socket_addr),
812 transport_name,
813 ))
814 }
815
816 fn peer_address_string_for_transport_candidate(
817 &self,
818 transport_id: TransportId,
819 transport_name: &str,
820 remote_addr: &TransportAddr,
821 ) -> String {
822 #[cfg(not(any(target_os = "linux", target_os = "macos")))]
823 let _ = (transport_id, transport_name);
824
825 #[cfg(any(target_os = "linux", target_os = "macos"))]
826 if transport_name == "ethernet"
827 && remote_addr.as_bytes().len() == 6
828 && let Some(interface) = self
829 .transports
830 .get(&transport_id)
831 .and_then(|transport| transport.interface_name())
832 {
833 let mut mac = [0u8; 6];
834 mac.copy_from_slice(remote_addr.as_bytes());
835 return format!(
836 "{interface}/{}",
837 crate::transport::ethernet::format_mac(&mac)
838 );
839 }
840
841 remote_addr.to_string()
842 }
843
844 fn resolve_peer_address_for_match(
845 &self,
846 candidate: &PeerAddress,
847 ) -> Option<(TransportId, TransportAddr)> {
848 if candidate.transport == "udp" && candidate.addr.eq_ignore_ascii_case("nat") {
849 return None;
850 }
851
852 if candidate.transport == "ethernet" {
853 return self.resolve_ethernet_addr(&candidate.addr).ok();
854 }
855
856 if candidate.transport == "ble" {
857 #[cfg(bluer_available)]
858 {
859 return self.resolve_ble_addr(&candidate.addr).ok();
860 }
861 #[cfg(not(bluer_available))]
862 {
863 return None;
864 }
865 }
866
867 let transport_id = if candidate.transport == "udp"
868 && let Ok(remote_socket_addr) = candidate.addr.parse::<SocketAddr>()
869 {
870 self.find_udp_transport_for_remote_addr(remote_socket_addr)
871 .map(|(id, _)| id)?
872 } else {
873 self.find_transport_for_type(&candidate.transport)?
874 };
875
876 Some((transport_id, TransportAddr::from_string(&candidate.addr)))
877 }
878
879 pub(super) async fn initiate_connection(
890 &mut self,
891 transport_id: TransportId,
892 remote_addr: TransportAddr,
893 peer_identity: PeerIdentity,
894 ) -> Result<(), NodeError> {
895 let peer_node_addr = *peer_identity.node_addr();
896
897 if self.is_connecting_to_peer_on_path(&peer_node_addr, transport_id, &remote_addr) {
898 debug!(
899 peer = %self.peer_display_name(&peer_node_addr),
900 transport_id = %transport_id,
901 remote_addr = %remote_addr,
902 "Connection already in progress for candidate path"
903 );
904 return Ok(());
905 }
906
907 if self.outbound_handshake_slots() == 0 {
908 return Err(NodeError::MaxConnectionsExceeded {
909 max: self.max_connections,
910 });
911 }
912
913 if self.outbound_link_slots() == 0 {
914 return Err(NodeError::MaxLinksExceeded {
915 max: self.max_links,
916 });
917 }
918
919 if !self.peers.contains_key(&peer_node_addr)
920 && self.max_peers > 0
921 && self.peers.len() >= self.max_peers
922 {
923 return Err(NodeError::MaxPeersExceeded {
924 max: self.max_peers,
925 });
926 }
927
928 self.authorize_peer(
929 &peer_identity,
930 PeerAclContext::OutboundConnect,
931 transport_id,
932 &remote_addr,
933 )?;
934
935 let is_connection_oriented = self
936 .transports
937 .get(&transport_id)
938 .map(|t| t.transport_type().connection_oriented)
939 .unwrap_or(false);
940
941 let link_id = self.allocate_link_id();
943
944 let link = if is_connection_oriented {
945 Link::new(
946 link_id,
947 transport_id,
948 remote_addr.clone(),
949 LinkDirection::Outbound,
950 Duration::from_millis(self.config.node.base_rtt_ms),
951 )
952 } else {
953 Link::connectionless(
954 link_id,
955 transport_id,
956 remote_addr.clone(),
957 LinkDirection::Outbound,
958 Duration::from_millis(self.config.node.base_rtt_ms),
959 )
960 };
961
962 self.links.insert(link_id, link);
963
964 self.addr_to_link
966 .insert((transport_id, remote_addr.clone()), link_id);
967
968 if is_connection_oriented {
969 if let Some(transport) = self.transports.get(&transport_id) {
971 match transport.connect(&remote_addr).await {
972 Ok(()) => {
973 debug!(
974 peer = %self.peer_display_name(&peer_node_addr),
975 transport_id = %transport_id,
976 remote_addr = %remote_addr,
977 link_id = %link_id,
978 "Transport connect initiated (non-blocking)"
979 );
980 self.pending_connects.push(super::PendingConnect {
981 link_id,
982 transport_id,
983 remote_addr,
984 peer_identity,
985 });
986 }
987 Err(e) => {
988 self.links.remove(&link_id);
990 self.addr_to_link.remove(&(transport_id, remote_addr));
991 return Err(NodeError::TransportError(e.to_string()));
992 }
993 }
994 }
995 Ok(())
996 } else {
997 self.start_handshake(link_id, transport_id, remote_addr, peer_identity)
999 .await
1000 }
1001 }
1002
1003 pub(super) async fn start_handshake(
1008 &mut self,
1009 link_id: LinkId,
1010 transport_id: TransportId,
1011 remote_addr: TransportAddr,
1012 peer_identity: PeerIdentity,
1013 ) -> Result<(), NodeError> {
1014 let peer_node_addr = *peer_identity.node_addr();
1015
1016 let current_time_ms = Self::now_ms();
1018 let mut connection = PeerConnection::outbound(link_id, peer_identity, current_time_ms);
1019
1020 let our_index = match self.index_allocator.allocate() {
1022 Ok(idx) => idx,
1023 Err(e) => {
1024 self.links.remove(&link_id);
1026 self.addr_to_link.remove(&(transport_id, remote_addr));
1027 return Err(NodeError::IndexAllocationFailed(e.to_string()));
1028 }
1029 };
1030
1031 let our_keypair = self.identity.keypair();
1033 let noise_msg1 =
1034 match connection.start_handshake(our_keypair, self.startup_epoch, current_time_ms) {
1035 Ok(msg) => msg,
1036 Err(e) => {
1037 let _ = self.index_allocator.free(our_index);
1039 self.links.remove(&link_id);
1040 self.addr_to_link.remove(&(transport_id, remote_addr));
1041 return Err(NodeError::HandshakeFailed(e.to_string()));
1042 }
1043 };
1044
1045 connection.set_our_index(our_index);
1047 connection.set_transport_id(transport_id);
1048 connection.set_source_addr(remote_addr.clone());
1049
1050 let wire_msg1 = build_msg1(our_index, &noise_msg1);
1052
1053 debug!(
1054 peer = %self.peer_display_name(&peer_node_addr),
1055 transport_id = %transport_id,
1056 remote_addr = %remote_addr,
1057 link_id = %link_id,
1058 our_index = %our_index,
1059 "Connection initiated"
1060 );
1061
1062 let resend_interval = self.config.node.rate_limit.handshake_resend_interval_ms;
1064 connection.set_handshake_msg1(wire_msg1.clone(), current_time_ms + resend_interval);
1065
1066 self.pending_outbound
1068 .insert((transport_id, our_index.as_u32()), link_id);
1069 self.connections.insert(link_id, connection);
1070
1071 let send_result = match self.transports.get(&transport_id) {
1076 Some(transport) => Some(transport.send(&remote_addr, &wire_msg1).await),
1077 None => None,
1078 };
1079 match send_result {
1080 Some(send_result) => {
1081 self.note_local_send_outcome(&send_result);
1082 match send_result {
1083 Ok(bytes) => {
1084 debug!(
1085 link_id = %link_id,
1086 our_index = %our_index,
1087 bytes,
1088 "Sent Noise handshake message 1 (wire format)"
1089 );
1090 }
1091 Err(e) => {
1092 warn!(
1093 link_id = %link_id,
1094 transport_id = %transport_id,
1095 remote_addr = %remote_addr,
1096 our_index = %our_index,
1097 error = %e,
1098 "Failed to send handshake message"
1099 );
1100 self.pending_outbound
1101 .remove(&(transport_id, our_index.as_u32()));
1102 self.connections.remove(&link_id);
1103 self.links.remove(&link_id);
1104 self.addr_to_link
1105 .remove(&(transport_id, remote_addr.clone()));
1106 let _ = self.index_allocator.free(our_index);
1107 return Err(NodeError::TransportError(e.to_string()));
1108 }
1109 }
1110 }
1111 None => {
1112 self.pending_outbound
1113 .remove(&(transport_id, our_index.as_u32()));
1114 self.connections.remove(&link_id);
1115 self.links.remove(&link_id);
1116 self.addr_to_link
1117 .remove(&(transport_id, remote_addr.clone()));
1118 let _ = self.index_allocator.free(our_index);
1119 return Err(NodeError::TransportError(format!(
1120 "transport {transport_id} disappeared before first handshake send"
1121 )));
1122 }
1123 }
1124
1125 Ok(())
1126 }
1127
1128 pub(super) async fn poll_transport_discovery(&mut self) {
1134 let mut to_connect = Vec::new();
1136 let mut queued_per_peer: HashMap<NodeAddr, usize> = HashMap::new();
1137 let mut connect_budget = self.discovery_connect_budget();
1138 let mut skipped_budget = 0usize;
1139
1140 for transport in self.transports.values() {
1141 if !transport.is_operational() {
1142 continue;
1143 }
1144 if !transport.auto_connect() {
1145 let _ = transport.discover();
1147 continue;
1148 }
1149 let discovered = match transport.discover() {
1150 Ok(peers) => peers,
1151 Err(_) => continue,
1152 };
1153 for peer in discovered {
1154 let discovered_transport_id = peer.transport_id;
1155 let pubkey = match peer.pubkey_hint {
1156 Some(pk) => pk,
1157 None => continue,
1158 };
1159 let identity = PeerIdentity::from_pubkey(pubkey);
1160 let node_addr = *identity.node_addr();
1161
1162 if node_addr == *self.identity.node_addr() {
1164 continue;
1165 }
1166
1167 let Some((candidate_transport_id, remote_addr, transport_name)) =
1168 self.transport_discovery_candidate(discovered_transport_id, peer.addr)
1169 else {
1170 continue;
1171 };
1172
1173 if self.peers.contains_key(&node_addr) {
1174 let candidate = PeerAddress::new(
1175 transport_name,
1176 self.peer_address_string_for_transport_candidate(
1177 candidate_transport_id,
1178 transport_name,
1179 &remote_addr,
1180 ),
1181 );
1182 if self.active_peer_candidate_is_fresh_enough_to_skip(
1183 &node_addr,
1184 std::slice::from_ref(&candidate),
1185 ) {
1186 continue;
1187 }
1188 if self.is_connecting_to_peer_on_path(
1189 &node_addr,
1190 candidate_transport_id,
1191 &remote_addr,
1192 ) {
1193 continue;
1194 }
1195 let queued_for_peer = queued_per_peer.get(&node_addr).copied().unwrap_or(0);
1196 if connect_budget == 0
1197 || self
1198 .path_candidate_attempt_budget(&node_addr)
1199 .saturating_sub(queued_for_peer)
1200 == 0
1201 {
1202 skipped_budget = skipped_budget.saturating_add(1);
1203 continue;
1204 }
1205 to_connect.push((candidate_transport_id, remote_addr, identity, true));
1206 *queued_per_peer.entry(node_addr).or_default() += 1;
1207 connect_budget = connect_budget.saturating_sub(1);
1208 continue;
1209 }
1210
1211 if self.is_connecting_to_peer_on_path(
1212 &node_addr,
1213 candidate_transport_id,
1214 &remote_addr,
1215 ) {
1216 continue;
1217 }
1218
1219 let queued_for_peer = queued_per_peer.get(&node_addr).copied().unwrap_or(0);
1220 if connect_budget == 0
1221 || self
1222 .path_candidate_attempt_budget(&node_addr)
1223 .saturating_sub(queued_for_peer)
1224 == 0
1225 {
1226 skipped_budget = skipped_budget.saturating_add(1);
1227 continue;
1228 }
1229 to_connect.push((candidate_transport_id, remote_addr, identity, false));
1230 *queued_per_peer.entry(node_addr).or_default() += 1;
1231 connect_budget = connect_budget.saturating_sub(1);
1232 }
1233 }
1234
1235 if skipped_budget > 0 {
1236 debug!(
1237 skipped = skipped_budget,
1238 queued = to_connect.len(),
1239 "Transport discovery connect budget exhausted"
1240 );
1241 }
1242
1243 for (transport_id, remote_addr, identity, active_refresh) in to_connect {
1244 info!(
1245 peer = %self.peer_display_name(identity.node_addr()),
1246 transport_id = %transport_id,
1247 remote_addr = %remote_addr,
1248 active_refresh,
1249 "Auto-connecting to discovered peer"
1250 );
1251 if let Err(e) = self
1252 .initiate_connection(transport_id, remote_addr, identity)
1253 .await
1254 {
1255 warn!(error = %e, "Failed to auto-connect to discovered peer");
1256 }
1257 }
1258 }
1259
1260 pub(super) async fn poll_nostr_discovery(&mut self) {
1261 let Some(bootstrap) = self.nostr_discovery.clone() else {
1262 return;
1263 };
1264
1265 bootstrap.set_outbound_admission(self.outbound_admission_check());
1266
1267 if let Err(err) = self.refresh_overlay_advert(&bootstrap).await {
1268 debug!(error = %err, "Failed to refresh local Nostr overlay advert");
1269 }
1270
1271 for event in bootstrap.drain_events().await {
1272 match event {
1273 BootstrapEvent::Established { traversal } => {
1274 if !self.outbound_admission_check() {
1275 debug!(
1276 peer_npub = %traversal.peer_npub,
1277 peers = self.peers.len(),
1278 max_peers = self.max_peers,
1279 "Dropping established NAT traversal: at capacity"
1280 );
1281 continue;
1282 }
1283 let peer_npub = traversal.peer_npub.clone();
1284 match self.adopt_established_traversal(traversal).await {
1285 Ok(_) => {
1286 info!(peer_npub = %peer_npub, "Adopted NAT traversal socket");
1287 }
1288 Err(err) => {
1289 warn!(peer_npub = %peer_npub, error = %err, "Failed to adopt NAT traversal");
1290 if let Ok(peer_identity) = PeerIdentity::from_npub(&peer_npub) {
1291 self.schedule_retry(*peer_identity.node_addr(), Self::now_ms());
1292 }
1293 }
1294 }
1295 }
1296 BootstrapEvent::Failed {
1297 peer_config,
1298 reason,
1299 } => {
1300 let peer_identity = match PeerIdentity::from_npub(&peer_config.npub) {
1301 Ok(identity) => identity,
1302 Err(_) => continue,
1303 };
1304 let node_addr = *peer_identity.node_addr();
1305 if self.peers.contains_key(&node_addr) {
1306 debug!(
1307 npub = %peer_config.npub,
1308 error = %reason,
1309 "Ignoring failed NAT traversal for already-connected peer"
1310 );
1311 continue;
1312 }
1313 if self.is_connecting_to_peer(&node_addr) {
1314 debug!(
1315 npub = %peer_config.npub,
1316 error = %reason,
1317 "Ignoring failed NAT traversal while peer handshake is already in progress"
1318 );
1319 continue;
1320 }
1321
1322 let now_ms = Self::now_ms();
1323 let decision = bootstrap.record_traversal_failure(&peer_config.npub, now_ms);
1324 if decision.should_warn {
1325 warn!(
1326 npub = %peer_config.npub,
1327 error = %reason,
1328 consecutive_failures = decision.consecutive_failures,
1329 cooldown_secs = decision
1330 .cooldown_until_ms
1331 .map(|t| t.saturating_sub(now_ms) / 1000),
1332 "NAT traversal failed"
1333 );
1334 } else {
1335 debug!(
1336 npub = %peer_config.npub,
1337 error = %reason,
1338 consecutive_failures = decision.consecutive_failures,
1339 "NAT traversal failed (suppressed by warn-rate-limit)"
1340 );
1341 }
1342
1343 if decision.crossed_threshold {
1347 bootstrap
1348 .request_advert_stale_check(peer_config.npub.clone())
1349 .await;
1350 }
1351
1352 if self
1353 .try_peer_addresses(&peer_config, peer_identity, false)
1354 .await
1355 .is_ok()
1356 {
1357 continue;
1358 }
1359
1360 self.schedule_retry(node_addr, now_ms);
1361 if let Some(cooldown_until_ms) = decision.cooldown_until_ms
1362 && let Some(state) = self.retry_pending.get_mut(&node_addr)
1363 {
1364 state.retry_after_ms = state.retry_after_ms.max(cooldown_until_ms);
1368 }
1369 }
1370 }
1371 }
1372
1373 self.maybe_run_startup_open_discovery_sweep(&bootstrap)
1374 .await;
1375 self.queue_open_discovery_retries(&bootstrap).await;
1376 }
1377
1378 pub(super) fn lan_discovery_scope(&self) -> Option<String> {
1384 if let Some(scope) = self.config.node.discovery.lan.scope.as_deref() {
1385 let scope = scope.trim();
1386 if !scope.is_empty() {
1387 return Some(scope.to_string());
1388 }
1389 }
1390
1391 let app = self.config.node.discovery.nostr.app.trim();
1392 if app.is_empty() {
1393 return None;
1394 }
1395 if let Some(rest) = app.strip_prefix("fips-overlay-v1:") {
1396 let scope = rest.trim();
1397 if scope.is_empty() {
1398 None
1399 } else {
1400 Some(scope.to_string())
1401 }
1402 } else {
1403 Some(app.to_string())
1404 }
1405 }
1406
1407 pub(super) fn start_local_instance_discovery(&mut self) {
1408 if !self.config.node.discovery.local.enabled {
1409 return;
1410 }
1411 let Some(scope) = self.lan_discovery_scope() else {
1412 debug!("local instance discovery not started: no discovery scope");
1413 return;
1414 };
1415 let now_ms = Self::now_ms();
1416 match crate::discovery::local::LocalInstanceRegistry::new(
1417 self.identity.npub(),
1418 scope,
1419 &self.config.node.discovery.local,
1420 now_ms,
1421 ) {
1422 Ok(registry) => {
1423 self.local_instance_registry = Some(registry);
1424 self.local_instance_started_at_ms = Some(now_ms);
1425 self.last_local_instance_publish_ms = None;
1426 self.last_local_instance_scan_ms = None;
1427 self.publish_local_instance_record(now_ms);
1428 info!("Same-host FIPS instance discovery enabled");
1429 }
1430 Err(crate::discovery::local::LocalInstanceRegistryError::Disabled) => {
1431 debug!("same-host FIPS instance discovery disabled");
1432 }
1433 Err(err) => {
1434 debug!(error = %err, "same-host FIPS instance discovery not started");
1435 }
1436 }
1437 }
1438
1439 fn local_instance_contacts(&self) -> Vec<crate::discovery::local::LocalInstanceContact> {
1440 let mut contacts = Vec::new();
1441 for handle in self.transports.values() {
1442 if !handle.is_operational() || !handle.accept_connections() {
1443 continue;
1444 }
1445 let transport = handle.transport_type().name;
1446 if transport != "udp" && transport != "tcp" {
1447 continue;
1448 }
1449 let Some(local_addr) = handle.local_addr() else {
1450 continue;
1451 };
1452 let Some(contact) =
1453 crate::discovery::local::contact_for_transport_addr(transport, local_addr)
1454 else {
1455 continue;
1456 };
1457 if contacts
1458 .iter()
1459 .any(|existing: &crate::discovery::local::LocalInstanceContact| {
1460 existing.transport == contact.transport && existing.addr == contact.addr
1461 })
1462 {
1463 continue;
1464 }
1465 contacts.push(contact);
1466 }
1467 contacts
1468 }
1469
1470 fn publish_local_instance_record(&mut self, now_ms: u64) {
1471 let Some(registry) = self.local_instance_registry.clone() else {
1472 return;
1473 };
1474 let contacts = self.local_instance_contacts();
1475 match registry.publish(contacts, now_ms) {
1476 Ok(()) => {
1477 self.last_local_instance_publish_ms = Some(now_ms);
1478 }
1479 Err(err) => {
1480 debug!(error = %err, "failed to publish same-host FIPS instance record");
1481 }
1482 }
1483 }
1484
1485 fn maybe_publish_local_instance_record(&mut self, now_ms: u64) {
1486 if self.local_instance_registry.is_none() {
1487 return;
1488 }
1489 let interval_ms = self.config.node.discovery.local.publish_interval_ms();
1490 let due = self
1491 .last_local_instance_publish_ms
1492 .map(|last| now_ms.saturating_sub(last) >= interval_ms)
1493 .unwrap_or(true);
1494 if due {
1495 self.publish_local_instance_record(now_ms);
1496 }
1497 }
1498
1499 fn local_instance_scan_due(&self, now_ms: u64) -> bool {
1500 if self.local_instance_registry.is_none() {
1501 return false;
1502 }
1503 let cfg = &self.config.node.discovery.local;
1504 let interval_ms = if self
1505 .local_instance_started_at_ms
1506 .map(|started| now_ms.saturating_sub(started) <= cfg.startup_scan_duration_ms())
1507 .unwrap_or(false)
1508 {
1509 cfg.startup_scan_interval_ms()
1510 } else {
1511 cfg.scan_interval_ms()
1512 };
1513 self.last_local_instance_scan_ms
1514 .map(|last| now_ms.saturating_sub(last) >= interval_ms)
1515 .unwrap_or(true)
1516 }
1517
1518 fn local_instance_peer_allowed(&self, identity: &PeerIdentity) -> bool {
1519 if self.config.peers().iter().any(|peer| {
1520 PeerIdentity::from_npub(&peer.npub)
1521 .map(|configured| configured.node_addr() == identity.node_addr())
1522 .unwrap_or(false)
1523 }) {
1524 return true;
1525 }
1526 self.config.node.discovery.nostr.policy == NostrDiscoveryPolicy::Open
1527 }
1528
1529 fn local_instance_peer_addresses(
1530 &self,
1531 record: &crate::discovery::local::LocalInstanceRecord,
1532 ) -> Vec<PeerAddress> {
1533 let mut addresses = Vec::new();
1534 for contact in &record.contacts {
1535 if contact.transport != "udp" && contact.transport != "tcp" {
1536 continue;
1537 }
1538 let Ok(socket_addr) = contact.addr.parse::<SocketAddr>() else {
1539 debug!(
1540 npub = %record.npub,
1541 transport = %contact.transport,
1542 addr = %contact.addr,
1543 "local instance discovery: skip non-socket contact"
1544 );
1545 continue;
1546 };
1547 if !socket_addr.ip().is_loopback() {
1548 debug!(
1549 npub = %record.npub,
1550 addr = %contact.addr,
1551 "local instance discovery: skip non-loopback contact"
1552 );
1553 continue;
1554 }
1555 let address =
1556 PeerAddress::with_priority(contact.transport.clone(), contact.addr.clone(), 10)
1557 .with_seen_at_ms(record.updated_at_ms);
1558 if addresses.iter().any(|existing: &PeerAddress| {
1559 existing.transport == address.transport && existing.addr == address.addr
1560 }) {
1561 continue;
1562 }
1563 addresses.push(address);
1564 }
1565 addresses
1566 }
1567
1568 pub(super) async fn poll_local_instance_discovery(&mut self) {
1572 let Some(registry) = self.local_instance_registry.clone() else {
1573 return;
1574 };
1575 let now_ms = Self::now_ms();
1576 self.maybe_publish_local_instance_record(now_ms);
1577 if !self.local_instance_scan_due(now_ms) {
1578 return;
1579 }
1580 self.last_local_instance_scan_ms = Some(now_ms);
1581
1582 let records = match registry.scan(now_ms, self.config.node.discovery.local.stale_after_ms())
1583 {
1584 Ok(records) => records,
1585 Err(err) => {
1586 debug!(error = %err, "same-host FIPS instance scan failed");
1587 return;
1588 }
1589 };
1590 if records.is_empty() {
1591 return;
1592 }
1593
1594 let mut connect_budget = self.discovery_connect_budget();
1595 let mut skipped_budget = 0usize;
1596 for record in records {
1597 let identity = match PeerIdentity::from_npub(&record.npub) {
1598 Ok(identity) => identity,
1599 Err(err) => {
1600 debug!(npub = %record.npub, error = %err, "local instance discovery: skip bad npub");
1601 continue;
1602 }
1603 };
1604 let peer_node_addr = *identity.node_addr();
1605 if peer_node_addr == *self.identity.node_addr() {
1606 continue;
1607 }
1608 if !self.local_instance_peer_allowed(&identity) {
1609 debug!(
1610 npub = %identity.short_npub(),
1611 "local instance discovery: skip unconfigured peer"
1612 );
1613 continue;
1614 }
1615
1616 let addresses = self.local_instance_peer_addresses(&record);
1617 if addresses.is_empty() {
1618 continue;
1619 }
1620
1621 if self.peers.contains_key(&peer_node_addr)
1622 && self.active_peer_candidate_is_fresh_enough_to_skip(&peer_node_addr, &addresses)
1623 {
1624 continue;
1625 }
1626
1627 for address in addresses {
1628 let Some((transport_id, remote_addr)) =
1629 self.resolve_peer_address_for_match(&address)
1630 else {
1631 continue;
1632 };
1633 if self.is_connecting_to_peer_on_path(&peer_node_addr, transport_id, &remote_addr) {
1634 continue;
1635 }
1636 if connect_budget == 0 || self.path_candidate_attempt_budget(&peer_node_addr) == 0 {
1637 skipped_budget = skipped_budget.saturating_add(1);
1638 continue;
1639 }
1640 info!(
1641 npub = %identity.short_npub(),
1642 transport = %address.transport,
1643 addr = %address.addr,
1644 "same-host FIPS instance discovery: initiating handshake"
1645 );
1646 if let Err(err) = self
1647 .initiate_connection(transport_id, remote_addr, identity)
1648 .await
1649 {
1650 debug!(
1651 npub = %record.npub,
1652 error = %err,
1653 "same-host FIPS instance discovery: failed to initiate connection"
1654 );
1655 }
1656 connect_budget = connect_budget.saturating_sub(1);
1657 }
1658 }
1659 if skipped_budget > 0 {
1660 debug!(
1661 skipped = skipped_budget,
1662 "same-host FIPS instance discovery connect budget exhausted"
1663 );
1664 }
1665 }
1666
1667 pub(super) async fn poll_lan_discovery(&mut self) {
1674 let Some(runtime) = self.lan_discovery.clone() else {
1675 return;
1676 };
1677 let events = runtime.drain_events().await;
1678 if events.is_empty() {
1679 return;
1680 }
1681 let mut connect_budget = self.discovery_connect_budget();
1682 let mut skipped_budget = 0usize;
1683 for event in events {
1684 let crate::discovery::lan::LanEvent::Discovered(peer) = event;
1685 let Some((transport_id, local_addr)) =
1686 self.find_udp_transport_for_remote_addr(peer.addr)
1687 else {
1688 debug!(
1689 addr = %peer.addr,
1690 "lan: skip discovered peer with no compatible UDP transport"
1691 );
1692 continue;
1693 };
1694 let identity = match crate::PeerIdentity::from_npub(&peer.npub) {
1695 Ok(id) => id,
1696 Err(err) => {
1697 debug!(npub = %peer.npub, error = %err, "lan: skip bad npub");
1698 continue;
1699 }
1700 };
1701 let peer_node_addr = *identity.node_addr();
1702 let remote_addr = crate::transport::TransportAddr::from_string(&peer.addr.to_string());
1703 if self.peers.contains_key(&peer_node_addr) {
1704 let candidate = PeerAddress::new("udp", peer.addr.to_string());
1705 if self.active_peer_candidate_is_fresh_enough_to_skip(
1706 &peer_node_addr,
1707 std::slice::from_ref(&candidate),
1708 ) {
1709 continue;
1710 }
1711 if self.is_connecting_to_peer_on_path(&peer_node_addr, transport_id, &remote_addr) {
1712 continue;
1713 }
1714 if connect_budget == 0 || self.path_candidate_attempt_budget(&peer_node_addr) == 0 {
1715 skipped_budget = skipped_budget.saturating_add(1);
1716 continue;
1717 }
1718 info!(
1719 npub = %identity.short_npub(),
1720 addr = %peer.addr,
1721 local_addr = %local_addr,
1722 "lan: initiating alternate-path handshake to active peer"
1723 );
1724 if let Err(err) = self
1725 .initiate_connection(transport_id, remote_addr, identity)
1726 .await
1727 {
1728 debug!(
1729 npub = %peer.npub,
1730 error = %err,
1731 "lan: failed to initiate active peer alternate-path handshake"
1732 );
1733 }
1734 connect_budget = connect_budget.saturating_sub(1);
1735 continue;
1736 }
1737 if self.is_connecting_to_peer_on_path(&peer_node_addr, transport_id, &remote_addr) {
1738 continue;
1739 }
1740 if connect_budget == 0 || self.path_candidate_attempt_budget(&peer_node_addr) == 0 {
1741 skipped_budget = skipped_budget.saturating_add(1);
1742 continue;
1743 }
1744 info!(
1745 npub = %identity.short_npub(),
1746 addr = %peer.addr,
1747 local_addr = %local_addr,
1748 "lan: initiating handshake to discovered peer"
1749 );
1750 if let Err(err) = self
1751 .initiate_connection(transport_id, remote_addr, identity)
1752 .await
1753 {
1754 debug!(
1755 npub = %peer.npub,
1756 error = %err,
1757 "lan: failed to initiate connection to discovered peer"
1758 );
1759 }
1760 connect_budget = connect_budget.saturating_sub(1);
1761 }
1762 if skipped_budget > 0 {
1763 debug!(
1764 skipped = skipped_budget,
1765 "lan: discovery connect budget exhausted"
1766 );
1767 }
1768 }
1769
1770 pub(super) async fn poll_pending_connects(&mut self) {
1777 if self.pending_connects.is_empty() {
1778 return;
1779 }
1780
1781 let mut completed = Vec::new();
1782
1783 for (i, pending) in self.pending_connects.iter().enumerate() {
1784 let state = if let Some(transport) = self.transports.get(&pending.transport_id) {
1785 transport.connection_state(&pending.remote_addr)
1786 } else {
1787 crate::transport::ConnectionState::Failed("transport removed".into())
1788 };
1789
1790 match state {
1791 crate::transport::ConnectionState::Connected => {
1792 completed.push((i, true, None));
1793 }
1794 crate::transport::ConnectionState::Failed(reason) => {
1795 completed.push((i, false, Some(reason)));
1796 }
1797 crate::transport::ConnectionState::Connecting => {
1798 }
1800 crate::transport::ConnectionState::None => {
1801 completed.push((i, false, Some("no connection attempt found".into())));
1803 }
1804 }
1805 }
1806
1807 for (i, success, reason) in completed.into_iter().rev() {
1809 let pending = self.pending_connects.remove(i);
1810
1811 if success {
1812 if let Some(link) = self.links.get_mut(&pending.link_id) {
1814 link.set_connected();
1815 }
1816
1817 debug!(
1818 peer = %self.peer_display_name(pending.peer_identity.node_addr()),
1819 transport_id = %pending.transport_id,
1820 remote_addr = %pending.remote_addr,
1821 link_id = %pending.link_id,
1822 "Transport connected, starting handshake"
1823 );
1824
1825 if let Err(e) = self
1827 .start_handshake(
1828 pending.link_id,
1829 pending.transport_id,
1830 pending.remote_addr.clone(),
1831 pending.peer_identity,
1832 )
1833 .await
1834 {
1835 warn!(
1836 link_id = %pending.link_id,
1837 error = %e,
1838 "Failed to start handshake after transport connect"
1839 );
1840 self.remove_link(&pending.link_id);
1842 }
1843 } else {
1844 let reason = reason.unwrap_or_default();
1845 warn!(
1846 peer = %self.peer_display_name(pending.peer_identity.node_addr()),
1847 transport_id = %pending.transport_id,
1848 remote_addr = %pending.remote_addr,
1849 link_id = %pending.link_id,
1850 reason = %reason,
1851 "Transport connect failed"
1852 );
1853
1854 self.remove_link(&pending.link_id);
1856 self.links.remove(&pending.link_id);
1857 self.schedule_retry(*pending.peer_identity.node_addr(), Self::now_ms());
1858 }
1859 }
1860 }
1861
1862 pub async fn start(&mut self) -> Result<(), NodeError> {
1869 node_start_debug_log("Node::start begin");
1870 if !self.state.can_start() {
1871 return Err(NodeError::AlreadyStarted);
1872 }
1873 self.state = NodeState::Starting;
1874 node_start_debug_log("Node::start state set to starting");
1875
1876 let packet_buffer_size = self.config.node.buffers.packet_channel;
1878 let (packet_tx, packet_rx) = packet_channel(packet_buffer_size);
1879 self.packet_tx = Some(packet_tx.clone());
1880 self.packet_rx = Some(packet_rx);
1881 node_start_debug_log("Node::start packet channel created");
1882
1883 node_start_debug_log("Node::start create transports begin");
1885 let transport_handles = self.create_transports(&packet_tx).await;
1886 node_start_debug_log(format!(
1887 "Node::start create transports complete count={}",
1888 transport_handles.len()
1889 ));
1890
1891 for mut handle in transport_handles {
1892 let transport_id = handle.transport_id();
1893 let transport_type = handle.transport_type().name;
1894 let name = handle.name().map(|s| s.to_string());
1895
1896 node_start_debug_log(format!(
1897 "Node::start transport start begin id={} type={} name={:?}",
1898 transport_id, transport_type, name
1899 ));
1900 match handle.start().await {
1901 Ok(()) => {
1902 node_start_debug_log(format!(
1903 "Node::start transport start ok id={} type={}",
1904 transport_id, transport_type
1905 ));
1906 self.transports.insert(transport_id, handle);
1907 }
1908 Err(e) => {
1909 node_start_debug_log(format!(
1910 "Node::start transport start error id={} type={} error={}",
1911 transport_id, transport_type, e
1912 ));
1913 if let Some(ref n) = name {
1914 warn!(transport_type, name = %n, error = %e, "Transport failed to start");
1915 } else {
1916 warn!(transport_type, error = %e, "Transport failed to start");
1917 }
1918 }
1919 }
1920 }
1921
1922 if !self.transports.is_empty() {
1923 info!(count = self.transports.len(), "Transports initialized");
1924 }
1925
1926 #[cfg(unix)]
1942 {
1943 if self.config.node.worker_pools_enabled {
1944 node_start_debug_log("Node::start worker pools begin");
1945 let cpu_default = std::thread::available_parallelism()
1946 .map(|n| n.get())
1947 .unwrap_or(1)
1948 .max(1);
1949 let encrypt_worker_count: usize = std::env::var("FIPS_ENCRYPT_WORKERS")
1950 .ok()
1951 .and_then(|s| s.parse().ok())
1952 .unwrap_or(cpu_default)
1953 .max(1);
1954 self.encrypt_workers = Some(super::encrypt_worker::EncryptWorkerPool::spawn(
1955 encrypt_worker_count,
1956 ));
1957 info!(
1958 workers = encrypt_worker_count,
1959 "Spawned FMP-encrypt worker pool"
1960 );
1961
1962 let decrypt_worker_count: usize = std::env::var("FIPS_DECRYPT_WORKERS")
1971 .ok()
1972 .and_then(|s| s.parse().ok())
1973 .unwrap_or(cpu_default);
1974 if decrypt_worker_count == 0 {
1975 info!("FIPS_DECRYPT_WORKERS=0 → in-line decrypt in rx_loop (no worker pool)");
1976 } else {
1977 self.decrypt_workers = Some(super::decrypt_worker::DecryptWorkerPool::spawn(
1978 decrypt_worker_count,
1979 ));
1980 info!(
1981 workers = decrypt_worker_count,
1982 "Spawned FMP+FSP-decrypt worker pool"
1983 );
1984 }
1985 node_start_debug_log("Node::start worker pools complete");
1986 } else {
1987 node_start_debug_log("Node::start worker pools disabled");
1988 info!("FIPS worker pools disabled; using in-line crypto/send path");
1989 }
1990 }
1991
1992 if self.config.node.discovery.nostr.enabled {
1993 node_start_debug_log("Node::start nostr discovery start begin");
1994 match NostrDiscovery::start(&self.identity, self.config.node.discovery.nostr.clone())
1995 .await
1996 {
1997 Ok(runtime) => {
1998 node_start_debug_log("Node::start nostr discovery runtime created");
1999 if let Err(err) = self.refresh_overlay_advert(&runtime).await {
2000 warn!(error = %err, "Failed to publish initial Nostr overlay advert");
2001 }
2002 node_start_debug_log("Node::start nostr overlay advert refreshed");
2003 self.nostr_discovery = Some(runtime);
2004 self.nostr_discovery_started_at_ms = Some(Self::now_ms());
2005 info!("Nostr overlay discovery enabled");
2006 }
2007 Err(err) => {
2008 node_start_debug_log(format!(
2009 "Node::start nostr discovery start error error={}",
2010 err
2011 ));
2012 warn!(error = %err, "Failed to start Nostr overlay discovery");
2013 }
2014 }
2015 }
2016
2017 if self.config.node.discovery.lan.enabled {
2021 node_start_debug_log("Node::start lan discovery start begin");
2022 let advertised_udp_port = self
2023 .transports
2024 .values()
2025 .filter(|h| h.is_operational())
2026 .filter(|h| h.transport_type().name == "udp")
2027 .find_map(|h| h.local_addr().map(|addr| addr.port()))
2028 .unwrap_or(0);
2029 let scope = self.lan_discovery_scope();
2030 match crate::discovery::lan::LanDiscovery::start(
2031 &self.identity,
2032 scope,
2033 advertised_udp_port,
2034 self.config.node.discovery.lan.clone(),
2035 )
2036 .await
2037 {
2038 Ok(runtime) => {
2039 node_start_debug_log("Node::start lan discovery start ok");
2040 self.lan_discovery = Some(runtime);
2041 info!("LAN mDNS discovery enabled");
2042 }
2043 Err(err) => {
2044 node_start_debug_log(format!(
2045 "Node::start lan discovery start error error={}",
2046 err
2047 ));
2048 debug!(error = %err, "LAN mDNS discovery not started");
2049 }
2050 }
2051 }
2052
2053 self.start_local_instance_discovery();
2054 self.poll_local_instance_discovery().await;
2055
2056 node_start_debug_log("Node::start initiate peer connections begin");
2059 self.initiate_peer_connections().await;
2060 node_start_debug_log("Node::start initiate peer connections complete");
2061
2062 if self.config.tun.enabled {
2064 node_start_debug_log("Node::start tun init begin");
2065 let address = *self.identity.address();
2066 match TunDevice::create(&self.config.tun, address).await {
2067 Ok(device) => {
2068 let mtu = device.mtu();
2069 let name = device.name().to_string();
2070 let our_addr = *device.address();
2071
2072 info!("TUN device active:");
2073 info!(" name: {}", name);
2074 info!(" address: {}", device.address());
2075 info!(" mtu: {}", mtu);
2076
2077 let effective_mtu = self.effective_ipv6_mtu();
2079 let max_mss = effective_mtu.saturating_sub(40).saturating_sub(20); info!("effective MTU: {} bytes", effective_mtu);
2082 debug!(" max TCP MSS: {} bytes", max_mss);
2083
2084 #[cfg(target_os = "macos")]
2088 let (shutdown_read_fd, shutdown_write_fd) = {
2089 let mut fds = [0i32; 2];
2090 if unsafe { libc::pipe(fds.as_mut_ptr()) } < 0 {
2091 return Err(NodeError::Tun(crate::upper::tun::TunError::Configure(
2092 "failed to create shutdown pipe".into(),
2093 )));
2094 }
2095 (fds[0], fds[1])
2096 };
2097
2098 let (writer, tun_tx) =
2102 device.create_writer(max_mss, self.path_mtu_lookup.clone())?;
2103
2104 let writer_handle = thread::spawn(move || {
2106 writer.run();
2107 });
2108
2109 let reader_tun_tx = tun_tx.clone();
2111
2112 let tun_channel_size = self.config.node.buffers.tun_channel;
2114 let (outbound_tx, outbound_rx) = tokio::sync::mpsc::channel(tun_channel_size);
2115
2116 let transport_mtu = self.transport_mtu();
2118 let path_mtu_lookup = self.path_mtu_lookup.clone();
2119 #[cfg(target_os = "macos")]
2120 let reader_handle = thread::spawn(move || {
2121 run_tun_reader(
2122 device,
2123 mtu,
2124 our_addr,
2125 reader_tun_tx,
2126 outbound_tx,
2127 transport_mtu,
2128 path_mtu_lookup,
2129 shutdown_read_fd,
2130 );
2131 });
2132 #[cfg(not(target_os = "macos"))]
2133 let reader_handle = thread::spawn(move || {
2134 run_tun_reader(
2135 device,
2136 mtu,
2137 our_addr,
2138 reader_tun_tx,
2139 outbound_tx,
2140 transport_mtu,
2141 path_mtu_lookup,
2142 );
2143 });
2144
2145 self.tun_state = TunState::Active;
2146 self.tun_name = Some(name);
2147 self.tun_tx = Some(tun_tx);
2148 self.tun_outbound_rx = Some(outbound_rx);
2149 self.tun_reader_handle = Some(reader_handle);
2150 self.tun_writer_handle = Some(writer_handle);
2151 #[cfg(target_os = "macos")]
2152 {
2153 self.tun_shutdown_fd = Some(shutdown_write_fd);
2154 }
2155 }
2156 Err(e) => {
2157 self.tun_state = TunState::Failed;
2158 warn!(error = %e, "Failed to initialize TUN, continuing without it");
2159 }
2160 }
2161 node_start_debug_log("Node::start tun init complete");
2162 }
2163
2164 if self.config.dns.enabled {
2181 node_start_debug_log("Node::start dns init begin");
2182 let addr_str = self.config.dns.bind_addr();
2183 match addr_str.parse::<std::net::IpAddr>() {
2184 Ok(ip) => {
2185 let bind = std::net::SocketAddr::new(ip, self.config.dns.port());
2186 match Self::bind_dns_socket(bind) {
2187 Ok(socket) => {
2188 let dns_channel_size = self.config.node.buffers.dns_channel;
2189 let (identity_tx, identity_rx) =
2190 tokio::sync::mpsc::channel(dns_channel_size);
2191 let dns_ttl = self.config.dns.ttl();
2192 let base_hosts = crate::upper::hosts::HostMap::from_peer_configs(
2193 self.config.peers(),
2194 );
2195 let reloader = if self.config.node.system_files_enabled {
2196 let hosts_path = std::path::PathBuf::from(
2197 crate::upper::hosts::DEFAULT_HOSTS_PATH,
2198 );
2199 crate::upper::hosts::HostMapReloader::new(base_hosts, hosts_path)
2200 } else {
2201 crate::upper::hosts::HostMapReloader::memory_only(base_hosts)
2202 };
2203 let mesh_ifindex = Self::lookup_mesh_ifindex(self.config.tun.name());
2211 info!(
2212 bind = %bind,
2213 hosts = reloader.hosts().len(),
2214 mesh_ifindex = ?mesh_ifindex,
2215 "DNS responder started for .fips domain (auto-reload enabled)"
2216 );
2217 let handle = tokio::spawn(crate::upper::dns::run_dns_responder(
2218 socket,
2219 identity_tx,
2220 dns_ttl,
2221 reloader,
2222 mesh_ifindex,
2223 ));
2224 self.dns_identity_rx = Some(identity_rx);
2225 self.dns_task = Some(handle);
2226 }
2227 Err(e) => {
2228 warn!(bind = %bind, error = %e, "Failed to start DNS responder");
2229 }
2230 }
2231 }
2232 Err(e) => {
2233 warn!(addr = %addr_str, error = %e, "Invalid dns.bind_addr; DNS responder not started");
2234 }
2235 }
2236 node_start_debug_log("Node::start dns init complete");
2237 }
2238
2239 self.state = NodeState::Running;
2240 node_start_debug_log("Node::start running");
2241 info!("Node started:");
2242 info!(" state: {}", self.state);
2243 info!(" transports: {}", self.transports.len());
2244 info!(" connections: {}", self.connections.len());
2245 Ok(())
2246 }
2247
2248 fn bind_dns_socket(
2261 addr: std::net::SocketAddr,
2262 ) -> Result<tokio::net::UdpSocket, std::io::Error> {
2263 use socket2::{Domain, Protocol, Socket, Type};
2264 let domain = if addr.is_ipv4() {
2265 Domain::IPV4
2266 } else {
2267 Domain::IPV6
2268 };
2269 let sock = Socket::new(domain, Type::DGRAM, Some(Protocol::UDP))?;
2270 if addr.is_ipv6() {
2271 sock.set_only_v6(false)?;
2272 #[cfg(unix)]
2273 Self::set_recv_pktinfo_v6(&sock)?;
2274 }
2275 sock.set_nonblocking(true)?;
2276 sock.bind(&addr.into())?;
2277 tokio::net::UdpSocket::from_std(sock.into())
2278 }
2279
2280 #[cfg(unix)]
2286 fn set_recv_pktinfo_v6(sock: &socket2::Socket) -> Result<(), std::io::Error> {
2287 use std::os::fd::AsRawFd;
2288 let enable: libc::c_int = 1;
2289 let ret = unsafe {
2290 libc::setsockopt(
2291 sock.as_raw_fd(),
2292 libc::IPPROTO_IPV6,
2293 libc::IPV6_RECVPKTINFO,
2294 &enable as *const _ as *const libc::c_void,
2295 std::mem::size_of::<libc::c_int>() as libc::socklen_t,
2296 )
2297 };
2298 if ret < 0 {
2299 return Err(std::io::Error::last_os_error());
2300 }
2301 Ok(())
2302 }
2303
2304 fn lookup_mesh_ifindex(name: &str) -> Option<u32> {
2311 #[cfg(unix)]
2312 {
2313 let c_name = std::ffi::CString::new(name).ok()?;
2314 let idx = unsafe { libc::if_nametoindex(c_name.as_ptr()) };
2315 if idx == 0 { None } else { Some(idx) }
2316 }
2317 #[cfg(not(unix))]
2318 {
2319 let _ = name;
2320 None
2321 }
2322 }
2323
2324 pub async fn stop(&mut self) -> Result<(), NodeError> {
2329 if !self.state.can_stop() {
2330 return Err(NodeError::NotStarted);
2331 }
2332 self.state = NodeState::Stopping;
2333 info!(state = %self.state, "Node stopping");
2334
2335 if let Some(handle) = self.dns_task.take() {
2337 handle.abort();
2338 debug!("DNS responder stopped");
2339 }
2340
2341 self.send_disconnect_to_all_peers(DisconnectReason::Shutdown)
2343 .await;
2344
2345 if let Some(bootstrap) = self.nostr_discovery.take()
2347 && let Err(e) = bootstrap.shutdown().await
2348 {
2349 warn!(error = %e, "Failed to shutdown Nostr overlay discovery");
2350 }
2351
2352 if let Some(lan) = self.lan_discovery.take() {
2356 lan.shutdown().await;
2357 }
2358
2359 if let Some(registry) = self.local_instance_registry.take()
2360 && let Err(err) = registry.remove()
2361 {
2362 debug!(error = %err, "failed to remove same-host FIPS instance record");
2363 }
2364
2365 let transport_ids: Vec<_> = self.transports.keys().cloned().collect();
2367 for transport_id in transport_ids {
2368 if let Some(mut handle) = self.transports.remove(&transport_id) {
2369 let transport_type = handle.transport_type().name;
2370 match handle.stop().await {
2371 Ok(()) => {
2372 info!(transport_id = %transport_id, transport_type, "Transport stopped");
2373 }
2374 Err(e) => {
2375 warn!(
2376 transport_id = %transport_id,
2377 transport_type,
2378 error = %e,
2379 "Transport stop failed"
2380 );
2381 }
2382 }
2383 }
2384 }
2385
2386 self.packet_tx.take();
2388 self.packet_rx.take();
2389
2390 if let Some(name) = self.tun_name.take() {
2392 info!(name = %name, "Shutting down TUN interface");
2393
2394 self.tun_tx.take();
2396
2397 if let Err(e) = shutdown_tun_interface(&name).await {
2399 warn!(name = %name, error = %e, "Failed to shutdown TUN interface");
2400 }
2401
2402 #[cfg(target_os = "macos")]
2405 if let Some(fd) = self.tun_shutdown_fd.take() {
2406 unsafe {
2407 libc::write(fd, b"x".as_ptr() as *const libc::c_void, 1);
2408 libc::close(fd);
2409 }
2410 }
2411
2412 if let Some(handle) = self.tun_reader_handle.take() {
2414 let _ = handle.join();
2415 }
2416 if let Some(handle) = self.tun_writer_handle.take() {
2417 let _ = handle.join();
2418 }
2419
2420 self.tun_state = TunState::Disabled;
2421 }
2422
2423 self.state = NodeState::Stopped;
2424 info!(state = %self.state, "Node stopped");
2425 Ok(())
2426 }
2427
2428 async fn send_disconnect_to_all_peers(&mut self, reason: DisconnectReason) {
2433 let disconnect = Disconnect::new(reason);
2434 let plaintext = disconnect.encode();
2435
2436 let peer_addrs: Vec<NodeAddr> = self
2438 .peers
2439 .iter()
2440 .filter(|(_, peer)| peer.can_send() && peer.has_session())
2441 .map(|(addr, _)| *addr)
2442 .collect();
2443
2444 if peer_addrs.is_empty() {
2445 debug!(
2446 total_peers = self.peers.len(),
2447 "No sendable peers for disconnect notification"
2448 );
2449 return;
2450 }
2451
2452 let mut sent = 0usize;
2453 for node_addr in &peer_addrs {
2454 match self
2455 .send_encrypted_link_message(node_addr, &plaintext)
2456 .await
2457 {
2458 Ok(()) => sent += 1,
2459 Err(e) => {
2460 debug!(
2461 peer = %self.peer_display_name(node_addr),
2462 error = %e,
2463 "Failed to send disconnect (transport may be down)"
2464 );
2465 }
2466 }
2467 }
2468
2469 info!(sent, total = peer_addrs.len(), reason = %reason, "Sent disconnect notifications");
2470 }
2471
2472 fn static_peer_addresses(&self, peer_config: &PeerConfig) -> Vec<PeerAddress> {
2473 peer_config
2474 .addresses_by_priority()
2475 .into_iter()
2476 .cloned()
2477 .collect()
2478 }
2479
2480 async fn nostr_peer_fallback_addresses(
2481 &self,
2482 peer_config: &PeerConfig,
2483 existing: &[PeerAddress],
2484 ) -> Vec<PeerAddress> {
2485 if !self.config.node.discovery.nostr.enabled
2486 || self.config.node.discovery.nostr.policy
2487 == crate::config::NostrDiscoveryPolicy::Disabled
2488 {
2489 return Vec::new();
2490 }
2491
2492 let Some(bootstrap) = self.nostr_discovery.clone() else {
2493 return Vec::new();
2494 };
2495 let endpoints = match bootstrap
2496 .cached_advert_endpoints_for_peer(&peer_config.npub)
2497 .await
2498 {
2499 Some(endpoints) => endpoints,
2500 None => {
2501 debug!(
2502 npub = %peer_config.npub,
2503 "No cached Nostr advert endpoints for configured peer"
2504 );
2505 return Vec::new();
2506 }
2507 };
2508
2509 let mut fallback = Vec::new();
2510 let mut next_priority = existing
2511 .iter()
2512 .map(|addr| addr.priority)
2513 .max()
2514 .unwrap_or(100)
2515 .saturating_add(1);
2516 let seen_at_ms = Self::now_ms();
2521 for endpoint in endpoints {
2522 let Some(candidate) =
2523 Self::overlay_endpoint_to_peer_address(&endpoint, next_priority, seen_at_ms)
2524 else {
2525 continue;
2526 };
2527 if existing
2528 .iter()
2529 .any(|addr| addr.transport == candidate.transport && addr.addr == candidate.addr)
2530 || fallback.iter().any(|addr: &PeerAddress| {
2531 addr.transport == candidate.transport && addr.addr == candidate.addr
2532 })
2533 {
2534 continue;
2535 }
2536 fallback.push(candidate);
2537 next_priority = next_priority.saturating_add(1);
2538 }
2539 fallback
2540 }
2541
2542 async fn request_nostr_bootstrap(&self, peer_config: &PeerConfig) -> bool {
2543 if !self.config.node.discovery.nostr.enabled
2544 || self.config.node.discovery.nostr.policy
2545 == crate::config::NostrDiscoveryPolicy::Disabled
2546 {
2547 return false;
2548 }
2549 let Some(bootstrap) = self.nostr_discovery.clone() else {
2550 return false;
2551 };
2552 bootstrap.request_connect(peer_config.clone()).await;
2553 info!(npub = %peer_config.npub, "Started background Nostr UDP NAT traversal attempt");
2554 true
2555 }
2556
2557 fn overlay_endpoint_to_peer_address(
2558 endpoint: &OverlayEndpointAdvert,
2559 priority: u8,
2560 seen_at_ms: u64,
2561 ) -> Option<PeerAddress> {
2562 let transport = match endpoint.transport {
2563 OverlayTransportKind::Udp => "udp",
2564 OverlayTransportKind::Tcp => "tcp",
2565 OverlayTransportKind::Tor => "tor",
2566 OverlayTransportKind::WebRtc => "webrtc",
2567 };
2568 Some(
2569 PeerAddress::with_priority(transport, endpoint.addr.clone(), priority)
2570 .with_seen_at_ms(seen_at_ms),
2571 )
2572 }
2573
2574 async fn attempt_peer_address_list(
2575 &mut self,
2576 peer_config: &PeerConfig,
2577 peer_identity: PeerIdentity,
2578 allow_bootstrap_nat: bool,
2579 addresses: &[PeerAddress],
2580 ) -> Result<(), NodeError> {
2581 let mut attempted = false;
2582 let peer_node_addr = *peer_identity.node_addr();
2583 let mut concrete_budget = self.path_candidate_attempt_budget(&peer_node_addr);
2584
2585 for addr in addresses {
2586 if attempted && addr.seen_at_ms.is_some() {
2587 debug!(
2588 npub = %peer_config.npub,
2589 transport = %addr.transport,
2590 addr = %addr.addr,
2591 "Skipping overlay fallback because a configured direct candidate is already in flight"
2592 );
2593 continue;
2594 }
2595
2596 if addr.transport == "udp" && addr.addr.eq_ignore_ascii_case("nat") {
2597 if attempted {
2598 debug!(
2599 npub = %peer_config.npub,
2600 "Skipping Nostr NAT fallback because a configured direct candidate is already in flight"
2601 );
2602 continue;
2603 }
2604 if !allow_bootstrap_nat {
2605 continue;
2606 }
2607 if self.request_nostr_bootstrap(peer_config).await {
2608 attempted = true;
2609 continue;
2610 }
2611 debug!(npub = %peer_config.npub, "No Nostr overlay runtime for udp:nat address");
2612 continue;
2613 }
2614
2615 let (transport_id, remote_addr) = if addr.transport == "ethernet" {
2616 match self.resolve_ethernet_addr(&addr.addr) {
2617 Ok(result) => result,
2618 Err(e) => {
2619 debug!(
2620 transport = %addr.transport,
2621 addr = %addr.addr,
2622 error = %e,
2623 "Failed to resolve Ethernet address"
2624 );
2625 continue;
2626 }
2627 }
2628 } else if addr.transport == "ble" {
2629 #[cfg(bluer_available)]
2630 {
2631 match self.resolve_ble_addr(&addr.addr) {
2632 Ok(result) => result,
2633 Err(e) => {
2634 debug!(
2635 transport = %addr.transport,
2636 addr = %addr.addr,
2637 error = %e,
2638 "Failed to resolve BLE address"
2639 );
2640 continue;
2641 }
2642 }
2643 }
2644 #[cfg(not(bluer_available))]
2645 {
2646 debug!(transport = %addr.transport, "BLE transport not available on this build");
2647 continue;
2648 }
2649 } else {
2650 let tid = if addr.transport == "udp"
2651 && let Ok(remote_socket_addr) = addr.addr.parse::<SocketAddr>()
2652 {
2653 match self.find_udp_transport_for_remote_addr(remote_socket_addr) {
2654 Some((id, _)) => id,
2655 None => {
2656 debug!(
2657 transport = %addr.transport,
2658 addr = %addr.addr,
2659 "No compatible operational UDP transport for address"
2660 );
2661 continue;
2662 }
2663 }
2664 } else {
2665 match self.find_transport_for_type(&addr.transport) {
2666 Some(id) => id,
2667 None => {
2668 debug!(
2669 transport = %addr.transport,
2670 addr = %addr.addr,
2671 "No operational transport for address type"
2672 );
2673 continue;
2674 }
2675 }
2676 };
2677 (tid, TransportAddr::from_string(&addr.addr))
2678 };
2679
2680 if self.is_connecting_to_peer_on_path(&peer_node_addr, transport_id, &remote_addr) {
2681 attempted = true;
2682 debug!(
2683 npub = %peer_config.npub,
2684 transport_id = %transport_id,
2685 remote_addr = %remote_addr,
2686 "Skipping duplicate in-flight candidate path"
2687 );
2688 continue;
2689 }
2690
2691 if concrete_budget == 0 {
2692 debug!(
2693 npub = %peer_config.npub,
2694 max_candidates = MAX_PARALLEL_PATH_CANDIDATES_PER_PEER,
2695 "Path candidate race budget exhausted"
2696 );
2697 break;
2698 }
2699
2700 match self
2701 .initiate_connection(transport_id, remote_addr, peer_identity)
2702 .await
2703 {
2704 Ok(()) => {
2705 attempted = true;
2706 concrete_budget = concrete_budget.saturating_sub(1);
2707 }
2708 Err(e @ NodeError::AccessDenied(_)) => return Err(e),
2709 Err(e) => {
2710 debug!(
2711 npub = %peer_config.npub,
2712 transport_id = %transport_id,
2713 error = %e,
2714 "Connection attempt failed, trying next address"
2715 );
2716 }
2717 }
2718 }
2719
2720 if attempted {
2721 return Ok(());
2722 }
2723
2724 Err(NodeError::NoTransportForType(format!(
2725 "no operational transport for any of {}'s addresses",
2726 peer_config.npub
2727 )))
2728 }
2729
2730 async fn queue_open_discovery_retries(&mut self, bootstrap: &std::sync::Arc<NostrDiscovery>) {
2731 self.run_open_discovery_sweep(bootstrap, None, "per-tick")
2732 .await;
2733 }
2734
2735 pub(in crate::node) async fn run_open_discovery_sweep(
2746 &mut self,
2747 bootstrap: &std::sync::Arc<NostrDiscovery>,
2748 max_age_secs: Option<u64>,
2749 caller: &'static str,
2750 ) {
2751 if !self.config.node.discovery.nostr.enabled
2752 || self.config.node.discovery.nostr.policy != crate::config::NostrDiscoveryPolicy::Open
2753 {
2754 return;
2755 }
2756
2757 let configured_npubs = self
2758 .config
2759 .peers()
2760 .iter()
2761 .map(|peer| peer.npub.clone())
2762 .collect::<HashSet<_>>();
2763 let now_ms = Self::now_ms();
2764 let now_secs = now_ms / 1000;
2765 let mut enqueue_budget = self.open_discovery_enqueue_budget(&configured_npubs);
2766 if enqueue_budget == 0 {
2767 debug!(
2768 caller = %caller,
2769 "open-discovery sweep: enqueue budget is 0, skipping"
2770 );
2771 return;
2772 }
2773
2774 let candidates = bootstrap.cached_open_discovery_candidates(64).await;
2775 let cached_count = candidates.len();
2776 let mut enqueued = 0usize;
2777 let mut skipped_age = 0usize;
2778 let mut skipped_configured = 0usize;
2779 let mut skipped_self = 0usize;
2780 let mut skipped_connected = 0usize;
2781 let mut skipped_retry_pending = 0usize;
2782 let mut skipped_connecting = 0usize;
2783 let mut skipped_no_endpoints = 0usize;
2784 let mut skipped_invalid_npub = 0usize;
2785 let mut skipped_cooldown = 0usize;
2786
2787 for (npub, endpoints, created_at_secs) in candidates {
2788 if enqueue_budget == 0 {
2789 break;
2790 }
2791
2792 if let Some(max_age) = max_age_secs
2793 && now_secs.saturating_sub(created_at_secs) > max_age
2794 {
2795 skipped_age = skipped_age.saturating_add(1);
2796 continue;
2797 }
2798
2799 if configured_npubs.contains(&npub) {
2800 if let Ok(identity) = PeerIdentity::from_npub(&npub) {
2820 let configured_addr = *identity.node_addr();
2821 if let Some(state) = self.retry_pending.get_mut(&configured_addr)
2822 && state.retry_after_ms > now_ms
2823 {
2824 state.retry_after_ms = now_ms;
2825 debug!(
2826 caller = %caller,
2827 peer = %self.peer_display_name(&configured_addr),
2828 advert_age_secs = now_secs.saturating_sub(created_at_secs),
2829 "Expediting configured-peer retry after fresh overlay advert"
2830 );
2831 }
2832 }
2833 skipped_configured = skipped_configured.saturating_add(1);
2834 continue;
2835 }
2836
2837 let peer_identity = match PeerIdentity::from_npub(&npub) {
2838 Ok(identity) => identity,
2839 Err(_) => {
2840 skipped_invalid_npub = skipped_invalid_npub.saturating_add(1);
2841 continue;
2842 }
2843 };
2844 let node_addr = *peer_identity.node_addr();
2845 if node_addr == *self.identity.node_addr() {
2846 skipped_self = skipped_self.saturating_add(1);
2847 continue;
2848 }
2849 if self.peers.contains_key(&node_addr) {
2850 skipped_connected = skipped_connected.saturating_add(1);
2851 continue;
2852 }
2853 if self.retry_pending.contains_key(&node_addr) {
2854 skipped_retry_pending = skipped_retry_pending.saturating_add(1);
2855 continue;
2856 }
2857 if bootstrap.cooldown_until(&npub, now_ms).is_some() {
2858 skipped_cooldown = skipped_cooldown.saturating_add(1);
2859 continue;
2860 }
2861 let connecting = self.connections.values().any(|conn| {
2862 conn.expected_identity()
2863 .map(|id| id.node_addr() == &node_addr)
2864 .unwrap_or(false)
2865 });
2866 if connecting {
2867 skipped_connecting = skipped_connecting.saturating_add(1);
2868 continue;
2869 }
2870
2871 let mut addresses = Vec::new();
2872 let mut priority = 120u8;
2873 let seen_at_ms = Self::now_ms();
2874 for endpoint in endpoints {
2875 let Some(candidate) =
2876 Self::overlay_endpoint_to_peer_address(&endpoint, priority, seen_at_ms)
2877 else {
2878 continue;
2879 };
2880 if addresses.iter().any(|existing: &PeerAddress| {
2881 existing.transport == candidate.transport && existing.addr == candidate.addr
2882 }) {
2883 continue;
2884 }
2885 addresses.push(candidate);
2886 priority = priority.saturating_add(1);
2887 }
2888 if addresses.is_empty() {
2889 skipped_no_endpoints = skipped_no_endpoints.saturating_add(1);
2890 continue;
2891 }
2892
2893 self.peer_aliases
2894 .entry(node_addr)
2895 .or_insert_with(|| peer_identity.short_npub());
2896 self.register_identity(node_addr, peer_identity.pubkey_full());
2897
2898 let mut state = super::retry::RetryState::new(PeerConfig {
2899 npub: npub.clone(),
2900 alias: None,
2901 addresses,
2902 connect_policy: ConnectPolicy::AutoConnect,
2903 auto_reconnect: true,
2904 discovery_fallback_transit: false,
2905 });
2906 state.reconnect = false;
2907 state.retry_after_ms = now_ms;
2908 state.expires_at_ms = Some(self.open_discovery_retry_expires_at_ms(now_ms));
2909 self.retry_pending.insert(node_addr, state);
2910 info!(
2911 caller = %caller,
2912 peer = %peer_identity.short_npub(),
2913 advert_age_secs = now_secs.saturating_sub(created_at_secs),
2914 "open-discovery sweep: queued retry for cached advert"
2915 );
2916 enqueue_budget = enqueue_budget.saturating_sub(1);
2917 enqueued = enqueued.saturating_add(1);
2918 }
2919
2920 let total_skipped = skipped_age
2924 + skipped_configured
2925 + skipped_self
2926 + skipped_connected
2927 + skipped_retry_pending
2928 + skipped_connecting
2929 + skipped_no_endpoints
2930 + skipped_invalid_npub
2931 + skipped_cooldown;
2932 let should_summarize = caller == "startup" || enqueued > 0;
2933 if should_summarize {
2934 info!(
2935 caller = %caller,
2936 cached = cached_count,
2937 queued = enqueued,
2938 skipped_age = skipped_age,
2939 skipped_configured = skipped_configured,
2940 skipped_self = skipped_self,
2941 skipped_connected = skipped_connected,
2942 skipped_retry_pending = skipped_retry_pending,
2943 skipped_connecting = skipped_connecting,
2944 skipped_no_endpoints = skipped_no_endpoints,
2945 skipped_invalid_npub = skipped_invalid_npub,
2946 skipped_cooldown = skipped_cooldown,
2947 skipped_total = total_skipped,
2948 "open-discovery sweep complete"
2949 );
2950 }
2951 }
2952
2953 async fn maybe_run_startup_open_discovery_sweep(
2961 &mut self,
2962 bootstrap: &std::sync::Arc<NostrDiscovery>,
2963 ) {
2964 if self.startup_open_discovery_sweep_done {
2965 return;
2966 }
2967 if !self.config.node.discovery.nostr.enabled
2968 || self.config.node.discovery.nostr.policy != crate::config::NostrDiscoveryPolicy::Open
2969 {
2970 self.startup_open_discovery_sweep_done = true;
2972 return;
2973 }
2974 let Some(started_at_ms) = self.nostr_discovery_started_at_ms else {
2975 return;
2976 };
2977 let now_ms = Self::now_ms();
2978 let delay_ms = self
2979 .config
2980 .node
2981 .discovery
2982 .nostr
2983 .startup_sweep_delay_secs
2984 .saturating_mul(1000);
2985 if now_ms < started_at_ms.saturating_add(delay_ms) {
2986 return;
2987 }
2988
2989 let max_age_secs = self.config.node.discovery.nostr.startup_sweep_max_age_secs;
2990 self.run_open_discovery_sweep(bootstrap, Some(max_age_secs), "startup")
2991 .await;
2992 self.startup_open_discovery_sweep_done = true;
2993 }
2994
2995 fn available_outbound_slots(&self) -> usize {
2996 let connection_used = self
2997 .connections
2998 .len()
2999 .saturating_add(self.pending_connects.len());
3000 let connection_slots = if self.max_connections == 0 {
3001 usize::MAX
3002 } else {
3003 self.max_connections.saturating_sub(connection_used)
3004 };
3005
3006 let peer_slots = if self.max_peers == 0 {
3007 usize::MAX
3008 } else {
3009 self.max_peers.saturating_sub(self.peers.len())
3010 };
3011
3012 connection_slots.min(peer_slots)
3013 }
3014
3015 fn open_discovery_enqueue_budget(&self, configured_npubs: &HashSet<String>) -> usize {
3016 let current_open_discovery_pending = self
3017 .retry_pending
3018 .values()
3019 .filter(|state| !configured_npubs.contains(&state.peer_config.npub))
3020 .count();
3021
3022 let cap_remaining = self
3023 .config
3024 .node
3025 .discovery
3026 .nostr
3027 .open_discovery_max_pending
3028 .saturating_sub(current_open_discovery_pending);
3029
3030 cap_remaining.min(self.available_outbound_slots())
3031 }
3032
3033 fn open_discovery_retry_expires_at_ms(&self, now_ms: u64) -> u64 {
3034 now_ms.saturating_add(
3035 self.config
3036 .node
3037 .discovery
3038 .nostr
3039 .advert_ttl_secs
3040 .saturating_mul(1000)
3041 .saturating_mul(OPEN_DISCOVERY_RETRY_LIFETIME_MULTIPLIER),
3042 )
3043 }
3044
3045 async fn build_overlay_advert(
3046 &self,
3047 bootstrap: &std::sync::Arc<NostrDiscovery>,
3048 ) -> Option<OverlayAdvert> {
3049 if !self.config.node.discovery.nostr.enabled {
3050 return None;
3051 }
3052
3053 let mut endpoints = Vec::new();
3054 let mut has_udp_nat = false;
3055 let mut has_webrtc = false;
3056
3057 for handle in self.transports.values() {
3058 if !handle.is_operational() {
3059 continue;
3060 }
3061
3062 match handle.transport_type().name {
3063 "udp" => {
3064 let Some(cfg) = self.lookup_udp_config(handle.name()) else {
3065 continue;
3066 };
3067 if !cfg.advertise_on_nostr() {
3068 continue;
3069 }
3070 if cfg.is_public() {
3071 if let Some(explicit) = cfg.external_advert_addr() {
3081 endpoints.push(OverlayEndpointAdvert {
3082 transport: OverlayTransportKind::Udp,
3083 addr: explicit.to_string(),
3084 });
3085 } else {
3086 match handle.local_addr() {
3087 Some(addr)
3088 if !addr.ip().is_unspecified()
3089 && !is_unroutable_advert_ip(addr.ip()) =>
3090 {
3091 endpoints.push(OverlayEndpointAdvert {
3092 transport: OverlayTransportKind::Udp,
3093 addr: addr.to_string(),
3094 });
3095 }
3096 Some(addr) => {
3097 let key = handle.transport_id().as_u32();
3098 let port = addr.port();
3099 if let Some(public) =
3100 bootstrap.learn_public_udp_addr(key, port).await
3101 {
3102 endpoints.push(OverlayEndpointAdvert {
3103 transport: OverlayTransportKind::Udp,
3104 addr: public.to_string(),
3105 });
3106 } else {
3107 warn!(
3108 transport_id = key,
3109 bind_addr = %addr,
3110 "advert: udp public=true but bind is wildcard \
3111 or private and STUN observation failed; \
3112 advertising no UDP endpoint. Either set \
3113 transports.udp.external_addr, bind to a \
3114 specific *public* IP, or ensure \
3115 node.discovery.nostr.stun_servers is reachable"
3116 );
3117 }
3118 }
3119 None => {}
3120 }
3121 }
3122 } else {
3123 endpoints.push(OverlayEndpointAdvert {
3124 transport: OverlayTransportKind::Udp,
3125 addr: "nat".to_string(),
3126 });
3127 has_udp_nat = true;
3128 }
3129 }
3130 "webrtc" => {
3131 let Some(cfg) = self.lookup_webrtc_config(handle.name()) else {
3132 continue;
3133 };
3134 if !cfg.advertise_on_nostr() {
3135 continue;
3136 }
3137 endpoints.push(OverlayEndpointAdvert {
3138 transport: OverlayTransportKind::WebRtc,
3139 addr: hex::encode(self.identity.pubkey_full().serialize()),
3140 });
3141 has_webrtc = true;
3142 }
3143 "tcp" => {
3144 let Some(cfg) = self.lookup_tcp_config(handle.name()) else {
3145 continue;
3146 };
3147 if !cfg.advertise_on_nostr() {
3148 continue;
3149 }
3150 if let Some(explicit) = cfg.external_advert_addr() {
3162 endpoints.push(OverlayEndpointAdvert {
3163 transport: OverlayTransportKind::Tcp,
3164 addr: explicit.to_string(),
3165 });
3166 } else {
3167 match handle.local_addr() {
3168 Some(addr)
3169 if !addr.ip().is_unspecified()
3170 && !is_unroutable_advert_ip(addr.ip()) =>
3171 {
3172 endpoints.push(OverlayEndpointAdvert {
3173 transport: OverlayTransportKind::Tcp,
3174 addr: addr.to_string(),
3175 });
3176 }
3177 Some(addr) => {
3178 warn!(
3179 bind_addr = %addr,
3180 "advert: tcp advertise_on_nostr=true bound to wildcard \
3181 or private IP and no transports.tcp.external_addr set; \
3182 advertising no TCP endpoint. Either set external_addr \
3183 to the public IP (recommended for cloud 1:1-NAT setups) \
3184 or bind explicitly to the public IP"
3185 );
3186 }
3187 None => {}
3188 }
3189 }
3190 }
3191 "tor" => {
3192 let Some(cfg) = self.lookup_tor_config(handle.name()) else {
3193 continue;
3194 };
3195 if !cfg.advertise_on_nostr() {
3196 continue;
3197 }
3198 if let Some(addr) = handle.onion_address() {
3199 endpoints.push(OverlayEndpointAdvert {
3200 transport: OverlayTransportKind::Tor,
3201 addr: format!("{}:{}", addr, cfg.advertised_port()),
3202 });
3203 }
3204 }
3205 _ => {}
3206 }
3207 }
3208
3209 if endpoints.is_empty() {
3210 return None;
3211 }
3212
3213 Some(OverlayAdvert {
3214 identifier: ADVERT_IDENTIFIER.to_string(),
3215 version: ADVERT_VERSION,
3216 endpoints,
3217 signal_relays: (has_udp_nat || has_webrtc)
3218 .then(|| self.config.node.discovery.nostr.dm_relays.clone()),
3219 stun_servers: (has_udp_nat || has_webrtc)
3220 .then(|| self.config.node.discovery.nostr.stun_servers.clone()),
3221 })
3222 }
3223
3224 async fn refresh_overlay_advert(
3225 &self,
3226 bootstrap: &std::sync::Arc<NostrDiscovery>,
3227 ) -> Result<(), crate::discovery::nostr::BootstrapError> {
3228 let advert = self.build_overlay_advert(bootstrap).await;
3229 bootstrap.update_local_advert(advert).await
3230 }
3231
3232 fn lookup_udp_config(&self, transport_name: Option<&str>) -> Option<&crate::config::UdpConfig> {
3233 match (&self.config.transports.udp, transport_name) {
3234 (crate::config::TransportInstances::Single(cfg), None) => Some(cfg),
3235 (crate::config::TransportInstances::Named(configs), Some(name)) => configs.get(name),
3236 _ => None,
3237 }
3238 }
3239
3240 fn lookup_tcp_config(&self, transport_name: Option<&str>) -> Option<&crate::config::TcpConfig> {
3241 match (&self.config.transports.tcp, transport_name) {
3242 (crate::config::TransportInstances::Single(cfg), None) => Some(cfg),
3243 (crate::config::TransportInstances::Named(configs), Some(name)) => configs.get(name),
3244 _ => None,
3245 }
3246 }
3247
3248 fn lookup_tor_config(&self, transport_name: Option<&str>) -> Option<&crate::config::TorConfig> {
3249 match (&self.config.transports.tor, transport_name) {
3250 (crate::config::TransportInstances::Single(cfg), None) => Some(cfg),
3251 (crate::config::TransportInstances::Named(configs), Some(name)) => configs.get(name),
3252 _ => None,
3253 }
3254 }
3255
3256 fn lookup_webrtc_config(
3257 &self,
3258 transport_name: Option<&str>,
3259 ) -> Option<&crate::config::WebRtcConfig> {
3260 match (&self.config.transports.webrtc, transport_name) {
3261 (crate::config::TransportInstances::Single(cfg), None) => Some(cfg),
3262 (crate::config::TransportInstances::Named(configs), Some(name)) => configs.get(name),
3263 _ => None,
3264 }
3265 }
3266
3267 pub(in crate::node) async fn try_peer_addresses(
3268 &mut self,
3269 peer_config: &PeerConfig,
3270 peer_identity: PeerIdentity,
3271 allow_bootstrap_nat: bool,
3272 ) -> Result<(), NodeError> {
3273 let peer_node_addr = *peer_identity.node_addr();
3274 if self.peers.contains_key(&peer_node_addr) {
3275 debug!(
3276 npub = %peer_config.npub,
3277 "Peer already exists, skipping address attempts"
3278 );
3279 return Ok(());
3280 }
3281
3282 let candidates = self.peer_address_candidates(peer_config).await;
3283
3284 if candidates.is_empty() {
3285 if allow_bootstrap_nat && self.request_nostr_bootstrap(peer_config).await {
3286 return Ok(());
3287 }
3288 return Err(NodeError::NoTransportForType(format!(
3289 "no addresses known for {}",
3290 peer_config.npub
3291 )));
3292 }
3293
3294 if self
3295 .attempt_peer_address_list(peer_config, peer_identity, allow_bootstrap_nat, &candidates)
3296 .await
3297 .is_ok()
3298 {
3299 return Ok(());
3300 }
3301
3302 Err(NodeError::NoTransportForType(format!(
3303 "no operational transport for any of {}'s addresses",
3304 peer_config.npub
3305 )))
3306 }
3307
3308 async fn try_active_peer_alternative_addresses(
3309 &mut self,
3310 peer_config: &PeerConfig,
3311 peer_identity: PeerIdentity,
3312 ) -> Result<bool, NodeError> {
3313 let peer_node_addr = *peer_identity.node_addr();
3314 let candidates = self.peer_address_candidates(peer_config).await;
3315
3316 if candidates.is_empty() {
3317 return Err(NodeError::NoTransportForType(format!(
3318 "no addresses known for {}",
3319 peer_config.npub
3320 )));
3321 }
3322
3323 let alternatives: Vec<_> = candidates
3324 .into_iter()
3325 .filter(|addr| !(addr.transport == "udp" && addr.addr.eq_ignore_ascii_case("nat")))
3326 .filter(|addr| !self.active_peer_matches_candidate(&peer_node_addr, addr))
3327 .collect();
3328
3329 if alternatives.is_empty() {
3330 return Err(NodeError::NoTransportForType(format!(
3331 "no concrete alternate addresses known for {}",
3332 peer_config.npub
3333 )));
3334 }
3335
3336 self.attempt_peer_address_list(peer_config, peer_identity, false, &alternatives)
3337 .await?;
3338 Ok(true)
3339 }
3340
3341 async fn peer_address_candidates(&self, peer_config: &PeerConfig) -> Vec<PeerAddress> {
3342 let static_addresses = self.static_peer_addresses(peer_config);
3349 let overlay_addresses = self
3350 .nostr_peer_fallback_addresses(peer_config, &static_addresses)
3351 .await;
3352
3353 let mut candidates = Vec::with_capacity(overlay_addresses.len() + static_addresses.len());
3354 for addr in overlay_addresses.into_iter().chain(static_addresses) {
3355 if !candidates.iter().any(|existing: &PeerAddress| {
3356 existing.transport == addr.transport && existing.addr == addr.addr
3357 }) {
3358 candidates.push(addr);
3359 }
3360 }
3361
3362 candidates.sort_by(|a, b| match (a.seen_at_ms, b.seen_at_ms) {
3367 _ if a.priority != b.priority => a.priority.cmp(&b.priority),
3368 (Some(a_ts), Some(b_ts)) => b_ts.cmp(&a_ts),
3369 (Some(_), None) => std::cmp::Ordering::Less,
3370 (None, Some(_)) => std::cmp::Ordering::Greater,
3371 (None, None) => std::cmp::Ordering::Equal,
3372 });
3373
3374 candidates
3375 }
3376
3377 fn active_peer_matches_any_candidate(
3378 &self,
3379 peer_node_addr: &NodeAddr,
3380 candidates: &[PeerAddress],
3381 ) -> bool {
3382 candidates
3383 .iter()
3384 .any(|candidate| self.active_peer_matches_candidate(peer_node_addr, candidate))
3385 }
3386
3387 pub(in crate::node) fn active_peer_candidate_is_fresh_enough_to_skip(
3388 &self,
3389 peer_node_addr: &NodeAddr,
3390 candidates: &[PeerAddress],
3391 ) -> bool {
3392 if !self.active_peer_matches_any_candidate(peer_node_addr, candidates) {
3393 return false;
3394 }
3395 !self.active_peer_needs_same_path_refresh(peer_node_addr)
3396 }
3397
3398 fn active_peer_needs_same_path_refresh(&self, peer_node_addr: &NodeAddr) -> bool {
3399 let Some(peer) = self.peers.get(peer_node_addr) else {
3400 return false;
3401 };
3402 let stale_after_ms = self
3403 .config
3404 .node
3405 .heartbeat_interval_secs
3406 .saturating_mul(1000)
3407 .max(1000);
3408 peer.idle_time(Self::now_ms()) > stale_after_ms
3409 }
3410
3411 fn active_peer_matches_candidate(
3412 &self,
3413 peer_node_addr: &NodeAddr,
3414 candidate: &PeerAddress,
3415 ) -> bool {
3416 let Some(peer) = self.peers.get(peer_node_addr) else {
3417 return false;
3418 };
3419 let Some(current_addr) = peer.current_addr() else {
3420 return false;
3421 };
3422 if let Some(peer_transport_id) = peer.transport_id()
3423 && let Some((candidate_transport_id, candidate_addr)) =
3424 self.resolve_peer_address_for_match(candidate)
3425 {
3426 return peer_transport_id == candidate_transport_id && current_addr == &candidate_addr;
3427 }
3428 if peer
3429 .transport_id()
3430 .map(|id| self.bootstrap_transports.contains(&id))
3431 .unwrap_or(false)
3432 {
3433 return false;
3434 }
3435 let current_addr = current_addr.to_string();
3436 let current_transport = peer
3437 .transport_id()
3438 .and_then(|id| self.transports.get(&id))
3439 .map(|transport| transport.transport_type().name);
3440
3441 candidate.addr == current_addr
3442 && current_transport
3443 .map(|transport| transport == candidate.transport)
3444 .unwrap_or(true)
3445 }
3446
3447 pub(crate) async fn api_connect(
3455 &mut self,
3456 npub: &str,
3457 address: &str,
3458 transport: &str,
3459 ) -> Result<serde_json::Value, String> {
3460 let peer_config = PeerConfig {
3461 npub: npub.to_string(),
3462 alias: None,
3463 addresses: vec![PeerAddress::new(transport, address)],
3464 connect_policy: ConnectPolicy::Manual,
3465 auto_reconnect: false,
3466 discovery_fallback_transit: true,
3467 };
3468
3469 if let Ok(identity) = PeerIdentity::from_npub(npub) {
3471 self.peer_aliases
3472 .insert(*identity.node_addr(), identity.short_npub());
3473 self.register_identity(*identity.node_addr(), identity.pubkey_full());
3474 }
3475
3476 self.initiate_peer_connection(&peer_config)
3477 .await
3478 .map(|()| {
3479 info!(
3480 npub = %npub,
3481 address = %address,
3482 transport = %transport,
3483 "API connect initiated"
3484 );
3485 serde_json::json!({
3486 "npub": npub,
3487 "address": address,
3488 "transport": transport,
3489 })
3490 })
3491 .map_err(|e| e.to_string())
3492 }
3493
3494 pub(crate) fn api_disconnect(&mut self, npub: &str) -> Result<serde_json::Value, String> {
3498 let peer_identity =
3499 PeerIdentity::from_npub(npub).map_err(|e| format!("invalid npub '{npub}': {e}"))?;
3500 let node_addr = *peer_identity.node_addr();
3501
3502 if !self.peers.contains_key(&node_addr) {
3503 return Err(format!("peer not found: {npub}"));
3504 }
3505
3506 self.remove_active_peer(&node_addr);
3508
3509 self.retry_pending.remove(&node_addr);
3511
3512 info!(npub = %npub, "API disconnect completed");
3513
3514 Ok(serde_json::json!({
3515 "npub": npub,
3516 "disconnected": true,
3517 }))
3518 }
3519
3520 pub async fn adopt_established_traversal(
3527 &mut self,
3528 traversal: EstablishedTraversal,
3529 ) -> Result<BootstrapHandoffResult, NodeError> {
3530 debug!(
3531 peer_npub = %traversal.peer_npub,
3532 session_id = %traversal.session_id,
3533 remote_addr = %traversal.remote_addr,
3534 "adopting established traversal socket"
3535 );
3536
3537 if !self.state.is_operational() {
3538 return Err(NodeError::NotStarted);
3539 }
3540
3541 let packet_tx = self.packet_tx.clone().ok_or(NodeError::NotStarted)?;
3542 let peer_identity = PeerIdentity::from_npub(&traversal.peer_npub).map_err(|e| {
3543 NodeError::InvalidPeerNpub {
3544 npub: traversal.peer_npub.clone(),
3545 reason: e.to_string(),
3546 }
3547 })?;
3548 let peer_node_addr = *peer_identity.node_addr();
3549 if self.peers.contains_key(&peer_node_addr) {
3550 debug!(
3551 peer_npub = %traversal.peer_npub,
3552 "Adopting NAT traversal handoff as alternate path for already-connected peer"
3553 );
3554 }
3555
3556 self.peer_aliases
3557 .insert(peer_node_addr, peer_identity.short_npub());
3558 self.register_identity(peer_node_addr, peer_identity.pubkey_full());
3559
3560 let transport_id = self.allocate_transport_id();
3561 let inherited_config = traversal.transport_config.clone().unwrap_or_else(|| {
3581 let mut cfg = self
3582 .lookup_udp_config(traversal.transport_name.as_deref())
3583 .or_else(|| self.lookup_udp_config(None))
3584 .cloned()
3585 .unwrap_or_default();
3586 cfg.bind_addr = None;
3587 cfg.external_addr = None;
3588 cfg
3589 });
3590 let mut transport = crate::transport::udp::UdpTransport::new(
3591 transport_id,
3592 traversal.transport_name.clone(),
3593 inherited_config,
3594 packet_tx,
3595 );
3596
3597 transport
3598 .adopt_socket_async(traversal.socket)
3599 .await
3600 .map_err(|e| NodeError::BootstrapHandoff(e.to_string()))?;
3601
3602 let local_addr = transport.local_addr().ok_or_else(|| {
3603 NodeError::BootstrapHandoff("adopted UDP transport has no local address".into())
3604 })?;
3605
3606 self.transports.insert(
3607 transport_id,
3608 crate::transport::TransportHandle::Udp(transport),
3609 );
3610 self.bootstrap_transports.insert(transport_id);
3611 self.bootstrap_transport_npubs
3612 .insert(transport_id, traversal.peer_npub.clone());
3613
3614 let remote_addr = TransportAddr::from_string(&traversal.remote_addr.to_string());
3615 if let Err(err) = self
3616 .initiate_connection(transport_id, remote_addr.clone(), peer_identity)
3617 .await
3618 {
3619 self.bootstrap_transports.remove(&transport_id);
3620 self.bootstrap_transport_npubs.remove(&transport_id);
3621 if let Some(mut handle) = self.transports.remove(&transport_id) {
3622 let _ = handle.stop().await;
3623 }
3624 return Err(err);
3625 }
3626
3627 info!(
3628 peer = %self.peer_display_name(&peer_node_addr),
3629 transport_id = %transport_id,
3630 local_addr = %local_addr,
3631 remote_addr = %traversal.remote_addr,
3632 session_id = %traversal.session_id,
3633 "adopted NAT traversal socket; handshake initiated"
3634 );
3635
3636 Ok(BootstrapHandoffResult {
3637 transport_id,
3638 local_addr,
3639 remote_addr: traversal.remote_addr,
3640 peer_node_addr,
3641 session_id: traversal.session_id,
3642 })
3643 }
3644}