1use super::{Node, NodeError, NodeState};
4use crate::config::{ConnectPolicy, PeerAddress, PeerConfig};
5use crate::discovery::nostr::{
6 ADVERT_IDENTIFIER, ADVERT_VERSION, BootstrapEvent, NostrDiscovery, OverlayAdvert,
7 OverlayEndpointAdvert, OverlayTransportKind,
8};
9use crate::discovery::{BootstrapHandoffResult, EstablishedTraversal};
10use crate::node::acl::PeerAclContext;
11use crate::node::wire::build_msg1;
12use crate::peer::PeerConnection;
13use crate::protocol::{Disconnect, DisconnectReason};
14use crate::transport::{Link, LinkDirection, LinkId, TransportAddr, TransportId, packet_channel};
15use crate::upper::tun::{TunDevice, TunState, run_tun_reader, shutdown_tun_interface};
16use crate::{NodeAddr, PeerIdentity};
17use std::collections::{HashMap, HashSet};
18use std::net::{IpAddr, SocketAddr};
19use std::thread;
20use std::time::Duration;
21use tracing::{debug, info, warn};
22
23#[cfg(debug_assertions)]
24fn node_start_debug_log(message: impl AsRef<str>) {
25 use std::io::Write as _;
26
27 if let Ok(mut file) = std::fs::OpenOptions::new()
28 .create(true)
29 .append(true)
30 .open(std::env::temp_dir().join("nvpn-fips-endpoint-debug.log"))
31 {
32 let _ = writeln!(
33 file,
34 "{:?} {}",
35 std::time::SystemTime::now(),
36 message.as_ref()
37 );
38 }
39}
40
41#[cfg(not(debug_assertions))]
42fn node_start_debug_log(_message: impl AsRef<str>) {}
43
44fn is_unroutable_advert_ip(ip: IpAddr) -> bool {
52 match ip {
53 IpAddr::V4(v4) => {
54 v4.is_private()
55 || v4.is_loopback()
56 || v4.is_link_local()
57 || v4.is_unspecified()
58 || v4.is_multicast()
59 || v4.is_broadcast()
60 || v4.is_documentation()
61 || (v4.octets()[0] == 100 && (v4.octets()[1] & 0xc0) == 64)
64 }
65 IpAddr::V6(v6) => {
66 v6.is_loopback()
67 || v6.is_unspecified()
68 || v6.is_unique_local()
69 || v6.is_multicast()
70 || (v6.segments()[0] & 0xffc0) == 0xfe80
72 }
73 }
74}
75
76fn socket_addr_families_compatible(local: SocketAddr, remote: SocketAddr) -> bool {
77 matches!(
78 (local, remote),
79 (SocketAddr::V4(_), SocketAddr::V4(_)) | (SocketAddr::V6(_), SocketAddr::V6(_))
80 )
81}
82
83const OPEN_DISCOVERY_RETRY_LIFETIME_MULTIPLIER: u64 = 2;
84const MAX_PARALLEL_PATH_CANDIDATES_PER_PEER: usize = 4;
85const MAX_AUTO_CONNECT_GRAPH_WARMUPS_PER_TICK: usize = 16;
86const MAX_DISCOVERY_CONNECTS_PER_TICK: usize = 16;
87
88impl Node {
89 pub(super) async fn update_peers(
101 &mut self,
102 new_peers: Vec<crate::config::PeerConfig>,
103 ) -> Result<crate::node::UpdatePeersOutcome, crate::node::NodeError> {
104 use std::collections::{HashMap, HashSet};
105
106 let mut new_by_addr: HashMap<crate::identity::NodeAddr, crate::config::PeerConfig> =
107 HashMap::with_capacity(new_peers.len());
108 for peer in new_peers {
109 let identity = match PeerIdentity::from_npub(&peer.npub) {
110 Ok(id) => id,
111 Err(e) => {
112 return Err(crate::node::NodeError::InvalidPeerNpub {
113 npub: peer.npub.clone(),
114 reason: e.to_string(),
115 });
116 }
117 };
118 new_by_addr.insert(*identity.node_addr(), peer);
122 }
123
124 let current_by_addr: HashMap<crate::identity::NodeAddr, crate::config::PeerConfig> = self
125 .config
126 .peers()
127 .iter()
128 .filter_map(|pc| {
129 PeerIdentity::from_npub(&pc.npub)
130 .ok()
131 .map(|id| (*id.node_addr(), pc.clone()))
132 })
133 .collect();
134
135 let new_addrs: HashSet<_> = new_by_addr.keys().copied().collect();
136 let current_addrs: HashSet<_> = current_by_addr.keys().copied().collect();
137
138 let removed: Vec<_> = current_addrs.difference(&new_addrs).copied().collect();
139 let added: Vec<_> = new_addrs.difference(¤t_addrs).copied().collect();
140 let kept: Vec<_> = new_addrs.intersection(¤t_addrs).copied().collect();
141
142 let mut outcome = crate::node::UpdatePeersOutcome::default();
143
144 for node_addr in &removed {
145 if self.retry_pending.remove(node_addr).is_some() {
146 debug!(
147 peer = %self.peer_display_name(node_addr),
148 "Dropping retry entry for peer removed from runtime peer list"
149 );
150 }
151 self.peer_aliases.remove(node_addr);
152 self.set_discovery_fallback_transit_allowed(*node_addr, false);
153 outcome.removed += 1;
154 }
155
156 let mut auto_connect_refresh_configs = Vec::new();
157 for node_addr in &kept {
158 let new_pc = &new_by_addr[node_addr];
159 let current_pc = ¤t_by_addr[node_addr];
160 if new_pc.addresses != current_pc.addresses
161 || new_pc.alias != current_pc.alias
162 || new_pc.connect_policy != current_pc.connect_policy
163 || new_pc.auto_reconnect != current_pc.auto_reconnect
164 || new_pc.discovery_fallback_transit != current_pc.discovery_fallback_transit
165 {
166 outcome.updated += 1;
167 self.set_discovery_fallback_transit_allowed(
168 *node_addr,
169 new_pc.discovery_fallback_transit,
170 );
171 if let Some(state) = self.retry_pending.get_mut(node_addr) {
172 state.peer_config = new_pc.clone();
173 state.retry_after_ms = Self::now_ms();
174 }
175 if let Some(alias) = new_pc.alias.clone() {
176 self.peer_aliases.insert(*node_addr, alias);
177 }
178 if new_pc.is_auto_connect() && !new_pc.addresses.is_empty() {
179 auto_connect_refresh_configs.push(new_pc.clone());
180 }
181 } else {
182 outcome.unchanged += 1;
183 self.set_discovery_fallback_transit_allowed(
184 *node_addr,
185 new_pc.discovery_fallback_transit,
186 );
187 if new_pc.is_auto_connect() && !new_pc.addresses.is_empty() {
188 auto_connect_refresh_configs.push(new_pc.clone());
189 }
190 }
191 }
192
193 let added_configs: Vec<crate::config::PeerConfig> =
194 added.iter().map(|addr| new_by_addr[addr].clone()).collect();
195
196 self.config.peers = new_by_addr.into_values().collect();
200
201 for peer_config in added_configs {
202 outcome.added += 1;
203 let Ok(identity) = PeerIdentity::from_npub(&peer_config.npub) else {
204 continue;
205 };
206 let name = peer_config
207 .alias
208 .clone()
209 .unwrap_or_else(|| identity.short_npub());
210 self.peer_aliases.insert(*identity.node_addr(), name);
211 self.set_discovery_fallback_transit_allowed(
212 *identity.node_addr(),
213 peer_config.discovery_fallback_transit,
214 );
215 self.register_identity(*identity.node_addr(), identity.pubkey_full());
216
217 if !peer_config.is_auto_connect() {
218 continue;
219 }
220
221 if let Err(e) = self.initiate_peer_connection(&peer_config).await {
222 warn!(
223 npub = %peer_config.npub,
224 error = %e,
225 "Failed to initiate connection for newly added peer"
226 );
227 if let Ok(peer_identity) = PeerIdentity::from_npub(&peer_config.npub) {
228 self.schedule_retry(*peer_identity.node_addr(), Self::now_ms());
229 }
230 if matches!(e, crate::node::NodeError::NoTransportForType(_))
231 && let Some(bootstrap) = self.nostr_discovery.clone()
232 {
233 bootstrap
234 .request_advert_stale_check(peer_config.npub.clone())
235 .await;
236 }
237 }
238 }
239
240 for peer_config in auto_connect_refresh_configs {
241 let Ok(peer_identity) = PeerIdentity::from_npub(&peer_config.npub) else {
242 continue;
243 };
244 let node_addr = *peer_identity.node_addr();
245
246 if self.peers.contains_key(&node_addr) {
247 match self
248 .initiate_active_peer_alternative_connection(&peer_config)
249 .await
250 {
251 Ok(attempted) => {
252 if attempted {
253 debug!(
254 peer = %self.peer_display_name(&node_addr),
255 "Started non-disruptive alternate-path handshake for active peer"
256 );
257 }
258 }
259 Err(e) => {
260 debug!(
261 npub = %peer_config.npub,
262 error = %e,
263 "Active peer alternate-path refresh did not start"
264 );
265 }
266 }
267 continue;
268 }
269
270 match self.initiate_peer_connection(&peer_config).await {
271 Ok(()) => {
272 let hs_timeout_ms = self.config.node.rate_limit.handshake_timeout_secs * 1000;
273 if let Some(state) = self.retry_pending.get_mut(&node_addr) {
274 state.peer_config = peer_config;
275 state.retry_after_ms = Self::now_ms().saturating_add(hs_timeout_ms);
276 }
277 }
278 Err(e) => {
279 debug!(
280 npub = %peer_config.npub,
281 error = %e,
282 "Refreshed peer addresses did not initiate a direct connection"
283 );
284 self.schedule_retry(node_addr, Self::now_ms());
285 }
286 }
287 }
288
289 self.warm_auto_connect_graph_sessions().await;
290
291 Ok(outcome)
292 }
293
294 pub(super) async fn initiate_peer_connections(&mut self) {
295 let peer_identities: Vec<(PeerIdentity, Option<String>)> = self
301 .config
302 .peers()
303 .iter()
304 .filter_map(|pc| {
305 PeerIdentity::from_npub(&pc.npub)
306 .ok()
307 .map(|id| (id, pc.alias.clone()))
308 })
309 .collect();
310
311 for (identity, alias) in peer_identities {
312 let name = alias.unwrap_or_else(|| identity.short_npub());
313 self.peer_aliases.insert(*identity.node_addr(), name);
314 self.register_identity(*identity.node_addr(), identity.pubkey_full());
318 }
319
320 let peer_configs: Vec<_> = self.config.auto_connect_peers().cloned().collect();
322
323 if peer_configs.is_empty() {
324 debug!("No static peers configured");
325 return;
326 }
327
328 debug!(
329 count = peer_configs.len(),
330 "Initiating static peer connections"
331 );
332
333 for peer_config in peer_configs {
334 if let Err(e) = self.initiate_peer_connection(&peer_config).await {
335 warn!(
336 npub = %peer_config.npub,
337 alias = ?peer_config.alias,
338 error = %e,
339 "Failed to initiate peer connection"
340 );
341 if let Ok(peer_identity) = PeerIdentity::from_npub(&peer_config.npub) {
345 self.schedule_retry(*peer_identity.node_addr(), Self::now_ms());
346 }
347 if matches!(e, crate::node::NodeError::NoTransportForType(_))
353 && let Some(bootstrap) = self.nostr_discovery.clone()
354 {
355 bootstrap
356 .request_advert_stale_check(peer_config.npub.clone())
357 .await;
358 }
359 }
360 }
361
362 self.warm_auto_connect_graph_sessions().await;
363 }
364
365 pub(super) async fn initiate_peer_connection(
369 &mut self,
370 peer_config: &crate::config::PeerConfig,
371 ) -> Result<(), NodeError> {
372 self.initiate_peer_connection_inner(peer_config).await
373 }
374
375 pub(super) async fn initiate_peer_retry_connection(
381 &mut self,
382 peer_config: &crate::config::PeerConfig,
383 ) -> Result<(), NodeError> {
384 self.initiate_peer_connection_inner(peer_config).await
385 }
386
387 async fn initiate_active_peer_alternative_connection(
388 &mut self,
389 peer_config: &crate::config::PeerConfig,
390 ) -> Result<bool, NodeError> {
391 let peer_identity =
392 PeerIdentity::from_npub(&peer_config.npub).map_err(|e| NodeError::InvalidPeerNpub {
393 npub: peer_config.npub.clone(),
394 reason: e.to_string(),
395 })?;
396 let peer_node_addr = *peer_identity.node_addr();
397
398 if !self.peers.contains_key(&peer_node_addr) {
399 self.initiate_peer_connection(peer_config).await?;
400 return Ok(true);
401 }
402
403 self.try_active_peer_alternative_addresses(peer_config, peer_identity)
409 .await
410 }
411
412 async fn initiate_peer_connection_inner(
413 &mut self,
414 peer_config: &crate::config::PeerConfig,
415 ) -> Result<(), NodeError> {
416 let peer_identity =
418 PeerIdentity::from_npub(&peer_config.npub).map_err(|e| NodeError::InvalidPeerNpub {
419 npub: peer_config.npub.clone(),
420 reason: e.to_string(),
421 })?;
422
423 let peer_node_addr = *peer_identity.node_addr();
424
425 if self.peers.contains_key(&peer_node_addr) {
427 debug!(
428 npub = %peer_config.npub,
429 "Peer already exists, skipping"
430 );
431 return Ok(());
432 }
433
434 self.try_peer_addresses(peer_config, peer_identity, true)
435 .await
436 }
437
438 fn is_connecting_to_peer(&self, peer_node_addr: &NodeAddr) -> bool {
439 self.connections.values().any(|conn| {
440 conn.expected_identity()
441 .map(|id| id.node_addr() == peer_node_addr)
442 .unwrap_or(false)
443 })
444 }
445
446 fn is_connecting_to_peer_on_path(
447 &self,
448 peer_node_addr: &NodeAddr,
449 transport_id: TransportId,
450 remote_addr: &TransportAddr,
451 ) -> bool {
452 self.connections.values().any(|conn| {
453 conn.expected_identity()
454 .map(|id| id.node_addr() == peer_node_addr)
455 .unwrap_or(false)
456 && conn.transport_id() == Some(transport_id)
457 && conn.source_addr() == Some(remote_addr)
458 }) || self.pending_connects.iter().any(|pending| {
459 pending.peer_identity.node_addr() == peer_node_addr
460 && pending.transport_id == transport_id
461 && &pending.remote_addr == remote_addr
462 })
463 }
464
465 pub(in crate::node) fn should_warm_auto_connect_session(
466 &self,
467 peer_node_addr: &NodeAddr,
468 ) -> bool {
469 if self.peers.contains_key(peer_node_addr)
470 || self
471 .sessions
472 .get(peer_node_addr)
473 .is_some_and(|entry| entry.is_established())
474 {
475 return false;
476 }
477
478 self.config.peers().iter().any(|peer| {
479 peer.is_auto_connect()
480 && PeerIdentity::from_npub(&peer.npub)
481 .map(|identity| identity.node_addr() == peer_node_addr)
482 .unwrap_or(false)
483 })
484 }
485
486 pub(in crate::node) async fn warm_auto_connect_graph_sessions(&mut self) -> usize {
487 if !self.peers.values().any(|peer| peer.can_send()) {
488 return 0;
489 }
490
491 let mut budget = self.graph_session_warmup_budget();
492 if budget == 0 {
493 return 0;
494 }
495
496 let peer_identities: Vec<_> = self
497 .config
498 .auto_connect_peers()
499 .filter_map(|peer| PeerIdentity::from_npub(&peer.npub).ok())
500 .collect();
501
502 let mut warmed = 0;
503 for identity in peer_identities {
504 if budget == 0 {
505 break;
506 }
507
508 let peer_node_addr = *identity.node_addr();
509 if peer_node_addr == *self.identity.node_addr()
510 || !self.should_warm_auto_connect_session(&peer_node_addr)
511 || self
512 .sessions
513 .get(&peer_node_addr)
514 .is_some_and(|entry| entry.is_initiating())
515 {
516 continue;
517 }
518
519 self.register_identity(peer_node_addr, identity.pubkey_full());
520
521 if self.find_next_hop(&peer_node_addr).is_some() {
522 match self
523 .initiate_session(peer_node_addr, identity.pubkey_full())
524 .await
525 {
526 Ok(()) => {
527 warmed += 1;
528 budget = budget.saturating_sub(1);
529 debug!(
530 peer = %self.peer_display_name(&peer_node_addr),
531 "Warmed auto-connect peer session over existing FIPS graph"
532 );
533 }
534 Err(NodeError::SendFailed { node_addr, reason })
535 if node_addr == peer_node_addr && reason == "no route to destination" =>
536 {
537 self.maybe_initiate_lookup(&peer_node_addr).await;
538 warmed += 1;
539 budget = budget.saturating_sub(1);
540 }
541 Err(err) => {
542 debug!(
543 peer = %self.peer_display_name(&peer_node_addr),
544 error = %err,
545 "Failed to warm auto-connect peer session"
546 );
547 }
548 }
549 } else {
550 self.maybe_initiate_lookup(&peer_node_addr).await;
551 warmed += 1;
552 budget = budget.saturating_sub(1);
553 }
554 }
555
556 warmed
557 }
558
559 pub(in crate::node) fn graph_session_warmup_budget(&self) -> usize {
560 let max_destinations = self.config.node.session.pending_max_destinations;
561 if max_destinations == 0 {
562 return 0;
563 }
564
565 let pending_sessions = self
566 .sessions
567 .values()
568 .filter(|entry| !entry.is_established())
569 .count();
570 let pending_total = pending_sessions.saturating_add(self.pending_lookups.len());
571 max_destinations
572 .saturating_sub(pending_total)
573 .min(MAX_AUTO_CONNECT_GRAPH_WARMUPS_PER_TICK)
574 }
575
576 fn outbound_handshake_slots(&self) -> usize {
577 let used = self
578 .connections
579 .len()
580 .saturating_add(self.pending_connects.len());
581 if self.max_connections == 0 {
582 usize::MAX
583 } else {
584 self.max_connections.saturating_sub(used)
585 }
586 }
587
588 fn outbound_link_slots(&self) -> usize {
589 if self.max_links == 0 {
590 usize::MAX
591 } else {
592 self.max_links.saturating_sub(self.links.len())
593 }
594 }
595
596 fn path_candidate_attempt_budget(&self, peer_node_addr: &NodeAddr) -> usize {
597 if !self.peers.contains_key(peer_node_addr)
598 && self.max_peers > 0
599 && self.peers.len() >= self.max_peers
600 {
601 return 0;
602 }
603
604 let in_flight_for_peer = self
605 .connections
606 .values()
607 .filter(|conn| {
608 conn.expected_identity()
609 .map(|id| id.node_addr() == peer_node_addr)
610 .unwrap_or(false)
611 })
612 .count()
613 .saturating_add(
614 self.pending_connects
615 .iter()
616 .filter(|pending| pending.peer_identity.node_addr() == peer_node_addr)
617 .count(),
618 );
619
620 self.outbound_handshake_slots()
621 .min(self.outbound_link_slots())
622 .min(MAX_PARALLEL_PATH_CANDIDATES_PER_PEER.saturating_sub(in_flight_for_peer))
623 }
624
625 fn discovery_connect_budget(&self) -> usize {
626 self.outbound_handshake_slots()
627 .min(self.outbound_link_slots())
628 .min(MAX_DISCOVERY_CONNECTS_PER_TICK)
629 }
630
631 fn find_udp_transport_for_remote_addr(
638 &self,
639 remote_addr: SocketAddr,
640 ) -> Option<(TransportId, SocketAddr)> {
641 self.transports
642 .iter()
643 .filter(|(id, handle)| {
644 handle.transport_type().name == "udp"
645 && handle.is_operational()
646 && !self.bootstrap_transports.contains(id)
647 })
648 .filter_map(|(id, handle)| {
649 let local_addr = handle.local_addr()?;
650 socket_addr_families_compatible(local_addr, remote_addr)
651 .then_some((*id, local_addr))
652 })
653 .min_by_key(|(id, _)| id.as_u32())
654 }
655
656 pub(super) fn transport_discovery_candidate(
657 &self,
658 discovered_transport_id: TransportId,
659 discovered_addr: TransportAddr,
660 ) -> Option<(TransportId, TransportAddr, &'static str)> {
661 let transport = self.transports.get(&discovered_transport_id)?;
662 let transport_name = transport.transport_type().name;
663
664 if transport_name != "udp" {
665 return Some((discovered_transport_id, discovered_addr, transport_name));
666 }
667
668 let Some(remote_socket_addr) = discovered_addr
669 .as_str()
670 .and_then(|addr| addr.parse::<SocketAddr>().ok())
671 else {
672 if self.bootstrap_transports.contains(&discovered_transport_id) {
673 debug!(
674 transport_id = %discovered_transport_id,
675 remote_addr = %discovered_addr,
676 "transport discovery: skip non-numeric UDP address from bootstrap transport"
677 );
678 return None;
679 }
680 return Some((discovered_transport_id, discovered_addr, transport_name));
681 };
682
683 let Some((transport_id, local_addr)) =
684 self.find_udp_transport_for_remote_addr(remote_socket_addr)
685 else {
686 debug!(
687 transport_id = %discovered_transport_id,
688 remote_addr = %discovered_addr,
689 "transport discovery: skip UDP peer with no compatible local socket"
690 );
691 return None;
692 };
693
694 if transport_id != discovered_transport_id {
695 debug!(
696 discovered_transport_id = %discovered_transport_id,
697 selected_transport_id = %transport_id,
698 local_addr = %local_addr,
699 remote_addr = %remote_socket_addr,
700 "transport discovery: selected compatible UDP transport"
701 );
702 }
703
704 Some((
705 transport_id,
706 TransportAddr::from_socket_addr(remote_socket_addr),
707 transport_name,
708 ))
709 }
710
711 pub(super) async fn initiate_connection(
722 &mut self,
723 transport_id: TransportId,
724 remote_addr: TransportAddr,
725 peer_identity: PeerIdentity,
726 ) -> Result<(), NodeError> {
727 let peer_node_addr = *peer_identity.node_addr();
728
729 if self.is_connecting_to_peer_on_path(&peer_node_addr, transport_id, &remote_addr) {
730 debug!(
731 peer = %self.peer_display_name(&peer_node_addr),
732 transport_id = %transport_id,
733 remote_addr = %remote_addr,
734 "Connection already in progress for candidate path"
735 );
736 return Ok(());
737 }
738
739 if self.outbound_handshake_slots() == 0 {
740 return Err(NodeError::MaxConnectionsExceeded {
741 max: self.max_connections,
742 });
743 }
744
745 if self.outbound_link_slots() == 0 {
746 return Err(NodeError::MaxLinksExceeded {
747 max: self.max_links,
748 });
749 }
750
751 if !self.peers.contains_key(&peer_node_addr)
752 && self.max_peers > 0
753 && self.peers.len() >= self.max_peers
754 {
755 return Err(NodeError::MaxPeersExceeded {
756 max: self.max_peers,
757 });
758 }
759
760 self.authorize_peer(
761 &peer_identity,
762 PeerAclContext::OutboundConnect,
763 transport_id,
764 &remote_addr,
765 )?;
766
767 let is_connection_oriented = self
768 .transports
769 .get(&transport_id)
770 .map(|t| t.transport_type().connection_oriented)
771 .unwrap_or(false);
772
773 let link_id = self.allocate_link_id();
775
776 let link = if is_connection_oriented {
777 Link::new(
778 link_id,
779 transport_id,
780 remote_addr.clone(),
781 LinkDirection::Outbound,
782 Duration::from_millis(self.config.node.base_rtt_ms),
783 )
784 } else {
785 Link::connectionless(
786 link_id,
787 transport_id,
788 remote_addr.clone(),
789 LinkDirection::Outbound,
790 Duration::from_millis(self.config.node.base_rtt_ms),
791 )
792 };
793
794 self.links.insert(link_id, link);
795
796 self.addr_to_link
798 .insert((transport_id, remote_addr.clone()), link_id);
799
800 if is_connection_oriented {
801 if let Some(transport) = self.transports.get(&transport_id) {
803 match transport.connect(&remote_addr).await {
804 Ok(()) => {
805 debug!(
806 peer = %self.peer_display_name(&peer_node_addr),
807 transport_id = %transport_id,
808 remote_addr = %remote_addr,
809 link_id = %link_id,
810 "Transport connect initiated (non-blocking)"
811 );
812 self.pending_connects.push(super::PendingConnect {
813 link_id,
814 transport_id,
815 remote_addr,
816 peer_identity,
817 });
818 }
819 Err(e) => {
820 self.links.remove(&link_id);
822 self.addr_to_link.remove(&(transport_id, remote_addr));
823 return Err(NodeError::TransportError(e.to_string()));
824 }
825 }
826 }
827 Ok(())
828 } else {
829 self.start_handshake(link_id, transport_id, remote_addr, peer_identity)
831 .await
832 }
833 }
834
835 pub(super) async fn start_handshake(
840 &mut self,
841 link_id: LinkId,
842 transport_id: TransportId,
843 remote_addr: TransportAddr,
844 peer_identity: PeerIdentity,
845 ) -> Result<(), NodeError> {
846 let peer_node_addr = *peer_identity.node_addr();
847
848 let current_time_ms = Self::now_ms();
850 let mut connection = PeerConnection::outbound(link_id, peer_identity, current_time_ms);
851
852 let our_index = match self.index_allocator.allocate() {
854 Ok(idx) => idx,
855 Err(e) => {
856 self.links.remove(&link_id);
858 self.addr_to_link.remove(&(transport_id, remote_addr));
859 return Err(NodeError::IndexAllocationFailed(e.to_string()));
860 }
861 };
862
863 let our_keypair = self.identity.keypair();
865 let noise_msg1 =
866 match connection.start_handshake(our_keypair, self.startup_epoch, current_time_ms) {
867 Ok(msg) => msg,
868 Err(e) => {
869 let _ = self.index_allocator.free(our_index);
871 self.links.remove(&link_id);
872 self.addr_to_link.remove(&(transport_id, remote_addr));
873 return Err(NodeError::HandshakeFailed(e.to_string()));
874 }
875 };
876
877 connection.set_our_index(our_index);
879 connection.set_transport_id(transport_id);
880 connection.set_source_addr(remote_addr.clone());
881
882 let wire_msg1 = build_msg1(our_index, &noise_msg1);
884
885 debug!(
886 peer = %self.peer_display_name(&peer_node_addr),
887 transport_id = %transport_id,
888 remote_addr = %remote_addr,
889 link_id = %link_id,
890 our_index = %our_index,
891 "Connection initiated"
892 );
893
894 let resend_interval = self.config.node.rate_limit.handshake_resend_interval_ms;
896 connection.set_handshake_msg1(wire_msg1.clone(), current_time_ms + resend_interval);
897
898 self.pending_outbound
900 .insert((transport_id, our_index.as_u32()), link_id);
901 self.connections.insert(link_id, connection);
902
903 let send_result = match self.transports.get(&transport_id) {
908 Some(transport) => Some(transport.send(&remote_addr, &wire_msg1).await),
909 None => None,
910 };
911 match send_result {
912 Some(send_result) => {
913 self.note_local_send_outcome(&send_result);
914 match send_result {
915 Ok(bytes) => {
916 debug!(
917 link_id = %link_id,
918 our_index = %our_index,
919 bytes,
920 "Sent Noise handshake message 1 (wire format)"
921 );
922 }
923 Err(e) => {
924 warn!(
925 link_id = %link_id,
926 transport_id = %transport_id,
927 remote_addr = %remote_addr,
928 our_index = %our_index,
929 error = %e,
930 "Failed to send handshake message"
931 );
932 self.pending_outbound
933 .remove(&(transport_id, our_index.as_u32()));
934 self.connections.remove(&link_id);
935 self.links.remove(&link_id);
936 self.addr_to_link
937 .remove(&(transport_id, remote_addr.clone()));
938 let _ = self.index_allocator.free(our_index);
939 return Err(NodeError::TransportError(e.to_string()));
940 }
941 }
942 }
943 None => {
944 self.pending_outbound
945 .remove(&(transport_id, our_index.as_u32()));
946 self.connections.remove(&link_id);
947 self.links.remove(&link_id);
948 self.addr_to_link
949 .remove(&(transport_id, remote_addr.clone()));
950 let _ = self.index_allocator.free(our_index);
951 return Err(NodeError::TransportError(format!(
952 "transport {transport_id} disappeared before first handshake send"
953 )));
954 }
955 }
956
957 Ok(())
958 }
959
960 pub(super) async fn poll_transport_discovery(&mut self) {
966 let mut to_connect = Vec::new();
968 let mut queued_per_peer: HashMap<NodeAddr, usize> = HashMap::new();
969 let mut connect_budget = self.discovery_connect_budget();
970 let mut skipped_budget = 0usize;
971
972 for transport in self.transports.values() {
973 if !transport.is_operational() {
974 continue;
975 }
976 if !transport.auto_connect() {
977 let _ = transport.discover();
979 continue;
980 }
981 let discovered = match transport.discover() {
982 Ok(peers) => peers,
983 Err(_) => continue,
984 };
985 for peer in discovered {
986 let discovered_transport_id = peer.transport_id;
987 let pubkey = match peer.pubkey_hint {
988 Some(pk) => pk,
989 None => continue,
990 };
991 let identity = PeerIdentity::from_pubkey(pubkey);
992 let node_addr = *identity.node_addr();
993
994 if node_addr == *self.identity.node_addr() {
996 continue;
997 }
998
999 let Some((candidate_transport_id, remote_addr, transport_name)) =
1000 self.transport_discovery_candidate(discovered_transport_id, peer.addr)
1001 else {
1002 continue;
1003 };
1004
1005 if self.peers.contains_key(&node_addr) {
1006 let candidate = PeerAddress::new(transport_name, remote_addr.to_string());
1007 if self.active_peer_candidate_is_fresh_enough_to_skip(
1008 &node_addr,
1009 std::slice::from_ref(&candidate),
1010 ) {
1011 continue;
1012 }
1013 if self.is_connecting_to_peer_on_path(
1014 &node_addr,
1015 candidate_transport_id,
1016 &remote_addr,
1017 ) {
1018 continue;
1019 }
1020 let queued_for_peer = queued_per_peer.get(&node_addr).copied().unwrap_or(0);
1021 if connect_budget == 0
1022 || self
1023 .path_candidate_attempt_budget(&node_addr)
1024 .saturating_sub(queued_for_peer)
1025 == 0
1026 {
1027 skipped_budget = skipped_budget.saturating_add(1);
1028 continue;
1029 }
1030 to_connect.push((candidate_transport_id, remote_addr, identity, true));
1031 *queued_per_peer.entry(node_addr).or_default() += 1;
1032 connect_budget = connect_budget.saturating_sub(1);
1033 continue;
1034 }
1035
1036 if self.is_connecting_to_peer_on_path(
1037 &node_addr,
1038 candidate_transport_id,
1039 &remote_addr,
1040 ) {
1041 continue;
1042 }
1043
1044 let queued_for_peer = queued_per_peer.get(&node_addr).copied().unwrap_or(0);
1045 if connect_budget == 0
1046 || self
1047 .path_candidate_attempt_budget(&node_addr)
1048 .saturating_sub(queued_for_peer)
1049 == 0
1050 {
1051 skipped_budget = skipped_budget.saturating_add(1);
1052 continue;
1053 }
1054 to_connect.push((candidate_transport_id, remote_addr, identity, false));
1055 *queued_per_peer.entry(node_addr).or_default() += 1;
1056 connect_budget = connect_budget.saturating_sub(1);
1057 }
1058 }
1059
1060 if skipped_budget > 0 {
1061 debug!(
1062 skipped = skipped_budget,
1063 queued = to_connect.len(),
1064 "Transport discovery connect budget exhausted"
1065 );
1066 }
1067
1068 for (transport_id, remote_addr, identity, active_refresh) in to_connect {
1069 info!(
1070 peer = %self.peer_display_name(identity.node_addr()),
1071 transport_id = %transport_id,
1072 remote_addr = %remote_addr,
1073 active_refresh,
1074 "Auto-connecting to discovered peer"
1075 );
1076 if let Err(e) = self
1077 .initiate_connection(transport_id, remote_addr, identity)
1078 .await
1079 {
1080 warn!(error = %e, "Failed to auto-connect to discovered peer");
1081 }
1082 }
1083 }
1084
1085 pub(super) async fn poll_nostr_discovery(&mut self) {
1086 let Some(bootstrap) = self.nostr_discovery.clone() else {
1087 return;
1088 };
1089
1090 if let Err(err) = self.refresh_overlay_advert(&bootstrap).await {
1091 debug!(error = %err, "Failed to refresh local Nostr overlay advert");
1092 }
1093
1094 for event in bootstrap.drain_events().await {
1095 match event {
1096 BootstrapEvent::Established { traversal } => {
1097 let peer_npub = traversal.peer_npub.clone();
1098 match self.adopt_established_traversal(traversal).await {
1099 Ok(_) => {
1100 info!(peer_npub = %peer_npub, "Adopted NAT traversal socket");
1101 }
1102 Err(err) => {
1103 warn!(peer_npub = %peer_npub, error = %err, "Failed to adopt NAT traversal");
1104 if let Ok(peer_identity) = PeerIdentity::from_npub(&peer_npub) {
1105 self.schedule_retry(*peer_identity.node_addr(), Self::now_ms());
1106 }
1107 }
1108 }
1109 }
1110 BootstrapEvent::Failed {
1111 peer_config,
1112 reason,
1113 } => {
1114 let peer_identity = match PeerIdentity::from_npub(&peer_config.npub) {
1115 Ok(identity) => identity,
1116 Err(_) => continue,
1117 };
1118 let node_addr = *peer_identity.node_addr();
1119 if self.peers.contains_key(&node_addr) {
1120 debug!(
1121 npub = %peer_config.npub,
1122 error = %reason,
1123 "Ignoring failed NAT traversal for already-connected peer"
1124 );
1125 continue;
1126 }
1127 if self.is_connecting_to_peer(&node_addr) {
1128 debug!(
1129 npub = %peer_config.npub,
1130 error = %reason,
1131 "Ignoring failed NAT traversal while peer handshake is already in progress"
1132 );
1133 continue;
1134 }
1135
1136 let now_ms = Self::now_ms();
1137 let decision = bootstrap.record_traversal_failure(&peer_config.npub, now_ms);
1138 if decision.should_warn {
1139 warn!(
1140 npub = %peer_config.npub,
1141 error = %reason,
1142 consecutive_failures = decision.consecutive_failures,
1143 cooldown_secs = decision
1144 .cooldown_until_ms
1145 .map(|t| t.saturating_sub(now_ms) / 1000),
1146 "NAT traversal failed"
1147 );
1148 } else {
1149 debug!(
1150 npub = %peer_config.npub,
1151 error = %reason,
1152 consecutive_failures = decision.consecutive_failures,
1153 "NAT traversal failed (suppressed by warn-rate-limit)"
1154 );
1155 }
1156
1157 if decision.crossed_threshold {
1161 bootstrap
1162 .request_advert_stale_check(peer_config.npub.clone())
1163 .await;
1164 }
1165
1166 if self
1167 .try_peer_addresses(&peer_config, peer_identity, false)
1168 .await
1169 .is_ok()
1170 {
1171 continue;
1172 }
1173
1174 self.schedule_retry(node_addr, now_ms);
1175 if let Some(cooldown_until_ms) = decision.cooldown_until_ms
1176 && let Some(state) = self.retry_pending.get_mut(&node_addr)
1177 {
1178 state.retry_after_ms = state.retry_after_ms.max(cooldown_until_ms);
1182 }
1183 }
1184 }
1185 }
1186
1187 self.maybe_run_startup_open_discovery_sweep(&bootstrap)
1188 .await;
1189 self.queue_open_discovery_retries(&bootstrap).await;
1190 }
1191
1192 pub(super) fn lan_discovery_scope(&self) -> Option<String> {
1198 if let Some(scope) = self.config.node.discovery.lan.scope.as_deref() {
1199 let scope = scope.trim();
1200 if !scope.is_empty() {
1201 return Some(scope.to_string());
1202 }
1203 }
1204
1205 let app = self.config.node.discovery.nostr.app.trim();
1206 if app.is_empty() {
1207 return None;
1208 }
1209 if let Some(rest) = app.strip_prefix("fips-overlay-v1:") {
1210 let scope = rest.trim();
1211 if scope.is_empty() {
1212 None
1213 } else {
1214 Some(scope.to_string())
1215 }
1216 } else {
1217 Some(app.to_string())
1218 }
1219 }
1220
1221 pub(super) async fn poll_lan_discovery(&mut self) {
1228 let Some(runtime) = self.lan_discovery.clone() else {
1229 return;
1230 };
1231 let events = runtime.drain_events().await;
1232 if events.is_empty() {
1233 return;
1234 }
1235 let mut connect_budget = self.discovery_connect_budget();
1236 let mut skipped_budget = 0usize;
1237 for event in events {
1238 let crate::discovery::lan::LanEvent::Discovered(peer) = event;
1239 let Some((transport_id, local_addr)) =
1240 self.find_udp_transport_for_remote_addr(peer.addr)
1241 else {
1242 debug!(
1243 addr = %peer.addr,
1244 "lan: skip discovered peer with no compatible UDP transport"
1245 );
1246 continue;
1247 };
1248 let identity = match crate::PeerIdentity::from_npub(&peer.npub) {
1249 Ok(id) => id,
1250 Err(err) => {
1251 debug!(npub = %peer.npub, error = %err, "lan: skip bad npub");
1252 continue;
1253 }
1254 };
1255 let peer_node_addr = *identity.node_addr();
1256 let remote_addr = crate::transport::TransportAddr::from_string(&peer.addr.to_string());
1257 if self.peers.contains_key(&peer_node_addr) {
1258 let candidate = PeerAddress::new("udp", peer.addr.to_string());
1259 if self.active_peer_candidate_is_fresh_enough_to_skip(
1260 &peer_node_addr,
1261 std::slice::from_ref(&candidate),
1262 ) {
1263 continue;
1264 }
1265 if self.is_connecting_to_peer_on_path(&peer_node_addr, transport_id, &remote_addr) {
1266 continue;
1267 }
1268 if connect_budget == 0 || self.path_candidate_attempt_budget(&peer_node_addr) == 0 {
1269 skipped_budget = skipped_budget.saturating_add(1);
1270 continue;
1271 }
1272 info!(
1273 npub = %identity.short_npub(),
1274 addr = %peer.addr,
1275 local_addr = %local_addr,
1276 "lan: initiating alternate-path handshake to active peer"
1277 );
1278 if let Err(err) = self
1279 .initiate_connection(transport_id, remote_addr, identity)
1280 .await
1281 {
1282 debug!(
1283 npub = %peer.npub,
1284 error = %err,
1285 "lan: failed to initiate active peer alternate-path handshake"
1286 );
1287 }
1288 connect_budget = connect_budget.saturating_sub(1);
1289 continue;
1290 }
1291 if self.is_connecting_to_peer_on_path(&peer_node_addr, transport_id, &remote_addr) {
1292 continue;
1293 }
1294 if connect_budget == 0 || self.path_candidate_attempt_budget(&peer_node_addr) == 0 {
1295 skipped_budget = skipped_budget.saturating_add(1);
1296 continue;
1297 }
1298 info!(
1299 npub = %identity.short_npub(),
1300 addr = %peer.addr,
1301 local_addr = %local_addr,
1302 "lan: initiating handshake to discovered peer"
1303 );
1304 if let Err(err) = self
1305 .initiate_connection(transport_id, remote_addr, identity)
1306 .await
1307 {
1308 debug!(
1309 npub = %peer.npub,
1310 error = %err,
1311 "lan: failed to initiate connection to discovered peer"
1312 );
1313 }
1314 connect_budget = connect_budget.saturating_sub(1);
1315 }
1316 if skipped_budget > 0 {
1317 debug!(
1318 skipped = skipped_budget,
1319 "lan: discovery connect budget exhausted"
1320 );
1321 }
1322 }
1323
1324 pub(super) async fn poll_pending_connects(&mut self) {
1331 if self.pending_connects.is_empty() {
1332 return;
1333 }
1334
1335 let mut completed = Vec::new();
1336
1337 for (i, pending) in self.pending_connects.iter().enumerate() {
1338 let state = if let Some(transport) = self.transports.get(&pending.transport_id) {
1339 transport.connection_state(&pending.remote_addr)
1340 } else {
1341 crate::transport::ConnectionState::Failed("transport removed".into())
1342 };
1343
1344 match state {
1345 crate::transport::ConnectionState::Connected => {
1346 completed.push((i, true, None));
1347 }
1348 crate::transport::ConnectionState::Failed(reason) => {
1349 completed.push((i, false, Some(reason)));
1350 }
1351 crate::transport::ConnectionState::Connecting => {
1352 }
1354 crate::transport::ConnectionState::None => {
1355 completed.push((i, false, Some("no connection attempt found".into())));
1357 }
1358 }
1359 }
1360
1361 for (i, success, reason) in completed.into_iter().rev() {
1363 let pending = self.pending_connects.remove(i);
1364
1365 if success {
1366 if let Some(link) = self.links.get_mut(&pending.link_id) {
1368 link.set_connected();
1369 }
1370
1371 debug!(
1372 peer = %self.peer_display_name(pending.peer_identity.node_addr()),
1373 transport_id = %pending.transport_id,
1374 remote_addr = %pending.remote_addr,
1375 link_id = %pending.link_id,
1376 "Transport connected, starting handshake"
1377 );
1378
1379 if let Err(e) = self
1381 .start_handshake(
1382 pending.link_id,
1383 pending.transport_id,
1384 pending.remote_addr.clone(),
1385 pending.peer_identity,
1386 )
1387 .await
1388 {
1389 warn!(
1390 link_id = %pending.link_id,
1391 error = %e,
1392 "Failed to start handshake after transport connect"
1393 );
1394 self.remove_link(&pending.link_id);
1396 }
1397 } else {
1398 let reason = reason.unwrap_or_default();
1399 warn!(
1400 peer = %self.peer_display_name(pending.peer_identity.node_addr()),
1401 transport_id = %pending.transport_id,
1402 remote_addr = %pending.remote_addr,
1403 link_id = %pending.link_id,
1404 reason = %reason,
1405 "Transport connect failed"
1406 );
1407
1408 self.remove_link(&pending.link_id);
1410 self.links.remove(&pending.link_id);
1411 self.schedule_retry(*pending.peer_identity.node_addr(), Self::now_ms());
1412 }
1413 }
1414 }
1415
1416 pub async fn start(&mut self) -> Result<(), NodeError> {
1423 node_start_debug_log("Node::start begin");
1424 if !self.state.can_start() {
1425 return Err(NodeError::AlreadyStarted);
1426 }
1427 self.state = NodeState::Starting;
1428 node_start_debug_log("Node::start state set to starting");
1429
1430 let packet_buffer_size = self.config.node.buffers.packet_channel;
1432 let (packet_tx, packet_rx) = packet_channel(packet_buffer_size);
1433 self.packet_tx = Some(packet_tx.clone());
1434 self.packet_rx = Some(packet_rx);
1435 node_start_debug_log("Node::start packet channel created");
1436
1437 node_start_debug_log("Node::start create transports begin");
1439 let transport_handles = self.create_transports(&packet_tx).await;
1440 node_start_debug_log(format!(
1441 "Node::start create transports complete count={}",
1442 transport_handles.len()
1443 ));
1444
1445 for mut handle in transport_handles {
1446 let transport_id = handle.transport_id();
1447 let transport_type = handle.transport_type().name;
1448 let name = handle.name().map(|s| s.to_string());
1449
1450 node_start_debug_log(format!(
1451 "Node::start transport start begin id={} type={} name={:?}",
1452 transport_id, transport_type, name
1453 ));
1454 match handle.start().await {
1455 Ok(()) => {
1456 node_start_debug_log(format!(
1457 "Node::start transport start ok id={} type={}",
1458 transport_id, transport_type
1459 ));
1460 self.transports.insert(transport_id, handle);
1461 }
1462 Err(e) => {
1463 node_start_debug_log(format!(
1464 "Node::start transport start error id={} type={} error={}",
1465 transport_id, transport_type, e
1466 ));
1467 if let Some(ref n) = name {
1468 warn!(transport_type, name = %n, error = %e, "Transport failed to start");
1469 } else {
1470 warn!(transport_type, error = %e, "Transport failed to start");
1471 }
1472 }
1473 }
1474 }
1475
1476 if !self.transports.is_empty() {
1477 info!(count = self.transports.len(), "Transports initialized");
1478 }
1479
1480 #[cfg(unix)]
1496 {
1497 if self.config.node.worker_pools_enabled {
1498 node_start_debug_log("Node::start worker pools begin");
1499 let cpu_default = std::thread::available_parallelism()
1500 .map(|n| n.get())
1501 .unwrap_or(1)
1502 .max(1);
1503 let encrypt_worker_count: usize = std::env::var("FIPS_ENCRYPT_WORKERS")
1504 .ok()
1505 .and_then(|s| s.parse().ok())
1506 .unwrap_or(cpu_default)
1507 .max(1);
1508 self.encrypt_workers = Some(super::encrypt_worker::EncryptWorkerPool::spawn(
1509 encrypt_worker_count,
1510 ));
1511 info!(
1512 workers = encrypt_worker_count,
1513 "Spawned FMP-encrypt worker pool"
1514 );
1515
1516 let decrypt_worker_count: usize = std::env::var("FIPS_DECRYPT_WORKERS")
1525 .ok()
1526 .and_then(|s| s.parse().ok())
1527 .unwrap_or(cpu_default);
1528 if decrypt_worker_count == 0 {
1529 info!("FIPS_DECRYPT_WORKERS=0 → in-line decrypt in rx_loop (no worker pool)");
1530 } else {
1531 self.decrypt_workers = Some(super::decrypt_worker::DecryptWorkerPool::spawn(
1532 decrypt_worker_count,
1533 ));
1534 info!(
1535 workers = decrypt_worker_count,
1536 "Spawned FMP+FSP-decrypt worker pool"
1537 );
1538 }
1539 node_start_debug_log("Node::start worker pools complete");
1540 } else {
1541 node_start_debug_log("Node::start worker pools disabled");
1542 info!("FIPS worker pools disabled; using in-line crypto/send path");
1543 }
1544 }
1545
1546 if self.config.node.discovery.nostr.enabled {
1547 node_start_debug_log("Node::start nostr discovery start begin");
1548 match NostrDiscovery::start(&self.identity, self.config.node.discovery.nostr.clone())
1549 .await
1550 {
1551 Ok(runtime) => {
1552 node_start_debug_log("Node::start nostr discovery runtime created");
1553 if let Err(err) = self.refresh_overlay_advert(&runtime).await {
1554 warn!(error = %err, "Failed to publish initial Nostr overlay advert");
1555 }
1556 node_start_debug_log("Node::start nostr overlay advert refreshed");
1557 self.nostr_discovery = Some(runtime);
1558 self.nostr_discovery_started_at_ms = Some(Self::now_ms());
1559 info!("Nostr overlay discovery enabled");
1560 }
1561 Err(err) => {
1562 node_start_debug_log(format!(
1563 "Node::start nostr discovery start error error={}",
1564 err
1565 ));
1566 warn!(error = %err, "Failed to start Nostr overlay discovery");
1567 }
1568 }
1569 }
1570
1571 if self.config.node.discovery.lan.enabled {
1575 node_start_debug_log("Node::start lan discovery start begin");
1576 let advertised_udp_port = self
1577 .transports
1578 .values()
1579 .filter(|h| h.is_operational())
1580 .filter(|h| h.transport_type().name == "udp")
1581 .find_map(|h| h.local_addr().map(|addr| addr.port()))
1582 .unwrap_or(0);
1583 let scope = self.lan_discovery_scope();
1584 match crate::discovery::lan::LanDiscovery::start(
1585 &self.identity,
1586 scope,
1587 advertised_udp_port,
1588 self.config.node.discovery.lan.clone(),
1589 )
1590 .await
1591 {
1592 Ok(runtime) => {
1593 node_start_debug_log("Node::start lan discovery start ok");
1594 self.lan_discovery = Some(runtime);
1595 info!("LAN mDNS discovery enabled");
1596 }
1597 Err(err) => {
1598 node_start_debug_log(format!(
1599 "Node::start lan discovery start error error={}",
1600 err
1601 ));
1602 debug!(error = %err, "LAN mDNS discovery not started");
1603 }
1604 }
1605 }
1606
1607 node_start_debug_log("Node::start initiate peer connections begin");
1610 self.initiate_peer_connections().await;
1611 node_start_debug_log("Node::start initiate peer connections complete");
1612
1613 if self.config.tun.enabled {
1615 node_start_debug_log("Node::start tun init begin");
1616 let address = *self.identity.address();
1617 match TunDevice::create(&self.config.tun, address).await {
1618 Ok(device) => {
1619 let mtu = device.mtu();
1620 let name = device.name().to_string();
1621 let our_addr = *device.address();
1622
1623 info!("TUN device active:");
1624 info!(" name: {}", name);
1625 info!(" address: {}", device.address());
1626 info!(" mtu: {}", mtu);
1627
1628 let effective_mtu = self.effective_ipv6_mtu();
1630 let max_mss = effective_mtu.saturating_sub(40).saturating_sub(20); info!("effective MTU: {} bytes", effective_mtu);
1633 debug!(" max TCP MSS: {} bytes", max_mss);
1634
1635 #[cfg(target_os = "macos")]
1639 let (shutdown_read_fd, shutdown_write_fd) = {
1640 let mut fds = [0i32; 2];
1641 if unsafe { libc::pipe(fds.as_mut_ptr()) } < 0 {
1642 return Err(NodeError::Tun(crate::upper::tun::TunError::Configure(
1643 "failed to create shutdown pipe".into(),
1644 )));
1645 }
1646 (fds[0], fds[1])
1647 };
1648
1649 let (writer, tun_tx) =
1653 device.create_writer(max_mss, self.path_mtu_lookup.clone())?;
1654
1655 let writer_handle = thread::spawn(move || {
1657 writer.run();
1658 });
1659
1660 let reader_tun_tx = tun_tx.clone();
1662
1663 let tun_channel_size = self.config.node.buffers.tun_channel;
1665 let (outbound_tx, outbound_rx) = tokio::sync::mpsc::channel(tun_channel_size);
1666
1667 let transport_mtu = self.transport_mtu();
1669 let path_mtu_lookup = self.path_mtu_lookup.clone();
1670 #[cfg(target_os = "macos")]
1671 let reader_handle = thread::spawn(move || {
1672 run_tun_reader(
1673 device,
1674 mtu,
1675 our_addr,
1676 reader_tun_tx,
1677 outbound_tx,
1678 transport_mtu,
1679 path_mtu_lookup,
1680 shutdown_read_fd,
1681 );
1682 });
1683 #[cfg(not(target_os = "macos"))]
1684 let reader_handle = thread::spawn(move || {
1685 run_tun_reader(
1686 device,
1687 mtu,
1688 our_addr,
1689 reader_tun_tx,
1690 outbound_tx,
1691 transport_mtu,
1692 path_mtu_lookup,
1693 );
1694 });
1695
1696 self.tun_state = TunState::Active;
1697 self.tun_name = Some(name);
1698 self.tun_tx = Some(tun_tx);
1699 self.tun_outbound_rx = Some(outbound_rx);
1700 self.tun_reader_handle = Some(reader_handle);
1701 self.tun_writer_handle = Some(writer_handle);
1702 #[cfg(target_os = "macos")]
1703 {
1704 self.tun_shutdown_fd = Some(shutdown_write_fd);
1705 }
1706 }
1707 Err(e) => {
1708 self.tun_state = TunState::Failed;
1709 warn!(error = %e, "Failed to initialize TUN, continuing without it");
1710 }
1711 }
1712 node_start_debug_log("Node::start tun init complete");
1713 }
1714
1715 if self.config.dns.enabled {
1732 node_start_debug_log("Node::start dns init begin");
1733 let addr_str = self.config.dns.bind_addr();
1734 match addr_str.parse::<std::net::IpAddr>() {
1735 Ok(ip) => {
1736 let bind = std::net::SocketAddr::new(ip, self.config.dns.port());
1737 match Self::bind_dns_socket(bind) {
1738 Ok(socket) => {
1739 let dns_channel_size = self.config.node.buffers.dns_channel;
1740 let (identity_tx, identity_rx) =
1741 tokio::sync::mpsc::channel(dns_channel_size);
1742 let dns_ttl = self.config.dns.ttl();
1743 let base_hosts = crate::upper::hosts::HostMap::from_peer_configs(
1744 self.config.peers(),
1745 );
1746 let reloader = if self.config.node.system_files_enabled {
1747 let hosts_path = std::path::PathBuf::from(
1748 crate::upper::hosts::DEFAULT_HOSTS_PATH,
1749 );
1750 crate::upper::hosts::HostMapReloader::new(base_hosts, hosts_path)
1751 } else {
1752 crate::upper::hosts::HostMapReloader::memory_only(base_hosts)
1753 };
1754 let mesh_ifindex = Self::lookup_mesh_ifindex(self.config.tun.name());
1762 info!(
1763 bind = %bind,
1764 hosts = reloader.hosts().len(),
1765 mesh_ifindex = ?mesh_ifindex,
1766 "DNS responder started for .fips domain (auto-reload enabled)"
1767 );
1768 let handle = tokio::spawn(crate::upper::dns::run_dns_responder(
1769 socket,
1770 identity_tx,
1771 dns_ttl,
1772 reloader,
1773 mesh_ifindex,
1774 ));
1775 self.dns_identity_rx = Some(identity_rx);
1776 self.dns_task = Some(handle);
1777 }
1778 Err(e) => {
1779 warn!(bind = %bind, error = %e, "Failed to start DNS responder");
1780 }
1781 }
1782 }
1783 Err(e) => {
1784 warn!(addr = %addr_str, error = %e, "Invalid dns.bind_addr; DNS responder not started");
1785 }
1786 }
1787 node_start_debug_log("Node::start dns init complete");
1788 }
1789
1790 self.state = NodeState::Running;
1791 node_start_debug_log("Node::start running");
1792 info!("Node started:");
1793 info!(" state: {}", self.state);
1794 info!(" transports: {}", self.transports.len());
1795 info!(" connections: {}", self.connections.len());
1796 Ok(())
1797 }
1798
1799 fn bind_dns_socket(
1812 addr: std::net::SocketAddr,
1813 ) -> Result<tokio::net::UdpSocket, std::io::Error> {
1814 use socket2::{Domain, Protocol, Socket, Type};
1815 let domain = if addr.is_ipv4() {
1816 Domain::IPV4
1817 } else {
1818 Domain::IPV6
1819 };
1820 let sock = Socket::new(domain, Type::DGRAM, Some(Protocol::UDP))?;
1821 if addr.is_ipv6() {
1822 sock.set_only_v6(false)?;
1823 #[cfg(unix)]
1824 Self::set_recv_pktinfo_v6(&sock)?;
1825 }
1826 sock.set_nonblocking(true)?;
1827 sock.bind(&addr.into())?;
1828 tokio::net::UdpSocket::from_std(sock.into())
1829 }
1830
1831 #[cfg(unix)]
1837 fn set_recv_pktinfo_v6(sock: &socket2::Socket) -> Result<(), std::io::Error> {
1838 use std::os::fd::AsRawFd;
1839 let enable: libc::c_int = 1;
1840 let ret = unsafe {
1841 libc::setsockopt(
1842 sock.as_raw_fd(),
1843 libc::IPPROTO_IPV6,
1844 libc::IPV6_RECVPKTINFO,
1845 &enable as *const _ as *const libc::c_void,
1846 std::mem::size_of::<libc::c_int>() as libc::socklen_t,
1847 )
1848 };
1849 if ret < 0 {
1850 return Err(std::io::Error::last_os_error());
1851 }
1852 Ok(())
1853 }
1854
1855 fn lookup_mesh_ifindex(name: &str) -> Option<u32> {
1862 #[cfg(unix)]
1863 {
1864 let c_name = std::ffi::CString::new(name).ok()?;
1865 let idx = unsafe { libc::if_nametoindex(c_name.as_ptr()) };
1866 if idx == 0 { None } else { Some(idx) }
1867 }
1868 #[cfg(not(unix))]
1869 {
1870 let _ = name;
1871 None
1872 }
1873 }
1874
1875 pub async fn stop(&mut self) -> Result<(), NodeError> {
1880 if !self.state.can_stop() {
1881 return Err(NodeError::NotStarted);
1882 }
1883 self.state = NodeState::Stopping;
1884 info!(state = %self.state, "Node stopping");
1885
1886 if let Some(handle) = self.dns_task.take() {
1888 handle.abort();
1889 debug!("DNS responder stopped");
1890 }
1891
1892 self.send_disconnect_to_all_peers(DisconnectReason::Shutdown)
1894 .await;
1895
1896 if let Some(bootstrap) = self.nostr_discovery.take()
1898 && let Err(e) = bootstrap.shutdown().await
1899 {
1900 warn!(error = %e, "Failed to shutdown Nostr overlay discovery");
1901 }
1902
1903 if let Some(lan) = self.lan_discovery.take() {
1907 lan.shutdown().await;
1908 }
1909
1910 let transport_ids: Vec<_> = self.transports.keys().cloned().collect();
1912 for transport_id in transport_ids {
1913 if let Some(mut handle) = self.transports.remove(&transport_id) {
1914 let transport_type = handle.transport_type().name;
1915 match handle.stop().await {
1916 Ok(()) => {
1917 info!(transport_id = %transport_id, transport_type, "Transport stopped");
1918 }
1919 Err(e) => {
1920 warn!(
1921 transport_id = %transport_id,
1922 transport_type,
1923 error = %e,
1924 "Transport stop failed"
1925 );
1926 }
1927 }
1928 }
1929 }
1930
1931 self.packet_tx.take();
1933 self.packet_rx.take();
1934
1935 if let Some(name) = self.tun_name.take() {
1937 info!(name = %name, "Shutting down TUN interface");
1938
1939 self.tun_tx.take();
1941
1942 if let Err(e) = shutdown_tun_interface(&name).await {
1944 warn!(name = %name, error = %e, "Failed to shutdown TUN interface");
1945 }
1946
1947 #[cfg(target_os = "macos")]
1950 if let Some(fd) = self.tun_shutdown_fd.take() {
1951 unsafe {
1952 libc::write(fd, b"x".as_ptr() as *const libc::c_void, 1);
1953 libc::close(fd);
1954 }
1955 }
1956
1957 if let Some(handle) = self.tun_reader_handle.take() {
1959 let _ = handle.join();
1960 }
1961 if let Some(handle) = self.tun_writer_handle.take() {
1962 let _ = handle.join();
1963 }
1964
1965 self.tun_state = TunState::Disabled;
1966 }
1967
1968 self.state = NodeState::Stopped;
1969 info!(state = %self.state, "Node stopped");
1970 Ok(())
1971 }
1972
1973 async fn send_disconnect_to_all_peers(&mut self, reason: DisconnectReason) {
1978 let disconnect = Disconnect::new(reason);
1979 let plaintext = disconnect.encode();
1980
1981 let peer_addrs: Vec<NodeAddr> = self
1983 .peers
1984 .iter()
1985 .filter(|(_, peer)| peer.can_send() && peer.has_session())
1986 .map(|(addr, _)| *addr)
1987 .collect();
1988
1989 if peer_addrs.is_empty() {
1990 debug!(
1991 total_peers = self.peers.len(),
1992 "No sendable peers for disconnect notification"
1993 );
1994 return;
1995 }
1996
1997 let mut sent = 0usize;
1998 for node_addr in &peer_addrs {
1999 match self
2000 .send_encrypted_link_message(node_addr, &plaintext)
2001 .await
2002 {
2003 Ok(()) => sent += 1,
2004 Err(e) => {
2005 debug!(
2006 peer = %self.peer_display_name(node_addr),
2007 error = %e,
2008 "Failed to send disconnect (transport may be down)"
2009 );
2010 }
2011 }
2012 }
2013
2014 info!(sent, total = peer_addrs.len(), reason = %reason, "Sent disconnect notifications");
2015 }
2016
2017 fn static_peer_addresses(&self, peer_config: &PeerConfig) -> Vec<PeerAddress> {
2018 peer_config
2019 .addresses_by_priority()
2020 .into_iter()
2021 .cloned()
2022 .collect()
2023 }
2024
2025 async fn nostr_peer_fallback_addresses(
2026 &self,
2027 peer_config: &PeerConfig,
2028 existing: &[PeerAddress],
2029 ) -> Vec<PeerAddress> {
2030 if !self.config.node.discovery.nostr.enabled
2031 || self.config.node.discovery.nostr.policy
2032 == crate::config::NostrDiscoveryPolicy::Disabled
2033 {
2034 return Vec::new();
2035 }
2036
2037 let Some(bootstrap) = self.nostr_discovery.clone() else {
2038 return Vec::new();
2039 };
2040 let endpoints = match bootstrap
2041 .cached_advert_endpoints_for_peer(&peer_config.npub)
2042 .await
2043 {
2044 Some(endpoints) => endpoints,
2045 None => {
2046 debug!(
2047 npub = %peer_config.npub,
2048 "No cached Nostr advert endpoints for configured peer"
2049 );
2050 return Vec::new();
2051 }
2052 };
2053
2054 let mut fallback = Vec::new();
2055 let mut next_priority = existing
2056 .iter()
2057 .map(|addr| addr.priority)
2058 .max()
2059 .unwrap_or(100)
2060 .saturating_add(1);
2061 let seen_at_ms = Self::now_ms();
2066 for endpoint in endpoints {
2067 let Some(candidate) =
2068 Self::overlay_endpoint_to_peer_address(&endpoint, next_priority, seen_at_ms)
2069 else {
2070 continue;
2071 };
2072 if existing
2073 .iter()
2074 .any(|addr| addr.transport == candidate.transport && addr.addr == candidate.addr)
2075 || fallback.iter().any(|addr: &PeerAddress| {
2076 addr.transport == candidate.transport && addr.addr == candidate.addr
2077 })
2078 {
2079 continue;
2080 }
2081 fallback.push(candidate);
2082 next_priority = next_priority.saturating_add(1);
2083 }
2084 fallback
2085 }
2086
2087 async fn request_nostr_bootstrap(&self, peer_config: &PeerConfig) -> bool {
2088 if !self.config.node.discovery.nostr.enabled
2089 || self.config.node.discovery.nostr.policy
2090 == crate::config::NostrDiscoveryPolicy::Disabled
2091 {
2092 return false;
2093 }
2094 let Some(bootstrap) = self.nostr_discovery.clone() else {
2095 return false;
2096 };
2097 bootstrap.request_connect(peer_config.clone()).await;
2098 info!(npub = %peer_config.npub, "Started background Nostr UDP NAT traversal attempt");
2099 true
2100 }
2101
2102 fn overlay_endpoint_to_peer_address(
2103 endpoint: &OverlayEndpointAdvert,
2104 priority: u8,
2105 seen_at_ms: u64,
2106 ) -> Option<PeerAddress> {
2107 let transport = match endpoint.transport {
2108 OverlayTransportKind::Udp => "udp",
2109 OverlayTransportKind::Tcp => "tcp",
2110 OverlayTransportKind::Tor => "tor",
2111 };
2112 Some(
2113 PeerAddress::with_priority(transport, endpoint.addr.clone(), priority)
2114 .with_seen_at_ms(seen_at_ms),
2115 )
2116 }
2117
2118 async fn attempt_peer_address_list(
2119 &mut self,
2120 peer_config: &PeerConfig,
2121 peer_identity: PeerIdentity,
2122 allow_bootstrap_nat: bool,
2123 addresses: &[PeerAddress],
2124 ) -> Result<(), NodeError> {
2125 let mut attempted = false;
2126 let peer_node_addr = *peer_identity.node_addr();
2127 let mut concrete_budget = self.path_candidate_attempt_budget(&peer_node_addr);
2128
2129 for addr in addresses {
2130 if addr.transport == "udp" && addr.addr.eq_ignore_ascii_case("nat") {
2131 if !allow_bootstrap_nat {
2132 continue;
2133 }
2134 if self.request_nostr_bootstrap(peer_config).await {
2135 attempted = true;
2136 continue;
2137 }
2138 debug!(npub = %peer_config.npub, "No Nostr overlay runtime for udp:nat address");
2139 continue;
2140 }
2141
2142 let (transport_id, remote_addr) = if addr.transport == "ethernet" {
2143 match self.resolve_ethernet_addr(&addr.addr) {
2144 Ok(result) => result,
2145 Err(e) => {
2146 debug!(
2147 transport = %addr.transport,
2148 addr = %addr.addr,
2149 error = %e,
2150 "Failed to resolve Ethernet address"
2151 );
2152 continue;
2153 }
2154 }
2155 } else if addr.transport == "ble" {
2156 #[cfg(bluer_available)]
2157 {
2158 match self.resolve_ble_addr(&addr.addr) {
2159 Ok(result) => result,
2160 Err(e) => {
2161 debug!(
2162 transport = %addr.transport,
2163 addr = %addr.addr,
2164 error = %e,
2165 "Failed to resolve BLE address"
2166 );
2167 continue;
2168 }
2169 }
2170 }
2171 #[cfg(not(bluer_available))]
2172 {
2173 debug!(transport = %addr.transport, "BLE transport not available on this build");
2174 continue;
2175 }
2176 } else {
2177 let tid = if addr.transport == "udp"
2178 && let Ok(remote_socket_addr) = addr.addr.parse::<SocketAddr>()
2179 {
2180 match self.find_udp_transport_for_remote_addr(remote_socket_addr) {
2181 Some((id, _)) => id,
2182 None => {
2183 debug!(
2184 transport = %addr.transport,
2185 addr = %addr.addr,
2186 "No compatible operational UDP transport for address"
2187 );
2188 continue;
2189 }
2190 }
2191 } else {
2192 match self.find_transport_for_type(&addr.transport) {
2193 Some(id) => id,
2194 None => {
2195 debug!(
2196 transport = %addr.transport,
2197 addr = %addr.addr,
2198 "No operational transport for address type"
2199 );
2200 continue;
2201 }
2202 }
2203 };
2204 (tid, TransportAddr::from_string(&addr.addr))
2205 };
2206
2207 if self.is_connecting_to_peer_on_path(&peer_node_addr, transport_id, &remote_addr) {
2208 attempted = true;
2209 debug!(
2210 npub = %peer_config.npub,
2211 transport_id = %transport_id,
2212 remote_addr = %remote_addr,
2213 "Skipping duplicate in-flight candidate path"
2214 );
2215 continue;
2216 }
2217
2218 if concrete_budget == 0 {
2219 debug!(
2220 npub = %peer_config.npub,
2221 max_candidates = MAX_PARALLEL_PATH_CANDIDATES_PER_PEER,
2222 "Path candidate race budget exhausted"
2223 );
2224 break;
2225 }
2226
2227 match self
2228 .initiate_connection(transport_id, remote_addr, peer_identity)
2229 .await
2230 {
2231 Ok(()) => {
2232 attempted = true;
2233 concrete_budget = concrete_budget.saturating_sub(1);
2234 }
2235 Err(e @ NodeError::AccessDenied(_)) => return Err(e),
2236 Err(e) => {
2237 debug!(
2238 npub = %peer_config.npub,
2239 transport_id = %transport_id,
2240 error = %e,
2241 "Connection attempt failed, trying next address"
2242 );
2243 }
2244 }
2245 }
2246
2247 if attempted {
2248 return Ok(());
2249 }
2250
2251 Err(NodeError::NoTransportForType(format!(
2252 "no operational transport for any of {}'s addresses",
2253 peer_config.npub
2254 )))
2255 }
2256
2257 async fn queue_open_discovery_retries(&mut self, bootstrap: &std::sync::Arc<NostrDiscovery>) {
2258 self.run_open_discovery_sweep(bootstrap, None, "per-tick")
2259 .await;
2260 }
2261
2262 pub(in crate::node) async fn run_open_discovery_sweep(
2273 &mut self,
2274 bootstrap: &std::sync::Arc<NostrDiscovery>,
2275 max_age_secs: Option<u64>,
2276 caller: &'static str,
2277 ) {
2278 if !self.config.node.discovery.nostr.enabled
2279 || self.config.node.discovery.nostr.policy != crate::config::NostrDiscoveryPolicy::Open
2280 {
2281 return;
2282 }
2283
2284 let configured_npubs = self
2285 .config
2286 .peers()
2287 .iter()
2288 .map(|peer| peer.npub.clone())
2289 .collect::<HashSet<_>>();
2290 let now_ms = Self::now_ms();
2291 let now_secs = now_ms / 1000;
2292 let mut enqueue_budget = self.open_discovery_enqueue_budget(&configured_npubs);
2293 if enqueue_budget == 0 {
2294 debug!(
2295 caller = %caller,
2296 "open-discovery sweep: enqueue budget is 0, skipping"
2297 );
2298 return;
2299 }
2300
2301 let candidates = bootstrap.cached_open_discovery_candidates(64).await;
2302 let cached_count = candidates.len();
2303 let mut enqueued = 0usize;
2304 let mut skipped_age = 0usize;
2305 let mut skipped_configured = 0usize;
2306 let mut skipped_self = 0usize;
2307 let mut skipped_connected = 0usize;
2308 let mut skipped_retry_pending = 0usize;
2309 let mut skipped_connecting = 0usize;
2310 let mut skipped_no_endpoints = 0usize;
2311 let mut skipped_invalid_npub = 0usize;
2312 let mut skipped_cooldown = 0usize;
2313
2314 for (npub, endpoints, created_at_secs) in candidates {
2315 if enqueue_budget == 0 {
2316 break;
2317 }
2318
2319 if let Some(max_age) = max_age_secs
2320 && now_secs.saturating_sub(created_at_secs) > max_age
2321 {
2322 skipped_age = skipped_age.saturating_add(1);
2323 continue;
2324 }
2325
2326 if configured_npubs.contains(&npub) {
2327 if let Ok(identity) = PeerIdentity::from_npub(&npub) {
2347 let configured_addr = *identity.node_addr();
2348 if let Some(state) = self.retry_pending.get_mut(&configured_addr)
2349 && state.retry_after_ms > now_ms
2350 {
2351 state.retry_after_ms = now_ms;
2352 debug!(
2353 caller = %caller,
2354 peer = %self.peer_display_name(&configured_addr),
2355 advert_age_secs = now_secs.saturating_sub(created_at_secs),
2356 "Expediting configured-peer retry after fresh overlay advert"
2357 );
2358 }
2359 }
2360 skipped_configured = skipped_configured.saturating_add(1);
2361 continue;
2362 }
2363
2364 let peer_identity = match PeerIdentity::from_npub(&npub) {
2365 Ok(identity) => identity,
2366 Err(_) => {
2367 skipped_invalid_npub = skipped_invalid_npub.saturating_add(1);
2368 continue;
2369 }
2370 };
2371 let node_addr = *peer_identity.node_addr();
2372 if node_addr == *self.identity.node_addr() {
2373 skipped_self = skipped_self.saturating_add(1);
2374 continue;
2375 }
2376 if self.peers.contains_key(&node_addr) {
2377 skipped_connected = skipped_connected.saturating_add(1);
2378 continue;
2379 }
2380 if self.retry_pending.contains_key(&node_addr) {
2381 skipped_retry_pending = skipped_retry_pending.saturating_add(1);
2382 continue;
2383 }
2384 if bootstrap.cooldown_until(&npub, now_ms).is_some() {
2385 skipped_cooldown = skipped_cooldown.saturating_add(1);
2386 continue;
2387 }
2388 let connecting = self.connections.values().any(|conn| {
2389 conn.expected_identity()
2390 .map(|id| id.node_addr() == &node_addr)
2391 .unwrap_or(false)
2392 });
2393 if connecting {
2394 skipped_connecting = skipped_connecting.saturating_add(1);
2395 continue;
2396 }
2397
2398 let mut addresses = Vec::new();
2399 let mut priority = 120u8;
2400 let seen_at_ms = Self::now_ms();
2401 for endpoint in endpoints {
2402 let Some(candidate) =
2403 Self::overlay_endpoint_to_peer_address(&endpoint, priority, seen_at_ms)
2404 else {
2405 continue;
2406 };
2407 if addresses.iter().any(|existing: &PeerAddress| {
2408 existing.transport == candidate.transport && existing.addr == candidate.addr
2409 }) {
2410 continue;
2411 }
2412 addresses.push(candidate);
2413 priority = priority.saturating_add(1);
2414 }
2415 if addresses.is_empty() {
2416 skipped_no_endpoints = skipped_no_endpoints.saturating_add(1);
2417 continue;
2418 }
2419
2420 self.peer_aliases
2421 .entry(node_addr)
2422 .or_insert_with(|| peer_identity.short_npub());
2423 self.register_identity(node_addr, peer_identity.pubkey_full());
2424
2425 let mut state = super::retry::RetryState::new(PeerConfig {
2426 npub: npub.clone(),
2427 alias: None,
2428 addresses,
2429 connect_policy: ConnectPolicy::AutoConnect,
2430 auto_reconnect: true,
2431 discovery_fallback_transit: false,
2432 });
2433 state.reconnect = false;
2434 state.retry_after_ms = now_ms;
2435 state.expires_at_ms = Some(self.open_discovery_retry_expires_at_ms(now_ms));
2436 self.retry_pending.insert(node_addr, state);
2437 info!(
2438 caller = %caller,
2439 peer = %peer_identity.short_npub(),
2440 advert_age_secs = now_secs.saturating_sub(created_at_secs),
2441 "open-discovery sweep: queued retry for cached advert"
2442 );
2443 enqueue_budget = enqueue_budget.saturating_sub(1);
2444 enqueued = enqueued.saturating_add(1);
2445 }
2446
2447 let total_skipped = skipped_age
2451 + skipped_configured
2452 + skipped_self
2453 + skipped_connected
2454 + skipped_retry_pending
2455 + skipped_connecting
2456 + skipped_no_endpoints
2457 + skipped_invalid_npub
2458 + skipped_cooldown;
2459 let should_summarize = caller == "startup" || enqueued > 0;
2460 if should_summarize {
2461 info!(
2462 caller = %caller,
2463 cached = cached_count,
2464 queued = enqueued,
2465 skipped_age = skipped_age,
2466 skipped_configured = skipped_configured,
2467 skipped_self = skipped_self,
2468 skipped_connected = skipped_connected,
2469 skipped_retry_pending = skipped_retry_pending,
2470 skipped_connecting = skipped_connecting,
2471 skipped_no_endpoints = skipped_no_endpoints,
2472 skipped_invalid_npub = skipped_invalid_npub,
2473 skipped_cooldown = skipped_cooldown,
2474 skipped_total = total_skipped,
2475 "open-discovery sweep complete"
2476 );
2477 }
2478 }
2479
2480 async fn maybe_run_startup_open_discovery_sweep(
2488 &mut self,
2489 bootstrap: &std::sync::Arc<NostrDiscovery>,
2490 ) {
2491 if self.startup_open_discovery_sweep_done {
2492 return;
2493 }
2494 if !self.config.node.discovery.nostr.enabled
2495 || self.config.node.discovery.nostr.policy != crate::config::NostrDiscoveryPolicy::Open
2496 {
2497 self.startup_open_discovery_sweep_done = true;
2499 return;
2500 }
2501 let Some(started_at_ms) = self.nostr_discovery_started_at_ms else {
2502 return;
2503 };
2504 let now_ms = Self::now_ms();
2505 let delay_ms = self
2506 .config
2507 .node
2508 .discovery
2509 .nostr
2510 .startup_sweep_delay_secs
2511 .saturating_mul(1000);
2512 if now_ms < started_at_ms.saturating_add(delay_ms) {
2513 return;
2514 }
2515
2516 let max_age_secs = self.config.node.discovery.nostr.startup_sweep_max_age_secs;
2517 self.run_open_discovery_sweep(bootstrap, Some(max_age_secs), "startup")
2518 .await;
2519 self.startup_open_discovery_sweep_done = true;
2520 }
2521
2522 fn available_outbound_slots(&self) -> usize {
2523 let connection_used = self
2524 .connections
2525 .len()
2526 .saturating_add(self.pending_connects.len());
2527 let connection_slots = if self.max_connections == 0 {
2528 usize::MAX
2529 } else {
2530 self.max_connections.saturating_sub(connection_used)
2531 };
2532
2533 let peer_slots = if self.max_peers == 0 {
2534 usize::MAX
2535 } else {
2536 self.max_peers.saturating_sub(self.peers.len())
2537 };
2538
2539 connection_slots.min(peer_slots)
2540 }
2541
2542 fn open_discovery_enqueue_budget(&self, configured_npubs: &HashSet<String>) -> usize {
2543 let current_open_discovery_pending = self
2544 .retry_pending
2545 .values()
2546 .filter(|state| !configured_npubs.contains(&state.peer_config.npub))
2547 .count();
2548
2549 let cap_remaining = self
2550 .config
2551 .node
2552 .discovery
2553 .nostr
2554 .open_discovery_max_pending
2555 .saturating_sub(current_open_discovery_pending);
2556
2557 cap_remaining.min(self.available_outbound_slots())
2558 }
2559
2560 fn open_discovery_retry_expires_at_ms(&self, now_ms: u64) -> u64 {
2561 now_ms.saturating_add(
2562 self.config
2563 .node
2564 .discovery
2565 .nostr
2566 .advert_ttl_secs
2567 .saturating_mul(1000)
2568 .saturating_mul(OPEN_DISCOVERY_RETRY_LIFETIME_MULTIPLIER),
2569 )
2570 }
2571
2572 async fn build_overlay_advert(
2573 &self,
2574 bootstrap: &std::sync::Arc<NostrDiscovery>,
2575 ) -> Option<OverlayAdvert> {
2576 if !self.config.node.discovery.nostr.enabled {
2577 return None;
2578 }
2579
2580 let mut endpoints = Vec::new();
2581 let mut has_udp_nat = false;
2582
2583 for handle in self.transports.values() {
2584 if !handle.is_operational() {
2585 continue;
2586 }
2587
2588 match handle.transport_type().name {
2589 "udp" => {
2590 let Some(cfg) = self.lookup_udp_config(handle.name()) else {
2591 continue;
2592 };
2593 if !cfg.advertise_on_nostr() {
2594 continue;
2595 }
2596 if cfg.is_public() {
2597 if let Some(explicit) = cfg.external_advert_addr() {
2607 endpoints.push(OverlayEndpointAdvert {
2608 transport: OverlayTransportKind::Udp,
2609 addr: explicit.to_string(),
2610 });
2611 } else {
2612 match handle.local_addr() {
2613 Some(addr)
2614 if !addr.ip().is_unspecified()
2615 && !is_unroutable_advert_ip(addr.ip()) =>
2616 {
2617 endpoints.push(OverlayEndpointAdvert {
2618 transport: OverlayTransportKind::Udp,
2619 addr: addr.to_string(),
2620 });
2621 }
2622 Some(addr) => {
2623 let key = handle.transport_id().as_u32();
2624 let port = addr.port();
2625 if let Some(public) =
2626 bootstrap.learn_public_udp_addr(key, port).await
2627 {
2628 endpoints.push(OverlayEndpointAdvert {
2629 transport: OverlayTransportKind::Udp,
2630 addr: public.to_string(),
2631 });
2632 } else {
2633 warn!(
2634 transport_id = key,
2635 bind_addr = %addr,
2636 "advert: udp public=true but bind is wildcard \
2637 or private and STUN observation failed; \
2638 advertising no UDP endpoint. Either set \
2639 transports.udp.external_addr, bind to a \
2640 specific *public* IP, or ensure \
2641 node.discovery.nostr.stun_servers is reachable"
2642 );
2643 }
2644 }
2645 None => {}
2646 }
2647 }
2648 } else {
2649 endpoints.push(OverlayEndpointAdvert {
2650 transport: OverlayTransportKind::Udp,
2651 addr: "nat".to_string(),
2652 });
2653 has_udp_nat = true;
2654 }
2655 }
2656 "tcp" => {
2657 let Some(cfg) = self.lookup_tcp_config(handle.name()) else {
2658 continue;
2659 };
2660 if !cfg.advertise_on_nostr() {
2661 continue;
2662 }
2663 if let Some(explicit) = cfg.external_advert_addr() {
2675 endpoints.push(OverlayEndpointAdvert {
2676 transport: OverlayTransportKind::Tcp,
2677 addr: explicit.to_string(),
2678 });
2679 } else {
2680 match handle.local_addr() {
2681 Some(addr)
2682 if !addr.ip().is_unspecified()
2683 && !is_unroutable_advert_ip(addr.ip()) =>
2684 {
2685 endpoints.push(OverlayEndpointAdvert {
2686 transport: OverlayTransportKind::Tcp,
2687 addr: addr.to_string(),
2688 });
2689 }
2690 Some(addr) => {
2691 warn!(
2692 bind_addr = %addr,
2693 "advert: tcp advertise_on_nostr=true bound to wildcard \
2694 or private IP and no transports.tcp.external_addr set; \
2695 advertising no TCP endpoint. Either set external_addr \
2696 to the public IP (recommended for cloud 1:1-NAT setups) \
2697 or bind explicitly to the public IP"
2698 );
2699 }
2700 None => {}
2701 }
2702 }
2703 }
2704 "tor" => {
2705 let Some(cfg) = self.lookup_tor_config(handle.name()) else {
2706 continue;
2707 };
2708 if !cfg.advertise_on_nostr() {
2709 continue;
2710 }
2711 if let Some(addr) = handle.onion_address() {
2712 endpoints.push(OverlayEndpointAdvert {
2713 transport: OverlayTransportKind::Tor,
2714 addr: format!("{}:{}", addr, cfg.advertised_port()),
2715 });
2716 }
2717 }
2718 _ => {}
2719 }
2720 }
2721
2722 if endpoints.is_empty() {
2723 return None;
2724 }
2725
2726 Some(OverlayAdvert {
2727 identifier: ADVERT_IDENTIFIER.to_string(),
2728 version: ADVERT_VERSION,
2729 endpoints,
2730 signal_relays: has_udp_nat.then(|| self.config.node.discovery.nostr.dm_relays.clone()),
2731 stun_servers: has_udp_nat
2732 .then(|| self.config.node.discovery.nostr.stun_servers.clone()),
2733 })
2734 }
2735
2736 async fn refresh_overlay_advert(
2737 &self,
2738 bootstrap: &std::sync::Arc<NostrDiscovery>,
2739 ) -> Result<(), crate::discovery::nostr::BootstrapError> {
2740 let advert = self.build_overlay_advert(bootstrap).await;
2741 bootstrap.update_local_advert(advert).await
2742 }
2743
2744 fn lookup_udp_config(&self, transport_name: Option<&str>) -> Option<&crate::config::UdpConfig> {
2745 match (&self.config.transports.udp, transport_name) {
2746 (crate::config::TransportInstances::Single(cfg), None) => Some(cfg),
2747 (crate::config::TransportInstances::Named(configs), Some(name)) => configs.get(name),
2748 _ => None,
2749 }
2750 }
2751
2752 fn lookup_tcp_config(&self, transport_name: Option<&str>) -> Option<&crate::config::TcpConfig> {
2753 match (&self.config.transports.tcp, transport_name) {
2754 (crate::config::TransportInstances::Single(cfg), None) => Some(cfg),
2755 (crate::config::TransportInstances::Named(configs), Some(name)) => configs.get(name),
2756 _ => None,
2757 }
2758 }
2759
2760 fn lookup_tor_config(&self, transport_name: Option<&str>) -> Option<&crate::config::TorConfig> {
2761 match (&self.config.transports.tor, transport_name) {
2762 (crate::config::TransportInstances::Single(cfg), None) => Some(cfg),
2763 (crate::config::TransportInstances::Named(configs), Some(name)) => configs.get(name),
2764 _ => None,
2765 }
2766 }
2767
2768 pub(in crate::node) async fn try_peer_addresses(
2769 &mut self,
2770 peer_config: &PeerConfig,
2771 peer_identity: PeerIdentity,
2772 allow_bootstrap_nat: bool,
2773 ) -> Result<(), NodeError> {
2774 let peer_node_addr = *peer_identity.node_addr();
2775 if self.peers.contains_key(&peer_node_addr) {
2776 debug!(
2777 npub = %peer_config.npub,
2778 "Peer already exists, skipping address attempts"
2779 );
2780 return Ok(());
2781 }
2782
2783 let candidates = self.peer_address_candidates(peer_config).await;
2784
2785 if candidates.is_empty() {
2786 if allow_bootstrap_nat && self.request_nostr_bootstrap(peer_config).await {
2787 return Ok(());
2788 }
2789 return Err(NodeError::NoTransportForType(format!(
2790 "no addresses known for {}",
2791 peer_config.npub
2792 )));
2793 }
2794
2795 if self
2796 .attempt_peer_address_list(peer_config, peer_identity, allow_bootstrap_nat, &candidates)
2797 .await
2798 .is_ok()
2799 {
2800 return Ok(());
2801 }
2802
2803 Err(NodeError::NoTransportForType(format!(
2804 "no operational transport for any of {}'s addresses",
2805 peer_config.npub
2806 )))
2807 }
2808
2809 async fn try_active_peer_alternative_addresses(
2810 &mut self,
2811 peer_config: &PeerConfig,
2812 peer_identity: PeerIdentity,
2813 ) -> Result<bool, NodeError> {
2814 let peer_node_addr = *peer_identity.node_addr();
2815 let candidates = self.peer_address_candidates(peer_config).await;
2816
2817 if candidates.is_empty() {
2818 return Err(NodeError::NoTransportForType(format!(
2819 "no addresses known for {}",
2820 peer_config.npub
2821 )));
2822 }
2823
2824 let alternatives: Vec<_> = candidates
2825 .into_iter()
2826 .filter(|addr| !(addr.transport == "udp" && addr.addr.eq_ignore_ascii_case("nat")))
2827 .filter(|addr| !self.active_peer_matches_candidate(&peer_node_addr, addr))
2828 .collect();
2829
2830 if alternatives.is_empty() {
2831 return Err(NodeError::NoTransportForType(format!(
2832 "no concrete alternate addresses known for {}",
2833 peer_config.npub
2834 )));
2835 }
2836
2837 self.attempt_peer_address_list(peer_config, peer_identity, false, &alternatives)
2838 .await?;
2839 Ok(true)
2840 }
2841
2842 async fn peer_address_candidates(&self, peer_config: &PeerConfig) -> Vec<PeerAddress> {
2843 let static_addresses = self.static_peer_addresses(peer_config);
2853 let overlay_addresses = self
2854 .nostr_peer_fallback_addresses(peer_config, &static_addresses)
2855 .await;
2856
2857 let mut candidates = Vec::with_capacity(overlay_addresses.len() + static_addresses.len());
2858 for addr in overlay_addresses.into_iter().chain(static_addresses) {
2859 if !candidates.iter().any(|existing: &PeerAddress| {
2860 existing.transport == addr.transport && existing.addr == addr.addr
2861 }) {
2862 candidates.push(addr);
2863 }
2864 }
2865
2866 candidates.sort_by(|a, b| match (a.seen_at_ms, b.seen_at_ms) {
2871 (Some(a_ts), Some(b_ts)) => b_ts.cmp(&a_ts),
2872 (Some(_), None) => std::cmp::Ordering::Less,
2873 (None, Some(_)) => std::cmp::Ordering::Greater,
2874 (None, None) => std::cmp::Ordering::Equal,
2875 });
2876
2877 candidates
2878 }
2879
2880 fn active_peer_matches_any_candidate(
2881 &self,
2882 peer_node_addr: &NodeAddr,
2883 candidates: &[PeerAddress],
2884 ) -> bool {
2885 candidates
2886 .iter()
2887 .any(|candidate| self.active_peer_matches_candidate(peer_node_addr, candidate))
2888 }
2889
2890 pub(in crate::node) fn active_peer_candidate_is_fresh_enough_to_skip(
2891 &self,
2892 peer_node_addr: &NodeAddr,
2893 candidates: &[PeerAddress],
2894 ) -> bool {
2895 if !self.active_peer_matches_any_candidate(peer_node_addr, candidates) {
2896 return false;
2897 }
2898 !self.active_peer_needs_same_path_refresh(peer_node_addr)
2899 }
2900
2901 fn active_peer_needs_same_path_refresh(&self, peer_node_addr: &NodeAddr) -> bool {
2902 let Some(peer) = self.peers.get(peer_node_addr) else {
2903 return false;
2904 };
2905 let stale_after_ms = self
2906 .config
2907 .node
2908 .heartbeat_interval_secs
2909 .saturating_mul(1000)
2910 .max(1000);
2911 peer.idle_time(Self::now_ms()) > stale_after_ms
2912 }
2913
2914 fn active_peer_matches_candidate(
2915 &self,
2916 peer_node_addr: &NodeAddr,
2917 candidate: &PeerAddress,
2918 ) -> bool {
2919 let Some(peer) = self.peers.get(peer_node_addr) else {
2920 return false;
2921 };
2922 let Some(current_addr) = peer.current_addr() else {
2923 return false;
2924 };
2925 if peer
2926 .transport_id()
2927 .map(|id| self.bootstrap_transports.contains(&id))
2928 .unwrap_or(false)
2929 {
2930 return false;
2931 }
2932 let current_addr = current_addr.to_string();
2933 let current_transport = peer
2934 .transport_id()
2935 .and_then(|id| self.transports.get(&id))
2936 .map(|transport| transport.transport_type().name);
2937
2938 candidate.addr == current_addr
2939 && current_transport
2940 .map(|transport| transport == candidate.transport)
2941 .unwrap_or(true)
2942 }
2943
2944 pub(crate) async fn api_connect(
2952 &mut self,
2953 npub: &str,
2954 address: &str,
2955 transport: &str,
2956 ) -> Result<serde_json::Value, String> {
2957 let peer_config = PeerConfig {
2958 npub: npub.to_string(),
2959 alias: None,
2960 addresses: vec![PeerAddress::new(transport, address)],
2961 connect_policy: ConnectPolicy::Manual,
2962 auto_reconnect: false,
2963 discovery_fallback_transit: true,
2964 };
2965
2966 if let Ok(identity) = PeerIdentity::from_npub(npub) {
2968 self.peer_aliases
2969 .insert(*identity.node_addr(), identity.short_npub());
2970 self.register_identity(*identity.node_addr(), identity.pubkey_full());
2971 }
2972
2973 self.initiate_peer_connection(&peer_config)
2974 .await
2975 .map(|()| {
2976 info!(
2977 npub = %npub,
2978 address = %address,
2979 transport = %transport,
2980 "API connect initiated"
2981 );
2982 serde_json::json!({
2983 "npub": npub,
2984 "address": address,
2985 "transport": transport,
2986 })
2987 })
2988 .map_err(|e| e.to_string())
2989 }
2990
2991 pub(crate) fn api_disconnect(&mut self, npub: &str) -> Result<serde_json::Value, String> {
2995 let peer_identity =
2996 PeerIdentity::from_npub(npub).map_err(|e| format!("invalid npub '{npub}': {e}"))?;
2997 let node_addr = *peer_identity.node_addr();
2998
2999 if !self.peers.contains_key(&node_addr) {
3000 return Err(format!("peer not found: {npub}"));
3001 }
3002
3003 self.remove_active_peer(&node_addr);
3005
3006 self.retry_pending.remove(&node_addr);
3008
3009 info!(npub = %npub, "API disconnect completed");
3010
3011 Ok(serde_json::json!({
3012 "npub": npub,
3013 "disconnected": true,
3014 }))
3015 }
3016
3017 pub async fn adopt_established_traversal(
3024 &mut self,
3025 traversal: EstablishedTraversal,
3026 ) -> Result<BootstrapHandoffResult, NodeError> {
3027 debug!(
3028 peer_npub = %traversal.peer_npub,
3029 session_id = %traversal.session_id,
3030 remote_addr = %traversal.remote_addr,
3031 "adopting established traversal socket"
3032 );
3033
3034 if !self.state.is_operational() {
3035 return Err(NodeError::NotStarted);
3036 }
3037
3038 let packet_tx = self.packet_tx.clone().ok_or(NodeError::NotStarted)?;
3039 let peer_identity = PeerIdentity::from_npub(&traversal.peer_npub).map_err(|e| {
3040 NodeError::InvalidPeerNpub {
3041 npub: traversal.peer_npub.clone(),
3042 reason: e.to_string(),
3043 }
3044 })?;
3045 let peer_node_addr = *peer_identity.node_addr();
3046 if self.peers.contains_key(&peer_node_addr) {
3047 debug!(
3048 peer_npub = %traversal.peer_npub,
3049 "Adopting NAT traversal handoff as alternate path for already-connected peer"
3050 );
3051 }
3052
3053 self.peer_aliases
3054 .insert(peer_node_addr, peer_identity.short_npub());
3055 self.register_identity(peer_node_addr, peer_identity.pubkey_full());
3056
3057 let transport_id = self.allocate_transport_id();
3058 let inherited_config = traversal.transport_config.clone().unwrap_or_else(|| {
3078 let mut cfg = self
3079 .lookup_udp_config(traversal.transport_name.as_deref())
3080 .or_else(|| self.lookup_udp_config(None))
3081 .cloned()
3082 .unwrap_or_default();
3083 cfg.bind_addr = None;
3084 cfg.external_addr = None;
3085 cfg
3086 });
3087 let mut transport = crate::transport::udp::UdpTransport::new(
3088 transport_id,
3089 traversal.transport_name.clone(),
3090 inherited_config,
3091 packet_tx,
3092 );
3093
3094 transport
3095 .adopt_socket_async(traversal.socket)
3096 .await
3097 .map_err(|e| NodeError::BootstrapHandoff(e.to_string()))?;
3098
3099 let local_addr = transport.local_addr().ok_or_else(|| {
3100 NodeError::BootstrapHandoff("adopted UDP transport has no local address".into())
3101 })?;
3102
3103 self.transports.insert(
3104 transport_id,
3105 crate::transport::TransportHandle::Udp(transport),
3106 );
3107 self.bootstrap_transports.insert(transport_id);
3108 self.bootstrap_transport_npubs
3109 .insert(transport_id, traversal.peer_npub.clone());
3110
3111 let remote_addr = TransportAddr::from_string(&traversal.remote_addr.to_string());
3112 if let Err(err) = self
3113 .initiate_connection(transport_id, remote_addr.clone(), peer_identity)
3114 .await
3115 {
3116 self.bootstrap_transports.remove(&transport_id);
3117 self.bootstrap_transport_npubs.remove(&transport_id);
3118 if let Some(mut handle) = self.transports.remove(&transport_id) {
3119 let _ = handle.stop().await;
3120 }
3121 return Err(err);
3122 }
3123
3124 info!(
3125 peer = %self.peer_display_name(&peer_node_addr),
3126 transport_id = %transport_id,
3127 local_addr = %local_addr,
3128 remote_addr = %traversal.remote_addr,
3129 session_id = %traversal.session_id,
3130 "adopted NAT traversal socket; handshake initiated"
3131 );
3132
3133 Ok(BootstrapHandoffResult {
3134 transport_id,
3135 local_addr,
3136 remote_addr: traversal.remote_addr,
3137 peer_node_addr,
3138 session_id: traversal.session_id,
3139 })
3140 }
3141}