1use super::{Node, NodeError, NodeState};
4use crate::config::{ConnectPolicy, PeerAddress, PeerConfig};
5use crate::discovery::nostr::{
6 ADVERT_IDENTIFIER, ADVERT_VERSION, BootstrapEvent, NostrDiscovery, OverlayAdvert,
7 OverlayEndpointAdvert, OverlayTransportKind,
8};
9use crate::discovery::{BootstrapHandoffResult, EstablishedTraversal};
10use crate::node::acl::PeerAclContext;
11use crate::node::wire::build_msg1;
12use crate::peer::PeerConnection;
13use crate::protocol::{Disconnect, DisconnectReason};
14use crate::transport::{Link, LinkDirection, LinkId, TransportAddr, TransportId, packet_channel};
15use crate::upper::tun::{TunDevice, TunState, run_tun_reader, shutdown_tun_interface};
16use crate::{NodeAddr, PeerIdentity};
17use std::collections::{HashMap, HashSet};
18use std::net::{IpAddr, SocketAddr};
19use std::thread;
20use std::time::Duration;
21use tracing::{debug, info, warn};
22
23#[cfg(debug_assertions)]
24fn node_start_debug_log(message: impl AsRef<str>) {
25 use std::io::Write as _;
26
27 if let Ok(mut file) = std::fs::OpenOptions::new()
28 .create(true)
29 .append(true)
30 .open(std::env::temp_dir().join("nvpn-fips-endpoint-debug.log"))
31 {
32 let _ = writeln!(
33 file,
34 "{:?} {}",
35 std::time::SystemTime::now(),
36 message.as_ref()
37 );
38 }
39}
40
41#[cfg(not(debug_assertions))]
42fn node_start_debug_log(_message: impl AsRef<str>) {}
43
44fn is_unroutable_advert_ip(ip: IpAddr) -> bool {
52 match ip {
53 IpAddr::V4(v4) => {
54 v4.is_private()
55 || v4.is_loopback()
56 || v4.is_link_local()
57 || v4.is_unspecified()
58 || v4.is_multicast()
59 || v4.is_broadcast()
60 || v4.is_documentation()
61 || (v4.octets()[0] == 100 && (v4.octets()[1] & 0xc0) == 64)
64 }
65 IpAddr::V6(v6) => {
66 v6.is_loopback()
67 || v6.is_unspecified()
68 || v6.is_unique_local()
69 || v6.is_multicast()
70 || (v6.segments()[0] & 0xffc0) == 0xfe80
72 }
73 }
74}
75
76fn socket_addr_families_compatible(local: SocketAddr, remote: SocketAddr) -> bool {
77 matches!(
78 (local, remote),
79 (SocketAddr::V4(_), SocketAddr::V4(_)) | (SocketAddr::V6(_), SocketAddr::V6(_))
80 )
81}
82
83const OPEN_DISCOVERY_RETRY_LIFETIME_MULTIPLIER: u64 = 2;
84const MAX_PARALLEL_PATH_CANDIDATES_PER_PEER: usize = 4;
85const MAX_AUTO_CONNECT_GRAPH_WARMUPS_PER_TICK: usize = 16;
86const MAX_DISCOVERY_CONNECTS_PER_TICK: usize = 16;
87
88impl Node {
89 pub(super) async fn update_peers(
101 &mut self,
102 new_peers: Vec<crate::config::PeerConfig>,
103 ) -> Result<crate::node::UpdatePeersOutcome, crate::node::NodeError> {
104 use std::collections::{HashMap, HashSet};
105
106 let mut new_by_addr: HashMap<crate::identity::NodeAddr, crate::config::PeerConfig> =
107 HashMap::with_capacity(new_peers.len());
108 for peer in new_peers {
109 let identity = match PeerIdentity::from_npub(&peer.npub) {
110 Ok(id) => id,
111 Err(e) => {
112 return Err(crate::node::NodeError::InvalidPeerNpub {
113 npub: peer.npub.clone(),
114 reason: e.to_string(),
115 });
116 }
117 };
118 new_by_addr.insert(*identity.node_addr(), peer);
122 }
123
124 let current_by_addr: HashMap<crate::identity::NodeAddr, crate::config::PeerConfig> = self
125 .config
126 .peers()
127 .iter()
128 .filter_map(|pc| {
129 PeerIdentity::from_npub(&pc.npub)
130 .ok()
131 .map(|id| (*id.node_addr(), pc.clone()))
132 })
133 .collect();
134
135 let new_addrs: HashSet<_> = new_by_addr.keys().copied().collect();
136 let current_addrs: HashSet<_> = current_by_addr.keys().copied().collect();
137
138 let removed: Vec<_> = current_addrs.difference(&new_addrs).copied().collect();
139 let added: Vec<_> = new_addrs.difference(¤t_addrs).copied().collect();
140 let kept: Vec<_> = new_addrs.intersection(¤t_addrs).copied().collect();
141
142 let mut outcome = crate::node::UpdatePeersOutcome::default();
143
144 for node_addr in &removed {
145 if self.retry_pending.remove(node_addr).is_some() {
146 debug!(
147 peer = %self.peer_display_name(node_addr),
148 "Dropping retry entry for peer removed from runtime peer list"
149 );
150 }
151 self.peer_aliases.remove(node_addr);
152 outcome.removed += 1;
153 }
154
155 let mut auto_connect_refresh_configs = Vec::new();
156 for node_addr in &kept {
157 let new_pc = &new_by_addr[node_addr];
158 let current_pc = ¤t_by_addr[node_addr];
159 if new_pc.addresses != current_pc.addresses
160 || new_pc.alias != current_pc.alias
161 || new_pc.connect_policy != current_pc.connect_policy
162 || new_pc.auto_reconnect != current_pc.auto_reconnect
163 {
164 outcome.updated += 1;
165 if let Some(state) = self.retry_pending.get_mut(node_addr) {
166 state.peer_config = new_pc.clone();
167 state.retry_after_ms = Self::now_ms();
168 }
169 if let Some(alias) = new_pc.alias.clone() {
170 self.peer_aliases.insert(*node_addr, alias);
171 }
172 if new_pc.is_auto_connect() && !new_pc.addresses.is_empty() {
173 auto_connect_refresh_configs.push(new_pc.clone());
174 }
175 } else {
176 outcome.unchanged += 1;
177 if new_pc.is_auto_connect() && !new_pc.addresses.is_empty() {
178 auto_connect_refresh_configs.push(new_pc.clone());
179 }
180 }
181 }
182
183 let added_configs: Vec<crate::config::PeerConfig> =
184 added.iter().map(|addr| new_by_addr[addr].clone()).collect();
185
186 self.config.peers = new_by_addr.into_values().collect();
190
191 for peer_config in added_configs {
192 outcome.added += 1;
193 let Ok(identity) = PeerIdentity::from_npub(&peer_config.npub) else {
194 continue;
195 };
196 let name = peer_config
197 .alias
198 .clone()
199 .unwrap_or_else(|| identity.short_npub());
200 self.peer_aliases.insert(*identity.node_addr(), name);
201 self.register_identity(*identity.node_addr(), identity.pubkey_full());
202
203 if !peer_config.is_auto_connect() {
204 continue;
205 }
206
207 if let Err(e) = self.initiate_peer_connection(&peer_config).await {
208 warn!(
209 npub = %peer_config.npub,
210 error = %e,
211 "Failed to initiate connection for newly added peer"
212 );
213 if let Ok(peer_identity) = PeerIdentity::from_npub(&peer_config.npub) {
214 self.schedule_retry(*peer_identity.node_addr(), Self::now_ms());
215 }
216 if matches!(e, crate::node::NodeError::NoTransportForType(_))
217 && let Some(bootstrap) = self.nostr_discovery.clone()
218 {
219 bootstrap
220 .request_advert_stale_check(peer_config.npub.clone())
221 .await;
222 }
223 }
224 }
225
226 for peer_config in auto_connect_refresh_configs {
227 let Ok(peer_identity) = PeerIdentity::from_npub(&peer_config.npub) else {
228 continue;
229 };
230 let node_addr = *peer_identity.node_addr();
231
232 if self.peers.contains_key(&node_addr) {
233 match self
234 .initiate_active_peer_alternative_connection(&peer_config)
235 .await
236 {
237 Ok(attempted) => {
238 if attempted {
239 debug!(
240 peer = %self.peer_display_name(&node_addr),
241 "Started non-disruptive alternate-path handshake for active peer"
242 );
243 }
244 }
245 Err(e) => {
246 debug!(
247 npub = %peer_config.npub,
248 error = %e,
249 "Active peer alternate-path refresh did not start"
250 );
251 }
252 }
253 continue;
254 }
255
256 match self.initiate_peer_connection(&peer_config).await {
257 Ok(()) => {
258 let hs_timeout_ms = self.config.node.rate_limit.handshake_timeout_secs * 1000;
259 if let Some(state) = self.retry_pending.get_mut(&node_addr) {
260 state.peer_config = peer_config;
261 state.retry_after_ms = Self::now_ms().saturating_add(hs_timeout_ms);
262 }
263 }
264 Err(e) => {
265 debug!(
266 npub = %peer_config.npub,
267 error = %e,
268 "Refreshed peer addresses did not initiate a direct connection"
269 );
270 self.schedule_retry(node_addr, Self::now_ms());
271 }
272 }
273 }
274
275 self.warm_auto_connect_graph_sessions().await;
276
277 Ok(outcome)
278 }
279
280 pub(super) async fn initiate_peer_connections(&mut self) {
281 let peer_identities: Vec<(PeerIdentity, Option<String>)> = self
287 .config
288 .peers()
289 .iter()
290 .filter_map(|pc| {
291 PeerIdentity::from_npub(&pc.npub)
292 .ok()
293 .map(|id| (id, pc.alias.clone()))
294 })
295 .collect();
296
297 for (identity, alias) in peer_identities {
298 let name = alias.unwrap_or_else(|| identity.short_npub());
299 self.peer_aliases.insert(*identity.node_addr(), name);
300 self.register_identity(*identity.node_addr(), identity.pubkey_full());
304 }
305
306 let peer_configs: Vec<_> = self.config.auto_connect_peers().cloned().collect();
308
309 if peer_configs.is_empty() {
310 debug!("No static peers configured");
311 return;
312 }
313
314 debug!(
315 count = peer_configs.len(),
316 "Initiating static peer connections"
317 );
318
319 for peer_config in peer_configs {
320 if let Err(e) = self.initiate_peer_connection(&peer_config).await {
321 warn!(
322 npub = %peer_config.npub,
323 alias = ?peer_config.alias,
324 error = %e,
325 "Failed to initiate peer connection"
326 );
327 if let Ok(peer_identity) = PeerIdentity::from_npub(&peer_config.npub) {
331 self.schedule_retry(*peer_identity.node_addr(), Self::now_ms());
332 }
333 if matches!(e, crate::node::NodeError::NoTransportForType(_))
339 && let Some(bootstrap) = self.nostr_discovery.clone()
340 {
341 bootstrap
342 .request_advert_stale_check(peer_config.npub.clone())
343 .await;
344 }
345 }
346 }
347
348 self.warm_auto_connect_graph_sessions().await;
349 }
350
351 pub(super) async fn initiate_peer_connection(
355 &mut self,
356 peer_config: &crate::config::PeerConfig,
357 ) -> Result<(), NodeError> {
358 self.initiate_peer_connection_inner(peer_config).await
359 }
360
361 pub(super) async fn initiate_peer_retry_connection(
367 &mut self,
368 peer_config: &crate::config::PeerConfig,
369 ) -> Result<(), NodeError> {
370 self.initiate_peer_connection_inner(peer_config).await
371 }
372
373 async fn initiate_active_peer_alternative_connection(
374 &mut self,
375 peer_config: &crate::config::PeerConfig,
376 ) -> Result<bool, NodeError> {
377 let peer_identity =
378 PeerIdentity::from_npub(&peer_config.npub).map_err(|e| NodeError::InvalidPeerNpub {
379 npub: peer_config.npub.clone(),
380 reason: e.to_string(),
381 })?;
382 let peer_node_addr = *peer_identity.node_addr();
383
384 if !self.peers.contains_key(&peer_node_addr) {
385 self.initiate_peer_connection(peer_config).await?;
386 return Ok(true);
387 }
388
389 self.try_active_peer_alternative_addresses(peer_config, peer_identity)
395 .await
396 }
397
398 async fn initiate_peer_connection_inner(
399 &mut self,
400 peer_config: &crate::config::PeerConfig,
401 ) -> Result<(), NodeError> {
402 let peer_identity =
404 PeerIdentity::from_npub(&peer_config.npub).map_err(|e| NodeError::InvalidPeerNpub {
405 npub: peer_config.npub.clone(),
406 reason: e.to_string(),
407 })?;
408
409 let peer_node_addr = *peer_identity.node_addr();
410
411 if self.peers.contains_key(&peer_node_addr) {
413 debug!(
414 npub = %peer_config.npub,
415 "Peer already exists, skipping"
416 );
417 return Ok(());
418 }
419
420 self.try_peer_addresses(peer_config, peer_identity, true)
421 .await
422 }
423
424 fn is_connecting_to_peer(&self, peer_node_addr: &NodeAddr) -> bool {
425 self.connections.values().any(|conn| {
426 conn.expected_identity()
427 .map(|id| id.node_addr() == peer_node_addr)
428 .unwrap_or(false)
429 })
430 }
431
432 fn is_connecting_to_peer_on_path(
433 &self,
434 peer_node_addr: &NodeAddr,
435 transport_id: TransportId,
436 remote_addr: &TransportAddr,
437 ) -> bool {
438 self.connections.values().any(|conn| {
439 conn.expected_identity()
440 .map(|id| id.node_addr() == peer_node_addr)
441 .unwrap_or(false)
442 && conn.transport_id() == Some(transport_id)
443 && conn.source_addr() == Some(remote_addr)
444 }) || self.pending_connects.iter().any(|pending| {
445 pending.peer_identity.node_addr() == peer_node_addr
446 && pending.transport_id == transport_id
447 && &pending.remote_addr == remote_addr
448 })
449 }
450
451 pub(in crate::node) fn should_warm_auto_connect_session(
452 &self,
453 peer_node_addr: &NodeAddr,
454 ) -> bool {
455 if self.peers.contains_key(peer_node_addr)
456 || self
457 .sessions
458 .get(peer_node_addr)
459 .is_some_and(|entry| entry.is_established())
460 {
461 return false;
462 }
463
464 self.config.peers().iter().any(|peer| {
465 peer.is_auto_connect()
466 && PeerIdentity::from_npub(&peer.npub)
467 .map(|identity| identity.node_addr() == peer_node_addr)
468 .unwrap_or(false)
469 })
470 }
471
472 pub(in crate::node) async fn warm_auto_connect_graph_sessions(&mut self) -> usize {
473 if !self.peers.values().any(|peer| peer.can_send()) {
474 return 0;
475 }
476
477 let mut budget = self.graph_session_warmup_budget();
478 if budget == 0 {
479 return 0;
480 }
481
482 let peer_identities: Vec<_> = self
483 .config
484 .auto_connect_peers()
485 .filter_map(|peer| PeerIdentity::from_npub(&peer.npub).ok())
486 .collect();
487
488 let mut warmed = 0;
489 for identity in peer_identities {
490 if budget == 0 {
491 break;
492 }
493
494 let peer_node_addr = *identity.node_addr();
495 if peer_node_addr == *self.identity.node_addr()
496 || !self.should_warm_auto_connect_session(&peer_node_addr)
497 || self
498 .sessions
499 .get(&peer_node_addr)
500 .is_some_and(|entry| entry.is_initiating())
501 {
502 continue;
503 }
504
505 self.register_identity(peer_node_addr, identity.pubkey_full());
506
507 if self.find_next_hop(&peer_node_addr).is_some() {
508 match self
509 .initiate_session(peer_node_addr, identity.pubkey_full())
510 .await
511 {
512 Ok(()) => {
513 warmed += 1;
514 budget = budget.saturating_sub(1);
515 debug!(
516 peer = %self.peer_display_name(&peer_node_addr),
517 "Warmed auto-connect peer session over existing FIPS graph"
518 );
519 }
520 Err(NodeError::SendFailed { node_addr, reason })
521 if node_addr == peer_node_addr && reason == "no route to destination" =>
522 {
523 self.maybe_initiate_lookup(&peer_node_addr).await;
524 warmed += 1;
525 budget = budget.saturating_sub(1);
526 }
527 Err(err) => {
528 debug!(
529 peer = %self.peer_display_name(&peer_node_addr),
530 error = %err,
531 "Failed to warm auto-connect peer session"
532 );
533 }
534 }
535 } else {
536 self.maybe_initiate_lookup(&peer_node_addr).await;
537 warmed += 1;
538 budget = budget.saturating_sub(1);
539 }
540 }
541
542 warmed
543 }
544
545 pub(in crate::node) fn graph_session_warmup_budget(&self) -> usize {
546 let max_destinations = self.config.node.session.pending_max_destinations;
547 if max_destinations == 0 {
548 return 0;
549 }
550
551 let pending_sessions = self
552 .sessions
553 .values()
554 .filter(|entry| !entry.is_established())
555 .count();
556 let pending_total = pending_sessions.saturating_add(self.pending_lookups.len());
557 max_destinations
558 .saturating_sub(pending_total)
559 .min(MAX_AUTO_CONNECT_GRAPH_WARMUPS_PER_TICK)
560 }
561
562 fn outbound_handshake_slots(&self) -> usize {
563 let used = self
564 .connections
565 .len()
566 .saturating_add(self.pending_connects.len());
567 if self.max_connections == 0 {
568 usize::MAX
569 } else {
570 self.max_connections.saturating_sub(used)
571 }
572 }
573
574 fn outbound_link_slots(&self) -> usize {
575 if self.max_links == 0 {
576 usize::MAX
577 } else {
578 self.max_links.saturating_sub(self.links.len())
579 }
580 }
581
582 fn path_candidate_attempt_budget(&self, peer_node_addr: &NodeAddr) -> usize {
583 if !self.peers.contains_key(peer_node_addr)
584 && self.max_peers > 0
585 && self.peers.len() >= self.max_peers
586 {
587 return 0;
588 }
589
590 let in_flight_for_peer = self
591 .connections
592 .values()
593 .filter(|conn| {
594 conn.expected_identity()
595 .map(|id| id.node_addr() == peer_node_addr)
596 .unwrap_or(false)
597 })
598 .count()
599 .saturating_add(
600 self.pending_connects
601 .iter()
602 .filter(|pending| pending.peer_identity.node_addr() == peer_node_addr)
603 .count(),
604 );
605
606 self.outbound_handshake_slots()
607 .min(self.outbound_link_slots())
608 .min(MAX_PARALLEL_PATH_CANDIDATES_PER_PEER.saturating_sub(in_flight_for_peer))
609 }
610
611 fn discovery_connect_budget(&self) -> usize {
612 self.outbound_handshake_slots()
613 .min(self.outbound_link_slots())
614 .min(MAX_DISCOVERY_CONNECTS_PER_TICK)
615 }
616
617 fn find_udp_transport_for_remote_addr(
624 &self,
625 remote_addr: SocketAddr,
626 ) -> Option<(TransportId, SocketAddr)> {
627 self.transports
628 .iter()
629 .filter(|(id, handle)| {
630 handle.transport_type().name == "udp"
631 && handle.is_operational()
632 && !self.bootstrap_transports.contains(id)
633 })
634 .filter_map(|(id, handle)| {
635 let local_addr = handle.local_addr()?;
636 socket_addr_families_compatible(local_addr, remote_addr)
637 .then_some((*id, local_addr))
638 })
639 .min_by_key(|(id, _)| id.as_u32())
640 }
641
642 pub(super) fn transport_discovery_candidate(
643 &self,
644 discovered_transport_id: TransportId,
645 discovered_addr: TransportAddr,
646 ) -> Option<(TransportId, TransportAddr, &'static str)> {
647 let transport = self.transports.get(&discovered_transport_id)?;
648 let transport_name = transport.transport_type().name;
649
650 if transport_name != "udp" {
651 return Some((discovered_transport_id, discovered_addr, transport_name));
652 }
653
654 let Some(remote_socket_addr) = discovered_addr
655 .as_str()
656 .and_then(|addr| addr.parse::<SocketAddr>().ok())
657 else {
658 if self.bootstrap_transports.contains(&discovered_transport_id) {
659 debug!(
660 transport_id = %discovered_transport_id,
661 remote_addr = %discovered_addr,
662 "transport discovery: skip non-numeric UDP address from bootstrap transport"
663 );
664 return None;
665 }
666 return Some((discovered_transport_id, discovered_addr, transport_name));
667 };
668
669 let Some((transport_id, local_addr)) =
670 self.find_udp_transport_for_remote_addr(remote_socket_addr)
671 else {
672 debug!(
673 transport_id = %discovered_transport_id,
674 remote_addr = %discovered_addr,
675 "transport discovery: skip UDP peer with no compatible local socket"
676 );
677 return None;
678 };
679
680 if transport_id != discovered_transport_id {
681 debug!(
682 discovered_transport_id = %discovered_transport_id,
683 selected_transport_id = %transport_id,
684 local_addr = %local_addr,
685 remote_addr = %remote_socket_addr,
686 "transport discovery: selected compatible UDP transport"
687 );
688 }
689
690 Some((
691 transport_id,
692 TransportAddr::from_socket_addr(remote_socket_addr),
693 transport_name,
694 ))
695 }
696
697 pub(super) async fn initiate_connection(
708 &mut self,
709 transport_id: TransportId,
710 remote_addr: TransportAddr,
711 peer_identity: PeerIdentity,
712 ) -> Result<(), NodeError> {
713 let peer_node_addr = *peer_identity.node_addr();
714
715 if self.is_connecting_to_peer_on_path(&peer_node_addr, transport_id, &remote_addr) {
716 debug!(
717 peer = %self.peer_display_name(&peer_node_addr),
718 transport_id = %transport_id,
719 remote_addr = %remote_addr,
720 "Connection already in progress for candidate path"
721 );
722 return Ok(());
723 }
724
725 if self.outbound_handshake_slots() == 0 {
726 return Err(NodeError::MaxConnectionsExceeded {
727 max: self.max_connections,
728 });
729 }
730
731 if self.outbound_link_slots() == 0 {
732 return Err(NodeError::MaxLinksExceeded {
733 max: self.max_links,
734 });
735 }
736
737 if !self.peers.contains_key(&peer_node_addr)
738 && self.max_peers > 0
739 && self.peers.len() >= self.max_peers
740 {
741 return Err(NodeError::MaxPeersExceeded {
742 max: self.max_peers,
743 });
744 }
745
746 self.authorize_peer(
747 &peer_identity,
748 PeerAclContext::OutboundConnect,
749 transport_id,
750 &remote_addr,
751 )?;
752
753 let is_connection_oriented = self
754 .transports
755 .get(&transport_id)
756 .map(|t| t.transport_type().connection_oriented)
757 .unwrap_or(false);
758
759 let link_id = self.allocate_link_id();
761
762 let link = if is_connection_oriented {
763 Link::new(
764 link_id,
765 transport_id,
766 remote_addr.clone(),
767 LinkDirection::Outbound,
768 Duration::from_millis(self.config.node.base_rtt_ms),
769 )
770 } else {
771 Link::connectionless(
772 link_id,
773 transport_id,
774 remote_addr.clone(),
775 LinkDirection::Outbound,
776 Duration::from_millis(self.config.node.base_rtt_ms),
777 )
778 };
779
780 self.links.insert(link_id, link);
781
782 self.addr_to_link
784 .insert((transport_id, remote_addr.clone()), link_id);
785
786 if is_connection_oriented {
787 if let Some(transport) = self.transports.get(&transport_id) {
789 match transport.connect(&remote_addr).await {
790 Ok(()) => {
791 debug!(
792 peer = %self.peer_display_name(&peer_node_addr),
793 transport_id = %transport_id,
794 remote_addr = %remote_addr,
795 link_id = %link_id,
796 "Transport connect initiated (non-blocking)"
797 );
798 self.pending_connects.push(super::PendingConnect {
799 link_id,
800 transport_id,
801 remote_addr,
802 peer_identity,
803 });
804 }
805 Err(e) => {
806 self.links.remove(&link_id);
808 self.addr_to_link.remove(&(transport_id, remote_addr));
809 return Err(NodeError::TransportError(e.to_string()));
810 }
811 }
812 }
813 Ok(())
814 } else {
815 self.start_handshake(link_id, transport_id, remote_addr, peer_identity)
817 .await
818 }
819 }
820
821 pub(super) async fn start_handshake(
826 &mut self,
827 link_id: LinkId,
828 transport_id: TransportId,
829 remote_addr: TransportAddr,
830 peer_identity: PeerIdentity,
831 ) -> Result<(), NodeError> {
832 let peer_node_addr = *peer_identity.node_addr();
833
834 let current_time_ms = Self::now_ms();
836 let mut connection = PeerConnection::outbound(link_id, peer_identity, current_time_ms);
837
838 let our_index = match self.index_allocator.allocate() {
840 Ok(idx) => idx,
841 Err(e) => {
842 self.links.remove(&link_id);
844 self.addr_to_link.remove(&(transport_id, remote_addr));
845 return Err(NodeError::IndexAllocationFailed(e.to_string()));
846 }
847 };
848
849 let our_keypair = self.identity.keypair();
851 let noise_msg1 =
852 match connection.start_handshake(our_keypair, self.startup_epoch, current_time_ms) {
853 Ok(msg) => msg,
854 Err(e) => {
855 let _ = self.index_allocator.free(our_index);
857 self.links.remove(&link_id);
858 self.addr_to_link.remove(&(transport_id, remote_addr));
859 return Err(NodeError::HandshakeFailed(e.to_string()));
860 }
861 };
862
863 connection.set_our_index(our_index);
865 connection.set_transport_id(transport_id);
866 connection.set_source_addr(remote_addr.clone());
867
868 let wire_msg1 = build_msg1(our_index, &noise_msg1);
870
871 debug!(
872 peer = %self.peer_display_name(&peer_node_addr),
873 transport_id = %transport_id,
874 remote_addr = %remote_addr,
875 link_id = %link_id,
876 our_index = %our_index,
877 "Connection initiated"
878 );
879
880 let resend_interval = self.config.node.rate_limit.handshake_resend_interval_ms;
882 connection.set_handshake_msg1(wire_msg1.clone(), current_time_ms + resend_interval);
883
884 self.pending_outbound
886 .insert((transport_id, our_index.as_u32()), link_id);
887 self.connections.insert(link_id, connection);
888
889 let send_result = match self.transports.get(&transport_id) {
894 Some(transport) => Some(transport.send(&remote_addr, &wire_msg1).await),
895 None => None,
896 };
897 match send_result {
898 Some(send_result) => {
899 self.note_local_send_outcome(&send_result);
900 match send_result {
901 Ok(bytes) => {
902 debug!(
903 link_id = %link_id,
904 our_index = %our_index,
905 bytes,
906 "Sent Noise handshake message 1 (wire format)"
907 );
908 }
909 Err(e) => {
910 warn!(
911 link_id = %link_id,
912 transport_id = %transport_id,
913 remote_addr = %remote_addr,
914 our_index = %our_index,
915 error = %e,
916 "Failed to send handshake message"
917 );
918 self.pending_outbound
919 .remove(&(transport_id, our_index.as_u32()));
920 self.connections.remove(&link_id);
921 self.links.remove(&link_id);
922 self.addr_to_link
923 .remove(&(transport_id, remote_addr.clone()));
924 let _ = self.index_allocator.free(our_index);
925 return Err(NodeError::TransportError(e.to_string()));
926 }
927 }
928 }
929 None => {
930 self.pending_outbound
931 .remove(&(transport_id, our_index.as_u32()));
932 self.connections.remove(&link_id);
933 self.links.remove(&link_id);
934 self.addr_to_link
935 .remove(&(transport_id, remote_addr.clone()));
936 let _ = self.index_allocator.free(our_index);
937 return Err(NodeError::TransportError(format!(
938 "transport {transport_id} disappeared before first handshake send"
939 )));
940 }
941 }
942
943 Ok(())
944 }
945
946 pub(super) async fn poll_transport_discovery(&mut self) {
952 let mut to_connect = Vec::new();
954 let mut queued_per_peer: HashMap<NodeAddr, usize> = HashMap::new();
955 let mut connect_budget = self.discovery_connect_budget();
956 let mut skipped_budget = 0usize;
957
958 for transport in self.transports.values() {
959 if !transport.is_operational() {
960 continue;
961 }
962 if !transport.auto_connect() {
963 let _ = transport.discover();
965 continue;
966 }
967 let discovered = match transport.discover() {
968 Ok(peers) => peers,
969 Err(_) => continue,
970 };
971 for peer in discovered {
972 let discovered_transport_id = peer.transport_id;
973 let pubkey = match peer.pubkey_hint {
974 Some(pk) => pk,
975 None => continue,
976 };
977 let identity = PeerIdentity::from_pubkey(pubkey);
978 let node_addr = *identity.node_addr();
979
980 if node_addr == *self.identity.node_addr() {
982 continue;
983 }
984
985 let Some((candidate_transport_id, remote_addr, transport_name)) =
986 self.transport_discovery_candidate(discovered_transport_id, peer.addr)
987 else {
988 continue;
989 };
990
991 if self.peers.contains_key(&node_addr) {
992 let candidate = PeerAddress::new(transport_name, remote_addr.to_string());
993 if self.active_peer_candidate_is_fresh_enough_to_skip(
994 &node_addr,
995 std::slice::from_ref(&candidate),
996 ) {
997 continue;
998 }
999 if self.is_connecting_to_peer_on_path(
1000 &node_addr,
1001 candidate_transport_id,
1002 &remote_addr,
1003 ) {
1004 continue;
1005 }
1006 let queued_for_peer = queued_per_peer.get(&node_addr).copied().unwrap_or(0);
1007 if connect_budget == 0
1008 || self
1009 .path_candidate_attempt_budget(&node_addr)
1010 .saturating_sub(queued_for_peer)
1011 == 0
1012 {
1013 skipped_budget = skipped_budget.saturating_add(1);
1014 continue;
1015 }
1016 to_connect.push((candidate_transport_id, remote_addr, identity, true));
1017 *queued_per_peer.entry(node_addr).or_default() += 1;
1018 connect_budget = connect_budget.saturating_sub(1);
1019 continue;
1020 }
1021
1022 if self.is_connecting_to_peer_on_path(
1023 &node_addr,
1024 candidate_transport_id,
1025 &remote_addr,
1026 ) {
1027 continue;
1028 }
1029
1030 let queued_for_peer = queued_per_peer.get(&node_addr).copied().unwrap_or(0);
1031 if connect_budget == 0
1032 || self
1033 .path_candidate_attempt_budget(&node_addr)
1034 .saturating_sub(queued_for_peer)
1035 == 0
1036 {
1037 skipped_budget = skipped_budget.saturating_add(1);
1038 continue;
1039 }
1040 to_connect.push((candidate_transport_id, remote_addr, identity, false));
1041 *queued_per_peer.entry(node_addr).or_default() += 1;
1042 connect_budget = connect_budget.saturating_sub(1);
1043 }
1044 }
1045
1046 if skipped_budget > 0 {
1047 debug!(
1048 skipped = skipped_budget,
1049 queued = to_connect.len(),
1050 "Transport discovery connect budget exhausted"
1051 );
1052 }
1053
1054 for (transport_id, remote_addr, identity, active_refresh) in to_connect {
1055 info!(
1056 peer = %self.peer_display_name(identity.node_addr()),
1057 transport_id = %transport_id,
1058 remote_addr = %remote_addr,
1059 active_refresh,
1060 "Auto-connecting to discovered peer"
1061 );
1062 if let Err(e) = self
1063 .initiate_connection(transport_id, remote_addr, identity)
1064 .await
1065 {
1066 warn!(error = %e, "Failed to auto-connect to discovered peer");
1067 }
1068 }
1069 }
1070
1071 pub(super) async fn poll_nostr_discovery(&mut self) {
1072 let Some(bootstrap) = self.nostr_discovery.clone() else {
1073 return;
1074 };
1075
1076 if let Err(err) = self.refresh_overlay_advert(&bootstrap).await {
1077 debug!(error = %err, "Failed to refresh local Nostr overlay advert");
1078 }
1079
1080 for event in bootstrap.drain_events().await {
1081 match event {
1082 BootstrapEvent::Established { traversal } => {
1083 let peer_npub = traversal.peer_npub.clone();
1084 match self.adopt_established_traversal(traversal).await {
1085 Ok(_) => {
1086 info!(peer_npub = %peer_npub, "Adopted NAT traversal socket");
1087 }
1088 Err(err) => {
1089 warn!(peer_npub = %peer_npub, error = %err, "Failed to adopt NAT traversal");
1090 if let Ok(peer_identity) = PeerIdentity::from_npub(&peer_npub) {
1091 self.schedule_retry(*peer_identity.node_addr(), Self::now_ms());
1092 }
1093 }
1094 }
1095 }
1096 BootstrapEvent::Failed {
1097 peer_config,
1098 reason,
1099 } => {
1100 let peer_identity = match PeerIdentity::from_npub(&peer_config.npub) {
1101 Ok(identity) => identity,
1102 Err(_) => continue,
1103 };
1104 let node_addr = *peer_identity.node_addr();
1105 if self.peers.contains_key(&node_addr) {
1106 debug!(
1107 npub = %peer_config.npub,
1108 error = %reason,
1109 "Ignoring failed NAT traversal for already-connected peer"
1110 );
1111 continue;
1112 }
1113 if self.is_connecting_to_peer(&node_addr) {
1114 debug!(
1115 npub = %peer_config.npub,
1116 error = %reason,
1117 "Ignoring failed NAT traversal while peer handshake is already in progress"
1118 );
1119 continue;
1120 }
1121
1122 let now_ms = Self::now_ms();
1123 let decision = bootstrap.record_traversal_failure(&peer_config.npub, now_ms);
1124 if decision.should_warn {
1125 warn!(
1126 npub = %peer_config.npub,
1127 error = %reason,
1128 consecutive_failures = decision.consecutive_failures,
1129 cooldown_secs = decision
1130 .cooldown_until_ms
1131 .map(|t| t.saturating_sub(now_ms) / 1000),
1132 "NAT traversal failed"
1133 );
1134 } else {
1135 debug!(
1136 npub = %peer_config.npub,
1137 error = %reason,
1138 consecutive_failures = decision.consecutive_failures,
1139 "NAT traversal failed (suppressed by warn-rate-limit)"
1140 );
1141 }
1142
1143 if decision.crossed_threshold {
1147 bootstrap
1148 .request_advert_stale_check(peer_config.npub.clone())
1149 .await;
1150 }
1151
1152 if self
1153 .try_peer_addresses(&peer_config, peer_identity, false)
1154 .await
1155 .is_ok()
1156 {
1157 continue;
1158 }
1159
1160 self.schedule_retry(node_addr, now_ms);
1161 if let Some(cooldown_until_ms) = decision.cooldown_until_ms
1162 && let Some(state) = self.retry_pending.get_mut(&node_addr)
1163 {
1164 state.retry_after_ms = state.retry_after_ms.max(cooldown_until_ms);
1168 }
1169 }
1170 }
1171 }
1172
1173 self.maybe_run_startup_open_discovery_sweep(&bootstrap)
1174 .await;
1175 self.queue_open_discovery_retries(&bootstrap).await;
1176 }
1177
1178 pub(super) fn lan_discovery_scope(&self) -> Option<String> {
1184 if let Some(scope) = self.config.node.discovery.lan.scope.as_deref() {
1185 let scope = scope.trim();
1186 if !scope.is_empty() {
1187 return Some(scope.to_string());
1188 }
1189 }
1190
1191 let app = self.config.node.discovery.nostr.app.trim();
1192 if app.is_empty() {
1193 return None;
1194 }
1195 if let Some(rest) = app.strip_prefix("fips-overlay-v1:") {
1196 let scope = rest.trim();
1197 if scope.is_empty() {
1198 None
1199 } else {
1200 Some(scope.to_string())
1201 }
1202 } else {
1203 Some(app.to_string())
1204 }
1205 }
1206
1207 pub(super) async fn poll_lan_discovery(&mut self) {
1214 let Some(runtime) = self.lan_discovery.clone() else {
1215 return;
1216 };
1217 let events = runtime.drain_events().await;
1218 if events.is_empty() {
1219 return;
1220 }
1221 let mut connect_budget = self.discovery_connect_budget();
1222 let mut skipped_budget = 0usize;
1223 for event in events {
1224 let crate::discovery::lan::LanEvent::Discovered(peer) = event;
1225 let Some((transport_id, local_addr)) =
1226 self.find_udp_transport_for_remote_addr(peer.addr)
1227 else {
1228 debug!(
1229 addr = %peer.addr,
1230 "lan: skip discovered peer with no compatible UDP transport"
1231 );
1232 continue;
1233 };
1234 let identity = match crate::PeerIdentity::from_npub(&peer.npub) {
1235 Ok(id) => id,
1236 Err(err) => {
1237 debug!(npub = %peer.npub, error = %err, "lan: skip bad npub");
1238 continue;
1239 }
1240 };
1241 let peer_node_addr = *identity.node_addr();
1242 let remote_addr = crate::transport::TransportAddr::from_string(&peer.addr.to_string());
1243 if self.peers.contains_key(&peer_node_addr) {
1244 let candidate = PeerAddress::new("udp", peer.addr.to_string());
1245 if self.active_peer_candidate_is_fresh_enough_to_skip(
1246 &peer_node_addr,
1247 std::slice::from_ref(&candidate),
1248 ) {
1249 continue;
1250 }
1251 if self.is_connecting_to_peer_on_path(&peer_node_addr, transport_id, &remote_addr) {
1252 continue;
1253 }
1254 if connect_budget == 0 || self.path_candidate_attempt_budget(&peer_node_addr) == 0 {
1255 skipped_budget = skipped_budget.saturating_add(1);
1256 continue;
1257 }
1258 info!(
1259 npub = %identity.short_npub(),
1260 addr = %peer.addr,
1261 local_addr = %local_addr,
1262 "lan: initiating alternate-path handshake to active peer"
1263 );
1264 if let Err(err) = self
1265 .initiate_connection(transport_id, remote_addr, identity)
1266 .await
1267 {
1268 debug!(
1269 npub = %peer.npub,
1270 error = %err,
1271 "lan: failed to initiate active peer alternate-path handshake"
1272 );
1273 }
1274 connect_budget = connect_budget.saturating_sub(1);
1275 continue;
1276 }
1277 if self.is_connecting_to_peer_on_path(&peer_node_addr, transport_id, &remote_addr) {
1278 continue;
1279 }
1280 if connect_budget == 0 || self.path_candidate_attempt_budget(&peer_node_addr) == 0 {
1281 skipped_budget = skipped_budget.saturating_add(1);
1282 continue;
1283 }
1284 info!(
1285 npub = %identity.short_npub(),
1286 addr = %peer.addr,
1287 local_addr = %local_addr,
1288 "lan: initiating handshake to discovered peer"
1289 );
1290 if let Err(err) = self
1291 .initiate_connection(transport_id, remote_addr, identity)
1292 .await
1293 {
1294 debug!(
1295 npub = %peer.npub,
1296 error = %err,
1297 "lan: failed to initiate connection to discovered peer"
1298 );
1299 }
1300 connect_budget = connect_budget.saturating_sub(1);
1301 }
1302 if skipped_budget > 0 {
1303 debug!(
1304 skipped = skipped_budget,
1305 "lan: discovery connect budget exhausted"
1306 );
1307 }
1308 }
1309
1310 pub(super) async fn poll_pending_connects(&mut self) {
1317 if self.pending_connects.is_empty() {
1318 return;
1319 }
1320
1321 let mut completed = Vec::new();
1322
1323 for (i, pending) in self.pending_connects.iter().enumerate() {
1324 let state = if let Some(transport) = self.transports.get(&pending.transport_id) {
1325 transport.connection_state(&pending.remote_addr)
1326 } else {
1327 crate::transport::ConnectionState::Failed("transport removed".into())
1328 };
1329
1330 match state {
1331 crate::transport::ConnectionState::Connected => {
1332 completed.push((i, true, None));
1333 }
1334 crate::transport::ConnectionState::Failed(reason) => {
1335 completed.push((i, false, Some(reason)));
1336 }
1337 crate::transport::ConnectionState::Connecting => {
1338 }
1340 crate::transport::ConnectionState::None => {
1341 completed.push((i, false, Some("no connection attempt found".into())));
1343 }
1344 }
1345 }
1346
1347 for (i, success, reason) in completed.into_iter().rev() {
1349 let pending = self.pending_connects.remove(i);
1350
1351 if success {
1352 if let Some(link) = self.links.get_mut(&pending.link_id) {
1354 link.set_connected();
1355 }
1356
1357 debug!(
1358 peer = %self.peer_display_name(pending.peer_identity.node_addr()),
1359 transport_id = %pending.transport_id,
1360 remote_addr = %pending.remote_addr,
1361 link_id = %pending.link_id,
1362 "Transport connected, starting handshake"
1363 );
1364
1365 if let Err(e) = self
1367 .start_handshake(
1368 pending.link_id,
1369 pending.transport_id,
1370 pending.remote_addr.clone(),
1371 pending.peer_identity,
1372 )
1373 .await
1374 {
1375 warn!(
1376 link_id = %pending.link_id,
1377 error = %e,
1378 "Failed to start handshake after transport connect"
1379 );
1380 self.remove_link(&pending.link_id);
1382 }
1383 } else {
1384 let reason = reason.unwrap_or_default();
1385 warn!(
1386 peer = %self.peer_display_name(pending.peer_identity.node_addr()),
1387 transport_id = %pending.transport_id,
1388 remote_addr = %pending.remote_addr,
1389 link_id = %pending.link_id,
1390 reason = %reason,
1391 "Transport connect failed"
1392 );
1393
1394 self.remove_link(&pending.link_id);
1396 self.links.remove(&pending.link_id);
1397 self.schedule_retry(*pending.peer_identity.node_addr(), Self::now_ms());
1398 }
1399 }
1400 }
1401
1402 pub async fn start(&mut self) -> Result<(), NodeError> {
1409 node_start_debug_log("Node::start begin");
1410 if !self.state.can_start() {
1411 return Err(NodeError::AlreadyStarted);
1412 }
1413 self.state = NodeState::Starting;
1414 node_start_debug_log("Node::start state set to starting");
1415
1416 let packet_buffer_size = self.config.node.buffers.packet_channel;
1418 let (packet_tx, packet_rx) = packet_channel(packet_buffer_size);
1419 self.packet_tx = Some(packet_tx.clone());
1420 self.packet_rx = Some(packet_rx);
1421 node_start_debug_log("Node::start packet channel created");
1422
1423 node_start_debug_log("Node::start create transports begin");
1425 let transport_handles = self.create_transports(&packet_tx).await;
1426 node_start_debug_log(format!(
1427 "Node::start create transports complete count={}",
1428 transport_handles.len()
1429 ));
1430
1431 for mut handle in transport_handles {
1432 let transport_id = handle.transport_id();
1433 let transport_type = handle.transport_type().name;
1434 let name = handle.name().map(|s| s.to_string());
1435
1436 node_start_debug_log(format!(
1437 "Node::start transport start begin id={} type={} name={:?}",
1438 transport_id, transport_type, name
1439 ));
1440 match handle.start().await {
1441 Ok(()) => {
1442 node_start_debug_log(format!(
1443 "Node::start transport start ok id={} type={}",
1444 transport_id, transport_type
1445 ));
1446 self.transports.insert(transport_id, handle);
1447 }
1448 Err(e) => {
1449 node_start_debug_log(format!(
1450 "Node::start transport start error id={} type={} error={}",
1451 transport_id, transport_type, e
1452 ));
1453 if let Some(ref n) = name {
1454 warn!(transport_type, name = %n, error = %e, "Transport failed to start");
1455 } else {
1456 warn!(transport_type, error = %e, "Transport failed to start");
1457 }
1458 }
1459 }
1460 }
1461
1462 if !self.transports.is_empty() {
1463 info!(count = self.transports.len(), "Transports initialized");
1464 }
1465
1466 #[cfg(unix)]
1482 {
1483 if self.config.node.worker_pools_enabled {
1484 node_start_debug_log("Node::start worker pools begin");
1485 let cpu_default = std::thread::available_parallelism()
1486 .map(|n| n.get())
1487 .unwrap_or(1)
1488 .max(1);
1489 let encrypt_worker_count: usize = std::env::var("FIPS_ENCRYPT_WORKERS")
1490 .ok()
1491 .and_then(|s| s.parse().ok())
1492 .unwrap_or(cpu_default)
1493 .max(1);
1494 self.encrypt_workers = Some(super::encrypt_worker::EncryptWorkerPool::spawn(
1495 encrypt_worker_count,
1496 ));
1497 info!(
1498 workers = encrypt_worker_count,
1499 "Spawned FMP-encrypt worker pool"
1500 );
1501
1502 let decrypt_worker_count: usize = std::env::var("FIPS_DECRYPT_WORKERS")
1511 .ok()
1512 .and_then(|s| s.parse().ok())
1513 .unwrap_or(cpu_default);
1514 if decrypt_worker_count == 0 {
1515 info!("FIPS_DECRYPT_WORKERS=0 → in-line decrypt in rx_loop (no worker pool)");
1516 } else {
1517 self.decrypt_workers = Some(super::decrypt_worker::DecryptWorkerPool::spawn(
1518 decrypt_worker_count,
1519 ));
1520 info!(
1521 workers = decrypt_worker_count,
1522 "Spawned FMP+FSP-decrypt worker pool"
1523 );
1524 }
1525 node_start_debug_log("Node::start worker pools complete");
1526 } else {
1527 node_start_debug_log("Node::start worker pools disabled");
1528 info!("FIPS worker pools disabled; using in-line crypto/send path");
1529 }
1530 }
1531
1532 if self.config.node.discovery.nostr.enabled {
1533 node_start_debug_log("Node::start nostr discovery start begin");
1534 match NostrDiscovery::start(&self.identity, self.config.node.discovery.nostr.clone())
1535 .await
1536 {
1537 Ok(runtime) => {
1538 node_start_debug_log("Node::start nostr discovery runtime created");
1539 if let Err(err) = self.refresh_overlay_advert(&runtime).await {
1540 warn!(error = %err, "Failed to publish initial Nostr overlay advert");
1541 }
1542 node_start_debug_log("Node::start nostr overlay advert refreshed");
1543 self.nostr_discovery = Some(runtime);
1544 self.nostr_discovery_started_at_ms = Some(Self::now_ms());
1545 info!("Nostr overlay discovery enabled");
1546 }
1547 Err(err) => {
1548 node_start_debug_log(format!(
1549 "Node::start nostr discovery start error error={}",
1550 err
1551 ));
1552 warn!(error = %err, "Failed to start Nostr overlay discovery");
1553 }
1554 }
1555 }
1556
1557 if self.config.node.discovery.lan.enabled {
1561 node_start_debug_log("Node::start lan discovery start begin");
1562 let advertised_udp_port = self
1563 .transports
1564 .values()
1565 .filter(|h| h.is_operational())
1566 .filter(|h| h.transport_type().name == "udp")
1567 .find_map(|h| h.local_addr().map(|addr| addr.port()))
1568 .unwrap_or(0);
1569 let scope = self.lan_discovery_scope();
1570 match crate::discovery::lan::LanDiscovery::start(
1571 &self.identity,
1572 scope,
1573 advertised_udp_port,
1574 self.config.node.discovery.lan.clone(),
1575 )
1576 .await
1577 {
1578 Ok(runtime) => {
1579 node_start_debug_log("Node::start lan discovery start ok");
1580 self.lan_discovery = Some(runtime);
1581 info!("LAN mDNS discovery enabled");
1582 }
1583 Err(err) => {
1584 node_start_debug_log(format!(
1585 "Node::start lan discovery start error error={}",
1586 err
1587 ));
1588 debug!(error = %err, "LAN mDNS discovery not started");
1589 }
1590 }
1591 }
1592
1593 node_start_debug_log("Node::start initiate peer connections begin");
1596 self.initiate_peer_connections().await;
1597 node_start_debug_log("Node::start initiate peer connections complete");
1598
1599 if self.config.tun.enabled {
1601 node_start_debug_log("Node::start tun init begin");
1602 let address = *self.identity.address();
1603 match TunDevice::create(&self.config.tun, address).await {
1604 Ok(device) => {
1605 let mtu = device.mtu();
1606 let name = device.name().to_string();
1607 let our_addr = *device.address();
1608
1609 info!("TUN device active:");
1610 info!(" name: {}", name);
1611 info!(" address: {}", device.address());
1612 info!(" mtu: {}", mtu);
1613
1614 let effective_mtu = self.effective_ipv6_mtu();
1616 let max_mss = effective_mtu.saturating_sub(40).saturating_sub(20); info!("effective MTU: {} bytes", effective_mtu);
1619 debug!(" max TCP MSS: {} bytes", max_mss);
1620
1621 #[cfg(target_os = "macos")]
1625 let (shutdown_read_fd, shutdown_write_fd) = {
1626 let mut fds = [0i32; 2];
1627 if unsafe { libc::pipe(fds.as_mut_ptr()) } < 0 {
1628 return Err(NodeError::Tun(crate::upper::tun::TunError::Configure(
1629 "failed to create shutdown pipe".into(),
1630 )));
1631 }
1632 (fds[0], fds[1])
1633 };
1634
1635 let (writer, tun_tx) =
1639 device.create_writer(max_mss, self.path_mtu_lookup.clone())?;
1640
1641 let writer_handle = thread::spawn(move || {
1643 writer.run();
1644 });
1645
1646 let reader_tun_tx = tun_tx.clone();
1648
1649 let tun_channel_size = self.config.node.buffers.tun_channel;
1651 let (outbound_tx, outbound_rx) = tokio::sync::mpsc::channel(tun_channel_size);
1652
1653 let transport_mtu = self.transport_mtu();
1655 let path_mtu_lookup = self.path_mtu_lookup.clone();
1656 #[cfg(target_os = "macos")]
1657 let reader_handle = thread::spawn(move || {
1658 run_tun_reader(
1659 device,
1660 mtu,
1661 our_addr,
1662 reader_tun_tx,
1663 outbound_tx,
1664 transport_mtu,
1665 path_mtu_lookup,
1666 shutdown_read_fd,
1667 );
1668 });
1669 #[cfg(not(target_os = "macos"))]
1670 let reader_handle = thread::spawn(move || {
1671 run_tun_reader(
1672 device,
1673 mtu,
1674 our_addr,
1675 reader_tun_tx,
1676 outbound_tx,
1677 transport_mtu,
1678 path_mtu_lookup,
1679 );
1680 });
1681
1682 self.tun_state = TunState::Active;
1683 self.tun_name = Some(name);
1684 self.tun_tx = Some(tun_tx);
1685 self.tun_outbound_rx = Some(outbound_rx);
1686 self.tun_reader_handle = Some(reader_handle);
1687 self.tun_writer_handle = Some(writer_handle);
1688 #[cfg(target_os = "macos")]
1689 {
1690 self.tun_shutdown_fd = Some(shutdown_write_fd);
1691 }
1692 }
1693 Err(e) => {
1694 self.tun_state = TunState::Failed;
1695 warn!(error = %e, "Failed to initialize TUN, continuing without it");
1696 }
1697 }
1698 node_start_debug_log("Node::start tun init complete");
1699 }
1700
1701 if self.config.dns.enabled {
1718 node_start_debug_log("Node::start dns init begin");
1719 let addr_str = self.config.dns.bind_addr();
1720 match addr_str.parse::<std::net::IpAddr>() {
1721 Ok(ip) => {
1722 let bind = std::net::SocketAddr::new(ip, self.config.dns.port());
1723 match Self::bind_dns_socket(bind) {
1724 Ok(socket) => {
1725 let dns_channel_size = self.config.node.buffers.dns_channel;
1726 let (identity_tx, identity_rx) =
1727 tokio::sync::mpsc::channel(dns_channel_size);
1728 let dns_ttl = self.config.dns.ttl();
1729 let base_hosts = crate::upper::hosts::HostMap::from_peer_configs(
1730 self.config.peers(),
1731 );
1732 let reloader = if self.config.node.system_files_enabled {
1733 let hosts_path = std::path::PathBuf::from(
1734 crate::upper::hosts::DEFAULT_HOSTS_PATH,
1735 );
1736 crate::upper::hosts::HostMapReloader::new(base_hosts, hosts_path)
1737 } else {
1738 crate::upper::hosts::HostMapReloader::memory_only(base_hosts)
1739 };
1740 let mesh_ifindex = Self::lookup_mesh_ifindex(self.config.tun.name());
1748 info!(
1749 bind = %bind,
1750 hosts = reloader.hosts().len(),
1751 mesh_ifindex = ?mesh_ifindex,
1752 "DNS responder started for .fips domain (auto-reload enabled)"
1753 );
1754 let handle = tokio::spawn(crate::upper::dns::run_dns_responder(
1755 socket,
1756 identity_tx,
1757 dns_ttl,
1758 reloader,
1759 mesh_ifindex,
1760 ));
1761 self.dns_identity_rx = Some(identity_rx);
1762 self.dns_task = Some(handle);
1763 }
1764 Err(e) => {
1765 warn!(bind = %bind, error = %e, "Failed to start DNS responder");
1766 }
1767 }
1768 }
1769 Err(e) => {
1770 warn!(addr = %addr_str, error = %e, "Invalid dns.bind_addr; DNS responder not started");
1771 }
1772 }
1773 node_start_debug_log("Node::start dns init complete");
1774 }
1775
1776 self.state = NodeState::Running;
1777 node_start_debug_log("Node::start running");
1778 info!("Node started:");
1779 info!(" state: {}", self.state);
1780 info!(" transports: {}", self.transports.len());
1781 info!(" connections: {}", self.connections.len());
1782 Ok(())
1783 }
1784
1785 fn bind_dns_socket(
1798 addr: std::net::SocketAddr,
1799 ) -> Result<tokio::net::UdpSocket, std::io::Error> {
1800 use socket2::{Domain, Protocol, Socket, Type};
1801 let domain = if addr.is_ipv4() {
1802 Domain::IPV4
1803 } else {
1804 Domain::IPV6
1805 };
1806 let sock = Socket::new(domain, Type::DGRAM, Some(Protocol::UDP))?;
1807 if addr.is_ipv6() {
1808 sock.set_only_v6(false)?;
1809 #[cfg(unix)]
1810 Self::set_recv_pktinfo_v6(&sock)?;
1811 }
1812 sock.set_nonblocking(true)?;
1813 sock.bind(&addr.into())?;
1814 tokio::net::UdpSocket::from_std(sock.into())
1815 }
1816
1817 #[cfg(unix)]
1823 fn set_recv_pktinfo_v6(sock: &socket2::Socket) -> Result<(), std::io::Error> {
1824 use std::os::fd::AsRawFd;
1825 let enable: libc::c_int = 1;
1826 let ret = unsafe {
1827 libc::setsockopt(
1828 sock.as_raw_fd(),
1829 libc::IPPROTO_IPV6,
1830 libc::IPV6_RECVPKTINFO,
1831 &enable as *const _ as *const libc::c_void,
1832 std::mem::size_of::<libc::c_int>() as libc::socklen_t,
1833 )
1834 };
1835 if ret < 0 {
1836 return Err(std::io::Error::last_os_error());
1837 }
1838 Ok(())
1839 }
1840
1841 fn lookup_mesh_ifindex(name: &str) -> Option<u32> {
1848 #[cfg(unix)]
1849 {
1850 let c_name = std::ffi::CString::new(name).ok()?;
1851 let idx = unsafe { libc::if_nametoindex(c_name.as_ptr()) };
1852 if idx == 0 { None } else { Some(idx) }
1853 }
1854 #[cfg(not(unix))]
1855 {
1856 let _ = name;
1857 None
1858 }
1859 }
1860
1861 pub async fn stop(&mut self) -> Result<(), NodeError> {
1866 if !self.state.can_stop() {
1867 return Err(NodeError::NotStarted);
1868 }
1869 self.state = NodeState::Stopping;
1870 info!(state = %self.state, "Node stopping");
1871
1872 if let Some(handle) = self.dns_task.take() {
1874 handle.abort();
1875 debug!("DNS responder stopped");
1876 }
1877
1878 self.send_disconnect_to_all_peers(DisconnectReason::Shutdown)
1880 .await;
1881
1882 if let Some(bootstrap) = self.nostr_discovery.take()
1884 && let Err(e) = bootstrap.shutdown().await
1885 {
1886 warn!(error = %e, "Failed to shutdown Nostr overlay discovery");
1887 }
1888
1889 if let Some(lan) = self.lan_discovery.take() {
1893 lan.shutdown().await;
1894 }
1895
1896 let transport_ids: Vec<_> = self.transports.keys().cloned().collect();
1898 for transport_id in transport_ids {
1899 if let Some(mut handle) = self.transports.remove(&transport_id) {
1900 let transport_type = handle.transport_type().name;
1901 match handle.stop().await {
1902 Ok(()) => {
1903 info!(transport_id = %transport_id, transport_type, "Transport stopped");
1904 }
1905 Err(e) => {
1906 warn!(
1907 transport_id = %transport_id,
1908 transport_type,
1909 error = %e,
1910 "Transport stop failed"
1911 );
1912 }
1913 }
1914 }
1915 }
1916
1917 self.packet_tx.take();
1919 self.packet_rx.take();
1920
1921 if let Some(name) = self.tun_name.take() {
1923 info!(name = %name, "Shutting down TUN interface");
1924
1925 self.tun_tx.take();
1927
1928 if let Err(e) = shutdown_tun_interface(&name).await {
1930 warn!(name = %name, error = %e, "Failed to shutdown TUN interface");
1931 }
1932
1933 #[cfg(target_os = "macos")]
1936 if let Some(fd) = self.tun_shutdown_fd.take() {
1937 unsafe {
1938 libc::write(fd, b"x".as_ptr() as *const libc::c_void, 1);
1939 libc::close(fd);
1940 }
1941 }
1942
1943 if let Some(handle) = self.tun_reader_handle.take() {
1945 let _ = handle.join();
1946 }
1947 if let Some(handle) = self.tun_writer_handle.take() {
1948 let _ = handle.join();
1949 }
1950
1951 self.tun_state = TunState::Disabled;
1952 }
1953
1954 self.state = NodeState::Stopped;
1955 info!(state = %self.state, "Node stopped");
1956 Ok(())
1957 }
1958
1959 async fn send_disconnect_to_all_peers(&mut self, reason: DisconnectReason) {
1964 let disconnect = Disconnect::new(reason);
1965 let plaintext = disconnect.encode();
1966
1967 let peer_addrs: Vec<NodeAddr> = self
1969 .peers
1970 .iter()
1971 .filter(|(_, peer)| peer.can_send() && peer.has_session())
1972 .map(|(addr, _)| *addr)
1973 .collect();
1974
1975 if peer_addrs.is_empty() {
1976 debug!(
1977 total_peers = self.peers.len(),
1978 "No sendable peers for disconnect notification"
1979 );
1980 return;
1981 }
1982
1983 let mut sent = 0usize;
1984 for node_addr in &peer_addrs {
1985 match self
1986 .send_encrypted_link_message(node_addr, &plaintext)
1987 .await
1988 {
1989 Ok(()) => sent += 1,
1990 Err(e) => {
1991 debug!(
1992 peer = %self.peer_display_name(node_addr),
1993 error = %e,
1994 "Failed to send disconnect (transport may be down)"
1995 );
1996 }
1997 }
1998 }
1999
2000 info!(sent, total = peer_addrs.len(), reason = %reason, "Sent disconnect notifications");
2001 }
2002
2003 fn static_peer_addresses(&self, peer_config: &PeerConfig) -> Vec<PeerAddress> {
2004 peer_config
2005 .addresses_by_priority()
2006 .into_iter()
2007 .cloned()
2008 .collect()
2009 }
2010
2011 async fn nostr_peer_fallback_addresses(
2012 &self,
2013 peer_config: &PeerConfig,
2014 existing: &[PeerAddress],
2015 ) -> Vec<PeerAddress> {
2016 if !self.config.node.discovery.nostr.enabled
2017 || self.config.node.discovery.nostr.policy
2018 == crate::config::NostrDiscoveryPolicy::Disabled
2019 {
2020 return Vec::new();
2021 }
2022
2023 let Some(bootstrap) = self.nostr_discovery.clone() else {
2024 return Vec::new();
2025 };
2026 let endpoints = match bootstrap
2027 .cached_advert_endpoints_for_peer(&peer_config.npub)
2028 .await
2029 {
2030 Some(endpoints) => endpoints,
2031 None => {
2032 debug!(
2033 npub = %peer_config.npub,
2034 "No cached Nostr advert endpoints for configured peer"
2035 );
2036 return Vec::new();
2037 }
2038 };
2039
2040 let mut fallback = Vec::new();
2041 let mut next_priority = existing
2042 .iter()
2043 .map(|addr| addr.priority)
2044 .max()
2045 .unwrap_or(100)
2046 .saturating_add(1);
2047 let seen_at_ms = Self::now_ms();
2052 for endpoint in endpoints {
2053 let Some(candidate) =
2054 Self::overlay_endpoint_to_peer_address(&endpoint, next_priority, seen_at_ms)
2055 else {
2056 continue;
2057 };
2058 if existing
2059 .iter()
2060 .any(|addr| addr.transport == candidate.transport && addr.addr == candidate.addr)
2061 || fallback.iter().any(|addr: &PeerAddress| {
2062 addr.transport == candidate.transport && addr.addr == candidate.addr
2063 })
2064 {
2065 continue;
2066 }
2067 fallback.push(candidate);
2068 next_priority = next_priority.saturating_add(1);
2069 }
2070 fallback
2071 }
2072
2073 async fn request_nostr_bootstrap(&self, peer_config: &PeerConfig) -> bool {
2074 if !self.config.node.discovery.nostr.enabled
2075 || self.config.node.discovery.nostr.policy
2076 == crate::config::NostrDiscoveryPolicy::Disabled
2077 {
2078 return false;
2079 }
2080 let Some(bootstrap) = self.nostr_discovery.clone() else {
2081 return false;
2082 };
2083 bootstrap.request_connect(peer_config.clone()).await;
2084 info!(npub = %peer_config.npub, "Started background Nostr UDP NAT traversal attempt");
2085 true
2086 }
2087
2088 fn overlay_endpoint_to_peer_address(
2089 endpoint: &OverlayEndpointAdvert,
2090 priority: u8,
2091 seen_at_ms: u64,
2092 ) -> Option<PeerAddress> {
2093 let transport = match endpoint.transport {
2094 OverlayTransportKind::Udp => "udp",
2095 OverlayTransportKind::Tcp => "tcp",
2096 OverlayTransportKind::Tor => "tor",
2097 };
2098 Some(
2099 PeerAddress::with_priority(transport, endpoint.addr.clone(), priority)
2100 .with_seen_at_ms(seen_at_ms),
2101 )
2102 }
2103
2104 async fn attempt_peer_address_list(
2105 &mut self,
2106 peer_config: &PeerConfig,
2107 peer_identity: PeerIdentity,
2108 allow_bootstrap_nat: bool,
2109 addresses: &[PeerAddress],
2110 ) -> Result<(), NodeError> {
2111 let mut attempted = false;
2112 let peer_node_addr = *peer_identity.node_addr();
2113 let mut concrete_budget = self.path_candidate_attempt_budget(&peer_node_addr);
2114
2115 for addr in addresses {
2116 if addr.transport == "udp" && addr.addr.eq_ignore_ascii_case("nat") {
2117 if !allow_bootstrap_nat {
2118 continue;
2119 }
2120 if self.request_nostr_bootstrap(peer_config).await {
2121 attempted = true;
2122 continue;
2123 }
2124 debug!(npub = %peer_config.npub, "No Nostr overlay runtime for udp:nat address");
2125 continue;
2126 }
2127
2128 let (transport_id, remote_addr) = if addr.transport == "ethernet" {
2129 match self.resolve_ethernet_addr(&addr.addr) {
2130 Ok(result) => result,
2131 Err(e) => {
2132 debug!(
2133 transport = %addr.transport,
2134 addr = %addr.addr,
2135 error = %e,
2136 "Failed to resolve Ethernet address"
2137 );
2138 continue;
2139 }
2140 }
2141 } else if addr.transport == "ble" {
2142 #[cfg(bluer_available)]
2143 {
2144 match self.resolve_ble_addr(&addr.addr) {
2145 Ok(result) => result,
2146 Err(e) => {
2147 debug!(
2148 transport = %addr.transport,
2149 addr = %addr.addr,
2150 error = %e,
2151 "Failed to resolve BLE address"
2152 );
2153 continue;
2154 }
2155 }
2156 }
2157 #[cfg(not(bluer_available))]
2158 {
2159 debug!(transport = %addr.transport, "BLE transport not available on this build");
2160 continue;
2161 }
2162 } else {
2163 let tid = if addr.transport == "udp"
2164 && let Ok(remote_socket_addr) = addr.addr.parse::<SocketAddr>()
2165 {
2166 match self.find_udp_transport_for_remote_addr(remote_socket_addr) {
2167 Some((id, _)) => id,
2168 None => {
2169 debug!(
2170 transport = %addr.transport,
2171 addr = %addr.addr,
2172 "No compatible operational UDP transport for address"
2173 );
2174 continue;
2175 }
2176 }
2177 } else {
2178 match self.find_transport_for_type(&addr.transport) {
2179 Some(id) => id,
2180 None => {
2181 debug!(
2182 transport = %addr.transport,
2183 addr = %addr.addr,
2184 "No operational transport for address type"
2185 );
2186 continue;
2187 }
2188 }
2189 };
2190 (tid, TransportAddr::from_string(&addr.addr))
2191 };
2192
2193 if self.is_connecting_to_peer_on_path(&peer_node_addr, transport_id, &remote_addr) {
2194 attempted = true;
2195 debug!(
2196 npub = %peer_config.npub,
2197 transport_id = %transport_id,
2198 remote_addr = %remote_addr,
2199 "Skipping duplicate in-flight candidate path"
2200 );
2201 continue;
2202 }
2203
2204 if concrete_budget == 0 {
2205 debug!(
2206 npub = %peer_config.npub,
2207 max_candidates = MAX_PARALLEL_PATH_CANDIDATES_PER_PEER,
2208 "Path candidate race budget exhausted"
2209 );
2210 break;
2211 }
2212
2213 match self
2214 .initiate_connection(transport_id, remote_addr, peer_identity)
2215 .await
2216 {
2217 Ok(()) => {
2218 attempted = true;
2219 concrete_budget = concrete_budget.saturating_sub(1);
2220 }
2221 Err(e @ NodeError::AccessDenied(_)) => return Err(e),
2222 Err(e) => {
2223 debug!(
2224 npub = %peer_config.npub,
2225 transport_id = %transport_id,
2226 error = %e,
2227 "Connection attempt failed, trying next address"
2228 );
2229 }
2230 }
2231 }
2232
2233 if attempted {
2234 return Ok(());
2235 }
2236
2237 Err(NodeError::NoTransportForType(format!(
2238 "no operational transport for any of {}'s addresses",
2239 peer_config.npub
2240 )))
2241 }
2242
2243 async fn queue_open_discovery_retries(&mut self, bootstrap: &std::sync::Arc<NostrDiscovery>) {
2244 self.run_open_discovery_sweep(bootstrap, None, "per-tick")
2245 .await;
2246 }
2247
2248 pub(in crate::node) async fn run_open_discovery_sweep(
2259 &mut self,
2260 bootstrap: &std::sync::Arc<NostrDiscovery>,
2261 max_age_secs: Option<u64>,
2262 caller: &'static str,
2263 ) {
2264 if !self.config.node.discovery.nostr.enabled
2265 || self.config.node.discovery.nostr.policy != crate::config::NostrDiscoveryPolicy::Open
2266 {
2267 return;
2268 }
2269
2270 let configured_npubs = self
2271 .config
2272 .peers()
2273 .iter()
2274 .map(|peer| peer.npub.clone())
2275 .collect::<HashSet<_>>();
2276 let now_ms = Self::now_ms();
2277 let now_secs = now_ms / 1000;
2278 let mut enqueue_budget = self.open_discovery_enqueue_budget(&configured_npubs);
2279 if enqueue_budget == 0 {
2280 debug!(
2281 caller = %caller,
2282 "open-discovery sweep: enqueue budget is 0, skipping"
2283 );
2284 return;
2285 }
2286
2287 let candidates = bootstrap.cached_open_discovery_candidates(64).await;
2288 let cached_count = candidates.len();
2289 let mut enqueued = 0usize;
2290 let mut skipped_age = 0usize;
2291 let mut skipped_configured = 0usize;
2292 let mut skipped_self = 0usize;
2293 let mut skipped_connected = 0usize;
2294 let mut skipped_retry_pending = 0usize;
2295 let mut skipped_connecting = 0usize;
2296 let mut skipped_no_endpoints = 0usize;
2297 let mut skipped_invalid_npub = 0usize;
2298 let mut skipped_cooldown = 0usize;
2299
2300 for (npub, endpoints, created_at_secs) in candidates {
2301 if enqueue_budget == 0 {
2302 break;
2303 }
2304
2305 if let Some(max_age) = max_age_secs
2306 && now_secs.saturating_sub(created_at_secs) > max_age
2307 {
2308 skipped_age = skipped_age.saturating_add(1);
2309 continue;
2310 }
2311
2312 if configured_npubs.contains(&npub) {
2313 if let Ok(identity) = PeerIdentity::from_npub(&npub) {
2333 let configured_addr = *identity.node_addr();
2334 if let Some(state) = self.retry_pending.get_mut(&configured_addr)
2335 && state.retry_after_ms > now_ms
2336 {
2337 state.retry_after_ms = now_ms;
2338 debug!(
2339 caller = %caller,
2340 peer = %self.peer_display_name(&configured_addr),
2341 advert_age_secs = now_secs.saturating_sub(created_at_secs),
2342 "Expediting configured-peer retry after fresh overlay advert"
2343 );
2344 }
2345 }
2346 skipped_configured = skipped_configured.saturating_add(1);
2347 continue;
2348 }
2349
2350 let peer_identity = match PeerIdentity::from_npub(&npub) {
2351 Ok(identity) => identity,
2352 Err(_) => {
2353 skipped_invalid_npub = skipped_invalid_npub.saturating_add(1);
2354 continue;
2355 }
2356 };
2357 let node_addr = *peer_identity.node_addr();
2358 if node_addr == *self.identity.node_addr() {
2359 skipped_self = skipped_self.saturating_add(1);
2360 continue;
2361 }
2362 if self.peers.contains_key(&node_addr) {
2363 skipped_connected = skipped_connected.saturating_add(1);
2364 continue;
2365 }
2366 if self.retry_pending.contains_key(&node_addr) {
2367 skipped_retry_pending = skipped_retry_pending.saturating_add(1);
2368 continue;
2369 }
2370 if bootstrap.cooldown_until(&npub, now_ms).is_some() {
2371 skipped_cooldown = skipped_cooldown.saturating_add(1);
2372 continue;
2373 }
2374 let connecting = self.connections.values().any(|conn| {
2375 conn.expected_identity()
2376 .map(|id| id.node_addr() == &node_addr)
2377 .unwrap_or(false)
2378 });
2379 if connecting {
2380 skipped_connecting = skipped_connecting.saturating_add(1);
2381 continue;
2382 }
2383
2384 let mut addresses = Vec::new();
2385 let mut priority = 120u8;
2386 let seen_at_ms = Self::now_ms();
2387 for endpoint in endpoints {
2388 let Some(candidate) =
2389 Self::overlay_endpoint_to_peer_address(&endpoint, priority, seen_at_ms)
2390 else {
2391 continue;
2392 };
2393 if addresses.iter().any(|existing: &PeerAddress| {
2394 existing.transport == candidate.transport && existing.addr == candidate.addr
2395 }) {
2396 continue;
2397 }
2398 addresses.push(candidate);
2399 priority = priority.saturating_add(1);
2400 }
2401 if addresses.is_empty() {
2402 skipped_no_endpoints = skipped_no_endpoints.saturating_add(1);
2403 continue;
2404 }
2405
2406 self.peer_aliases
2407 .entry(node_addr)
2408 .or_insert_with(|| peer_identity.short_npub());
2409 self.register_identity(node_addr, peer_identity.pubkey_full());
2410
2411 let mut state = super::retry::RetryState::new(PeerConfig {
2412 npub: npub.clone(),
2413 alias: None,
2414 addresses,
2415 connect_policy: ConnectPolicy::AutoConnect,
2416 auto_reconnect: true,
2417 });
2418 state.reconnect = false;
2419 state.retry_after_ms = now_ms;
2420 state.expires_at_ms = Some(self.open_discovery_retry_expires_at_ms(now_ms));
2421 self.retry_pending.insert(node_addr, state);
2422 info!(
2423 caller = %caller,
2424 peer = %peer_identity.short_npub(),
2425 advert_age_secs = now_secs.saturating_sub(created_at_secs),
2426 "open-discovery sweep: queued retry for cached advert"
2427 );
2428 enqueue_budget = enqueue_budget.saturating_sub(1);
2429 enqueued = enqueued.saturating_add(1);
2430 }
2431
2432 let total_skipped = skipped_age
2436 + skipped_configured
2437 + skipped_self
2438 + skipped_connected
2439 + skipped_retry_pending
2440 + skipped_connecting
2441 + skipped_no_endpoints
2442 + skipped_invalid_npub
2443 + skipped_cooldown;
2444 let should_summarize = caller == "startup" || enqueued > 0;
2445 if should_summarize {
2446 info!(
2447 caller = %caller,
2448 cached = cached_count,
2449 queued = enqueued,
2450 skipped_age = skipped_age,
2451 skipped_configured = skipped_configured,
2452 skipped_self = skipped_self,
2453 skipped_connected = skipped_connected,
2454 skipped_retry_pending = skipped_retry_pending,
2455 skipped_connecting = skipped_connecting,
2456 skipped_no_endpoints = skipped_no_endpoints,
2457 skipped_invalid_npub = skipped_invalid_npub,
2458 skipped_cooldown = skipped_cooldown,
2459 skipped_total = total_skipped,
2460 "open-discovery sweep complete"
2461 );
2462 }
2463 }
2464
2465 async fn maybe_run_startup_open_discovery_sweep(
2473 &mut self,
2474 bootstrap: &std::sync::Arc<NostrDiscovery>,
2475 ) {
2476 if self.startup_open_discovery_sweep_done {
2477 return;
2478 }
2479 if !self.config.node.discovery.nostr.enabled
2480 || self.config.node.discovery.nostr.policy != crate::config::NostrDiscoveryPolicy::Open
2481 {
2482 self.startup_open_discovery_sweep_done = true;
2484 return;
2485 }
2486 let Some(started_at_ms) = self.nostr_discovery_started_at_ms else {
2487 return;
2488 };
2489 let now_ms = Self::now_ms();
2490 let delay_ms = self
2491 .config
2492 .node
2493 .discovery
2494 .nostr
2495 .startup_sweep_delay_secs
2496 .saturating_mul(1000);
2497 if now_ms < started_at_ms.saturating_add(delay_ms) {
2498 return;
2499 }
2500
2501 let max_age_secs = self.config.node.discovery.nostr.startup_sweep_max_age_secs;
2502 self.run_open_discovery_sweep(bootstrap, Some(max_age_secs), "startup")
2503 .await;
2504 self.startup_open_discovery_sweep_done = true;
2505 }
2506
2507 fn available_outbound_slots(&self) -> usize {
2508 let connection_used = self
2509 .connections
2510 .len()
2511 .saturating_add(self.pending_connects.len());
2512 let connection_slots = if self.max_connections == 0 {
2513 usize::MAX
2514 } else {
2515 self.max_connections.saturating_sub(connection_used)
2516 };
2517
2518 let peer_slots = if self.max_peers == 0 {
2519 usize::MAX
2520 } else {
2521 self.max_peers.saturating_sub(self.peers.len())
2522 };
2523
2524 connection_slots.min(peer_slots)
2525 }
2526
2527 fn open_discovery_enqueue_budget(&self, configured_npubs: &HashSet<String>) -> usize {
2528 let current_open_discovery_pending = self
2529 .retry_pending
2530 .values()
2531 .filter(|state| !configured_npubs.contains(&state.peer_config.npub))
2532 .count();
2533
2534 let cap_remaining = self
2535 .config
2536 .node
2537 .discovery
2538 .nostr
2539 .open_discovery_max_pending
2540 .saturating_sub(current_open_discovery_pending);
2541
2542 cap_remaining.min(self.available_outbound_slots())
2543 }
2544
2545 fn open_discovery_retry_expires_at_ms(&self, now_ms: u64) -> u64 {
2546 now_ms.saturating_add(
2547 self.config
2548 .node
2549 .discovery
2550 .nostr
2551 .advert_ttl_secs
2552 .saturating_mul(1000)
2553 .saturating_mul(OPEN_DISCOVERY_RETRY_LIFETIME_MULTIPLIER),
2554 )
2555 }
2556
2557 async fn build_overlay_advert(
2558 &self,
2559 bootstrap: &std::sync::Arc<NostrDiscovery>,
2560 ) -> Option<OverlayAdvert> {
2561 if !self.config.node.discovery.nostr.enabled {
2562 return None;
2563 }
2564
2565 let mut endpoints = Vec::new();
2566 let mut has_udp_nat = false;
2567
2568 for handle in self.transports.values() {
2569 if !handle.is_operational() {
2570 continue;
2571 }
2572
2573 match handle.transport_type().name {
2574 "udp" => {
2575 let Some(cfg) = self.lookup_udp_config(handle.name()) else {
2576 continue;
2577 };
2578 if !cfg.advertise_on_nostr() {
2579 continue;
2580 }
2581 if cfg.is_public() {
2582 if let Some(explicit) = cfg.external_advert_addr() {
2592 endpoints.push(OverlayEndpointAdvert {
2593 transport: OverlayTransportKind::Udp,
2594 addr: explicit.to_string(),
2595 });
2596 } else {
2597 match handle.local_addr() {
2598 Some(addr)
2599 if !addr.ip().is_unspecified()
2600 && !is_unroutable_advert_ip(addr.ip()) =>
2601 {
2602 endpoints.push(OverlayEndpointAdvert {
2603 transport: OverlayTransportKind::Udp,
2604 addr: addr.to_string(),
2605 });
2606 }
2607 Some(addr) => {
2608 let key = handle.transport_id().as_u32();
2609 let port = addr.port();
2610 if let Some(public) =
2611 bootstrap.learn_public_udp_addr(key, port).await
2612 {
2613 endpoints.push(OverlayEndpointAdvert {
2614 transport: OverlayTransportKind::Udp,
2615 addr: public.to_string(),
2616 });
2617 } else {
2618 warn!(
2619 transport_id = key,
2620 bind_addr = %addr,
2621 "advert: udp public=true but bind is wildcard \
2622 or private and STUN observation failed; \
2623 advertising no UDP endpoint. Either set \
2624 transports.udp.external_addr, bind to a \
2625 specific *public* IP, or ensure \
2626 node.discovery.nostr.stun_servers is reachable"
2627 );
2628 }
2629 }
2630 None => {}
2631 }
2632 }
2633 } else {
2634 endpoints.push(OverlayEndpointAdvert {
2635 transport: OverlayTransportKind::Udp,
2636 addr: "nat".to_string(),
2637 });
2638 has_udp_nat = true;
2639 }
2640 }
2641 "tcp" => {
2642 let Some(cfg) = self.lookup_tcp_config(handle.name()) else {
2643 continue;
2644 };
2645 if !cfg.advertise_on_nostr() {
2646 continue;
2647 }
2648 if let Some(explicit) = cfg.external_advert_addr() {
2660 endpoints.push(OverlayEndpointAdvert {
2661 transport: OverlayTransportKind::Tcp,
2662 addr: explicit.to_string(),
2663 });
2664 } else {
2665 match handle.local_addr() {
2666 Some(addr)
2667 if !addr.ip().is_unspecified()
2668 && !is_unroutable_advert_ip(addr.ip()) =>
2669 {
2670 endpoints.push(OverlayEndpointAdvert {
2671 transport: OverlayTransportKind::Tcp,
2672 addr: addr.to_string(),
2673 });
2674 }
2675 Some(addr) => {
2676 warn!(
2677 bind_addr = %addr,
2678 "advert: tcp advertise_on_nostr=true bound to wildcard \
2679 or private IP and no transports.tcp.external_addr set; \
2680 advertising no TCP endpoint. Either set external_addr \
2681 to the public IP (recommended for cloud 1:1-NAT setups) \
2682 or bind explicitly to the public IP"
2683 );
2684 }
2685 None => {}
2686 }
2687 }
2688 }
2689 "tor" => {
2690 let Some(cfg) = self.lookup_tor_config(handle.name()) else {
2691 continue;
2692 };
2693 if !cfg.advertise_on_nostr() {
2694 continue;
2695 }
2696 if let Some(addr) = handle.onion_address() {
2697 endpoints.push(OverlayEndpointAdvert {
2698 transport: OverlayTransportKind::Tor,
2699 addr: format!("{}:{}", addr, cfg.advertised_port()),
2700 });
2701 }
2702 }
2703 _ => {}
2704 }
2705 }
2706
2707 if endpoints.is_empty() {
2708 return None;
2709 }
2710
2711 Some(OverlayAdvert {
2712 identifier: ADVERT_IDENTIFIER.to_string(),
2713 version: ADVERT_VERSION,
2714 endpoints,
2715 signal_relays: has_udp_nat.then(|| self.config.node.discovery.nostr.dm_relays.clone()),
2716 stun_servers: has_udp_nat
2717 .then(|| self.config.node.discovery.nostr.stun_servers.clone()),
2718 })
2719 }
2720
2721 async fn refresh_overlay_advert(
2722 &self,
2723 bootstrap: &std::sync::Arc<NostrDiscovery>,
2724 ) -> Result<(), crate::discovery::nostr::BootstrapError> {
2725 let advert = self.build_overlay_advert(bootstrap).await;
2726 bootstrap.update_local_advert(advert).await
2727 }
2728
2729 fn lookup_udp_config(&self, transport_name: Option<&str>) -> Option<&crate::config::UdpConfig> {
2730 match (&self.config.transports.udp, transport_name) {
2731 (crate::config::TransportInstances::Single(cfg), None) => Some(cfg),
2732 (crate::config::TransportInstances::Named(configs), Some(name)) => configs.get(name),
2733 _ => None,
2734 }
2735 }
2736
2737 fn lookup_tcp_config(&self, transport_name: Option<&str>) -> Option<&crate::config::TcpConfig> {
2738 match (&self.config.transports.tcp, transport_name) {
2739 (crate::config::TransportInstances::Single(cfg), None) => Some(cfg),
2740 (crate::config::TransportInstances::Named(configs), Some(name)) => configs.get(name),
2741 _ => None,
2742 }
2743 }
2744
2745 fn lookup_tor_config(&self, transport_name: Option<&str>) -> Option<&crate::config::TorConfig> {
2746 match (&self.config.transports.tor, transport_name) {
2747 (crate::config::TransportInstances::Single(cfg), None) => Some(cfg),
2748 (crate::config::TransportInstances::Named(configs), Some(name)) => configs.get(name),
2749 _ => None,
2750 }
2751 }
2752
2753 pub(in crate::node) async fn try_peer_addresses(
2754 &mut self,
2755 peer_config: &PeerConfig,
2756 peer_identity: PeerIdentity,
2757 allow_bootstrap_nat: bool,
2758 ) -> Result<(), NodeError> {
2759 let peer_node_addr = *peer_identity.node_addr();
2760 if self.peers.contains_key(&peer_node_addr) {
2761 debug!(
2762 npub = %peer_config.npub,
2763 "Peer already exists, skipping address attempts"
2764 );
2765 return Ok(());
2766 }
2767
2768 let candidates = self.peer_address_candidates(peer_config).await;
2769
2770 if candidates.is_empty() {
2771 if allow_bootstrap_nat && self.request_nostr_bootstrap(peer_config).await {
2772 return Ok(());
2773 }
2774 return Err(NodeError::NoTransportForType(format!(
2775 "no addresses known for {}",
2776 peer_config.npub
2777 )));
2778 }
2779
2780 if self
2781 .attempt_peer_address_list(peer_config, peer_identity, allow_bootstrap_nat, &candidates)
2782 .await
2783 .is_ok()
2784 {
2785 return Ok(());
2786 }
2787
2788 Err(NodeError::NoTransportForType(format!(
2789 "no operational transport for any of {}'s addresses",
2790 peer_config.npub
2791 )))
2792 }
2793
2794 async fn try_active_peer_alternative_addresses(
2795 &mut self,
2796 peer_config: &PeerConfig,
2797 peer_identity: PeerIdentity,
2798 ) -> Result<bool, NodeError> {
2799 let peer_node_addr = *peer_identity.node_addr();
2800 let candidates = self.peer_address_candidates(peer_config).await;
2801
2802 if candidates.is_empty() {
2803 return Err(NodeError::NoTransportForType(format!(
2804 "no addresses known for {}",
2805 peer_config.npub
2806 )));
2807 }
2808
2809 let alternatives: Vec<_> = candidates
2810 .into_iter()
2811 .filter(|addr| !(addr.transport == "udp" && addr.addr.eq_ignore_ascii_case("nat")))
2812 .filter(|addr| !self.active_peer_matches_candidate(&peer_node_addr, addr))
2813 .collect();
2814
2815 if alternatives.is_empty() {
2816 return Err(NodeError::NoTransportForType(format!(
2817 "no concrete alternate addresses known for {}",
2818 peer_config.npub
2819 )));
2820 }
2821
2822 self.attempt_peer_address_list(peer_config, peer_identity, false, &alternatives)
2823 .await?;
2824 Ok(true)
2825 }
2826
2827 async fn peer_address_candidates(&self, peer_config: &PeerConfig) -> Vec<PeerAddress> {
2828 let static_addresses = self.static_peer_addresses(peer_config);
2838 let overlay_addresses = self
2839 .nostr_peer_fallback_addresses(peer_config, &static_addresses)
2840 .await;
2841
2842 let mut candidates = Vec::with_capacity(overlay_addresses.len() + static_addresses.len());
2843 for addr in overlay_addresses.into_iter().chain(static_addresses) {
2844 if !candidates.iter().any(|existing: &PeerAddress| {
2845 existing.transport == addr.transport && existing.addr == addr.addr
2846 }) {
2847 candidates.push(addr);
2848 }
2849 }
2850
2851 candidates.sort_by(|a, b| match (a.seen_at_ms, b.seen_at_ms) {
2856 (Some(a_ts), Some(b_ts)) => b_ts.cmp(&a_ts),
2857 (Some(_), None) => std::cmp::Ordering::Less,
2858 (None, Some(_)) => std::cmp::Ordering::Greater,
2859 (None, None) => std::cmp::Ordering::Equal,
2860 });
2861
2862 candidates
2863 }
2864
2865 fn active_peer_matches_any_candidate(
2866 &self,
2867 peer_node_addr: &NodeAddr,
2868 candidates: &[PeerAddress],
2869 ) -> bool {
2870 candidates
2871 .iter()
2872 .any(|candidate| self.active_peer_matches_candidate(peer_node_addr, candidate))
2873 }
2874
2875 pub(in crate::node) fn active_peer_candidate_is_fresh_enough_to_skip(
2876 &self,
2877 peer_node_addr: &NodeAddr,
2878 candidates: &[PeerAddress],
2879 ) -> bool {
2880 if !self.active_peer_matches_any_candidate(peer_node_addr, candidates) {
2881 return false;
2882 }
2883 !self.active_peer_needs_same_path_refresh(peer_node_addr)
2884 }
2885
2886 fn active_peer_needs_same_path_refresh(&self, peer_node_addr: &NodeAddr) -> bool {
2887 let Some(peer) = self.peers.get(peer_node_addr) else {
2888 return false;
2889 };
2890 let stale_after_ms = self
2891 .config
2892 .node
2893 .heartbeat_interval_secs
2894 .saturating_mul(1000)
2895 .max(1000);
2896 peer.idle_time(Self::now_ms()) > stale_after_ms
2897 }
2898
2899 fn active_peer_matches_candidate(
2900 &self,
2901 peer_node_addr: &NodeAddr,
2902 candidate: &PeerAddress,
2903 ) -> bool {
2904 let Some(peer) = self.peers.get(peer_node_addr) else {
2905 return false;
2906 };
2907 let Some(current_addr) = peer.current_addr() else {
2908 return false;
2909 };
2910 if peer
2911 .transport_id()
2912 .map(|id| self.bootstrap_transports.contains(&id))
2913 .unwrap_or(false)
2914 {
2915 return false;
2916 }
2917 let current_addr = current_addr.to_string();
2918 let current_transport = peer
2919 .transport_id()
2920 .and_then(|id| self.transports.get(&id))
2921 .map(|transport| transport.transport_type().name);
2922
2923 candidate.addr == current_addr
2924 && current_transport
2925 .map(|transport| transport == candidate.transport)
2926 .unwrap_or(true)
2927 }
2928
2929 pub(crate) async fn api_connect(
2937 &mut self,
2938 npub: &str,
2939 address: &str,
2940 transport: &str,
2941 ) -> Result<serde_json::Value, String> {
2942 let peer_config = PeerConfig {
2943 npub: npub.to_string(),
2944 alias: None,
2945 addresses: vec![PeerAddress::new(transport, address)],
2946 connect_policy: ConnectPolicy::Manual,
2947 auto_reconnect: false,
2948 };
2949
2950 if let Ok(identity) = PeerIdentity::from_npub(npub) {
2952 self.peer_aliases
2953 .insert(*identity.node_addr(), identity.short_npub());
2954 self.register_identity(*identity.node_addr(), identity.pubkey_full());
2955 }
2956
2957 self.initiate_peer_connection(&peer_config)
2958 .await
2959 .map(|()| {
2960 info!(
2961 npub = %npub,
2962 address = %address,
2963 transport = %transport,
2964 "API connect initiated"
2965 );
2966 serde_json::json!({
2967 "npub": npub,
2968 "address": address,
2969 "transport": transport,
2970 })
2971 })
2972 .map_err(|e| e.to_string())
2973 }
2974
2975 pub(crate) fn api_disconnect(&mut self, npub: &str) -> Result<serde_json::Value, String> {
2979 let peer_identity =
2980 PeerIdentity::from_npub(npub).map_err(|e| format!("invalid npub '{npub}': {e}"))?;
2981 let node_addr = *peer_identity.node_addr();
2982
2983 if !self.peers.contains_key(&node_addr) {
2984 return Err(format!("peer not found: {npub}"));
2985 }
2986
2987 self.remove_active_peer(&node_addr);
2989
2990 self.retry_pending.remove(&node_addr);
2992
2993 info!(npub = %npub, "API disconnect completed");
2994
2995 Ok(serde_json::json!({
2996 "npub": npub,
2997 "disconnected": true,
2998 }))
2999 }
3000
3001 pub async fn adopt_established_traversal(
3008 &mut self,
3009 traversal: EstablishedTraversal,
3010 ) -> Result<BootstrapHandoffResult, NodeError> {
3011 debug!(
3012 peer_npub = %traversal.peer_npub,
3013 session_id = %traversal.session_id,
3014 remote_addr = %traversal.remote_addr,
3015 "adopting established traversal socket"
3016 );
3017
3018 if !self.state.is_operational() {
3019 return Err(NodeError::NotStarted);
3020 }
3021
3022 let packet_tx = self.packet_tx.clone().ok_or(NodeError::NotStarted)?;
3023 let peer_identity = PeerIdentity::from_npub(&traversal.peer_npub).map_err(|e| {
3024 NodeError::InvalidPeerNpub {
3025 npub: traversal.peer_npub.clone(),
3026 reason: e.to_string(),
3027 }
3028 })?;
3029 let peer_node_addr = *peer_identity.node_addr();
3030 if self.peers.contains_key(&peer_node_addr) {
3031 debug!(
3032 peer_npub = %traversal.peer_npub,
3033 "Adopting NAT traversal handoff as alternate path for already-connected peer"
3034 );
3035 }
3036
3037 self.peer_aliases
3038 .insert(peer_node_addr, peer_identity.short_npub());
3039 self.register_identity(peer_node_addr, peer_identity.pubkey_full());
3040
3041 let transport_id = self.allocate_transport_id();
3042 let inherited_config = traversal.transport_config.clone().unwrap_or_else(|| {
3062 let mut cfg = self
3063 .lookup_udp_config(traversal.transport_name.as_deref())
3064 .or_else(|| self.lookup_udp_config(None))
3065 .cloned()
3066 .unwrap_or_default();
3067 cfg.bind_addr = None;
3068 cfg.external_addr = None;
3069 cfg
3070 });
3071 let mut transport = crate::transport::udp::UdpTransport::new(
3072 transport_id,
3073 traversal.transport_name.clone(),
3074 inherited_config,
3075 packet_tx,
3076 );
3077
3078 transport
3079 .adopt_socket_async(traversal.socket)
3080 .await
3081 .map_err(|e| NodeError::BootstrapHandoff(e.to_string()))?;
3082
3083 let local_addr = transport.local_addr().ok_or_else(|| {
3084 NodeError::BootstrapHandoff("adopted UDP transport has no local address".into())
3085 })?;
3086
3087 self.transports.insert(
3088 transport_id,
3089 crate::transport::TransportHandle::Udp(transport),
3090 );
3091 self.bootstrap_transports.insert(transport_id);
3092 self.bootstrap_transport_npubs
3093 .insert(transport_id, traversal.peer_npub.clone());
3094
3095 let remote_addr = TransportAddr::from_string(&traversal.remote_addr.to_string());
3096 if let Err(err) = self
3097 .initiate_connection(transport_id, remote_addr.clone(), peer_identity)
3098 .await
3099 {
3100 self.bootstrap_transports.remove(&transport_id);
3101 self.bootstrap_transport_npubs.remove(&transport_id);
3102 if let Some(mut handle) = self.transports.remove(&transport_id) {
3103 let _ = handle.stop().await;
3104 }
3105 return Err(err);
3106 }
3107
3108 info!(
3109 peer = %self.peer_display_name(&peer_node_addr),
3110 transport_id = %transport_id,
3111 local_addr = %local_addr,
3112 remote_addr = %traversal.remote_addr,
3113 session_id = %traversal.session_id,
3114 "adopted NAT traversal socket; handshake initiated"
3115 );
3116
3117 Ok(BootstrapHandoffResult {
3118 transport_id,
3119 local_addr,
3120 remote_addr: traversal.remote_addr,
3121 peer_node_addr,
3122 session_id: traversal.session_id,
3123 })
3124 }
3125}