1use super::{Node, NodeError, NodeState};
4use crate::config::{ConnectPolicy, PeerAddress, PeerConfig};
5use crate::discovery::nostr::{
6 ADVERT_IDENTIFIER, ADVERT_VERSION, BootstrapEvent, NostrDiscovery, OverlayAdvert,
7 OverlayEndpointAdvert, OverlayTransportKind,
8};
9use crate::discovery::{BootstrapHandoffResult, EstablishedTraversal};
10use crate::node::acl::PeerAclContext;
11use crate::node::wire::build_msg1;
12use crate::peer::PeerConnection;
13use crate::protocol::{Disconnect, DisconnectReason};
14use crate::transport::{Link, LinkDirection, LinkId, TransportAddr, TransportId, packet_channel};
15use crate::upper::tun::{TunDevice, TunState, run_tun_reader, shutdown_tun_interface};
16use crate::{NodeAddr, PeerIdentity};
17use std::collections::{HashMap, HashSet};
18use std::net::{IpAddr, SocketAddr};
19use std::thread;
20use std::time::Duration;
21use tracing::{debug, info, warn};
22
23#[cfg(debug_assertions)]
24fn node_start_debug_log(message: impl AsRef<str>) {
25 use std::io::Write as _;
26
27 if let Ok(mut file) = std::fs::OpenOptions::new()
28 .create(true)
29 .append(true)
30 .open(std::env::temp_dir().join("nvpn-fips-endpoint-debug.log"))
31 {
32 let _ = writeln!(
33 file,
34 "{:?} {}",
35 std::time::SystemTime::now(),
36 message.as_ref()
37 );
38 }
39}
40
41#[cfg(not(debug_assertions))]
42fn node_start_debug_log(_message: impl AsRef<str>) {}
43
44fn is_unroutable_advert_ip(ip: IpAddr) -> bool {
52 match ip {
53 IpAddr::V4(v4) => {
54 v4.is_private()
55 || v4.is_loopback()
56 || v4.is_link_local()
57 || v4.is_unspecified()
58 || v4.is_multicast()
59 || v4.is_broadcast()
60 || v4.is_documentation()
61 || (v4.octets()[0] == 100 && (v4.octets()[1] & 0xc0) == 64)
64 }
65 IpAddr::V6(v6) => {
66 v6.is_loopback()
67 || v6.is_unspecified()
68 || v6.is_unique_local()
69 || v6.is_multicast()
70 || (v6.segments()[0] & 0xffc0) == 0xfe80
72 }
73 }
74}
75
76fn socket_addr_families_compatible(local: SocketAddr, remote: SocketAddr) -> bool {
77 matches!(
78 (local, remote),
79 (SocketAddr::V4(_), SocketAddr::V4(_)) | (SocketAddr::V6(_), SocketAddr::V6(_))
80 )
81}
82
83const OPEN_DISCOVERY_RETRY_LIFETIME_MULTIPLIER: u64 = 2;
84const MAX_PARALLEL_PATH_CANDIDATES_PER_PEER: usize = 4;
85const MAX_AUTO_CONNECT_GRAPH_WARMUPS_PER_TICK: usize = 16;
86const MAX_DISCOVERY_CONNECTS_PER_TICK: usize = 16;
87
88impl Node {
89 pub(super) async fn update_peers(
101 &mut self,
102 new_peers: Vec<crate::config::PeerConfig>,
103 ) -> Result<crate::node::UpdatePeersOutcome, crate::node::NodeError> {
104 use std::collections::{HashMap, HashSet};
105
106 let mut new_by_addr: HashMap<crate::identity::NodeAddr, crate::config::PeerConfig> =
107 HashMap::with_capacity(new_peers.len());
108 for peer in new_peers {
109 let identity = match PeerIdentity::from_npub(&peer.npub) {
110 Ok(id) => id,
111 Err(e) => {
112 return Err(crate::node::NodeError::InvalidPeerNpub {
113 npub: peer.npub.clone(),
114 reason: e.to_string(),
115 });
116 }
117 };
118 new_by_addr.insert(*identity.node_addr(), peer);
122 }
123
124 let current_by_addr: HashMap<crate::identity::NodeAddr, crate::config::PeerConfig> = self
125 .config
126 .peers()
127 .iter()
128 .filter_map(|pc| {
129 PeerIdentity::from_npub(&pc.npub)
130 .ok()
131 .map(|id| (*id.node_addr(), pc.clone()))
132 })
133 .collect();
134
135 let new_addrs: HashSet<_> = new_by_addr.keys().copied().collect();
136 let current_addrs: HashSet<_> = current_by_addr.keys().copied().collect();
137
138 let removed: Vec<_> = current_addrs.difference(&new_addrs).copied().collect();
139 let added: Vec<_> = new_addrs.difference(¤t_addrs).copied().collect();
140 let kept: Vec<_> = new_addrs.intersection(¤t_addrs).copied().collect();
141
142 let mut outcome = crate::node::UpdatePeersOutcome::default();
143
144 for node_addr in &removed {
145 if self.retry_pending.remove(node_addr).is_some() {
146 debug!(
147 peer = %self.peer_display_name(node_addr),
148 "Dropping retry entry for peer removed from runtime peer list"
149 );
150 }
151 self.peer_aliases.remove(node_addr);
152 self.set_discovery_fallback_transit_allowed(*node_addr, false);
153 outcome.removed += 1;
154 }
155
156 let mut auto_connect_refresh_configs = Vec::new();
157 for node_addr in &kept {
158 let new_pc = &new_by_addr[node_addr];
159 let current_pc = ¤t_by_addr[node_addr];
160 if new_pc.addresses != current_pc.addresses
161 || new_pc.alias != current_pc.alias
162 || new_pc.connect_policy != current_pc.connect_policy
163 || new_pc.auto_reconnect != current_pc.auto_reconnect
164 || new_pc.discovery_fallback_transit != current_pc.discovery_fallback_transit
165 {
166 outcome.updated += 1;
167 self.set_discovery_fallback_transit_allowed(
168 *node_addr,
169 new_pc.discovery_fallback_transit,
170 );
171 if let Some(state) = self.retry_pending.get_mut(node_addr) {
172 state.peer_config = new_pc.clone();
173 state.retry_after_ms = Self::now_ms();
174 }
175 if let Some(alias) = new_pc.alias.clone() {
176 self.peer_aliases.insert(*node_addr, alias);
177 }
178 if new_pc.is_auto_connect() && !new_pc.addresses.is_empty() {
179 auto_connect_refresh_configs.push(new_pc.clone());
180 }
181 } else {
182 outcome.unchanged += 1;
183 self.set_discovery_fallback_transit_allowed(
184 *node_addr,
185 new_pc.discovery_fallback_transit,
186 );
187 if new_pc.is_auto_connect() && !new_pc.addresses.is_empty() {
188 auto_connect_refresh_configs.push(new_pc.clone());
189 }
190 }
191 }
192
193 let added_configs: Vec<crate::config::PeerConfig> =
194 added.iter().map(|addr| new_by_addr[addr].clone()).collect();
195
196 self.config.peers = new_by_addr.into_values().collect();
200
201 for peer_config in added_configs {
202 outcome.added += 1;
203 let Ok(identity) = PeerIdentity::from_npub(&peer_config.npub) else {
204 continue;
205 };
206 let name = peer_config
207 .alias
208 .clone()
209 .unwrap_or_else(|| identity.short_npub());
210 self.peer_aliases.insert(*identity.node_addr(), name);
211 self.set_discovery_fallback_transit_allowed(
212 *identity.node_addr(),
213 peer_config.discovery_fallback_transit,
214 );
215 self.register_identity(*identity.node_addr(), identity.pubkey_full());
216
217 if !peer_config.is_auto_connect() {
218 continue;
219 }
220
221 if let Err(e) = self.initiate_peer_connection(&peer_config).await {
222 warn!(
223 npub = %peer_config.npub,
224 error = %e,
225 "Failed to initiate connection for newly added peer"
226 );
227 if let Ok(peer_identity) = PeerIdentity::from_npub(&peer_config.npub) {
228 self.schedule_retry(*peer_identity.node_addr(), Self::now_ms());
229 }
230 if matches!(e, crate::node::NodeError::NoTransportForType(_))
231 && let Some(bootstrap) = self.nostr_discovery.clone()
232 {
233 bootstrap
234 .request_advert_stale_check(peer_config.npub.clone())
235 .await;
236 }
237 }
238 }
239
240 for peer_config in auto_connect_refresh_configs {
241 let Ok(peer_identity) = PeerIdentity::from_npub(&peer_config.npub) else {
242 continue;
243 };
244 let node_addr = *peer_identity.node_addr();
245
246 if self.peers.contains_key(&node_addr) {
247 match self
248 .initiate_active_peer_alternative_connection(&peer_config)
249 .await
250 {
251 Ok(attempted) => {
252 if attempted {
253 debug!(
254 peer = %self.peer_display_name(&node_addr),
255 "Started non-disruptive alternate-path handshake for active peer"
256 );
257 }
258 }
259 Err(e) => {
260 debug!(
261 npub = %peer_config.npub,
262 error = %e,
263 "Active peer alternate-path refresh did not start"
264 );
265 }
266 }
267 continue;
268 }
269
270 match self.initiate_peer_connection(&peer_config).await {
271 Ok(()) => {
272 let hs_timeout_ms = self.config.node.rate_limit.handshake_timeout_secs * 1000;
273 if let Some(state) = self.retry_pending.get_mut(&node_addr) {
274 state.peer_config = peer_config;
275 state.retry_after_ms = Self::now_ms().saturating_add(hs_timeout_ms);
276 }
277 }
278 Err(e) => {
279 debug!(
280 npub = %peer_config.npub,
281 error = %e,
282 "Refreshed peer addresses did not initiate a direct connection"
283 );
284 self.schedule_retry(node_addr, Self::now_ms());
285 }
286 }
287 }
288
289 self.warm_auto_connect_graph_sessions().await;
290
291 Ok(outcome)
292 }
293
294 pub(super) async fn initiate_peer_connections(&mut self) {
295 let peer_identities: Vec<(PeerIdentity, Option<String>)> = self
301 .config
302 .peers()
303 .iter()
304 .filter_map(|pc| {
305 PeerIdentity::from_npub(&pc.npub)
306 .ok()
307 .map(|id| (id, pc.alias.clone()))
308 })
309 .collect();
310
311 for (identity, alias) in peer_identities {
312 let name = alias.unwrap_or_else(|| identity.short_npub());
313 self.peer_aliases.insert(*identity.node_addr(), name);
314 self.register_identity(*identity.node_addr(), identity.pubkey_full());
318 }
319
320 let peer_configs: Vec<_> = self.config.auto_connect_peers().cloned().collect();
322
323 if peer_configs.is_empty() {
324 debug!("No static peers configured");
325 return;
326 }
327
328 debug!(
329 count = peer_configs.len(),
330 "Initiating static peer connections"
331 );
332
333 for peer_config in peer_configs {
334 if let Err(e) = self.initiate_peer_connection(&peer_config).await {
335 warn!(
336 npub = %peer_config.npub,
337 alias = ?peer_config.alias,
338 error = %e,
339 "Failed to initiate peer connection"
340 );
341 if let Ok(peer_identity) = PeerIdentity::from_npub(&peer_config.npub) {
345 self.schedule_retry(*peer_identity.node_addr(), Self::now_ms());
346 }
347 if matches!(e, crate::node::NodeError::NoTransportForType(_))
353 && let Some(bootstrap) = self.nostr_discovery.clone()
354 {
355 bootstrap
356 .request_advert_stale_check(peer_config.npub.clone())
357 .await;
358 }
359 }
360 }
361
362 self.warm_auto_connect_graph_sessions().await;
363 }
364
365 pub(super) async fn initiate_peer_connection(
369 &mut self,
370 peer_config: &crate::config::PeerConfig,
371 ) -> Result<(), NodeError> {
372 self.initiate_peer_connection_inner(peer_config).await
373 }
374
375 pub(super) async fn initiate_peer_retry_connection(
381 &mut self,
382 peer_config: &crate::config::PeerConfig,
383 ) -> Result<(), NodeError> {
384 self.initiate_peer_connection_inner(peer_config).await
385 }
386
387 async fn initiate_active_peer_alternative_connection(
388 &mut self,
389 peer_config: &crate::config::PeerConfig,
390 ) -> Result<bool, NodeError> {
391 let peer_identity =
392 PeerIdentity::from_npub(&peer_config.npub).map_err(|e| NodeError::InvalidPeerNpub {
393 npub: peer_config.npub.clone(),
394 reason: e.to_string(),
395 })?;
396 let peer_node_addr = *peer_identity.node_addr();
397
398 if !self.peers.contains_key(&peer_node_addr) {
399 self.initiate_peer_connection(peer_config).await?;
400 return Ok(true);
401 }
402
403 self.try_active_peer_alternative_addresses(peer_config, peer_identity)
409 .await
410 }
411
412 async fn initiate_peer_connection_inner(
413 &mut self,
414 peer_config: &crate::config::PeerConfig,
415 ) -> Result<(), NodeError> {
416 let peer_identity =
418 PeerIdentity::from_npub(&peer_config.npub).map_err(|e| NodeError::InvalidPeerNpub {
419 npub: peer_config.npub.clone(),
420 reason: e.to_string(),
421 })?;
422
423 let peer_node_addr = *peer_identity.node_addr();
424
425 if self.peers.contains_key(&peer_node_addr) {
427 debug!(
428 npub = %peer_config.npub,
429 "Peer already exists, skipping"
430 );
431 return Ok(());
432 }
433
434 self.try_peer_addresses(peer_config, peer_identity, true)
435 .await
436 }
437
438 fn is_connecting_to_peer(&self, peer_node_addr: &NodeAddr) -> bool {
439 self.connections.values().any(|conn| {
440 conn.expected_identity()
441 .map(|id| id.node_addr() == peer_node_addr)
442 .unwrap_or(false)
443 })
444 }
445
446 fn is_connecting_to_peer_on_path(
447 &self,
448 peer_node_addr: &NodeAddr,
449 transport_id: TransportId,
450 remote_addr: &TransportAddr,
451 ) -> bool {
452 self.connections.values().any(|conn| {
453 conn.expected_identity()
454 .map(|id| id.node_addr() == peer_node_addr)
455 .unwrap_or(false)
456 && conn.transport_id() == Some(transport_id)
457 && conn.source_addr() == Some(remote_addr)
458 }) || self.pending_connects.iter().any(|pending| {
459 pending.peer_identity.node_addr() == peer_node_addr
460 && pending.transport_id == transport_id
461 && &pending.remote_addr == remote_addr
462 })
463 }
464
465 pub(in crate::node) fn should_warm_auto_connect_session(
466 &self,
467 peer_node_addr: &NodeAddr,
468 ) -> bool {
469 if self.peers.contains_key(peer_node_addr)
470 || self
471 .sessions
472 .get(peer_node_addr)
473 .is_some_and(|entry| entry.is_established())
474 {
475 return false;
476 }
477
478 self.config.peers().iter().any(|peer| {
479 peer.is_auto_connect()
480 && PeerIdentity::from_npub(&peer.npub)
481 .map(|identity| identity.node_addr() == peer_node_addr)
482 .unwrap_or(false)
483 })
484 }
485
486 pub(in crate::node) async fn warm_auto_connect_graph_sessions(&mut self) -> usize {
487 if !self.peers.values().any(|peer| peer.can_send()) {
488 return 0;
489 }
490
491 let mut budget = self.graph_session_warmup_budget();
492 if budget == 0 {
493 return 0;
494 }
495
496 let peer_identities: Vec<_> = self
497 .config
498 .auto_connect_peers()
499 .filter_map(|peer| PeerIdentity::from_npub(&peer.npub).ok())
500 .collect();
501
502 let mut warmed = 0;
503 for identity in peer_identities {
504 if budget == 0 {
505 break;
506 }
507
508 let peer_node_addr = *identity.node_addr();
509 if peer_node_addr == *self.identity.node_addr()
510 || !self.should_warm_auto_connect_session(&peer_node_addr)
511 || self
512 .sessions
513 .get(&peer_node_addr)
514 .is_some_and(|entry| entry.is_initiating())
515 {
516 continue;
517 }
518
519 self.register_identity(peer_node_addr, identity.pubkey_full());
520
521 if self.find_next_hop(&peer_node_addr).is_some() {
522 match self
523 .initiate_session(peer_node_addr, identity.pubkey_full())
524 .await
525 {
526 Ok(()) => {
527 warmed += 1;
528 budget = budget.saturating_sub(1);
529 debug!(
530 peer = %self.peer_display_name(&peer_node_addr),
531 "Warmed auto-connect peer session over existing FIPS graph"
532 );
533 }
534 Err(NodeError::SendFailed { node_addr, reason })
535 if node_addr == peer_node_addr && reason == "no route to destination" =>
536 {
537 self.maybe_initiate_lookup(&peer_node_addr).await;
538 warmed += 1;
539 budget = budget.saturating_sub(1);
540 }
541 Err(err) => {
542 debug!(
543 peer = %self.peer_display_name(&peer_node_addr),
544 error = %err,
545 "Failed to warm auto-connect peer session"
546 );
547 }
548 }
549 } else {
550 self.maybe_initiate_lookup(&peer_node_addr).await;
551 warmed += 1;
552 budget = budget.saturating_sub(1);
553 }
554 }
555
556 warmed
557 }
558
559 pub(in crate::node) fn graph_session_warmup_budget(&self) -> usize {
560 let max_destinations = self.config.node.session.pending_max_destinations;
561 if max_destinations == 0 {
562 return 0;
563 }
564
565 let pending_sessions = self
566 .sessions
567 .values()
568 .filter(|entry| !entry.is_established())
569 .count();
570 let pending_total = pending_sessions.saturating_add(self.pending_lookups.len());
571 max_destinations
572 .saturating_sub(pending_total)
573 .min(MAX_AUTO_CONNECT_GRAPH_WARMUPS_PER_TICK)
574 }
575
576 fn outbound_handshake_slots(&self) -> usize {
577 let used = self
578 .connections
579 .len()
580 .saturating_add(self.pending_connects.len());
581 if self.max_connections == 0 {
582 usize::MAX
583 } else {
584 self.max_connections.saturating_sub(used)
585 }
586 }
587
588 fn outbound_link_slots(&self) -> usize {
589 if self.max_links == 0 {
590 usize::MAX
591 } else {
592 self.max_links.saturating_sub(self.links.len())
593 }
594 }
595
596 fn path_candidate_attempt_budget(&self, peer_node_addr: &NodeAddr) -> usize {
597 if !self.peers.contains_key(peer_node_addr)
598 && self.max_peers > 0
599 && self.peers.len() >= self.max_peers
600 {
601 return 0;
602 }
603
604 let in_flight_for_peer = self
605 .connections
606 .values()
607 .filter(|conn| {
608 conn.expected_identity()
609 .map(|id| id.node_addr() == peer_node_addr)
610 .unwrap_or(false)
611 })
612 .count()
613 .saturating_add(
614 self.pending_connects
615 .iter()
616 .filter(|pending| pending.peer_identity.node_addr() == peer_node_addr)
617 .count(),
618 );
619
620 self.outbound_handshake_slots()
621 .min(self.outbound_link_slots())
622 .min(MAX_PARALLEL_PATH_CANDIDATES_PER_PEER.saturating_sub(in_flight_for_peer))
623 }
624
625 fn discovery_connect_budget(&self) -> usize {
626 self.outbound_handshake_slots()
627 .min(self.outbound_link_slots())
628 .min(MAX_DISCOVERY_CONNECTS_PER_TICK)
629 }
630
631 fn find_udp_transport_for_remote_addr(
638 &self,
639 remote_addr: SocketAddr,
640 ) -> Option<(TransportId, SocketAddr)> {
641 self.transports
642 .iter()
643 .filter(|(id, handle)| {
644 handle.transport_type().name == "udp"
645 && handle.is_operational()
646 && !self.bootstrap_transports.contains(id)
647 })
648 .filter_map(|(id, handle)| {
649 let local_addr = handle.local_addr()?;
650 socket_addr_families_compatible(local_addr, remote_addr)
651 .then_some((*id, local_addr))
652 })
653 .min_by_key(|(id, _)| id.as_u32())
654 }
655
656 pub(super) fn transport_discovery_candidate(
657 &self,
658 discovered_transport_id: TransportId,
659 discovered_addr: TransportAddr,
660 ) -> Option<(TransportId, TransportAddr, &'static str)> {
661 let transport = self.transports.get(&discovered_transport_id)?;
662 let transport_name = transport.transport_type().name;
663
664 if transport_name != "udp" {
665 return Some((discovered_transport_id, discovered_addr, transport_name));
666 }
667
668 let Some(remote_socket_addr) = discovered_addr
669 .as_str()
670 .and_then(|addr| addr.parse::<SocketAddr>().ok())
671 else {
672 if self.bootstrap_transports.contains(&discovered_transport_id) {
673 debug!(
674 transport_id = %discovered_transport_id,
675 remote_addr = %discovered_addr,
676 "transport discovery: skip non-numeric UDP address from bootstrap transport"
677 );
678 return None;
679 }
680 return Some((discovered_transport_id, discovered_addr, transport_name));
681 };
682
683 let Some((transport_id, local_addr)) =
684 self.find_udp_transport_for_remote_addr(remote_socket_addr)
685 else {
686 debug!(
687 transport_id = %discovered_transport_id,
688 remote_addr = %discovered_addr,
689 "transport discovery: skip UDP peer with no compatible local socket"
690 );
691 return None;
692 };
693
694 if transport_id != discovered_transport_id {
695 debug!(
696 discovered_transport_id = %discovered_transport_id,
697 selected_transport_id = %transport_id,
698 local_addr = %local_addr,
699 remote_addr = %remote_socket_addr,
700 "transport discovery: selected compatible UDP transport"
701 );
702 }
703
704 Some((
705 transport_id,
706 TransportAddr::from_socket_addr(remote_socket_addr),
707 transport_name,
708 ))
709 }
710
711 fn peer_address_string_for_transport_candidate(
712 &self,
713 transport_id: TransportId,
714 transport_name: &str,
715 remote_addr: &TransportAddr,
716 ) -> String {
717 #[cfg(any(target_os = "linux", target_os = "macos"))]
718 if transport_name == "ethernet"
719 && remote_addr.as_bytes().len() == 6
720 && let Some(interface) = self
721 .transports
722 .get(&transport_id)
723 .and_then(|transport| transport.interface_name())
724 {
725 let mut mac = [0u8; 6];
726 mac.copy_from_slice(remote_addr.as_bytes());
727 return format!(
728 "{interface}/{}",
729 crate::transport::ethernet::format_mac(&mac)
730 );
731 }
732
733 remote_addr.to_string()
734 }
735
736 fn resolve_peer_address_for_match(
737 &self,
738 candidate: &PeerAddress,
739 ) -> Option<(TransportId, TransportAddr)> {
740 if candidate.transport == "udp" && candidate.addr.eq_ignore_ascii_case("nat") {
741 return None;
742 }
743
744 if candidate.transport == "ethernet" {
745 return self.resolve_ethernet_addr(&candidate.addr).ok();
746 }
747
748 if candidate.transport == "ble" {
749 #[cfg(bluer_available)]
750 {
751 return self.resolve_ble_addr(&candidate.addr).ok();
752 }
753 #[cfg(not(bluer_available))]
754 {
755 return None;
756 }
757 }
758
759 let transport_id = if candidate.transport == "udp"
760 && let Ok(remote_socket_addr) = candidate.addr.parse::<SocketAddr>()
761 {
762 self.find_udp_transport_for_remote_addr(remote_socket_addr)
763 .map(|(id, _)| id)?
764 } else {
765 self.find_transport_for_type(&candidate.transport)?
766 };
767
768 Some((transport_id, TransportAddr::from_string(&candidate.addr)))
769 }
770
771 pub(super) async fn initiate_connection(
782 &mut self,
783 transport_id: TransportId,
784 remote_addr: TransportAddr,
785 peer_identity: PeerIdentity,
786 ) -> Result<(), NodeError> {
787 let peer_node_addr = *peer_identity.node_addr();
788
789 if self.is_connecting_to_peer_on_path(&peer_node_addr, transport_id, &remote_addr) {
790 debug!(
791 peer = %self.peer_display_name(&peer_node_addr),
792 transport_id = %transport_id,
793 remote_addr = %remote_addr,
794 "Connection already in progress for candidate path"
795 );
796 return Ok(());
797 }
798
799 if self.outbound_handshake_slots() == 0 {
800 return Err(NodeError::MaxConnectionsExceeded {
801 max: self.max_connections,
802 });
803 }
804
805 if self.outbound_link_slots() == 0 {
806 return Err(NodeError::MaxLinksExceeded {
807 max: self.max_links,
808 });
809 }
810
811 if !self.peers.contains_key(&peer_node_addr)
812 && self.max_peers > 0
813 && self.peers.len() >= self.max_peers
814 {
815 return Err(NodeError::MaxPeersExceeded {
816 max: self.max_peers,
817 });
818 }
819
820 self.authorize_peer(
821 &peer_identity,
822 PeerAclContext::OutboundConnect,
823 transport_id,
824 &remote_addr,
825 )?;
826
827 let is_connection_oriented = self
828 .transports
829 .get(&transport_id)
830 .map(|t| t.transport_type().connection_oriented)
831 .unwrap_or(false);
832
833 let link_id = self.allocate_link_id();
835
836 let link = if is_connection_oriented {
837 Link::new(
838 link_id,
839 transport_id,
840 remote_addr.clone(),
841 LinkDirection::Outbound,
842 Duration::from_millis(self.config.node.base_rtt_ms),
843 )
844 } else {
845 Link::connectionless(
846 link_id,
847 transport_id,
848 remote_addr.clone(),
849 LinkDirection::Outbound,
850 Duration::from_millis(self.config.node.base_rtt_ms),
851 )
852 };
853
854 self.links.insert(link_id, link);
855
856 self.addr_to_link
858 .insert((transport_id, remote_addr.clone()), link_id);
859
860 if is_connection_oriented {
861 if let Some(transport) = self.transports.get(&transport_id) {
863 match transport.connect(&remote_addr).await {
864 Ok(()) => {
865 debug!(
866 peer = %self.peer_display_name(&peer_node_addr),
867 transport_id = %transport_id,
868 remote_addr = %remote_addr,
869 link_id = %link_id,
870 "Transport connect initiated (non-blocking)"
871 );
872 self.pending_connects.push(super::PendingConnect {
873 link_id,
874 transport_id,
875 remote_addr,
876 peer_identity,
877 });
878 }
879 Err(e) => {
880 self.links.remove(&link_id);
882 self.addr_to_link.remove(&(transport_id, remote_addr));
883 return Err(NodeError::TransportError(e.to_string()));
884 }
885 }
886 }
887 Ok(())
888 } else {
889 self.start_handshake(link_id, transport_id, remote_addr, peer_identity)
891 .await
892 }
893 }
894
895 pub(super) async fn start_handshake(
900 &mut self,
901 link_id: LinkId,
902 transport_id: TransportId,
903 remote_addr: TransportAddr,
904 peer_identity: PeerIdentity,
905 ) -> Result<(), NodeError> {
906 let peer_node_addr = *peer_identity.node_addr();
907
908 let current_time_ms = Self::now_ms();
910 let mut connection = PeerConnection::outbound(link_id, peer_identity, current_time_ms);
911
912 let our_index = match self.index_allocator.allocate() {
914 Ok(idx) => idx,
915 Err(e) => {
916 self.links.remove(&link_id);
918 self.addr_to_link.remove(&(transport_id, remote_addr));
919 return Err(NodeError::IndexAllocationFailed(e.to_string()));
920 }
921 };
922
923 let our_keypair = self.identity.keypair();
925 let noise_msg1 =
926 match connection.start_handshake(our_keypair, self.startup_epoch, current_time_ms) {
927 Ok(msg) => msg,
928 Err(e) => {
929 let _ = self.index_allocator.free(our_index);
931 self.links.remove(&link_id);
932 self.addr_to_link.remove(&(transport_id, remote_addr));
933 return Err(NodeError::HandshakeFailed(e.to_string()));
934 }
935 };
936
937 connection.set_our_index(our_index);
939 connection.set_transport_id(transport_id);
940 connection.set_source_addr(remote_addr.clone());
941
942 let wire_msg1 = build_msg1(our_index, &noise_msg1);
944
945 debug!(
946 peer = %self.peer_display_name(&peer_node_addr),
947 transport_id = %transport_id,
948 remote_addr = %remote_addr,
949 link_id = %link_id,
950 our_index = %our_index,
951 "Connection initiated"
952 );
953
954 let resend_interval = self.config.node.rate_limit.handshake_resend_interval_ms;
956 connection.set_handshake_msg1(wire_msg1.clone(), current_time_ms + resend_interval);
957
958 self.pending_outbound
960 .insert((transport_id, our_index.as_u32()), link_id);
961 self.connections.insert(link_id, connection);
962
963 let send_result = match self.transports.get(&transport_id) {
968 Some(transport) => Some(transport.send(&remote_addr, &wire_msg1).await),
969 None => None,
970 };
971 match send_result {
972 Some(send_result) => {
973 self.note_local_send_outcome(&send_result);
974 match send_result {
975 Ok(bytes) => {
976 debug!(
977 link_id = %link_id,
978 our_index = %our_index,
979 bytes,
980 "Sent Noise handshake message 1 (wire format)"
981 );
982 }
983 Err(e) => {
984 warn!(
985 link_id = %link_id,
986 transport_id = %transport_id,
987 remote_addr = %remote_addr,
988 our_index = %our_index,
989 error = %e,
990 "Failed to send handshake message"
991 );
992 self.pending_outbound
993 .remove(&(transport_id, our_index.as_u32()));
994 self.connections.remove(&link_id);
995 self.links.remove(&link_id);
996 self.addr_to_link
997 .remove(&(transport_id, remote_addr.clone()));
998 let _ = self.index_allocator.free(our_index);
999 return Err(NodeError::TransportError(e.to_string()));
1000 }
1001 }
1002 }
1003 None => {
1004 self.pending_outbound
1005 .remove(&(transport_id, our_index.as_u32()));
1006 self.connections.remove(&link_id);
1007 self.links.remove(&link_id);
1008 self.addr_to_link
1009 .remove(&(transport_id, remote_addr.clone()));
1010 let _ = self.index_allocator.free(our_index);
1011 return Err(NodeError::TransportError(format!(
1012 "transport {transport_id} disappeared before first handshake send"
1013 )));
1014 }
1015 }
1016
1017 Ok(())
1018 }
1019
1020 pub(super) async fn poll_transport_discovery(&mut self) {
1026 let mut to_connect = Vec::new();
1028 let mut queued_per_peer: HashMap<NodeAddr, usize> = HashMap::new();
1029 let mut connect_budget = self.discovery_connect_budget();
1030 let mut skipped_budget = 0usize;
1031
1032 for transport in self.transports.values() {
1033 if !transport.is_operational() {
1034 continue;
1035 }
1036 if !transport.auto_connect() {
1037 let _ = transport.discover();
1039 continue;
1040 }
1041 let discovered = match transport.discover() {
1042 Ok(peers) => peers,
1043 Err(_) => continue,
1044 };
1045 for peer in discovered {
1046 let discovered_transport_id = peer.transport_id;
1047 let pubkey = match peer.pubkey_hint {
1048 Some(pk) => pk,
1049 None => continue,
1050 };
1051 let identity = PeerIdentity::from_pubkey(pubkey);
1052 let node_addr = *identity.node_addr();
1053
1054 if node_addr == *self.identity.node_addr() {
1056 continue;
1057 }
1058
1059 let Some((candidate_transport_id, remote_addr, transport_name)) =
1060 self.transport_discovery_candidate(discovered_transport_id, peer.addr)
1061 else {
1062 continue;
1063 };
1064
1065 if self.peers.contains_key(&node_addr) {
1066 let candidate = PeerAddress::new(
1067 transport_name,
1068 self.peer_address_string_for_transport_candidate(
1069 candidate_transport_id,
1070 transport_name,
1071 &remote_addr,
1072 ),
1073 );
1074 if self.active_peer_candidate_is_fresh_enough_to_skip(
1075 &node_addr,
1076 std::slice::from_ref(&candidate),
1077 ) {
1078 continue;
1079 }
1080 if self.is_connecting_to_peer_on_path(
1081 &node_addr,
1082 candidate_transport_id,
1083 &remote_addr,
1084 ) {
1085 continue;
1086 }
1087 let queued_for_peer = queued_per_peer.get(&node_addr).copied().unwrap_or(0);
1088 if connect_budget == 0
1089 || self
1090 .path_candidate_attempt_budget(&node_addr)
1091 .saturating_sub(queued_for_peer)
1092 == 0
1093 {
1094 skipped_budget = skipped_budget.saturating_add(1);
1095 continue;
1096 }
1097 to_connect.push((candidate_transport_id, remote_addr, identity, true));
1098 *queued_per_peer.entry(node_addr).or_default() += 1;
1099 connect_budget = connect_budget.saturating_sub(1);
1100 continue;
1101 }
1102
1103 if self.is_connecting_to_peer_on_path(
1104 &node_addr,
1105 candidate_transport_id,
1106 &remote_addr,
1107 ) {
1108 continue;
1109 }
1110
1111 let queued_for_peer = queued_per_peer.get(&node_addr).copied().unwrap_or(0);
1112 if connect_budget == 0
1113 || self
1114 .path_candidate_attempt_budget(&node_addr)
1115 .saturating_sub(queued_for_peer)
1116 == 0
1117 {
1118 skipped_budget = skipped_budget.saturating_add(1);
1119 continue;
1120 }
1121 to_connect.push((candidate_transport_id, remote_addr, identity, false));
1122 *queued_per_peer.entry(node_addr).or_default() += 1;
1123 connect_budget = connect_budget.saturating_sub(1);
1124 }
1125 }
1126
1127 if skipped_budget > 0 {
1128 debug!(
1129 skipped = skipped_budget,
1130 queued = to_connect.len(),
1131 "Transport discovery connect budget exhausted"
1132 );
1133 }
1134
1135 for (transport_id, remote_addr, identity, active_refresh) in to_connect {
1136 info!(
1137 peer = %self.peer_display_name(identity.node_addr()),
1138 transport_id = %transport_id,
1139 remote_addr = %remote_addr,
1140 active_refresh,
1141 "Auto-connecting to discovered peer"
1142 );
1143 if let Err(e) = self
1144 .initiate_connection(transport_id, remote_addr, identity)
1145 .await
1146 {
1147 warn!(error = %e, "Failed to auto-connect to discovered peer");
1148 }
1149 }
1150 }
1151
1152 pub(super) async fn poll_nostr_discovery(&mut self) {
1153 let Some(bootstrap) = self.nostr_discovery.clone() else {
1154 return;
1155 };
1156
1157 if let Err(err) = self.refresh_overlay_advert(&bootstrap).await {
1158 debug!(error = %err, "Failed to refresh local Nostr overlay advert");
1159 }
1160
1161 for event in bootstrap.drain_events().await {
1162 match event {
1163 BootstrapEvent::Established { traversal } => {
1164 let peer_npub = traversal.peer_npub.clone();
1165 match self.adopt_established_traversal(traversal).await {
1166 Ok(_) => {
1167 info!(peer_npub = %peer_npub, "Adopted NAT traversal socket");
1168 }
1169 Err(err) => {
1170 warn!(peer_npub = %peer_npub, error = %err, "Failed to adopt NAT traversal");
1171 if let Ok(peer_identity) = PeerIdentity::from_npub(&peer_npub) {
1172 self.schedule_retry(*peer_identity.node_addr(), Self::now_ms());
1173 }
1174 }
1175 }
1176 }
1177 BootstrapEvent::Failed {
1178 peer_config,
1179 reason,
1180 } => {
1181 let peer_identity = match PeerIdentity::from_npub(&peer_config.npub) {
1182 Ok(identity) => identity,
1183 Err(_) => continue,
1184 };
1185 let node_addr = *peer_identity.node_addr();
1186 if self.peers.contains_key(&node_addr) {
1187 debug!(
1188 npub = %peer_config.npub,
1189 error = %reason,
1190 "Ignoring failed NAT traversal for already-connected peer"
1191 );
1192 continue;
1193 }
1194 if self.is_connecting_to_peer(&node_addr) {
1195 debug!(
1196 npub = %peer_config.npub,
1197 error = %reason,
1198 "Ignoring failed NAT traversal while peer handshake is already in progress"
1199 );
1200 continue;
1201 }
1202
1203 let now_ms = Self::now_ms();
1204 let decision = bootstrap.record_traversal_failure(&peer_config.npub, now_ms);
1205 if decision.should_warn {
1206 warn!(
1207 npub = %peer_config.npub,
1208 error = %reason,
1209 consecutive_failures = decision.consecutive_failures,
1210 cooldown_secs = decision
1211 .cooldown_until_ms
1212 .map(|t| t.saturating_sub(now_ms) / 1000),
1213 "NAT traversal failed"
1214 );
1215 } else {
1216 debug!(
1217 npub = %peer_config.npub,
1218 error = %reason,
1219 consecutive_failures = decision.consecutive_failures,
1220 "NAT traversal failed (suppressed by warn-rate-limit)"
1221 );
1222 }
1223
1224 if decision.crossed_threshold {
1228 bootstrap
1229 .request_advert_stale_check(peer_config.npub.clone())
1230 .await;
1231 }
1232
1233 if self
1234 .try_peer_addresses(&peer_config, peer_identity, false)
1235 .await
1236 .is_ok()
1237 {
1238 continue;
1239 }
1240
1241 self.schedule_retry(node_addr, now_ms);
1242 if let Some(cooldown_until_ms) = decision.cooldown_until_ms
1243 && let Some(state) = self.retry_pending.get_mut(&node_addr)
1244 {
1245 state.retry_after_ms = state.retry_after_ms.max(cooldown_until_ms);
1249 }
1250 }
1251 }
1252 }
1253
1254 self.maybe_run_startup_open_discovery_sweep(&bootstrap)
1255 .await;
1256 self.queue_open_discovery_retries(&bootstrap).await;
1257 }
1258
1259 pub(super) fn lan_discovery_scope(&self) -> Option<String> {
1265 if let Some(scope) = self.config.node.discovery.lan.scope.as_deref() {
1266 let scope = scope.trim();
1267 if !scope.is_empty() {
1268 return Some(scope.to_string());
1269 }
1270 }
1271
1272 let app = self.config.node.discovery.nostr.app.trim();
1273 if app.is_empty() {
1274 return None;
1275 }
1276 if let Some(rest) = app.strip_prefix("fips-overlay-v1:") {
1277 let scope = rest.trim();
1278 if scope.is_empty() {
1279 None
1280 } else {
1281 Some(scope.to_string())
1282 }
1283 } else {
1284 Some(app.to_string())
1285 }
1286 }
1287
1288 pub(super) async fn poll_lan_discovery(&mut self) {
1295 let Some(runtime) = self.lan_discovery.clone() else {
1296 return;
1297 };
1298 let events = runtime.drain_events().await;
1299 if events.is_empty() {
1300 return;
1301 }
1302 let mut connect_budget = self.discovery_connect_budget();
1303 let mut skipped_budget = 0usize;
1304 for event in events {
1305 let crate::discovery::lan::LanEvent::Discovered(peer) = event;
1306 let Some((transport_id, local_addr)) =
1307 self.find_udp_transport_for_remote_addr(peer.addr)
1308 else {
1309 debug!(
1310 addr = %peer.addr,
1311 "lan: skip discovered peer with no compatible UDP transport"
1312 );
1313 continue;
1314 };
1315 let identity = match crate::PeerIdentity::from_npub(&peer.npub) {
1316 Ok(id) => id,
1317 Err(err) => {
1318 debug!(npub = %peer.npub, error = %err, "lan: skip bad npub");
1319 continue;
1320 }
1321 };
1322 let peer_node_addr = *identity.node_addr();
1323 let remote_addr = crate::transport::TransportAddr::from_string(&peer.addr.to_string());
1324 if self.peers.contains_key(&peer_node_addr) {
1325 let candidate = PeerAddress::new("udp", peer.addr.to_string());
1326 if self.active_peer_candidate_is_fresh_enough_to_skip(
1327 &peer_node_addr,
1328 std::slice::from_ref(&candidate),
1329 ) {
1330 continue;
1331 }
1332 if self.is_connecting_to_peer_on_path(&peer_node_addr, transport_id, &remote_addr) {
1333 continue;
1334 }
1335 if connect_budget == 0 || self.path_candidate_attempt_budget(&peer_node_addr) == 0 {
1336 skipped_budget = skipped_budget.saturating_add(1);
1337 continue;
1338 }
1339 info!(
1340 npub = %identity.short_npub(),
1341 addr = %peer.addr,
1342 local_addr = %local_addr,
1343 "lan: initiating alternate-path handshake to active peer"
1344 );
1345 if let Err(err) = self
1346 .initiate_connection(transport_id, remote_addr, identity)
1347 .await
1348 {
1349 debug!(
1350 npub = %peer.npub,
1351 error = %err,
1352 "lan: failed to initiate active peer alternate-path handshake"
1353 );
1354 }
1355 connect_budget = connect_budget.saturating_sub(1);
1356 continue;
1357 }
1358 if self.is_connecting_to_peer_on_path(&peer_node_addr, transport_id, &remote_addr) {
1359 continue;
1360 }
1361 if connect_budget == 0 || self.path_candidate_attempt_budget(&peer_node_addr) == 0 {
1362 skipped_budget = skipped_budget.saturating_add(1);
1363 continue;
1364 }
1365 info!(
1366 npub = %identity.short_npub(),
1367 addr = %peer.addr,
1368 local_addr = %local_addr,
1369 "lan: initiating handshake to discovered peer"
1370 );
1371 if let Err(err) = self
1372 .initiate_connection(transport_id, remote_addr, identity)
1373 .await
1374 {
1375 debug!(
1376 npub = %peer.npub,
1377 error = %err,
1378 "lan: failed to initiate connection to discovered peer"
1379 );
1380 }
1381 connect_budget = connect_budget.saturating_sub(1);
1382 }
1383 if skipped_budget > 0 {
1384 debug!(
1385 skipped = skipped_budget,
1386 "lan: discovery connect budget exhausted"
1387 );
1388 }
1389 }
1390
1391 pub(super) async fn poll_pending_connects(&mut self) {
1398 if self.pending_connects.is_empty() {
1399 return;
1400 }
1401
1402 let mut completed = Vec::new();
1403
1404 for (i, pending) in self.pending_connects.iter().enumerate() {
1405 let state = if let Some(transport) = self.transports.get(&pending.transport_id) {
1406 transport.connection_state(&pending.remote_addr)
1407 } else {
1408 crate::transport::ConnectionState::Failed("transport removed".into())
1409 };
1410
1411 match state {
1412 crate::transport::ConnectionState::Connected => {
1413 completed.push((i, true, None));
1414 }
1415 crate::transport::ConnectionState::Failed(reason) => {
1416 completed.push((i, false, Some(reason)));
1417 }
1418 crate::transport::ConnectionState::Connecting => {
1419 }
1421 crate::transport::ConnectionState::None => {
1422 completed.push((i, false, Some("no connection attempt found".into())));
1424 }
1425 }
1426 }
1427
1428 for (i, success, reason) in completed.into_iter().rev() {
1430 let pending = self.pending_connects.remove(i);
1431
1432 if success {
1433 if let Some(link) = self.links.get_mut(&pending.link_id) {
1435 link.set_connected();
1436 }
1437
1438 debug!(
1439 peer = %self.peer_display_name(pending.peer_identity.node_addr()),
1440 transport_id = %pending.transport_id,
1441 remote_addr = %pending.remote_addr,
1442 link_id = %pending.link_id,
1443 "Transport connected, starting handshake"
1444 );
1445
1446 if let Err(e) = self
1448 .start_handshake(
1449 pending.link_id,
1450 pending.transport_id,
1451 pending.remote_addr.clone(),
1452 pending.peer_identity,
1453 )
1454 .await
1455 {
1456 warn!(
1457 link_id = %pending.link_id,
1458 error = %e,
1459 "Failed to start handshake after transport connect"
1460 );
1461 self.remove_link(&pending.link_id);
1463 }
1464 } else {
1465 let reason = reason.unwrap_or_default();
1466 warn!(
1467 peer = %self.peer_display_name(pending.peer_identity.node_addr()),
1468 transport_id = %pending.transport_id,
1469 remote_addr = %pending.remote_addr,
1470 link_id = %pending.link_id,
1471 reason = %reason,
1472 "Transport connect failed"
1473 );
1474
1475 self.remove_link(&pending.link_id);
1477 self.links.remove(&pending.link_id);
1478 self.schedule_retry(*pending.peer_identity.node_addr(), Self::now_ms());
1479 }
1480 }
1481 }
1482
1483 pub async fn start(&mut self) -> Result<(), NodeError> {
1490 node_start_debug_log("Node::start begin");
1491 if !self.state.can_start() {
1492 return Err(NodeError::AlreadyStarted);
1493 }
1494 self.state = NodeState::Starting;
1495 node_start_debug_log("Node::start state set to starting");
1496
1497 let packet_buffer_size = self.config.node.buffers.packet_channel;
1499 let (packet_tx, packet_rx) = packet_channel(packet_buffer_size);
1500 self.packet_tx = Some(packet_tx.clone());
1501 self.packet_rx = Some(packet_rx);
1502 node_start_debug_log("Node::start packet channel created");
1503
1504 node_start_debug_log("Node::start create transports begin");
1506 let transport_handles = self.create_transports(&packet_tx).await;
1507 node_start_debug_log(format!(
1508 "Node::start create transports complete count={}",
1509 transport_handles.len()
1510 ));
1511
1512 for mut handle in transport_handles {
1513 let transport_id = handle.transport_id();
1514 let transport_type = handle.transport_type().name;
1515 let name = handle.name().map(|s| s.to_string());
1516
1517 node_start_debug_log(format!(
1518 "Node::start transport start begin id={} type={} name={:?}",
1519 transport_id, transport_type, name
1520 ));
1521 match handle.start().await {
1522 Ok(()) => {
1523 node_start_debug_log(format!(
1524 "Node::start transport start ok id={} type={}",
1525 transport_id, transport_type
1526 ));
1527 self.transports.insert(transport_id, handle);
1528 }
1529 Err(e) => {
1530 node_start_debug_log(format!(
1531 "Node::start transport start error id={} type={} error={}",
1532 transport_id, transport_type, e
1533 ));
1534 if let Some(ref n) = name {
1535 warn!(transport_type, name = %n, error = %e, "Transport failed to start");
1536 } else {
1537 warn!(transport_type, error = %e, "Transport failed to start");
1538 }
1539 }
1540 }
1541 }
1542
1543 if !self.transports.is_empty() {
1544 info!(count = self.transports.len(), "Transports initialized");
1545 }
1546
1547 #[cfg(unix)]
1563 {
1564 if self.config.node.worker_pools_enabled {
1565 node_start_debug_log("Node::start worker pools begin");
1566 let cpu_default = std::thread::available_parallelism()
1567 .map(|n| n.get())
1568 .unwrap_or(1)
1569 .max(1);
1570 let encrypt_worker_count: usize = std::env::var("FIPS_ENCRYPT_WORKERS")
1571 .ok()
1572 .and_then(|s| s.parse().ok())
1573 .unwrap_or(cpu_default)
1574 .max(1);
1575 self.encrypt_workers = Some(super::encrypt_worker::EncryptWorkerPool::spawn(
1576 encrypt_worker_count,
1577 ));
1578 info!(
1579 workers = encrypt_worker_count,
1580 "Spawned FMP-encrypt worker pool"
1581 );
1582
1583 let decrypt_worker_count: usize = std::env::var("FIPS_DECRYPT_WORKERS")
1592 .ok()
1593 .and_then(|s| s.parse().ok())
1594 .unwrap_or(cpu_default);
1595 if decrypt_worker_count == 0 {
1596 info!("FIPS_DECRYPT_WORKERS=0 → in-line decrypt in rx_loop (no worker pool)");
1597 } else {
1598 self.decrypt_workers = Some(super::decrypt_worker::DecryptWorkerPool::spawn(
1599 decrypt_worker_count,
1600 ));
1601 info!(
1602 workers = decrypt_worker_count,
1603 "Spawned FMP+FSP-decrypt worker pool"
1604 );
1605 }
1606 node_start_debug_log("Node::start worker pools complete");
1607 } else {
1608 node_start_debug_log("Node::start worker pools disabled");
1609 info!("FIPS worker pools disabled; using in-line crypto/send path");
1610 }
1611 }
1612
1613 if self.config.node.discovery.nostr.enabled {
1614 node_start_debug_log("Node::start nostr discovery start begin");
1615 match NostrDiscovery::start(&self.identity, self.config.node.discovery.nostr.clone())
1616 .await
1617 {
1618 Ok(runtime) => {
1619 node_start_debug_log("Node::start nostr discovery runtime created");
1620 if let Err(err) = self.refresh_overlay_advert(&runtime).await {
1621 warn!(error = %err, "Failed to publish initial Nostr overlay advert");
1622 }
1623 node_start_debug_log("Node::start nostr overlay advert refreshed");
1624 self.nostr_discovery = Some(runtime);
1625 self.nostr_discovery_started_at_ms = Some(Self::now_ms());
1626 info!("Nostr overlay discovery enabled");
1627 }
1628 Err(err) => {
1629 node_start_debug_log(format!(
1630 "Node::start nostr discovery start error error={}",
1631 err
1632 ));
1633 warn!(error = %err, "Failed to start Nostr overlay discovery");
1634 }
1635 }
1636 }
1637
1638 if self.config.node.discovery.lan.enabled {
1642 node_start_debug_log("Node::start lan discovery start begin");
1643 let advertised_udp_port = self
1644 .transports
1645 .values()
1646 .filter(|h| h.is_operational())
1647 .filter(|h| h.transport_type().name == "udp")
1648 .find_map(|h| h.local_addr().map(|addr| addr.port()))
1649 .unwrap_or(0);
1650 let scope = self.lan_discovery_scope();
1651 match crate::discovery::lan::LanDiscovery::start(
1652 &self.identity,
1653 scope,
1654 advertised_udp_port,
1655 self.config.node.discovery.lan.clone(),
1656 )
1657 .await
1658 {
1659 Ok(runtime) => {
1660 node_start_debug_log("Node::start lan discovery start ok");
1661 self.lan_discovery = Some(runtime);
1662 info!("LAN mDNS discovery enabled");
1663 }
1664 Err(err) => {
1665 node_start_debug_log(format!(
1666 "Node::start lan discovery start error error={}",
1667 err
1668 ));
1669 debug!(error = %err, "LAN mDNS discovery not started");
1670 }
1671 }
1672 }
1673
1674 node_start_debug_log("Node::start initiate peer connections begin");
1677 self.initiate_peer_connections().await;
1678 node_start_debug_log("Node::start initiate peer connections complete");
1679
1680 if self.config.tun.enabled {
1682 node_start_debug_log("Node::start tun init begin");
1683 let address = *self.identity.address();
1684 match TunDevice::create(&self.config.tun, address).await {
1685 Ok(device) => {
1686 let mtu = device.mtu();
1687 let name = device.name().to_string();
1688 let our_addr = *device.address();
1689
1690 info!("TUN device active:");
1691 info!(" name: {}", name);
1692 info!(" address: {}", device.address());
1693 info!(" mtu: {}", mtu);
1694
1695 let effective_mtu = self.effective_ipv6_mtu();
1697 let max_mss = effective_mtu.saturating_sub(40).saturating_sub(20); info!("effective MTU: {} bytes", effective_mtu);
1700 debug!(" max TCP MSS: {} bytes", max_mss);
1701
1702 #[cfg(target_os = "macos")]
1706 let (shutdown_read_fd, shutdown_write_fd) = {
1707 let mut fds = [0i32; 2];
1708 if unsafe { libc::pipe(fds.as_mut_ptr()) } < 0 {
1709 return Err(NodeError::Tun(crate::upper::tun::TunError::Configure(
1710 "failed to create shutdown pipe".into(),
1711 )));
1712 }
1713 (fds[0], fds[1])
1714 };
1715
1716 let (writer, tun_tx) =
1720 device.create_writer(max_mss, self.path_mtu_lookup.clone())?;
1721
1722 let writer_handle = thread::spawn(move || {
1724 writer.run();
1725 });
1726
1727 let reader_tun_tx = tun_tx.clone();
1729
1730 let tun_channel_size = self.config.node.buffers.tun_channel;
1732 let (outbound_tx, outbound_rx) = tokio::sync::mpsc::channel(tun_channel_size);
1733
1734 let transport_mtu = self.transport_mtu();
1736 let path_mtu_lookup = self.path_mtu_lookup.clone();
1737 #[cfg(target_os = "macos")]
1738 let reader_handle = thread::spawn(move || {
1739 run_tun_reader(
1740 device,
1741 mtu,
1742 our_addr,
1743 reader_tun_tx,
1744 outbound_tx,
1745 transport_mtu,
1746 path_mtu_lookup,
1747 shutdown_read_fd,
1748 );
1749 });
1750 #[cfg(not(target_os = "macos"))]
1751 let reader_handle = thread::spawn(move || {
1752 run_tun_reader(
1753 device,
1754 mtu,
1755 our_addr,
1756 reader_tun_tx,
1757 outbound_tx,
1758 transport_mtu,
1759 path_mtu_lookup,
1760 );
1761 });
1762
1763 self.tun_state = TunState::Active;
1764 self.tun_name = Some(name);
1765 self.tun_tx = Some(tun_tx);
1766 self.tun_outbound_rx = Some(outbound_rx);
1767 self.tun_reader_handle = Some(reader_handle);
1768 self.tun_writer_handle = Some(writer_handle);
1769 #[cfg(target_os = "macos")]
1770 {
1771 self.tun_shutdown_fd = Some(shutdown_write_fd);
1772 }
1773 }
1774 Err(e) => {
1775 self.tun_state = TunState::Failed;
1776 warn!(error = %e, "Failed to initialize TUN, continuing without it");
1777 }
1778 }
1779 node_start_debug_log("Node::start tun init complete");
1780 }
1781
1782 if self.config.dns.enabled {
1799 node_start_debug_log("Node::start dns init begin");
1800 let addr_str = self.config.dns.bind_addr();
1801 match addr_str.parse::<std::net::IpAddr>() {
1802 Ok(ip) => {
1803 let bind = std::net::SocketAddr::new(ip, self.config.dns.port());
1804 match Self::bind_dns_socket(bind) {
1805 Ok(socket) => {
1806 let dns_channel_size = self.config.node.buffers.dns_channel;
1807 let (identity_tx, identity_rx) =
1808 tokio::sync::mpsc::channel(dns_channel_size);
1809 let dns_ttl = self.config.dns.ttl();
1810 let base_hosts = crate::upper::hosts::HostMap::from_peer_configs(
1811 self.config.peers(),
1812 );
1813 let reloader = if self.config.node.system_files_enabled {
1814 let hosts_path = std::path::PathBuf::from(
1815 crate::upper::hosts::DEFAULT_HOSTS_PATH,
1816 );
1817 crate::upper::hosts::HostMapReloader::new(base_hosts, hosts_path)
1818 } else {
1819 crate::upper::hosts::HostMapReloader::memory_only(base_hosts)
1820 };
1821 let mesh_ifindex = Self::lookup_mesh_ifindex(self.config.tun.name());
1829 info!(
1830 bind = %bind,
1831 hosts = reloader.hosts().len(),
1832 mesh_ifindex = ?mesh_ifindex,
1833 "DNS responder started for .fips domain (auto-reload enabled)"
1834 );
1835 let handle = tokio::spawn(crate::upper::dns::run_dns_responder(
1836 socket,
1837 identity_tx,
1838 dns_ttl,
1839 reloader,
1840 mesh_ifindex,
1841 ));
1842 self.dns_identity_rx = Some(identity_rx);
1843 self.dns_task = Some(handle);
1844 }
1845 Err(e) => {
1846 warn!(bind = %bind, error = %e, "Failed to start DNS responder");
1847 }
1848 }
1849 }
1850 Err(e) => {
1851 warn!(addr = %addr_str, error = %e, "Invalid dns.bind_addr; DNS responder not started");
1852 }
1853 }
1854 node_start_debug_log("Node::start dns init complete");
1855 }
1856
1857 self.state = NodeState::Running;
1858 node_start_debug_log("Node::start running");
1859 info!("Node started:");
1860 info!(" state: {}", self.state);
1861 info!(" transports: {}", self.transports.len());
1862 info!(" connections: {}", self.connections.len());
1863 Ok(())
1864 }
1865
1866 fn bind_dns_socket(
1879 addr: std::net::SocketAddr,
1880 ) -> Result<tokio::net::UdpSocket, std::io::Error> {
1881 use socket2::{Domain, Protocol, Socket, Type};
1882 let domain = if addr.is_ipv4() {
1883 Domain::IPV4
1884 } else {
1885 Domain::IPV6
1886 };
1887 let sock = Socket::new(domain, Type::DGRAM, Some(Protocol::UDP))?;
1888 if addr.is_ipv6() {
1889 sock.set_only_v6(false)?;
1890 #[cfg(unix)]
1891 Self::set_recv_pktinfo_v6(&sock)?;
1892 }
1893 sock.set_nonblocking(true)?;
1894 sock.bind(&addr.into())?;
1895 tokio::net::UdpSocket::from_std(sock.into())
1896 }
1897
1898 #[cfg(unix)]
1904 fn set_recv_pktinfo_v6(sock: &socket2::Socket) -> Result<(), std::io::Error> {
1905 use std::os::fd::AsRawFd;
1906 let enable: libc::c_int = 1;
1907 let ret = unsafe {
1908 libc::setsockopt(
1909 sock.as_raw_fd(),
1910 libc::IPPROTO_IPV6,
1911 libc::IPV6_RECVPKTINFO,
1912 &enable as *const _ as *const libc::c_void,
1913 std::mem::size_of::<libc::c_int>() as libc::socklen_t,
1914 )
1915 };
1916 if ret < 0 {
1917 return Err(std::io::Error::last_os_error());
1918 }
1919 Ok(())
1920 }
1921
1922 fn lookup_mesh_ifindex(name: &str) -> Option<u32> {
1929 #[cfg(unix)]
1930 {
1931 let c_name = std::ffi::CString::new(name).ok()?;
1932 let idx = unsafe { libc::if_nametoindex(c_name.as_ptr()) };
1933 if idx == 0 { None } else { Some(idx) }
1934 }
1935 #[cfg(not(unix))]
1936 {
1937 let _ = name;
1938 None
1939 }
1940 }
1941
1942 pub async fn stop(&mut self) -> Result<(), NodeError> {
1947 if !self.state.can_stop() {
1948 return Err(NodeError::NotStarted);
1949 }
1950 self.state = NodeState::Stopping;
1951 info!(state = %self.state, "Node stopping");
1952
1953 if let Some(handle) = self.dns_task.take() {
1955 handle.abort();
1956 debug!("DNS responder stopped");
1957 }
1958
1959 self.send_disconnect_to_all_peers(DisconnectReason::Shutdown)
1961 .await;
1962
1963 if let Some(bootstrap) = self.nostr_discovery.take()
1965 && let Err(e) = bootstrap.shutdown().await
1966 {
1967 warn!(error = %e, "Failed to shutdown Nostr overlay discovery");
1968 }
1969
1970 if let Some(lan) = self.lan_discovery.take() {
1974 lan.shutdown().await;
1975 }
1976
1977 let transport_ids: Vec<_> = self.transports.keys().cloned().collect();
1979 for transport_id in transport_ids {
1980 if let Some(mut handle) = self.transports.remove(&transport_id) {
1981 let transport_type = handle.transport_type().name;
1982 match handle.stop().await {
1983 Ok(()) => {
1984 info!(transport_id = %transport_id, transport_type, "Transport stopped");
1985 }
1986 Err(e) => {
1987 warn!(
1988 transport_id = %transport_id,
1989 transport_type,
1990 error = %e,
1991 "Transport stop failed"
1992 );
1993 }
1994 }
1995 }
1996 }
1997
1998 self.packet_tx.take();
2000 self.packet_rx.take();
2001
2002 if let Some(name) = self.tun_name.take() {
2004 info!(name = %name, "Shutting down TUN interface");
2005
2006 self.tun_tx.take();
2008
2009 if let Err(e) = shutdown_tun_interface(&name).await {
2011 warn!(name = %name, error = %e, "Failed to shutdown TUN interface");
2012 }
2013
2014 #[cfg(target_os = "macos")]
2017 if let Some(fd) = self.tun_shutdown_fd.take() {
2018 unsafe {
2019 libc::write(fd, b"x".as_ptr() as *const libc::c_void, 1);
2020 libc::close(fd);
2021 }
2022 }
2023
2024 if let Some(handle) = self.tun_reader_handle.take() {
2026 let _ = handle.join();
2027 }
2028 if let Some(handle) = self.tun_writer_handle.take() {
2029 let _ = handle.join();
2030 }
2031
2032 self.tun_state = TunState::Disabled;
2033 }
2034
2035 self.state = NodeState::Stopped;
2036 info!(state = %self.state, "Node stopped");
2037 Ok(())
2038 }
2039
2040 async fn send_disconnect_to_all_peers(&mut self, reason: DisconnectReason) {
2045 let disconnect = Disconnect::new(reason);
2046 let plaintext = disconnect.encode();
2047
2048 let peer_addrs: Vec<NodeAddr> = self
2050 .peers
2051 .iter()
2052 .filter(|(_, peer)| peer.can_send() && peer.has_session())
2053 .map(|(addr, _)| *addr)
2054 .collect();
2055
2056 if peer_addrs.is_empty() {
2057 debug!(
2058 total_peers = self.peers.len(),
2059 "No sendable peers for disconnect notification"
2060 );
2061 return;
2062 }
2063
2064 let mut sent = 0usize;
2065 for node_addr in &peer_addrs {
2066 match self
2067 .send_encrypted_link_message(node_addr, &plaintext)
2068 .await
2069 {
2070 Ok(()) => sent += 1,
2071 Err(e) => {
2072 debug!(
2073 peer = %self.peer_display_name(node_addr),
2074 error = %e,
2075 "Failed to send disconnect (transport may be down)"
2076 );
2077 }
2078 }
2079 }
2080
2081 info!(sent, total = peer_addrs.len(), reason = %reason, "Sent disconnect notifications");
2082 }
2083
2084 fn static_peer_addresses(&self, peer_config: &PeerConfig) -> Vec<PeerAddress> {
2085 peer_config
2086 .addresses_by_priority()
2087 .into_iter()
2088 .cloned()
2089 .collect()
2090 }
2091
2092 async fn nostr_peer_fallback_addresses(
2093 &self,
2094 peer_config: &PeerConfig,
2095 existing: &[PeerAddress],
2096 ) -> Vec<PeerAddress> {
2097 if !self.config.node.discovery.nostr.enabled
2098 || self.config.node.discovery.nostr.policy
2099 == crate::config::NostrDiscoveryPolicy::Disabled
2100 {
2101 return Vec::new();
2102 }
2103
2104 let Some(bootstrap) = self.nostr_discovery.clone() else {
2105 return Vec::new();
2106 };
2107 let endpoints = match bootstrap
2108 .cached_advert_endpoints_for_peer(&peer_config.npub)
2109 .await
2110 {
2111 Some(endpoints) => endpoints,
2112 None => {
2113 debug!(
2114 npub = %peer_config.npub,
2115 "No cached Nostr advert endpoints for configured peer"
2116 );
2117 return Vec::new();
2118 }
2119 };
2120
2121 let mut fallback = Vec::new();
2122 let mut next_priority = existing
2123 .iter()
2124 .map(|addr| addr.priority)
2125 .max()
2126 .unwrap_or(100)
2127 .saturating_add(1);
2128 let seen_at_ms = Self::now_ms();
2133 for endpoint in endpoints {
2134 let Some(candidate) =
2135 Self::overlay_endpoint_to_peer_address(&endpoint, next_priority, seen_at_ms)
2136 else {
2137 continue;
2138 };
2139 if existing
2140 .iter()
2141 .any(|addr| addr.transport == candidate.transport && addr.addr == candidate.addr)
2142 || fallback.iter().any(|addr: &PeerAddress| {
2143 addr.transport == candidate.transport && addr.addr == candidate.addr
2144 })
2145 {
2146 continue;
2147 }
2148 fallback.push(candidate);
2149 next_priority = next_priority.saturating_add(1);
2150 }
2151 fallback
2152 }
2153
2154 async fn request_nostr_bootstrap(&self, peer_config: &PeerConfig) -> bool {
2155 if !self.config.node.discovery.nostr.enabled
2156 || self.config.node.discovery.nostr.policy
2157 == crate::config::NostrDiscoveryPolicy::Disabled
2158 {
2159 return false;
2160 }
2161 let Some(bootstrap) = self.nostr_discovery.clone() else {
2162 return false;
2163 };
2164 bootstrap.request_connect(peer_config.clone()).await;
2165 info!(npub = %peer_config.npub, "Started background Nostr UDP NAT traversal attempt");
2166 true
2167 }
2168
2169 fn overlay_endpoint_to_peer_address(
2170 endpoint: &OverlayEndpointAdvert,
2171 priority: u8,
2172 seen_at_ms: u64,
2173 ) -> Option<PeerAddress> {
2174 let transport = match endpoint.transport {
2175 OverlayTransportKind::Udp => "udp",
2176 OverlayTransportKind::Tcp => "tcp",
2177 OverlayTransportKind::Tor => "tor",
2178 OverlayTransportKind::WebRtc => "webrtc",
2179 };
2180 Some(
2181 PeerAddress::with_priority(transport, endpoint.addr.clone(), priority)
2182 .with_seen_at_ms(seen_at_ms),
2183 )
2184 }
2185
2186 async fn attempt_peer_address_list(
2187 &mut self,
2188 peer_config: &PeerConfig,
2189 peer_identity: PeerIdentity,
2190 allow_bootstrap_nat: bool,
2191 addresses: &[PeerAddress],
2192 ) -> Result<(), NodeError> {
2193 let mut attempted = false;
2194 let peer_node_addr = *peer_identity.node_addr();
2195 let mut concrete_budget = self.path_candidate_attempt_budget(&peer_node_addr);
2196
2197 for addr in addresses {
2198 if addr.transport == "udp" && addr.addr.eq_ignore_ascii_case("nat") {
2199 if !allow_bootstrap_nat {
2200 continue;
2201 }
2202 if self.request_nostr_bootstrap(peer_config).await {
2203 attempted = true;
2204 continue;
2205 }
2206 debug!(npub = %peer_config.npub, "No Nostr overlay runtime for udp:nat address");
2207 continue;
2208 }
2209
2210 let (transport_id, remote_addr) = if addr.transport == "ethernet" {
2211 match self.resolve_ethernet_addr(&addr.addr) {
2212 Ok(result) => result,
2213 Err(e) => {
2214 debug!(
2215 transport = %addr.transport,
2216 addr = %addr.addr,
2217 error = %e,
2218 "Failed to resolve Ethernet address"
2219 );
2220 continue;
2221 }
2222 }
2223 } else if addr.transport == "ble" {
2224 #[cfg(bluer_available)]
2225 {
2226 match self.resolve_ble_addr(&addr.addr) {
2227 Ok(result) => result,
2228 Err(e) => {
2229 debug!(
2230 transport = %addr.transport,
2231 addr = %addr.addr,
2232 error = %e,
2233 "Failed to resolve BLE address"
2234 );
2235 continue;
2236 }
2237 }
2238 }
2239 #[cfg(not(bluer_available))]
2240 {
2241 debug!(transport = %addr.transport, "BLE transport not available on this build");
2242 continue;
2243 }
2244 } else {
2245 let tid = if addr.transport == "udp"
2246 && let Ok(remote_socket_addr) = addr.addr.parse::<SocketAddr>()
2247 {
2248 match self.find_udp_transport_for_remote_addr(remote_socket_addr) {
2249 Some((id, _)) => id,
2250 None => {
2251 debug!(
2252 transport = %addr.transport,
2253 addr = %addr.addr,
2254 "No compatible operational UDP transport for address"
2255 );
2256 continue;
2257 }
2258 }
2259 } else {
2260 match self.find_transport_for_type(&addr.transport) {
2261 Some(id) => id,
2262 None => {
2263 debug!(
2264 transport = %addr.transport,
2265 addr = %addr.addr,
2266 "No operational transport for address type"
2267 );
2268 continue;
2269 }
2270 }
2271 };
2272 (tid, TransportAddr::from_string(&addr.addr))
2273 };
2274
2275 if self.is_connecting_to_peer_on_path(&peer_node_addr, transport_id, &remote_addr) {
2276 attempted = true;
2277 debug!(
2278 npub = %peer_config.npub,
2279 transport_id = %transport_id,
2280 remote_addr = %remote_addr,
2281 "Skipping duplicate in-flight candidate path"
2282 );
2283 continue;
2284 }
2285
2286 if concrete_budget == 0 {
2287 debug!(
2288 npub = %peer_config.npub,
2289 max_candidates = MAX_PARALLEL_PATH_CANDIDATES_PER_PEER,
2290 "Path candidate race budget exhausted"
2291 );
2292 break;
2293 }
2294
2295 match self
2296 .initiate_connection(transport_id, remote_addr, peer_identity)
2297 .await
2298 {
2299 Ok(()) => {
2300 attempted = true;
2301 concrete_budget = concrete_budget.saturating_sub(1);
2302 }
2303 Err(e @ NodeError::AccessDenied(_)) => return Err(e),
2304 Err(e) => {
2305 debug!(
2306 npub = %peer_config.npub,
2307 transport_id = %transport_id,
2308 error = %e,
2309 "Connection attempt failed, trying next address"
2310 );
2311 }
2312 }
2313 }
2314
2315 if attempted {
2316 return Ok(());
2317 }
2318
2319 Err(NodeError::NoTransportForType(format!(
2320 "no operational transport for any of {}'s addresses",
2321 peer_config.npub
2322 )))
2323 }
2324
2325 async fn queue_open_discovery_retries(&mut self, bootstrap: &std::sync::Arc<NostrDiscovery>) {
2326 self.run_open_discovery_sweep(bootstrap, None, "per-tick")
2327 .await;
2328 }
2329
2330 pub(in crate::node) async fn run_open_discovery_sweep(
2341 &mut self,
2342 bootstrap: &std::sync::Arc<NostrDiscovery>,
2343 max_age_secs: Option<u64>,
2344 caller: &'static str,
2345 ) {
2346 if !self.config.node.discovery.nostr.enabled
2347 || self.config.node.discovery.nostr.policy != crate::config::NostrDiscoveryPolicy::Open
2348 {
2349 return;
2350 }
2351
2352 let configured_npubs = self
2353 .config
2354 .peers()
2355 .iter()
2356 .map(|peer| peer.npub.clone())
2357 .collect::<HashSet<_>>();
2358 let now_ms = Self::now_ms();
2359 let now_secs = now_ms / 1000;
2360 let mut enqueue_budget = self.open_discovery_enqueue_budget(&configured_npubs);
2361 if enqueue_budget == 0 {
2362 debug!(
2363 caller = %caller,
2364 "open-discovery sweep: enqueue budget is 0, skipping"
2365 );
2366 return;
2367 }
2368
2369 let candidates = bootstrap.cached_open_discovery_candidates(64).await;
2370 let cached_count = candidates.len();
2371 let mut enqueued = 0usize;
2372 let mut skipped_age = 0usize;
2373 let mut skipped_configured = 0usize;
2374 let mut skipped_self = 0usize;
2375 let mut skipped_connected = 0usize;
2376 let mut skipped_retry_pending = 0usize;
2377 let mut skipped_connecting = 0usize;
2378 let mut skipped_no_endpoints = 0usize;
2379 let mut skipped_invalid_npub = 0usize;
2380 let mut skipped_cooldown = 0usize;
2381
2382 for (npub, endpoints, created_at_secs) in candidates {
2383 if enqueue_budget == 0 {
2384 break;
2385 }
2386
2387 if let Some(max_age) = max_age_secs
2388 && now_secs.saturating_sub(created_at_secs) > max_age
2389 {
2390 skipped_age = skipped_age.saturating_add(1);
2391 continue;
2392 }
2393
2394 if configured_npubs.contains(&npub) {
2395 if let Ok(identity) = PeerIdentity::from_npub(&npub) {
2415 let configured_addr = *identity.node_addr();
2416 if let Some(state) = self.retry_pending.get_mut(&configured_addr)
2417 && state.retry_after_ms > now_ms
2418 {
2419 state.retry_after_ms = now_ms;
2420 debug!(
2421 caller = %caller,
2422 peer = %self.peer_display_name(&configured_addr),
2423 advert_age_secs = now_secs.saturating_sub(created_at_secs),
2424 "Expediting configured-peer retry after fresh overlay advert"
2425 );
2426 }
2427 }
2428 skipped_configured = skipped_configured.saturating_add(1);
2429 continue;
2430 }
2431
2432 let peer_identity = match PeerIdentity::from_npub(&npub) {
2433 Ok(identity) => identity,
2434 Err(_) => {
2435 skipped_invalid_npub = skipped_invalid_npub.saturating_add(1);
2436 continue;
2437 }
2438 };
2439 let node_addr = *peer_identity.node_addr();
2440 if node_addr == *self.identity.node_addr() {
2441 skipped_self = skipped_self.saturating_add(1);
2442 continue;
2443 }
2444 if self.peers.contains_key(&node_addr) {
2445 skipped_connected = skipped_connected.saturating_add(1);
2446 continue;
2447 }
2448 if self.retry_pending.contains_key(&node_addr) {
2449 skipped_retry_pending = skipped_retry_pending.saturating_add(1);
2450 continue;
2451 }
2452 if bootstrap.cooldown_until(&npub, now_ms).is_some() {
2453 skipped_cooldown = skipped_cooldown.saturating_add(1);
2454 continue;
2455 }
2456 let connecting = self.connections.values().any(|conn| {
2457 conn.expected_identity()
2458 .map(|id| id.node_addr() == &node_addr)
2459 .unwrap_or(false)
2460 });
2461 if connecting {
2462 skipped_connecting = skipped_connecting.saturating_add(1);
2463 continue;
2464 }
2465
2466 let mut addresses = Vec::new();
2467 let mut priority = 120u8;
2468 let seen_at_ms = Self::now_ms();
2469 for endpoint in endpoints {
2470 let Some(candidate) =
2471 Self::overlay_endpoint_to_peer_address(&endpoint, priority, seen_at_ms)
2472 else {
2473 continue;
2474 };
2475 if addresses.iter().any(|existing: &PeerAddress| {
2476 existing.transport == candidate.transport && existing.addr == candidate.addr
2477 }) {
2478 continue;
2479 }
2480 addresses.push(candidate);
2481 priority = priority.saturating_add(1);
2482 }
2483 if addresses.is_empty() {
2484 skipped_no_endpoints = skipped_no_endpoints.saturating_add(1);
2485 continue;
2486 }
2487
2488 self.peer_aliases
2489 .entry(node_addr)
2490 .or_insert_with(|| peer_identity.short_npub());
2491 self.register_identity(node_addr, peer_identity.pubkey_full());
2492
2493 let mut state = super::retry::RetryState::new(PeerConfig {
2494 npub: npub.clone(),
2495 alias: None,
2496 addresses,
2497 connect_policy: ConnectPolicy::AutoConnect,
2498 auto_reconnect: true,
2499 discovery_fallback_transit: false,
2500 });
2501 state.reconnect = false;
2502 state.retry_after_ms = now_ms;
2503 state.expires_at_ms = Some(self.open_discovery_retry_expires_at_ms(now_ms));
2504 self.retry_pending.insert(node_addr, state);
2505 info!(
2506 caller = %caller,
2507 peer = %peer_identity.short_npub(),
2508 advert_age_secs = now_secs.saturating_sub(created_at_secs),
2509 "open-discovery sweep: queued retry for cached advert"
2510 );
2511 enqueue_budget = enqueue_budget.saturating_sub(1);
2512 enqueued = enqueued.saturating_add(1);
2513 }
2514
2515 let total_skipped = skipped_age
2519 + skipped_configured
2520 + skipped_self
2521 + skipped_connected
2522 + skipped_retry_pending
2523 + skipped_connecting
2524 + skipped_no_endpoints
2525 + skipped_invalid_npub
2526 + skipped_cooldown;
2527 let should_summarize = caller == "startup" || enqueued > 0;
2528 if should_summarize {
2529 info!(
2530 caller = %caller,
2531 cached = cached_count,
2532 queued = enqueued,
2533 skipped_age = skipped_age,
2534 skipped_configured = skipped_configured,
2535 skipped_self = skipped_self,
2536 skipped_connected = skipped_connected,
2537 skipped_retry_pending = skipped_retry_pending,
2538 skipped_connecting = skipped_connecting,
2539 skipped_no_endpoints = skipped_no_endpoints,
2540 skipped_invalid_npub = skipped_invalid_npub,
2541 skipped_cooldown = skipped_cooldown,
2542 skipped_total = total_skipped,
2543 "open-discovery sweep complete"
2544 );
2545 }
2546 }
2547
2548 async fn maybe_run_startup_open_discovery_sweep(
2556 &mut self,
2557 bootstrap: &std::sync::Arc<NostrDiscovery>,
2558 ) {
2559 if self.startup_open_discovery_sweep_done {
2560 return;
2561 }
2562 if !self.config.node.discovery.nostr.enabled
2563 || self.config.node.discovery.nostr.policy != crate::config::NostrDiscoveryPolicy::Open
2564 {
2565 self.startup_open_discovery_sweep_done = true;
2567 return;
2568 }
2569 let Some(started_at_ms) = self.nostr_discovery_started_at_ms else {
2570 return;
2571 };
2572 let now_ms = Self::now_ms();
2573 let delay_ms = self
2574 .config
2575 .node
2576 .discovery
2577 .nostr
2578 .startup_sweep_delay_secs
2579 .saturating_mul(1000);
2580 if now_ms < started_at_ms.saturating_add(delay_ms) {
2581 return;
2582 }
2583
2584 let max_age_secs = self.config.node.discovery.nostr.startup_sweep_max_age_secs;
2585 self.run_open_discovery_sweep(bootstrap, Some(max_age_secs), "startup")
2586 .await;
2587 self.startup_open_discovery_sweep_done = true;
2588 }
2589
2590 fn available_outbound_slots(&self) -> usize {
2591 let connection_used = self
2592 .connections
2593 .len()
2594 .saturating_add(self.pending_connects.len());
2595 let connection_slots = if self.max_connections == 0 {
2596 usize::MAX
2597 } else {
2598 self.max_connections.saturating_sub(connection_used)
2599 };
2600
2601 let peer_slots = if self.max_peers == 0 {
2602 usize::MAX
2603 } else {
2604 self.max_peers.saturating_sub(self.peers.len())
2605 };
2606
2607 connection_slots.min(peer_slots)
2608 }
2609
2610 fn open_discovery_enqueue_budget(&self, configured_npubs: &HashSet<String>) -> usize {
2611 let current_open_discovery_pending = self
2612 .retry_pending
2613 .values()
2614 .filter(|state| !configured_npubs.contains(&state.peer_config.npub))
2615 .count();
2616
2617 let cap_remaining = self
2618 .config
2619 .node
2620 .discovery
2621 .nostr
2622 .open_discovery_max_pending
2623 .saturating_sub(current_open_discovery_pending);
2624
2625 cap_remaining.min(self.available_outbound_slots())
2626 }
2627
2628 fn open_discovery_retry_expires_at_ms(&self, now_ms: u64) -> u64 {
2629 now_ms.saturating_add(
2630 self.config
2631 .node
2632 .discovery
2633 .nostr
2634 .advert_ttl_secs
2635 .saturating_mul(1000)
2636 .saturating_mul(OPEN_DISCOVERY_RETRY_LIFETIME_MULTIPLIER),
2637 )
2638 }
2639
2640 async fn build_overlay_advert(
2641 &self,
2642 bootstrap: &std::sync::Arc<NostrDiscovery>,
2643 ) -> Option<OverlayAdvert> {
2644 if !self.config.node.discovery.nostr.enabled {
2645 return None;
2646 }
2647
2648 let mut endpoints = Vec::new();
2649 let mut has_udp_nat = false;
2650 let mut has_webrtc = false;
2651
2652 for handle in self.transports.values() {
2653 if !handle.is_operational() {
2654 continue;
2655 }
2656
2657 match handle.transport_type().name {
2658 "udp" => {
2659 let Some(cfg) = self.lookup_udp_config(handle.name()) else {
2660 continue;
2661 };
2662 if !cfg.advertise_on_nostr() {
2663 continue;
2664 }
2665 if cfg.is_public() {
2666 if let Some(explicit) = cfg.external_advert_addr() {
2676 endpoints.push(OverlayEndpointAdvert {
2677 transport: OverlayTransportKind::Udp,
2678 addr: explicit.to_string(),
2679 });
2680 } else {
2681 match handle.local_addr() {
2682 Some(addr)
2683 if !addr.ip().is_unspecified()
2684 && !is_unroutable_advert_ip(addr.ip()) =>
2685 {
2686 endpoints.push(OverlayEndpointAdvert {
2687 transport: OverlayTransportKind::Udp,
2688 addr: addr.to_string(),
2689 });
2690 }
2691 Some(addr) => {
2692 let key = handle.transport_id().as_u32();
2693 let port = addr.port();
2694 if let Some(public) =
2695 bootstrap.learn_public_udp_addr(key, port).await
2696 {
2697 endpoints.push(OverlayEndpointAdvert {
2698 transport: OverlayTransportKind::Udp,
2699 addr: public.to_string(),
2700 });
2701 } else {
2702 warn!(
2703 transport_id = key,
2704 bind_addr = %addr,
2705 "advert: udp public=true but bind is wildcard \
2706 or private and STUN observation failed; \
2707 advertising no UDP endpoint. Either set \
2708 transports.udp.external_addr, bind to a \
2709 specific *public* IP, or ensure \
2710 node.discovery.nostr.stun_servers is reachable"
2711 );
2712 }
2713 }
2714 None => {}
2715 }
2716 }
2717 } else {
2718 endpoints.push(OverlayEndpointAdvert {
2719 transport: OverlayTransportKind::Udp,
2720 addr: "nat".to_string(),
2721 });
2722 has_udp_nat = true;
2723 }
2724 }
2725 "webrtc" => {
2726 let Some(cfg) = self.lookup_webrtc_config(handle.name()) else {
2727 continue;
2728 };
2729 if !cfg.advertise_on_nostr() {
2730 continue;
2731 }
2732 endpoints.push(OverlayEndpointAdvert {
2733 transport: OverlayTransportKind::WebRtc,
2734 addr: hex::encode(self.identity.pubkey_full().serialize()),
2735 });
2736 has_webrtc = true;
2737 }
2738 "tcp" => {
2739 let Some(cfg) = self.lookup_tcp_config(handle.name()) else {
2740 continue;
2741 };
2742 if !cfg.advertise_on_nostr() {
2743 continue;
2744 }
2745 if let Some(explicit) = cfg.external_advert_addr() {
2757 endpoints.push(OverlayEndpointAdvert {
2758 transport: OverlayTransportKind::Tcp,
2759 addr: explicit.to_string(),
2760 });
2761 } else {
2762 match handle.local_addr() {
2763 Some(addr)
2764 if !addr.ip().is_unspecified()
2765 && !is_unroutable_advert_ip(addr.ip()) =>
2766 {
2767 endpoints.push(OverlayEndpointAdvert {
2768 transport: OverlayTransportKind::Tcp,
2769 addr: addr.to_string(),
2770 });
2771 }
2772 Some(addr) => {
2773 warn!(
2774 bind_addr = %addr,
2775 "advert: tcp advertise_on_nostr=true bound to wildcard \
2776 or private IP and no transports.tcp.external_addr set; \
2777 advertising no TCP endpoint. Either set external_addr \
2778 to the public IP (recommended for cloud 1:1-NAT setups) \
2779 or bind explicitly to the public IP"
2780 );
2781 }
2782 None => {}
2783 }
2784 }
2785 }
2786 "tor" => {
2787 let Some(cfg) = self.lookup_tor_config(handle.name()) else {
2788 continue;
2789 };
2790 if !cfg.advertise_on_nostr() {
2791 continue;
2792 }
2793 if let Some(addr) = handle.onion_address() {
2794 endpoints.push(OverlayEndpointAdvert {
2795 transport: OverlayTransportKind::Tor,
2796 addr: format!("{}:{}", addr, cfg.advertised_port()),
2797 });
2798 }
2799 }
2800 _ => {}
2801 }
2802 }
2803
2804 if endpoints.is_empty() {
2805 return None;
2806 }
2807
2808 Some(OverlayAdvert {
2809 identifier: ADVERT_IDENTIFIER.to_string(),
2810 version: ADVERT_VERSION,
2811 endpoints,
2812 signal_relays: (has_udp_nat || has_webrtc)
2813 .then(|| self.config.node.discovery.nostr.dm_relays.clone()),
2814 stun_servers: (has_udp_nat || has_webrtc)
2815 .then(|| self.config.node.discovery.nostr.stun_servers.clone()),
2816 })
2817 }
2818
2819 async fn refresh_overlay_advert(
2820 &self,
2821 bootstrap: &std::sync::Arc<NostrDiscovery>,
2822 ) -> Result<(), crate::discovery::nostr::BootstrapError> {
2823 let advert = self.build_overlay_advert(bootstrap).await;
2824 bootstrap.update_local_advert(advert).await
2825 }
2826
2827 fn lookup_udp_config(&self, transport_name: Option<&str>) -> Option<&crate::config::UdpConfig> {
2828 match (&self.config.transports.udp, transport_name) {
2829 (crate::config::TransportInstances::Single(cfg), None) => Some(cfg),
2830 (crate::config::TransportInstances::Named(configs), Some(name)) => configs.get(name),
2831 _ => None,
2832 }
2833 }
2834
2835 fn lookup_tcp_config(&self, transport_name: Option<&str>) -> Option<&crate::config::TcpConfig> {
2836 match (&self.config.transports.tcp, transport_name) {
2837 (crate::config::TransportInstances::Single(cfg), None) => Some(cfg),
2838 (crate::config::TransportInstances::Named(configs), Some(name)) => configs.get(name),
2839 _ => None,
2840 }
2841 }
2842
2843 fn lookup_tor_config(&self, transport_name: Option<&str>) -> Option<&crate::config::TorConfig> {
2844 match (&self.config.transports.tor, transport_name) {
2845 (crate::config::TransportInstances::Single(cfg), None) => Some(cfg),
2846 (crate::config::TransportInstances::Named(configs), Some(name)) => configs.get(name),
2847 _ => None,
2848 }
2849 }
2850
2851 fn lookup_webrtc_config(
2852 &self,
2853 transport_name: Option<&str>,
2854 ) -> Option<&crate::config::WebRtcConfig> {
2855 match (&self.config.transports.webrtc, transport_name) {
2856 (crate::config::TransportInstances::Single(cfg), None) => Some(cfg),
2857 (crate::config::TransportInstances::Named(configs), Some(name)) => configs.get(name),
2858 _ => None,
2859 }
2860 }
2861
2862 pub(in crate::node) async fn try_peer_addresses(
2863 &mut self,
2864 peer_config: &PeerConfig,
2865 peer_identity: PeerIdentity,
2866 allow_bootstrap_nat: bool,
2867 ) -> Result<(), NodeError> {
2868 let peer_node_addr = *peer_identity.node_addr();
2869 if self.peers.contains_key(&peer_node_addr) {
2870 debug!(
2871 npub = %peer_config.npub,
2872 "Peer already exists, skipping address attempts"
2873 );
2874 return Ok(());
2875 }
2876
2877 let candidates = self.peer_address_candidates(peer_config).await;
2878
2879 if candidates.is_empty() {
2880 if allow_bootstrap_nat && self.request_nostr_bootstrap(peer_config).await {
2881 return Ok(());
2882 }
2883 return Err(NodeError::NoTransportForType(format!(
2884 "no addresses known for {}",
2885 peer_config.npub
2886 )));
2887 }
2888
2889 if self
2890 .attempt_peer_address_list(peer_config, peer_identity, allow_bootstrap_nat, &candidates)
2891 .await
2892 .is_ok()
2893 {
2894 return Ok(());
2895 }
2896
2897 Err(NodeError::NoTransportForType(format!(
2898 "no operational transport for any of {}'s addresses",
2899 peer_config.npub
2900 )))
2901 }
2902
2903 async fn try_active_peer_alternative_addresses(
2904 &mut self,
2905 peer_config: &PeerConfig,
2906 peer_identity: PeerIdentity,
2907 ) -> Result<bool, NodeError> {
2908 let peer_node_addr = *peer_identity.node_addr();
2909 let candidates = self.peer_address_candidates(peer_config).await;
2910
2911 if candidates.is_empty() {
2912 return Err(NodeError::NoTransportForType(format!(
2913 "no addresses known for {}",
2914 peer_config.npub
2915 )));
2916 }
2917
2918 let alternatives: Vec<_> = candidates
2919 .into_iter()
2920 .filter(|addr| !(addr.transport == "udp" && addr.addr.eq_ignore_ascii_case("nat")))
2921 .filter(|addr| !self.active_peer_matches_candidate(&peer_node_addr, addr))
2922 .collect();
2923
2924 if alternatives.is_empty() {
2925 return Err(NodeError::NoTransportForType(format!(
2926 "no concrete alternate addresses known for {}",
2927 peer_config.npub
2928 )));
2929 }
2930
2931 self.attempt_peer_address_list(peer_config, peer_identity, false, &alternatives)
2932 .await?;
2933 Ok(true)
2934 }
2935
2936 async fn peer_address_candidates(&self, peer_config: &PeerConfig) -> Vec<PeerAddress> {
2937 let static_addresses = self.static_peer_addresses(peer_config);
2947 let overlay_addresses = self
2948 .nostr_peer_fallback_addresses(peer_config, &static_addresses)
2949 .await;
2950
2951 let mut candidates = Vec::with_capacity(overlay_addresses.len() + static_addresses.len());
2952 for addr in overlay_addresses.into_iter().chain(static_addresses) {
2953 if !candidates.iter().any(|existing: &PeerAddress| {
2954 existing.transport == addr.transport && existing.addr == addr.addr
2955 }) {
2956 candidates.push(addr);
2957 }
2958 }
2959
2960 candidates.sort_by(|a, b| match (a.seen_at_ms, b.seen_at_ms) {
2965 (Some(a_ts), Some(b_ts)) => b_ts.cmp(&a_ts),
2966 (Some(_), None) => std::cmp::Ordering::Less,
2967 (None, Some(_)) => std::cmp::Ordering::Greater,
2968 (None, None) => std::cmp::Ordering::Equal,
2969 });
2970
2971 candidates
2972 }
2973
2974 fn active_peer_matches_any_candidate(
2975 &self,
2976 peer_node_addr: &NodeAddr,
2977 candidates: &[PeerAddress],
2978 ) -> bool {
2979 candidates
2980 .iter()
2981 .any(|candidate| self.active_peer_matches_candidate(peer_node_addr, candidate))
2982 }
2983
2984 pub(in crate::node) fn active_peer_candidate_is_fresh_enough_to_skip(
2985 &self,
2986 peer_node_addr: &NodeAddr,
2987 candidates: &[PeerAddress],
2988 ) -> bool {
2989 if !self.active_peer_matches_any_candidate(peer_node_addr, candidates) {
2990 return false;
2991 }
2992 !self.active_peer_needs_same_path_refresh(peer_node_addr)
2993 }
2994
2995 fn active_peer_needs_same_path_refresh(&self, peer_node_addr: &NodeAddr) -> bool {
2996 let Some(peer) = self.peers.get(peer_node_addr) else {
2997 return false;
2998 };
2999 let stale_after_ms = self
3000 .config
3001 .node
3002 .heartbeat_interval_secs
3003 .saturating_mul(1000)
3004 .max(1000);
3005 peer.idle_time(Self::now_ms()) > stale_after_ms
3006 }
3007
3008 fn active_peer_matches_candidate(
3009 &self,
3010 peer_node_addr: &NodeAddr,
3011 candidate: &PeerAddress,
3012 ) -> bool {
3013 let Some(peer) = self.peers.get(peer_node_addr) else {
3014 return false;
3015 };
3016 let Some(current_addr) = peer.current_addr() else {
3017 return false;
3018 };
3019 if let Some(peer_transport_id) = peer.transport_id()
3020 && let Some((candidate_transport_id, candidate_addr)) =
3021 self.resolve_peer_address_for_match(candidate)
3022 {
3023 return peer_transport_id == candidate_transport_id && current_addr == &candidate_addr;
3024 }
3025 if peer
3026 .transport_id()
3027 .map(|id| self.bootstrap_transports.contains(&id))
3028 .unwrap_or(false)
3029 {
3030 return false;
3031 }
3032 let current_addr = current_addr.to_string();
3033 let current_transport = peer
3034 .transport_id()
3035 .and_then(|id| self.transports.get(&id))
3036 .map(|transport| transport.transport_type().name);
3037
3038 candidate.addr == current_addr
3039 && current_transport
3040 .map(|transport| transport == candidate.transport)
3041 .unwrap_or(true)
3042 }
3043
3044 pub(crate) async fn api_connect(
3052 &mut self,
3053 npub: &str,
3054 address: &str,
3055 transport: &str,
3056 ) -> Result<serde_json::Value, String> {
3057 let peer_config = PeerConfig {
3058 npub: npub.to_string(),
3059 alias: None,
3060 addresses: vec![PeerAddress::new(transport, address)],
3061 connect_policy: ConnectPolicy::Manual,
3062 auto_reconnect: false,
3063 discovery_fallback_transit: true,
3064 };
3065
3066 if let Ok(identity) = PeerIdentity::from_npub(npub) {
3068 self.peer_aliases
3069 .insert(*identity.node_addr(), identity.short_npub());
3070 self.register_identity(*identity.node_addr(), identity.pubkey_full());
3071 }
3072
3073 self.initiate_peer_connection(&peer_config)
3074 .await
3075 .map(|()| {
3076 info!(
3077 npub = %npub,
3078 address = %address,
3079 transport = %transport,
3080 "API connect initiated"
3081 );
3082 serde_json::json!({
3083 "npub": npub,
3084 "address": address,
3085 "transport": transport,
3086 })
3087 })
3088 .map_err(|e| e.to_string())
3089 }
3090
3091 pub(crate) fn api_disconnect(&mut self, npub: &str) -> Result<serde_json::Value, String> {
3095 let peer_identity =
3096 PeerIdentity::from_npub(npub).map_err(|e| format!("invalid npub '{npub}': {e}"))?;
3097 let node_addr = *peer_identity.node_addr();
3098
3099 if !self.peers.contains_key(&node_addr) {
3100 return Err(format!("peer not found: {npub}"));
3101 }
3102
3103 self.remove_active_peer(&node_addr);
3105
3106 self.retry_pending.remove(&node_addr);
3108
3109 info!(npub = %npub, "API disconnect completed");
3110
3111 Ok(serde_json::json!({
3112 "npub": npub,
3113 "disconnected": true,
3114 }))
3115 }
3116
3117 pub async fn adopt_established_traversal(
3124 &mut self,
3125 traversal: EstablishedTraversal,
3126 ) -> Result<BootstrapHandoffResult, NodeError> {
3127 debug!(
3128 peer_npub = %traversal.peer_npub,
3129 session_id = %traversal.session_id,
3130 remote_addr = %traversal.remote_addr,
3131 "adopting established traversal socket"
3132 );
3133
3134 if !self.state.is_operational() {
3135 return Err(NodeError::NotStarted);
3136 }
3137
3138 let packet_tx = self.packet_tx.clone().ok_or(NodeError::NotStarted)?;
3139 let peer_identity = PeerIdentity::from_npub(&traversal.peer_npub).map_err(|e| {
3140 NodeError::InvalidPeerNpub {
3141 npub: traversal.peer_npub.clone(),
3142 reason: e.to_string(),
3143 }
3144 })?;
3145 let peer_node_addr = *peer_identity.node_addr();
3146 if self.peers.contains_key(&peer_node_addr) {
3147 debug!(
3148 peer_npub = %traversal.peer_npub,
3149 "Adopting NAT traversal handoff as alternate path for already-connected peer"
3150 );
3151 }
3152
3153 self.peer_aliases
3154 .insert(peer_node_addr, peer_identity.short_npub());
3155 self.register_identity(peer_node_addr, peer_identity.pubkey_full());
3156
3157 let transport_id = self.allocate_transport_id();
3158 let inherited_config = traversal.transport_config.clone().unwrap_or_else(|| {
3178 let mut cfg = self
3179 .lookup_udp_config(traversal.transport_name.as_deref())
3180 .or_else(|| self.lookup_udp_config(None))
3181 .cloned()
3182 .unwrap_or_default();
3183 cfg.bind_addr = None;
3184 cfg.external_addr = None;
3185 cfg
3186 });
3187 let mut transport = crate::transport::udp::UdpTransport::new(
3188 transport_id,
3189 traversal.transport_name.clone(),
3190 inherited_config,
3191 packet_tx,
3192 );
3193
3194 transport
3195 .adopt_socket_async(traversal.socket)
3196 .await
3197 .map_err(|e| NodeError::BootstrapHandoff(e.to_string()))?;
3198
3199 let local_addr = transport.local_addr().ok_or_else(|| {
3200 NodeError::BootstrapHandoff("adopted UDP transport has no local address".into())
3201 })?;
3202
3203 self.transports.insert(
3204 transport_id,
3205 crate::transport::TransportHandle::Udp(transport),
3206 );
3207 self.bootstrap_transports.insert(transport_id);
3208 self.bootstrap_transport_npubs
3209 .insert(transport_id, traversal.peer_npub.clone());
3210
3211 let remote_addr = TransportAddr::from_string(&traversal.remote_addr.to_string());
3212 if let Err(err) = self
3213 .initiate_connection(transport_id, remote_addr.clone(), peer_identity)
3214 .await
3215 {
3216 self.bootstrap_transports.remove(&transport_id);
3217 self.bootstrap_transport_npubs.remove(&transport_id);
3218 if let Some(mut handle) = self.transports.remove(&transport_id) {
3219 let _ = handle.stop().await;
3220 }
3221 return Err(err);
3222 }
3223
3224 info!(
3225 peer = %self.peer_display_name(&peer_node_addr),
3226 transport_id = %transport_id,
3227 local_addr = %local_addr,
3228 remote_addr = %traversal.remote_addr,
3229 session_id = %traversal.session_id,
3230 "adopted NAT traversal socket; handshake initiated"
3231 );
3232
3233 Ok(BootstrapHandoffResult {
3234 transport_id,
3235 local_addr,
3236 remote_addr: traversal.remote_addr,
3237 peer_node_addr,
3238 session_id: traversal.session_id,
3239 })
3240 }
3241}