1use super::{Node, NodeError, NodeState};
4use crate::config::{ConnectPolicy, PeerAddress, PeerConfig};
5use crate::discovery::nostr::{
6 ADVERT_IDENTIFIER, ADVERT_VERSION, BootstrapEvent, NostrDiscovery, OverlayAdvert,
7 OverlayEndpointAdvert, OverlayTransportKind,
8};
9use crate::discovery::{BootstrapHandoffResult, EstablishedTraversal};
10use crate::node::acl::PeerAclContext;
11use crate::node::wire::build_msg1;
12use crate::peer::PeerConnection;
13use crate::protocol::{Disconnect, DisconnectReason};
14use crate::transport::{Link, LinkDirection, LinkId, TransportAddr, TransportId, packet_channel};
15use crate::upper::tun::{TunDevice, TunState, run_tun_reader, shutdown_tun_interface};
16use crate::{NodeAddr, PeerIdentity};
17use std::collections::HashSet;
18use std::net::IpAddr;
19use std::thread;
20use std::time::Duration;
21use tracing::{debug, info, warn};
22
23fn is_unroutable_advert_ip(ip: IpAddr) -> bool {
31 match ip {
32 IpAddr::V4(v4) => {
33 v4.is_private()
34 || v4.is_loopback()
35 || v4.is_link_local()
36 || v4.is_unspecified()
37 || v4.is_multicast()
38 || v4.is_broadcast()
39 || v4.is_documentation()
40 || (v4.octets()[0] == 100 && (v4.octets()[1] & 0xc0) == 64)
43 }
44 IpAddr::V6(v6) => {
45 v6.is_loopback()
46 || v6.is_unspecified()
47 || v6.is_unique_local()
48 || v6.is_multicast()
49 || (v6.segments()[0] & 0xffc0) == 0xfe80
51 }
52 }
53}
54
55const OPEN_DISCOVERY_RETRY_LIFETIME_MULTIPLIER: u64 = 2;
56
57impl Node {
58 pub(super) async fn update_peers(
69 &mut self,
70 new_peers: Vec<crate::config::PeerConfig>,
71 ) -> Result<crate::node::UpdatePeersOutcome, crate::node::NodeError> {
72 use std::collections::{HashMap, HashSet};
73
74 let mut new_by_addr: HashMap<crate::identity::NodeAddr, crate::config::PeerConfig> =
75 HashMap::with_capacity(new_peers.len());
76 for peer in new_peers {
77 let identity = match PeerIdentity::from_npub(&peer.npub) {
78 Ok(id) => id,
79 Err(e) => {
80 return Err(crate::node::NodeError::InvalidPeerNpub {
81 npub: peer.npub.clone(),
82 reason: e.to_string(),
83 });
84 }
85 };
86 new_by_addr.insert(*identity.node_addr(), peer);
90 }
91
92 let current_by_addr: HashMap<crate::identity::NodeAddr, crate::config::PeerConfig> = self
93 .config
94 .peers()
95 .iter()
96 .filter_map(|pc| {
97 PeerIdentity::from_npub(&pc.npub)
98 .ok()
99 .map(|id| (*id.node_addr(), pc.clone()))
100 })
101 .collect();
102
103 let new_addrs: HashSet<_> = new_by_addr.keys().copied().collect();
104 let current_addrs: HashSet<_> = current_by_addr.keys().copied().collect();
105
106 let removed: Vec<_> = current_addrs.difference(&new_addrs).copied().collect();
107 let added: Vec<_> = new_addrs.difference(¤t_addrs).copied().collect();
108 let kept: Vec<_> = new_addrs.intersection(¤t_addrs).copied().collect();
109
110 let mut outcome = crate::node::UpdatePeersOutcome::default();
111
112 for node_addr in &removed {
113 if self.retry_pending.remove(node_addr).is_some() {
114 debug!(
115 peer = %self.peer_display_name(node_addr),
116 "Dropping retry entry for peer removed from runtime peer list"
117 );
118 }
119 self.peer_aliases.remove(node_addr);
120 outcome.removed += 1;
121 }
122
123 for node_addr in &kept {
124 let new_pc = &new_by_addr[node_addr];
125 let current_pc = ¤t_by_addr[node_addr];
126 if new_pc.addresses != current_pc.addresses
127 || new_pc.alias != current_pc.alias
128 || new_pc.connect_policy != current_pc.connect_policy
129 || new_pc.auto_reconnect != current_pc.auto_reconnect
130 {
131 outcome.updated += 1;
132 if let Some(state) = self.retry_pending.get_mut(node_addr) {
133 state.peer_config = new_pc.clone();
134 }
135 if let Some(alias) = new_pc.alias.clone() {
136 self.peer_aliases.insert(*node_addr, alias);
137 }
138 } else {
139 outcome.unchanged += 1;
140 }
141 }
142
143 let added_configs: Vec<crate::config::PeerConfig> =
144 added.iter().map(|addr| new_by_addr[addr].clone()).collect();
145
146 self.config.peers = new_by_addr.into_values().collect();
150
151 for peer_config in added_configs {
152 outcome.added += 1;
153 let Ok(identity) = PeerIdentity::from_npub(&peer_config.npub) else {
154 continue;
155 };
156 let name = peer_config
157 .alias
158 .clone()
159 .unwrap_or_else(|| identity.short_npub());
160 self.peer_aliases.insert(*identity.node_addr(), name);
161 self.register_identity(*identity.node_addr(), identity.pubkey_full());
162
163 if !peer_config.is_auto_connect() {
164 continue;
165 }
166
167 if let Err(e) = self.initiate_peer_connection(&peer_config).await {
168 warn!(
169 npub = %peer_config.npub,
170 error = %e,
171 "Failed to initiate connection for newly added peer"
172 );
173 if let Ok(peer_identity) = PeerIdentity::from_npub(&peer_config.npub) {
174 self.schedule_retry(*peer_identity.node_addr(), Self::now_ms());
175 }
176 if matches!(e, crate::node::NodeError::NoTransportForType(_))
177 && let Some(bootstrap) = self.nostr_discovery.clone()
178 {
179 let npub = peer_config.npub.clone();
180 tokio::spawn(async move {
181 let _ = bootstrap.refetch_advert_for_stale_check(&npub).await;
182 });
183 }
184 }
185 }
186
187 Ok(outcome)
188 }
189
190 pub(super) async fn initiate_peer_connections(&mut self) {
191 let peer_identities: Vec<(PeerIdentity, Option<String>)> = self
197 .config
198 .peers()
199 .iter()
200 .filter_map(|pc| {
201 PeerIdentity::from_npub(&pc.npub)
202 .ok()
203 .map(|id| (id, pc.alias.clone()))
204 })
205 .collect();
206
207 for (identity, alias) in peer_identities {
208 let name = alias.unwrap_or_else(|| identity.short_npub());
209 self.peer_aliases.insert(*identity.node_addr(), name);
210 self.register_identity(*identity.node_addr(), identity.pubkey_full());
214 }
215
216 let peer_configs: Vec<_> = self.config.auto_connect_peers().cloned().collect();
218
219 if peer_configs.is_empty() {
220 debug!("No static peers configured");
221 return;
222 }
223
224 debug!(
225 count = peer_configs.len(),
226 "Initiating static peer connections"
227 );
228
229 for peer_config in peer_configs {
230 if let Err(e) = self.initiate_peer_connection(&peer_config).await {
231 warn!(
232 npub = %peer_config.npub,
233 alias = ?peer_config.alias,
234 error = %e,
235 "Failed to initiate peer connection"
236 );
237 if let Ok(peer_identity) = PeerIdentity::from_npub(&peer_config.npub) {
241 self.schedule_retry(*peer_identity.node_addr(), Self::now_ms());
242 }
243 if matches!(e, crate::node::NodeError::NoTransportForType(_))
249 && let Some(bootstrap) = self.nostr_discovery.clone()
250 {
251 let npub = peer_config.npub.clone();
252 tokio::spawn(async move {
253 let _ = bootstrap.refetch_advert_for_stale_check(&npub).await;
254 });
255 }
256 }
257 }
258 }
259
260 pub(super) async fn initiate_peer_connection(
264 &mut self,
265 peer_config: &crate::config::PeerConfig,
266 ) -> Result<(), NodeError> {
267 self.initiate_peer_connection_inner(peer_config).await
268 }
269
270 pub(super) async fn initiate_peer_retry_connection(
276 &mut self,
277 peer_config: &crate::config::PeerConfig,
278 ) -> Result<(), NodeError> {
279 self.initiate_peer_connection_inner(peer_config).await
280 }
281
282 async fn initiate_peer_connection_inner(
283 &mut self,
284 peer_config: &crate::config::PeerConfig,
285 ) -> Result<(), NodeError> {
286 let peer_identity =
288 PeerIdentity::from_npub(&peer_config.npub).map_err(|e| NodeError::InvalidPeerNpub {
289 npub: peer_config.npub.clone(),
290 reason: e.to_string(),
291 })?;
292
293 let peer_node_addr = *peer_identity.node_addr();
294
295 if self.peers.contains_key(&peer_node_addr) {
297 debug!(
298 npub = %peer_config.npub,
299 "Peer already exists, skipping"
300 );
301 return Ok(());
302 }
303
304 let already_connecting = self.connections.values().any(|conn| {
306 conn.expected_identity()
307 .map(|id| id.node_addr() == &peer_node_addr)
308 .unwrap_or(false)
309 });
310 if already_connecting {
311 debug!(
312 npub = %peer_config.npub,
313 "Connection already in progress, skipping"
314 );
315 return Ok(());
316 }
317
318 self.try_peer_addresses(peer_config, peer_identity, true)
319 .await
320 }
321
322 fn is_connecting_to_peer(&self, peer_node_addr: &NodeAddr) -> bool {
323 self.connections.values().any(|conn| {
324 conn.expected_identity()
325 .map(|id| id.node_addr() == peer_node_addr)
326 .unwrap_or(false)
327 })
328 }
329
330 pub(super) async fn initiate_connection(
341 &mut self,
342 transport_id: TransportId,
343 remote_addr: TransportAddr,
344 peer_identity: PeerIdentity,
345 ) -> Result<(), NodeError> {
346 let peer_node_addr = *peer_identity.node_addr();
347
348 self.authorize_peer(
349 &peer_identity,
350 PeerAclContext::OutboundConnect,
351 transport_id,
352 &remote_addr,
353 )?;
354
355 let is_connection_oriented = self
356 .transports
357 .get(&transport_id)
358 .map(|t| t.transport_type().connection_oriented)
359 .unwrap_or(false);
360
361 let link_id = self.allocate_link_id();
363
364 let link = if is_connection_oriented {
365 Link::new(
366 link_id,
367 transport_id,
368 remote_addr.clone(),
369 LinkDirection::Outbound,
370 Duration::from_millis(self.config.node.base_rtt_ms),
371 )
372 } else {
373 Link::connectionless(
374 link_id,
375 transport_id,
376 remote_addr.clone(),
377 LinkDirection::Outbound,
378 Duration::from_millis(self.config.node.base_rtt_ms),
379 )
380 };
381
382 self.links.insert(link_id, link);
383
384 self.addr_to_link
386 .insert((transport_id, remote_addr.clone()), link_id);
387
388 if is_connection_oriented {
389 if let Some(transport) = self.transports.get(&transport_id) {
391 match transport.connect(&remote_addr).await {
392 Ok(()) => {
393 debug!(
394 peer = %self.peer_display_name(&peer_node_addr),
395 transport_id = %transport_id,
396 remote_addr = %remote_addr,
397 link_id = %link_id,
398 "Transport connect initiated (non-blocking)"
399 );
400 self.pending_connects.push(super::PendingConnect {
401 link_id,
402 transport_id,
403 remote_addr,
404 peer_identity,
405 });
406 }
407 Err(e) => {
408 self.links.remove(&link_id);
410 self.addr_to_link.remove(&(transport_id, remote_addr));
411 return Err(NodeError::TransportError(e.to_string()));
412 }
413 }
414 }
415 Ok(())
416 } else {
417 self.start_handshake(link_id, transport_id, remote_addr, peer_identity)
419 .await
420 }
421 }
422
423 pub(super) async fn start_handshake(
428 &mut self,
429 link_id: LinkId,
430 transport_id: TransportId,
431 remote_addr: TransportAddr,
432 peer_identity: PeerIdentity,
433 ) -> Result<(), NodeError> {
434 let peer_node_addr = *peer_identity.node_addr();
435
436 let current_time_ms = Self::now_ms();
438 let mut connection = PeerConnection::outbound(link_id, peer_identity, current_time_ms);
439
440 let our_index = match self.index_allocator.allocate() {
442 Ok(idx) => idx,
443 Err(e) => {
444 self.links.remove(&link_id);
446 self.addr_to_link.remove(&(transport_id, remote_addr));
447 return Err(NodeError::IndexAllocationFailed(e.to_string()));
448 }
449 };
450
451 let our_keypair = self.identity.keypair();
453 let noise_msg1 =
454 match connection.start_handshake(our_keypair, self.startup_epoch, current_time_ms) {
455 Ok(msg) => msg,
456 Err(e) => {
457 let _ = self.index_allocator.free(our_index);
459 self.links.remove(&link_id);
460 self.addr_to_link.remove(&(transport_id, remote_addr));
461 return Err(NodeError::HandshakeFailed(e.to_string()));
462 }
463 };
464
465 connection.set_our_index(our_index);
467 connection.set_transport_id(transport_id);
468 connection.set_source_addr(remote_addr.clone());
469
470 let wire_msg1 = build_msg1(our_index, &noise_msg1);
472
473 debug!(
474 peer = %self.peer_display_name(&peer_node_addr),
475 transport_id = %transport_id,
476 remote_addr = %remote_addr,
477 link_id = %link_id,
478 our_index = %our_index,
479 "Connection initiated"
480 );
481
482 let resend_interval = self.config.node.rate_limit.handshake_resend_interval_ms;
484 connection.set_handshake_msg1(wire_msg1.clone(), current_time_ms + resend_interval);
485
486 self.pending_outbound
488 .insert((transport_id, our_index.as_u32()), link_id);
489 self.connections.insert(link_id, connection);
490
491 let send_result = match self.transports.get(&transport_id) {
493 Some(transport) => Some(transport.send(&remote_addr, &wire_msg1).await),
494 None => None,
495 };
496 if let Some(send_result) = send_result {
497 self.note_local_send_outcome(&send_result);
498 match send_result {
499 Ok(bytes) => {
500 debug!(
501 link_id = %link_id,
502 our_index = %our_index,
503 bytes,
504 "Sent Noise handshake message 1 (wire format)"
505 );
506 }
507 Err(e) => {
508 warn!(
509 link_id = %link_id,
510 transport_id = %transport_id,
511 remote_addr = %remote_addr,
512 our_index = %our_index,
513 error = %e,
514 "Failed to send handshake message"
515 );
516 if let Some(conn) = self.connections.get_mut(&link_id) {
519 conn.mark_failed();
520 }
521 }
522 }
523 }
524
525 Ok(())
526 }
527
528 pub(super) async fn poll_transport_discovery(&mut self) {
534 let mut to_connect = Vec::new();
536
537 for (transport_id, transport) in &self.transports {
538 if !transport.is_operational() {
539 continue;
540 }
541 if !transport.auto_connect() {
542 let _ = transport.discover();
544 continue;
545 }
546 let discovered = match transport.discover() {
547 Ok(peers) => peers,
548 Err(_) => continue,
549 };
550 for peer in discovered {
551 let pubkey = match peer.pubkey_hint {
552 Some(pk) => pk,
553 None => continue,
554 };
555 let identity = PeerIdentity::from_pubkey(pubkey);
556 let node_addr = *identity.node_addr();
557
558 if node_addr == *self.identity.node_addr() {
560 continue;
561 }
562 if self.peers.contains_key(&node_addr) {
564 continue;
565 }
566 let connecting = self.connections.values().any(|c| {
568 c.expected_identity()
569 .map(|id| id.node_addr() == &node_addr)
570 .unwrap_or(false)
571 });
572 if connecting {
573 continue;
574 }
575
576 to_connect.push((*transport_id, peer.addr, identity));
577 }
578 }
579
580 for (transport_id, remote_addr, identity) in to_connect {
581 info!(
582 peer = %self.peer_display_name(identity.node_addr()),
583 transport_id = %transport_id,
584 remote_addr = %remote_addr,
585 "Auto-connecting to discovered peer"
586 );
587 if let Err(e) = self
588 .initiate_connection(transport_id, remote_addr, identity)
589 .await
590 {
591 warn!(error = %e, "Failed to auto-connect to discovered peer");
592 }
593 }
594 }
595
596 pub(super) async fn poll_nostr_discovery(&mut self) {
597 let Some(bootstrap) = self.nostr_discovery.clone() else {
598 return;
599 };
600
601 if let Err(err) = self.refresh_overlay_advert(&bootstrap).await {
602 debug!(error = %err, "Failed to refresh local Nostr overlay advert");
603 }
604
605 for event in bootstrap.drain_events().await {
606 match event {
607 BootstrapEvent::Established { traversal } => {
608 let peer_npub = traversal.peer_npub.clone();
609 if let Ok(peer_identity) = PeerIdentity::from_npub(&peer_npub) {
610 let peer_addr = *peer_identity.node_addr();
611 if self.peers.contains_key(&peer_addr) {
612 debug!(
613 peer_npub = %peer_npub,
614 "Ignoring established NAT traversal for already-connected peer"
615 );
616 continue;
617 }
618 if self.is_connecting_to_peer(&peer_addr) {
619 debug!(
620 peer_npub = %peer_npub,
621 "Ignoring established NAT traversal while peer handshake is already in progress"
622 );
623 continue;
624 }
625 }
626 match self.adopt_established_traversal(traversal).await {
627 Ok(_) => {
628 info!(peer_npub = %peer_npub, "Adopted NAT traversal socket");
629 }
630 Err(err) => {
631 warn!(peer_npub = %peer_npub, error = %err, "Failed to adopt NAT traversal");
632 if let Ok(peer_identity) = PeerIdentity::from_npub(&peer_npub) {
633 self.schedule_retry(*peer_identity.node_addr(), Self::now_ms());
634 }
635 }
636 }
637 }
638 BootstrapEvent::Failed {
639 peer_config,
640 reason,
641 } => {
642 let peer_identity = match PeerIdentity::from_npub(&peer_config.npub) {
643 Ok(identity) => identity,
644 Err(_) => continue,
645 };
646 let node_addr = *peer_identity.node_addr();
647 if self.peers.contains_key(&node_addr) {
648 debug!(
649 npub = %peer_config.npub,
650 error = %reason,
651 "Ignoring failed NAT traversal for already-connected peer"
652 );
653 continue;
654 }
655 if self.is_connecting_to_peer(&node_addr) {
656 debug!(
657 npub = %peer_config.npub,
658 error = %reason,
659 "Ignoring failed NAT traversal while peer handshake is already in progress"
660 );
661 continue;
662 }
663
664 let now_ms = Self::now_ms();
665 let decision = bootstrap.record_traversal_failure(&peer_config.npub, now_ms);
666 if decision.should_warn {
667 warn!(
668 npub = %peer_config.npub,
669 error = %reason,
670 consecutive_failures = decision.consecutive_failures,
671 cooldown_secs = decision
672 .cooldown_until_ms
673 .map(|t| t.saturating_sub(now_ms) / 1000),
674 "NAT traversal failed"
675 );
676 } else {
677 debug!(
678 npub = %peer_config.npub,
679 error = %reason,
680 consecutive_failures = decision.consecutive_failures,
681 "NAT traversal failed (suppressed by warn-rate-limit)"
682 );
683 }
684
685 if decision.crossed_threshold {
689 let bootstrap = bootstrap.clone();
690 let npub = peer_config.npub.clone();
691 tokio::spawn(async move {
692 let outcome = bootstrap.refetch_advert_for_stale_check(&npub).await;
693 match outcome {
694 crate::discovery::nostr::NostrRefetchOutcome::Evicted => info!(
695 npub = %npub,
696 "stale-advert sweep: peer evicted from advert cache"
697 ),
698 crate::discovery::nostr::NostrRefetchOutcome::Refreshed => info!(
699 npub = %npub,
700 "stale-advert sweep: peer republished, cache refreshed and streak reset"
701 ),
702 crate::discovery::nostr::NostrRefetchOutcome::SameAdvert => debug!(
703 npub = %npub,
704 "stale-advert sweep: advert unchanged, cooldown stands"
705 ),
706 crate::discovery::nostr::NostrRefetchOutcome::Skipped => debug!(
707 npub = %npub,
708 "stale-advert sweep: skipped (relay error or no advert_relays)"
709 ),
710 }
711 });
712 }
713
714 if self
715 .try_peer_addresses(&peer_config, peer_identity, false)
716 .await
717 .is_ok()
718 {
719 continue;
720 }
721
722 self.schedule_retry(node_addr, now_ms);
723 if let Some(cooldown_until_ms) = decision.cooldown_until_ms
724 && let Some(state) = self.retry_pending.get_mut(&node_addr)
725 {
726 state.retry_after_ms = state.retry_after_ms.max(cooldown_until_ms);
730 }
731 }
732 }
733 }
734
735 self.maybe_run_startup_open_discovery_sweep(&bootstrap)
736 .await;
737 self.queue_open_discovery_retries(&bootstrap).await;
738 }
739
740 pub(super) fn lan_discovery_scope(&self) -> Option<String> {
745 let app = self.config.node.discovery.nostr.app.trim();
746 if app.is_empty() {
747 return None;
748 }
749 if let Some(rest) = app.strip_prefix("fips-overlay-v1:") {
750 let scope = rest.trim();
751 if scope.is_empty() {
752 None
753 } else {
754 Some(scope.to_string())
755 }
756 } else {
757 Some(app.to_string())
758 }
759 }
760
761 pub(super) async fn poll_lan_discovery(&mut self) {
767 let Some(runtime) = self.lan_discovery.clone() else {
768 return;
769 };
770 let events = runtime.drain_events().await;
771 if events.is_empty() {
772 return;
773 }
774 let udp_transport_id = self.find_transport_for_type("udp");
777 let Some(transport_id) = udp_transport_id else {
778 debug!("lan: no operational UDP transport, skipping discovered peers");
779 return;
780 };
781 for event in events {
782 let crate::discovery::lan::LanEvent::Discovered(peer) = event;
783 let identity = match crate::PeerIdentity::from_npub(&peer.npub) {
784 Ok(id) => id,
785 Err(err) => {
786 debug!(npub = %peer.npub, error = %err, "lan: skip bad npub");
787 continue;
788 }
789 };
790 let peer_node_addr = *identity.node_addr();
791 if self.peers.contains_key(&peer_node_addr) {
792 continue;
793 }
794 let already_connecting = self.connections.values().any(|conn| {
795 conn.expected_identity()
796 .map(|id| id.node_addr() == &peer_node_addr)
797 .unwrap_or(false)
798 });
799 if already_connecting {
800 continue;
801 }
802 let remote_addr = crate::transport::TransportAddr::from_string(&peer.addr.to_string());
803 info!(
804 npub = %identity.short_npub(),
805 addr = %peer.addr,
806 "lan: initiating handshake to discovered peer"
807 );
808 if let Err(err) = self
809 .initiate_connection(transport_id, remote_addr, identity)
810 .await
811 {
812 debug!(
813 npub = %peer.npub,
814 error = %err,
815 "lan: failed to initiate connection to discovered peer"
816 );
817 }
818 }
819 }
820
821 pub(super) async fn poll_pending_connects(&mut self) {
828 if self.pending_connects.is_empty() {
829 return;
830 }
831
832 let mut completed = Vec::new();
833
834 for (i, pending) in self.pending_connects.iter().enumerate() {
835 let state = if let Some(transport) = self.transports.get(&pending.transport_id) {
836 transport.connection_state(&pending.remote_addr)
837 } else {
838 crate::transport::ConnectionState::Failed("transport removed".into())
839 };
840
841 match state {
842 crate::transport::ConnectionState::Connected => {
843 completed.push((i, true, None));
844 }
845 crate::transport::ConnectionState::Failed(reason) => {
846 completed.push((i, false, Some(reason)));
847 }
848 crate::transport::ConnectionState::Connecting => {
849 }
851 crate::transport::ConnectionState::None => {
852 completed.push((i, false, Some("no connection attempt found".into())));
854 }
855 }
856 }
857
858 for (i, success, reason) in completed.into_iter().rev() {
860 let pending = self.pending_connects.remove(i);
861
862 if success {
863 if let Some(link) = self.links.get_mut(&pending.link_id) {
865 link.set_connected();
866 }
867
868 debug!(
869 peer = %self.peer_display_name(pending.peer_identity.node_addr()),
870 transport_id = %pending.transport_id,
871 remote_addr = %pending.remote_addr,
872 link_id = %pending.link_id,
873 "Transport connected, starting handshake"
874 );
875
876 if let Err(e) = self
878 .start_handshake(
879 pending.link_id,
880 pending.transport_id,
881 pending.remote_addr.clone(),
882 pending.peer_identity,
883 )
884 .await
885 {
886 warn!(
887 link_id = %pending.link_id,
888 error = %e,
889 "Failed to start handshake after transport connect"
890 );
891 self.remove_link(&pending.link_id);
893 }
894 } else {
895 let reason = reason.unwrap_or_default();
896 warn!(
897 peer = %self.peer_display_name(pending.peer_identity.node_addr()),
898 transport_id = %pending.transport_id,
899 remote_addr = %pending.remote_addr,
900 link_id = %pending.link_id,
901 reason = %reason,
902 "Transport connect failed"
903 );
904
905 self.remove_link(&pending.link_id);
907 self.links.remove(&pending.link_id);
908 self.schedule_retry(*pending.peer_identity.node_addr(), Self::now_ms());
909 }
910 }
911 }
912
913 pub async fn start(&mut self) -> Result<(), NodeError> {
920 if !self.state.can_start() {
921 return Err(NodeError::AlreadyStarted);
922 }
923 self.state = NodeState::Starting;
924
925 let packet_buffer_size = self.config.node.buffers.packet_channel;
927 let (packet_tx, packet_rx) = packet_channel(packet_buffer_size);
928 self.packet_tx = Some(packet_tx.clone());
929 self.packet_rx = Some(packet_rx);
930
931 let transport_handles = self.create_transports(&packet_tx).await;
933
934 for mut handle in transport_handles {
935 let transport_id = handle.transport_id();
936 let transport_type = handle.transport_type().name;
937 let name = handle.name().map(|s| s.to_string());
938
939 match handle.start().await {
940 Ok(()) => {
941 self.transports.insert(transport_id, handle);
942 }
943 Err(e) => {
944 if let Some(ref n) = name {
945 warn!(transport_type, name = %n, error = %e, "Transport failed to start");
946 } else {
947 warn!(transport_type, error = %e, "Transport failed to start");
948 }
949 }
950 }
951 }
952
953 if !self.transports.is_empty() {
954 info!(count = self.transports.len(), "Transports initialized");
955 }
956
957 #[cfg(unix)]
973 {
974 let cpu_default = std::thread::available_parallelism()
975 .map(|n| n.get())
976 .unwrap_or(1)
977 .max(1);
978 let encrypt_worker_count: usize = std::env::var("FIPS_ENCRYPT_WORKERS")
979 .ok()
980 .and_then(|s| s.parse().ok())
981 .unwrap_or(cpu_default)
982 .max(1);
983 self.encrypt_workers = Some(super::encrypt_worker::EncryptWorkerPool::spawn(
984 encrypt_worker_count,
985 ));
986 info!(
987 workers = encrypt_worker_count,
988 "Spawned FMP-encrypt worker pool"
989 );
990
991 let decrypt_worker_count: usize = std::env::var("FIPS_DECRYPT_WORKERS")
1000 .ok()
1001 .and_then(|s| s.parse().ok())
1002 .unwrap_or(cpu_default);
1003 if decrypt_worker_count == 0 {
1004 info!("FIPS_DECRYPT_WORKERS=0 → in-line decrypt in rx_loop (no worker pool)");
1005 } else {
1006 self.decrypt_workers = Some(super::decrypt_worker::DecryptWorkerPool::spawn(
1007 decrypt_worker_count,
1008 ));
1009 info!(
1010 workers = decrypt_worker_count,
1011 "Spawned FMP+FSP-decrypt worker pool"
1012 );
1013 }
1014 }
1015
1016 if self.config.node.discovery.nostr.enabled {
1017 match NostrDiscovery::start(&self.identity, self.config.node.discovery.nostr.clone())
1018 .await
1019 {
1020 Ok(runtime) => {
1021 if let Err(err) = self.refresh_overlay_advert(&runtime).await {
1022 warn!(error = %err, "Failed to publish initial Nostr overlay advert");
1023 }
1024 self.nostr_discovery = Some(runtime);
1025 self.nostr_discovery_started_at_ms = Some(Self::now_ms());
1026 info!("Nostr overlay discovery enabled");
1027 }
1028 Err(err) => {
1029 warn!(error = %err, "Failed to start Nostr overlay discovery");
1030 }
1031 }
1032 }
1033
1034 if self.config.node.discovery.lan.enabled {
1038 let advertised_udp_port = self
1039 .transports
1040 .values()
1041 .filter(|h| h.is_operational())
1042 .filter(|h| h.transport_type().name == "udp")
1043 .find_map(|h| h.local_addr().map(|addr| addr.port()))
1044 .unwrap_or(0);
1045 let scope = self.lan_discovery_scope();
1046 match crate::discovery::lan::LanDiscovery::start(
1047 &self.identity,
1048 scope,
1049 advertised_udp_port,
1050 self.config.node.discovery.lan.clone(),
1051 )
1052 .await
1053 {
1054 Ok(runtime) => {
1055 self.lan_discovery = Some(runtime);
1056 info!("LAN mDNS discovery enabled");
1057 }
1058 Err(err) => {
1059 debug!(error = %err, "LAN mDNS discovery not started");
1060 }
1061 }
1062 }
1063
1064 self.initiate_peer_connections().await;
1067
1068 if self.config.tun.enabled {
1070 let address = *self.identity.address();
1071 match TunDevice::create(&self.config.tun, address).await {
1072 Ok(device) => {
1073 let mtu = device.mtu();
1074 let name = device.name().to_string();
1075 let our_addr = *device.address();
1076
1077 info!("TUN device active:");
1078 info!(" name: {}", name);
1079 info!(" address: {}", device.address());
1080 info!(" mtu: {}", mtu);
1081
1082 let effective_mtu = self.effective_ipv6_mtu();
1084 let max_mss = effective_mtu.saturating_sub(40).saturating_sub(20); info!("effective MTU: {} bytes", effective_mtu);
1087 debug!(" max TCP MSS: {} bytes", max_mss);
1088
1089 #[cfg(target_os = "macos")]
1093 let (shutdown_read_fd, shutdown_write_fd) = {
1094 let mut fds = [0i32; 2];
1095 if unsafe { libc::pipe(fds.as_mut_ptr()) } < 0 {
1096 return Err(NodeError::Tun(crate::upper::tun::TunError::Configure(
1097 "failed to create shutdown pipe".into(),
1098 )));
1099 }
1100 (fds[0], fds[1])
1101 };
1102
1103 let (writer, tun_tx) =
1107 device.create_writer(max_mss, self.path_mtu_lookup.clone())?;
1108
1109 let writer_handle = thread::spawn(move || {
1111 writer.run();
1112 });
1113
1114 let reader_tun_tx = tun_tx.clone();
1116
1117 let tun_channel_size = self.config.node.buffers.tun_channel;
1119 let (outbound_tx, outbound_rx) = tokio::sync::mpsc::channel(tun_channel_size);
1120
1121 let transport_mtu = self.transport_mtu();
1123 let path_mtu_lookup = self.path_mtu_lookup.clone();
1124 #[cfg(target_os = "macos")]
1125 let reader_handle = thread::spawn(move || {
1126 run_tun_reader(
1127 device,
1128 mtu,
1129 our_addr,
1130 reader_tun_tx,
1131 outbound_tx,
1132 transport_mtu,
1133 path_mtu_lookup,
1134 shutdown_read_fd,
1135 );
1136 });
1137 #[cfg(not(target_os = "macos"))]
1138 let reader_handle = thread::spawn(move || {
1139 run_tun_reader(
1140 device,
1141 mtu,
1142 our_addr,
1143 reader_tun_tx,
1144 outbound_tx,
1145 transport_mtu,
1146 path_mtu_lookup,
1147 );
1148 });
1149
1150 self.tun_state = TunState::Active;
1151 self.tun_name = Some(name);
1152 self.tun_tx = Some(tun_tx);
1153 self.tun_outbound_rx = Some(outbound_rx);
1154 self.tun_reader_handle = Some(reader_handle);
1155 self.tun_writer_handle = Some(writer_handle);
1156 #[cfg(target_os = "macos")]
1157 {
1158 self.tun_shutdown_fd = Some(shutdown_write_fd);
1159 }
1160 }
1161 Err(e) => {
1162 self.tun_state = TunState::Failed;
1163 warn!(error = %e, "Failed to initialize TUN, continuing without it");
1164 }
1165 }
1166 }
1167
1168 if self.config.dns.enabled {
1185 let addr_str = self.config.dns.bind_addr();
1186 match addr_str.parse::<std::net::IpAddr>() {
1187 Ok(ip) => {
1188 let bind = std::net::SocketAddr::new(ip, self.config.dns.port());
1189 match Self::bind_dns_socket(bind) {
1190 Ok(socket) => {
1191 let dns_channel_size = self.config.node.buffers.dns_channel;
1192 let (identity_tx, identity_rx) =
1193 tokio::sync::mpsc::channel(dns_channel_size);
1194 let dns_ttl = self.config.dns.ttl();
1195 let base_hosts = crate::upper::hosts::HostMap::from_peer_configs(
1196 self.config.peers(),
1197 );
1198 let reloader = if self.config.node.system_files_enabled {
1199 let hosts_path = std::path::PathBuf::from(
1200 crate::upper::hosts::DEFAULT_HOSTS_PATH,
1201 );
1202 crate::upper::hosts::HostMapReloader::new(base_hosts, hosts_path)
1203 } else {
1204 crate::upper::hosts::HostMapReloader::memory_only(base_hosts)
1205 };
1206 let mesh_ifindex = Self::lookup_mesh_ifindex(self.config.tun.name());
1214 info!(
1215 bind = %bind,
1216 hosts = reloader.hosts().len(),
1217 mesh_ifindex = ?mesh_ifindex,
1218 "DNS responder started for .fips domain (auto-reload enabled)"
1219 );
1220 let handle = tokio::spawn(crate::upper::dns::run_dns_responder(
1221 socket,
1222 identity_tx,
1223 dns_ttl,
1224 reloader,
1225 mesh_ifindex,
1226 ));
1227 self.dns_identity_rx = Some(identity_rx);
1228 self.dns_task = Some(handle);
1229 }
1230 Err(e) => {
1231 warn!(bind = %bind, error = %e, "Failed to start DNS responder");
1232 }
1233 }
1234 }
1235 Err(e) => {
1236 warn!(addr = %addr_str, error = %e, "Invalid dns.bind_addr; DNS responder not started");
1237 }
1238 }
1239 }
1240
1241 self.state = NodeState::Running;
1242 info!("Node started:");
1243 info!(" state: {}", self.state);
1244 info!(" transports: {}", self.transports.len());
1245 info!(" connections: {}", self.connections.len());
1246 Ok(())
1247 }
1248
1249 fn bind_dns_socket(
1262 addr: std::net::SocketAddr,
1263 ) -> Result<tokio::net::UdpSocket, std::io::Error> {
1264 use socket2::{Domain, Protocol, Socket, Type};
1265 let domain = if addr.is_ipv4() {
1266 Domain::IPV4
1267 } else {
1268 Domain::IPV6
1269 };
1270 let sock = Socket::new(domain, Type::DGRAM, Some(Protocol::UDP))?;
1271 if addr.is_ipv6() {
1272 sock.set_only_v6(false)?;
1273 #[cfg(unix)]
1274 Self::set_recv_pktinfo_v6(&sock)?;
1275 }
1276 sock.set_nonblocking(true)?;
1277 sock.bind(&addr.into())?;
1278 tokio::net::UdpSocket::from_std(sock.into())
1279 }
1280
1281 #[cfg(unix)]
1287 fn set_recv_pktinfo_v6(sock: &socket2::Socket) -> Result<(), std::io::Error> {
1288 use std::os::fd::AsRawFd;
1289 let enable: libc::c_int = 1;
1290 let ret = unsafe {
1291 libc::setsockopt(
1292 sock.as_raw_fd(),
1293 libc::IPPROTO_IPV6,
1294 libc::IPV6_RECVPKTINFO,
1295 &enable as *const _ as *const libc::c_void,
1296 std::mem::size_of::<libc::c_int>() as libc::socklen_t,
1297 )
1298 };
1299 if ret < 0 {
1300 return Err(std::io::Error::last_os_error());
1301 }
1302 Ok(())
1303 }
1304
1305 fn lookup_mesh_ifindex(name: &str) -> Option<u32> {
1312 #[cfg(unix)]
1313 {
1314 let c_name = std::ffi::CString::new(name).ok()?;
1315 let idx = unsafe { libc::if_nametoindex(c_name.as_ptr()) };
1316 if idx == 0 { None } else { Some(idx) }
1317 }
1318 #[cfg(not(unix))]
1319 {
1320 let _ = name;
1321 None
1322 }
1323 }
1324
1325 pub async fn stop(&mut self) -> Result<(), NodeError> {
1330 if !self.state.can_stop() {
1331 return Err(NodeError::NotStarted);
1332 }
1333 self.state = NodeState::Stopping;
1334 info!(state = %self.state, "Node stopping");
1335
1336 if let Some(handle) = self.dns_task.take() {
1338 handle.abort();
1339 debug!("DNS responder stopped");
1340 }
1341
1342 self.send_disconnect_to_all_peers(DisconnectReason::Shutdown)
1344 .await;
1345
1346 if let Some(bootstrap) = self.nostr_discovery.take()
1348 && let Err(e) = bootstrap.shutdown().await
1349 {
1350 warn!(error = %e, "Failed to shutdown Nostr overlay discovery");
1351 }
1352
1353 if let Some(lan) = self.lan_discovery.take() {
1357 lan.shutdown().await;
1358 }
1359
1360 let transport_ids: Vec<_> = self.transports.keys().cloned().collect();
1362 for transport_id in transport_ids {
1363 if let Some(mut handle) = self.transports.remove(&transport_id) {
1364 let transport_type = handle.transport_type().name;
1365 match handle.stop().await {
1366 Ok(()) => {
1367 info!(transport_id = %transport_id, transport_type, "Transport stopped");
1368 }
1369 Err(e) => {
1370 warn!(
1371 transport_id = %transport_id,
1372 transport_type,
1373 error = %e,
1374 "Transport stop failed"
1375 );
1376 }
1377 }
1378 }
1379 }
1380
1381 self.packet_tx.take();
1383 self.packet_rx.take();
1384
1385 if let Some(name) = self.tun_name.take() {
1387 info!(name = %name, "Shutting down TUN interface");
1388
1389 self.tun_tx.take();
1391
1392 if let Err(e) = shutdown_tun_interface(&name).await {
1394 warn!(name = %name, error = %e, "Failed to shutdown TUN interface");
1395 }
1396
1397 #[cfg(target_os = "macos")]
1400 if let Some(fd) = self.tun_shutdown_fd.take() {
1401 unsafe {
1402 libc::write(fd, b"x".as_ptr() as *const libc::c_void, 1);
1403 libc::close(fd);
1404 }
1405 }
1406
1407 if let Some(handle) = self.tun_reader_handle.take() {
1409 let _ = handle.join();
1410 }
1411 if let Some(handle) = self.tun_writer_handle.take() {
1412 let _ = handle.join();
1413 }
1414
1415 self.tun_state = TunState::Disabled;
1416 }
1417
1418 self.state = NodeState::Stopped;
1419 info!(state = %self.state, "Node stopped");
1420 Ok(())
1421 }
1422
1423 async fn send_disconnect_to_all_peers(&mut self, reason: DisconnectReason) {
1428 let disconnect = Disconnect::new(reason);
1429 let plaintext = disconnect.encode();
1430
1431 let peer_addrs: Vec<NodeAddr> = self
1433 .peers
1434 .iter()
1435 .filter(|(_, peer)| peer.can_send() && peer.has_session())
1436 .map(|(addr, _)| *addr)
1437 .collect();
1438
1439 if peer_addrs.is_empty() {
1440 debug!(
1441 total_peers = self.peers.len(),
1442 "No sendable peers for disconnect notification"
1443 );
1444 return;
1445 }
1446
1447 let mut sent = 0usize;
1448 for node_addr in &peer_addrs {
1449 match self
1450 .send_encrypted_link_message(node_addr, &plaintext)
1451 .await
1452 {
1453 Ok(()) => sent += 1,
1454 Err(e) => {
1455 debug!(
1456 peer = %self.peer_display_name(node_addr),
1457 error = %e,
1458 "Failed to send disconnect (transport may be down)"
1459 );
1460 }
1461 }
1462 }
1463
1464 info!(sent, total = peer_addrs.len(), reason = %reason, "Sent disconnect notifications");
1465 }
1466
1467 fn static_peer_addresses(&self, peer_config: &PeerConfig) -> Vec<PeerAddress> {
1468 peer_config
1469 .addresses_by_priority()
1470 .into_iter()
1471 .cloned()
1472 .collect()
1473 }
1474
1475 async fn nostr_peer_fallback_addresses(
1476 &self,
1477 peer_config: &PeerConfig,
1478 existing: &[PeerAddress],
1479 ) -> Vec<PeerAddress> {
1480 if !self.config.node.discovery.nostr.enabled
1481 || self.config.node.discovery.nostr.policy
1482 == crate::config::NostrDiscoveryPolicy::Disabled
1483 {
1484 return Vec::new();
1485 }
1486
1487 let Some(bootstrap) = self.nostr_discovery.clone() else {
1488 return Vec::new();
1489 };
1490 let endpoints = match bootstrap.advert_endpoints_for_peer(&peer_config.npub).await {
1491 Ok(endpoints) => endpoints,
1492 Err(err) => {
1493 debug!(
1494 npub = %peer_config.npub,
1495 error = %err,
1496 "Failed to resolve Nostr advert endpoints for configured peer"
1497 );
1498 return Vec::new();
1499 }
1500 };
1501
1502 let mut fallback = Vec::new();
1503 let mut next_priority = existing
1504 .iter()
1505 .map(|addr| addr.priority)
1506 .max()
1507 .unwrap_or(100)
1508 .saturating_add(1);
1509 let seen_at_ms = Self::now_ms();
1514 for endpoint in endpoints {
1515 let Some(candidate) = Self::overlay_endpoint_to_peer_address(
1516 &endpoint,
1517 next_priority,
1518 seen_at_ms,
1519 ) else {
1520 continue;
1521 };
1522 if existing
1523 .iter()
1524 .any(|addr| addr.transport == candidate.transport && addr.addr == candidate.addr)
1525 || fallback.iter().any(|addr: &PeerAddress| {
1526 addr.transport == candidate.transport && addr.addr == candidate.addr
1527 })
1528 {
1529 continue;
1530 }
1531 fallback.push(candidate);
1532 next_priority = next_priority.saturating_add(1);
1533 }
1534 fallback
1535 }
1536
1537 fn overlay_endpoint_to_peer_address(
1538 endpoint: &OverlayEndpointAdvert,
1539 priority: u8,
1540 seen_at_ms: u64,
1541 ) -> Option<PeerAddress> {
1542 let transport = match endpoint.transport {
1543 OverlayTransportKind::Udp => "udp",
1544 OverlayTransportKind::Tcp => "tcp",
1545 OverlayTransportKind::Tor => "tor",
1546 };
1547 Some(
1548 PeerAddress::with_priority(transport, endpoint.addr.clone(), priority)
1549 .with_seen_at_ms(seen_at_ms),
1550 )
1551 }
1552
1553 async fn attempt_peer_address_list(
1554 &mut self,
1555 peer_config: &PeerConfig,
1556 peer_identity: PeerIdentity,
1557 allow_bootstrap_nat: bool,
1558 addresses: &[PeerAddress],
1559 ) -> Result<(), NodeError> {
1560 for addr in addresses {
1561 if addr.transport == "udp" && addr.addr.eq_ignore_ascii_case("nat") {
1562 if !allow_bootstrap_nat {
1563 continue;
1564 }
1565 let Some(bootstrap) = self.nostr_discovery.clone() else {
1566 debug!(npub = %peer_config.npub, "No Nostr overlay runtime for udp:nat address");
1567 continue;
1568 };
1569 bootstrap.request_connect(peer_config.clone()).await;
1570 info!(npub = %peer_config.npub, "Started Nostr UDP NAT traversal attempt");
1571 return Ok(());
1572 }
1573
1574 let (transport_id, remote_addr) = if addr.transport == "ethernet" {
1575 match self.resolve_ethernet_addr(&addr.addr) {
1576 Ok(result) => result,
1577 Err(e) => {
1578 debug!(
1579 transport = %addr.transport,
1580 addr = %addr.addr,
1581 error = %e,
1582 "Failed to resolve Ethernet address"
1583 );
1584 continue;
1585 }
1586 }
1587 } else if addr.transport == "ble" {
1588 #[cfg(bluer_available)]
1589 {
1590 match self.resolve_ble_addr(&addr.addr) {
1591 Ok(result) => result,
1592 Err(e) => {
1593 debug!(
1594 transport = %addr.transport,
1595 addr = %addr.addr,
1596 error = %e,
1597 "Failed to resolve BLE address"
1598 );
1599 continue;
1600 }
1601 }
1602 }
1603 #[cfg(not(bluer_available))]
1604 {
1605 debug!(transport = %addr.transport, "BLE transport not available on this build");
1606 continue;
1607 }
1608 } else {
1609 let tid = match self.find_transport_for_type(&addr.transport) {
1610 Some(id) => id,
1611 None => {
1612 debug!(
1613 transport = %addr.transport,
1614 addr = %addr.addr,
1615 "No operational transport for address type"
1616 );
1617 continue;
1618 }
1619 };
1620 (tid, TransportAddr::from_string(&addr.addr))
1621 };
1622
1623 match self
1624 .initiate_connection(transport_id, remote_addr, peer_identity)
1625 .await
1626 {
1627 Ok(()) => return Ok(()),
1628 Err(e @ NodeError::AccessDenied(_)) => return Err(e),
1629 Err(e) => {
1630 debug!(
1631 npub = %peer_config.npub,
1632 transport_id = %transport_id,
1633 error = %e,
1634 "Connection attempt failed, trying next address"
1635 );
1636 }
1637 }
1638 }
1639
1640 Err(NodeError::NoTransportForType(format!(
1641 "no operational transport for any of {}'s addresses",
1642 peer_config.npub
1643 )))
1644 }
1645
1646 async fn queue_open_discovery_retries(&mut self, bootstrap: &std::sync::Arc<NostrDiscovery>) {
1647 self.run_open_discovery_sweep(bootstrap, None, "per-tick")
1648 .await;
1649 }
1650
1651 pub(in crate::node) async fn run_open_discovery_sweep(
1662 &mut self,
1663 bootstrap: &std::sync::Arc<NostrDiscovery>,
1664 max_age_secs: Option<u64>,
1665 caller: &'static str,
1666 ) {
1667 if !self.config.node.discovery.nostr.enabled
1668 || self.config.node.discovery.nostr.policy != crate::config::NostrDiscoveryPolicy::Open
1669 {
1670 return;
1671 }
1672
1673 let configured_npubs = self
1674 .config
1675 .peers()
1676 .iter()
1677 .map(|peer| peer.npub.clone())
1678 .collect::<HashSet<_>>();
1679 let now_ms = Self::now_ms();
1680 let now_secs = now_ms / 1000;
1681 let mut enqueue_budget = self.open_discovery_enqueue_budget(&configured_npubs);
1682 if enqueue_budget == 0 {
1683 debug!(
1684 caller = %caller,
1685 "open-discovery sweep: enqueue budget is 0, skipping"
1686 );
1687 return;
1688 }
1689
1690 let candidates = bootstrap.cached_open_discovery_candidates(64).await;
1691 let cached_count = candidates.len();
1692 let mut enqueued = 0usize;
1693 let mut skipped_age = 0usize;
1694 let mut skipped_configured = 0usize;
1695 let mut skipped_self = 0usize;
1696 let mut skipped_connected = 0usize;
1697 let mut skipped_retry_pending = 0usize;
1698 let mut skipped_connecting = 0usize;
1699 let mut skipped_no_endpoints = 0usize;
1700 let mut skipped_invalid_npub = 0usize;
1701 let mut skipped_cooldown = 0usize;
1702
1703 for (npub, endpoints, created_at_secs) in candidates {
1704 if enqueue_budget == 0 {
1705 break;
1706 }
1707
1708 if let Some(max_age) = max_age_secs
1709 && now_secs.saturating_sub(created_at_secs) > max_age
1710 {
1711 skipped_age = skipped_age.saturating_add(1);
1712 continue;
1713 }
1714
1715 if configured_npubs.contains(&npub) {
1716 skipped_configured = skipped_configured.saturating_add(1);
1717 continue;
1718 }
1719
1720 let peer_identity = match PeerIdentity::from_npub(&npub) {
1721 Ok(identity) => identity,
1722 Err(_) => {
1723 skipped_invalid_npub = skipped_invalid_npub.saturating_add(1);
1724 continue;
1725 }
1726 };
1727 let node_addr = *peer_identity.node_addr();
1728 if node_addr == *self.identity.node_addr() {
1729 skipped_self = skipped_self.saturating_add(1);
1730 continue;
1731 }
1732 if self.peers.contains_key(&node_addr) {
1733 skipped_connected = skipped_connected.saturating_add(1);
1734 continue;
1735 }
1736 if self.retry_pending.contains_key(&node_addr) {
1737 skipped_retry_pending = skipped_retry_pending.saturating_add(1);
1738 continue;
1739 }
1740 if bootstrap.cooldown_until(&npub, now_ms).is_some() {
1741 skipped_cooldown = skipped_cooldown.saturating_add(1);
1742 continue;
1743 }
1744 let connecting = self.connections.values().any(|conn| {
1745 conn.expected_identity()
1746 .map(|id| id.node_addr() == &node_addr)
1747 .unwrap_or(false)
1748 });
1749 if connecting {
1750 skipped_connecting = skipped_connecting.saturating_add(1);
1751 continue;
1752 }
1753
1754 let mut addresses = Vec::new();
1755 let mut priority = 120u8;
1756 let seen_at_ms = Self::now_ms();
1757 for endpoint in endpoints {
1758 let Some(candidate) = Self::overlay_endpoint_to_peer_address(
1759 &endpoint,
1760 priority,
1761 seen_at_ms,
1762 ) else {
1763 continue;
1764 };
1765 if addresses.iter().any(|existing: &PeerAddress| {
1766 existing.transport == candidate.transport && existing.addr == candidate.addr
1767 }) {
1768 continue;
1769 }
1770 addresses.push(candidate);
1771 priority = priority.saturating_add(1);
1772 }
1773 if addresses.is_empty() {
1774 skipped_no_endpoints = skipped_no_endpoints.saturating_add(1);
1775 continue;
1776 }
1777
1778 self.peer_aliases
1779 .entry(node_addr)
1780 .or_insert_with(|| peer_identity.short_npub());
1781 self.register_identity(node_addr, peer_identity.pubkey_full());
1782
1783 let mut state = super::retry::RetryState::new(PeerConfig {
1784 npub: npub.clone(),
1785 alias: None,
1786 addresses,
1787 connect_policy: ConnectPolicy::AutoConnect,
1788 auto_reconnect: true,
1789 });
1790 state.reconnect = false;
1791 state.retry_after_ms = now_ms;
1792 state.expires_at_ms = Some(self.open_discovery_retry_expires_at_ms(now_ms));
1793 self.retry_pending.insert(node_addr, state);
1794 info!(
1795 caller = %caller,
1796 peer = %peer_identity.short_npub(),
1797 advert_age_secs = now_secs.saturating_sub(created_at_secs),
1798 "open-discovery sweep: queued retry for cached advert"
1799 );
1800 enqueue_budget = enqueue_budget.saturating_sub(1);
1801 enqueued = enqueued.saturating_add(1);
1802 }
1803
1804 let total_skipped = skipped_age
1808 + skipped_configured
1809 + skipped_self
1810 + skipped_connected
1811 + skipped_retry_pending
1812 + skipped_connecting
1813 + skipped_no_endpoints
1814 + skipped_invalid_npub
1815 + skipped_cooldown;
1816 let should_summarize = caller == "startup" || enqueued > 0;
1817 if should_summarize {
1818 info!(
1819 caller = %caller,
1820 cached = cached_count,
1821 queued = enqueued,
1822 skipped_age = skipped_age,
1823 skipped_configured = skipped_configured,
1824 skipped_self = skipped_self,
1825 skipped_connected = skipped_connected,
1826 skipped_retry_pending = skipped_retry_pending,
1827 skipped_connecting = skipped_connecting,
1828 skipped_no_endpoints = skipped_no_endpoints,
1829 skipped_invalid_npub = skipped_invalid_npub,
1830 skipped_cooldown = skipped_cooldown,
1831 skipped_total = total_skipped,
1832 "open-discovery sweep complete"
1833 );
1834 }
1835 }
1836
1837 async fn maybe_run_startup_open_discovery_sweep(
1845 &mut self,
1846 bootstrap: &std::sync::Arc<NostrDiscovery>,
1847 ) {
1848 if self.startup_open_discovery_sweep_done {
1849 return;
1850 }
1851 if !self.config.node.discovery.nostr.enabled
1852 || self.config.node.discovery.nostr.policy != crate::config::NostrDiscoveryPolicy::Open
1853 {
1854 self.startup_open_discovery_sweep_done = true;
1856 return;
1857 }
1858 let Some(started_at_ms) = self.nostr_discovery_started_at_ms else {
1859 return;
1860 };
1861 let now_ms = Self::now_ms();
1862 let delay_ms = self
1863 .config
1864 .node
1865 .discovery
1866 .nostr
1867 .startup_sweep_delay_secs
1868 .saturating_mul(1000);
1869 if now_ms < started_at_ms.saturating_add(delay_ms) {
1870 return;
1871 }
1872
1873 let max_age_secs = self.config.node.discovery.nostr.startup_sweep_max_age_secs;
1874 self.run_open_discovery_sweep(bootstrap, Some(max_age_secs), "startup")
1875 .await;
1876 self.startup_open_discovery_sweep_done = true;
1877 }
1878
1879 fn available_outbound_slots(&self) -> usize {
1880 let connection_used = self
1881 .connections
1882 .len()
1883 .saturating_add(self.pending_connects.len());
1884 let connection_slots = if self.max_connections == 0 {
1885 usize::MAX
1886 } else {
1887 self.max_connections.saturating_sub(connection_used)
1888 };
1889
1890 let peer_slots = if self.max_peers == 0 {
1891 usize::MAX
1892 } else {
1893 self.max_peers.saturating_sub(self.peers.len())
1894 };
1895
1896 connection_slots.min(peer_slots)
1897 }
1898
1899 fn open_discovery_enqueue_budget(&self, configured_npubs: &HashSet<String>) -> usize {
1900 let current_open_discovery_pending = self
1901 .retry_pending
1902 .values()
1903 .filter(|state| !configured_npubs.contains(&state.peer_config.npub))
1904 .count();
1905
1906 let cap_remaining = self
1907 .config
1908 .node
1909 .discovery
1910 .nostr
1911 .open_discovery_max_pending
1912 .saturating_sub(current_open_discovery_pending);
1913
1914 cap_remaining.min(self.available_outbound_slots())
1915 }
1916
1917 fn open_discovery_retry_expires_at_ms(&self, now_ms: u64) -> u64 {
1918 now_ms.saturating_add(
1919 self.config
1920 .node
1921 .discovery
1922 .nostr
1923 .advert_ttl_secs
1924 .saturating_mul(1000)
1925 .saturating_mul(OPEN_DISCOVERY_RETRY_LIFETIME_MULTIPLIER),
1926 )
1927 }
1928
1929 async fn build_overlay_advert(
1930 &self,
1931 bootstrap: &std::sync::Arc<NostrDiscovery>,
1932 ) -> Option<OverlayAdvert> {
1933 if !self.config.node.discovery.nostr.enabled {
1934 return None;
1935 }
1936
1937 let mut endpoints = Vec::new();
1938 let mut has_udp_nat = false;
1939
1940 for handle in self.transports.values() {
1941 if !handle.is_operational() {
1942 continue;
1943 }
1944
1945 match handle.transport_type().name {
1946 "udp" => {
1947 let Some(cfg) = self.lookup_udp_config(handle.name()) else {
1948 continue;
1949 };
1950 if !cfg.advertise_on_nostr() {
1951 continue;
1952 }
1953 if cfg.is_public() {
1954 if let Some(explicit) = cfg.external_advert_addr() {
1964 endpoints.push(OverlayEndpointAdvert {
1965 transport: OverlayTransportKind::Udp,
1966 addr: explicit.to_string(),
1967 });
1968 } else {
1969 match handle.local_addr() {
1970 Some(addr)
1971 if !addr.ip().is_unspecified()
1972 && !is_unroutable_advert_ip(addr.ip()) =>
1973 {
1974 endpoints.push(OverlayEndpointAdvert {
1975 transport: OverlayTransportKind::Udp,
1976 addr: addr.to_string(),
1977 });
1978 }
1979 Some(addr) => {
1980 let key = handle.transport_id().as_u32();
1981 let port = addr.port();
1982 if let Some(public) =
1983 bootstrap.learn_public_udp_addr(key, port).await
1984 {
1985 endpoints.push(OverlayEndpointAdvert {
1986 transport: OverlayTransportKind::Udp,
1987 addr: public.to_string(),
1988 });
1989 } else {
1990 warn!(
1991 transport_id = key,
1992 bind_addr = %addr,
1993 "advert: udp public=true but bind is wildcard \
1994 or private and STUN observation failed; \
1995 advertising no UDP endpoint. Either set \
1996 transports.udp.external_addr, bind to a \
1997 specific *public* IP, or ensure \
1998 node.discovery.nostr.stun_servers is reachable"
1999 );
2000 }
2001 }
2002 None => {}
2003 }
2004 }
2005 } else {
2006 endpoints.push(OverlayEndpointAdvert {
2007 transport: OverlayTransportKind::Udp,
2008 addr: "nat".to_string(),
2009 });
2010 has_udp_nat = true;
2011 }
2012 }
2013 "tcp" => {
2014 let Some(cfg) = self.lookup_tcp_config(handle.name()) else {
2015 continue;
2016 };
2017 if !cfg.advertise_on_nostr() {
2018 continue;
2019 }
2020 if let Some(explicit) = cfg.external_advert_addr() {
2032 endpoints.push(OverlayEndpointAdvert {
2033 transport: OverlayTransportKind::Tcp,
2034 addr: explicit.to_string(),
2035 });
2036 } else {
2037 match handle.local_addr() {
2038 Some(addr)
2039 if !addr.ip().is_unspecified()
2040 && !is_unroutable_advert_ip(addr.ip()) =>
2041 {
2042 endpoints.push(OverlayEndpointAdvert {
2043 transport: OverlayTransportKind::Tcp,
2044 addr: addr.to_string(),
2045 });
2046 }
2047 Some(addr) => {
2048 warn!(
2049 bind_addr = %addr,
2050 "advert: tcp advertise_on_nostr=true bound to wildcard \
2051 or private IP and no transports.tcp.external_addr set; \
2052 advertising no TCP endpoint. Either set external_addr \
2053 to the public IP (recommended for cloud 1:1-NAT setups) \
2054 or bind explicitly to the public IP"
2055 );
2056 }
2057 None => {}
2058 }
2059 }
2060 }
2061 "tor" => {
2062 let Some(cfg) = self.lookup_tor_config(handle.name()) else {
2063 continue;
2064 };
2065 if !cfg.advertise_on_nostr() {
2066 continue;
2067 }
2068 if let Some(addr) = handle.onion_address() {
2069 endpoints.push(OverlayEndpointAdvert {
2070 transport: OverlayTransportKind::Tor,
2071 addr: format!("{}:{}", addr, cfg.advertised_port()),
2072 });
2073 }
2074 }
2075 _ => {}
2076 }
2077 }
2078
2079 if endpoints.is_empty() {
2080 return None;
2081 }
2082
2083 Some(OverlayAdvert {
2084 identifier: ADVERT_IDENTIFIER.to_string(),
2085 version: ADVERT_VERSION,
2086 endpoints,
2087 signal_relays: has_udp_nat.then(|| self.config.node.discovery.nostr.dm_relays.clone()),
2088 stun_servers: has_udp_nat
2089 .then(|| self.config.node.discovery.nostr.stun_servers.clone()),
2090 })
2091 }
2092
2093 async fn refresh_overlay_advert(
2094 &self,
2095 bootstrap: &std::sync::Arc<NostrDiscovery>,
2096 ) -> Result<(), crate::discovery::nostr::BootstrapError> {
2097 let advert = self.build_overlay_advert(bootstrap).await;
2098 bootstrap.update_local_advert(advert).await
2099 }
2100
2101 fn lookup_udp_config(&self, transport_name: Option<&str>) -> Option<&crate::config::UdpConfig> {
2102 match (&self.config.transports.udp, transport_name) {
2103 (crate::config::TransportInstances::Single(cfg), None) => Some(cfg),
2104 (crate::config::TransportInstances::Named(configs), Some(name)) => configs.get(name),
2105 _ => None,
2106 }
2107 }
2108
2109 fn lookup_tcp_config(&self, transport_name: Option<&str>) -> Option<&crate::config::TcpConfig> {
2110 match (&self.config.transports.tcp, transport_name) {
2111 (crate::config::TransportInstances::Single(cfg), None) => Some(cfg),
2112 (crate::config::TransportInstances::Named(configs), Some(name)) => configs.get(name),
2113 _ => None,
2114 }
2115 }
2116
2117 fn lookup_tor_config(&self, transport_name: Option<&str>) -> Option<&crate::config::TorConfig> {
2118 match (&self.config.transports.tor, transport_name) {
2119 (crate::config::TransportInstances::Single(cfg), None) => Some(cfg),
2120 (crate::config::TransportInstances::Named(configs), Some(name)) => configs.get(name),
2121 _ => None,
2122 }
2123 }
2124
2125 pub(in crate::node) async fn try_peer_addresses(
2126 &mut self,
2127 peer_config: &PeerConfig,
2128 peer_identity: PeerIdentity,
2129 allow_bootstrap_nat: bool,
2130 ) -> Result<(), NodeError> {
2131 let peer_node_addr = *peer_identity.node_addr();
2132 if self.peers.contains_key(&peer_node_addr) {
2133 debug!(
2134 npub = %peer_config.npub,
2135 "Peer already exists, skipping address attempts"
2136 );
2137 return Ok(());
2138 }
2139 if self.is_connecting_to_peer(&peer_node_addr) {
2140 debug!(
2141 npub = %peer_config.npub,
2142 "Connection already in progress, skipping address attempts"
2143 );
2144 return Ok(());
2145 }
2146
2147 let static_addresses = self.static_peer_addresses(peer_config);
2155 let overlay_addresses = self
2156 .nostr_peer_fallback_addresses(peer_config, &static_addresses)
2157 .await;
2158
2159 let mut candidates = Vec::with_capacity(overlay_addresses.len() + static_addresses.len());
2160 for addr in overlay_addresses.into_iter().chain(static_addresses) {
2161 if !candidates.iter().any(|existing: &PeerAddress| {
2162 existing.transport == addr.transport && existing.addr == addr.addr
2163 }) {
2164 candidates.push(addr);
2165 }
2166 }
2167
2168 candidates.sort_by(|a, b| match (a.seen_at_ms, b.seen_at_ms) {
2173 (Some(a_ts), Some(b_ts)) => b_ts.cmp(&a_ts),
2174 (Some(_), None) => std::cmp::Ordering::Less,
2175 (None, Some(_)) => std::cmp::Ordering::Greater,
2176 (None, None) => std::cmp::Ordering::Equal,
2177 });
2178
2179 if candidates.is_empty() {
2180 return Err(NodeError::NoTransportForType(format!(
2181 "no addresses known for {}",
2182 peer_config.npub
2183 )));
2184 }
2185
2186 if self
2187 .attempt_peer_address_list(
2188 peer_config,
2189 peer_identity,
2190 allow_bootstrap_nat,
2191 &candidates,
2192 )
2193 .await
2194 .is_ok()
2195 {
2196 return Ok(());
2197 }
2198
2199 Err(NodeError::NoTransportForType(format!(
2200 "no operational transport for any of {}'s addresses",
2201 peer_config.npub
2202 )))
2203 }
2204
2205 pub(crate) async fn api_connect(
2213 &mut self,
2214 npub: &str,
2215 address: &str,
2216 transport: &str,
2217 ) -> Result<serde_json::Value, String> {
2218 let peer_config = PeerConfig {
2219 npub: npub.to_string(),
2220 alias: None,
2221 addresses: vec![PeerAddress::new(transport, address)],
2222 connect_policy: ConnectPolicy::Manual,
2223 auto_reconnect: false,
2224 };
2225
2226 if let Ok(identity) = PeerIdentity::from_npub(npub) {
2228 self.peer_aliases
2229 .insert(*identity.node_addr(), identity.short_npub());
2230 self.register_identity(*identity.node_addr(), identity.pubkey_full());
2231 }
2232
2233 self.initiate_peer_connection(&peer_config)
2234 .await
2235 .map(|()| {
2236 info!(
2237 npub = %npub,
2238 address = %address,
2239 transport = %transport,
2240 "API connect initiated"
2241 );
2242 serde_json::json!({
2243 "npub": npub,
2244 "address": address,
2245 "transport": transport,
2246 })
2247 })
2248 .map_err(|e| e.to_string())
2249 }
2250
2251 pub(crate) fn api_disconnect(&mut self, npub: &str) -> Result<serde_json::Value, String> {
2255 let peer_identity =
2256 PeerIdentity::from_npub(npub).map_err(|e| format!("invalid npub '{npub}': {e}"))?;
2257 let node_addr = *peer_identity.node_addr();
2258
2259 if !self.peers.contains_key(&node_addr) {
2260 return Err(format!("peer not found: {npub}"));
2261 }
2262
2263 self.remove_active_peer(&node_addr);
2265
2266 self.retry_pending.remove(&node_addr);
2268
2269 info!(npub = %npub, "API disconnect completed");
2270
2271 Ok(serde_json::json!({
2272 "npub": npub,
2273 "disconnected": true,
2274 }))
2275 }
2276
2277 pub async fn adopt_established_traversal(
2284 &mut self,
2285 traversal: EstablishedTraversal,
2286 ) -> Result<BootstrapHandoffResult, NodeError> {
2287 debug!(
2288 peer_npub = %traversal.peer_npub,
2289 session_id = %traversal.session_id,
2290 remote_addr = %traversal.remote_addr,
2291 "adopting established traversal socket"
2292 );
2293
2294 if !self.state.is_operational() {
2295 return Err(NodeError::NotStarted);
2296 }
2297
2298 let packet_tx = self.packet_tx.clone().ok_or(NodeError::NotStarted)?;
2299 let peer_identity = PeerIdentity::from_npub(&traversal.peer_npub).map_err(|e| {
2300 NodeError::InvalidPeerNpub {
2301 npub: traversal.peer_npub.clone(),
2302 reason: e.to_string(),
2303 }
2304 })?;
2305 let peer_node_addr = *peer_identity.node_addr();
2306 if self.peers.contains_key(&peer_node_addr) {
2307 debug!(
2308 peer_npub = %traversal.peer_npub,
2309 "Ignoring NAT traversal handoff for already-connected peer"
2310 );
2311 return Err(NodeError::PeerAlreadyExists(peer_node_addr));
2312 }
2313 if self.is_connecting_to_peer(&peer_node_addr) {
2314 debug!(
2315 peer_npub = %traversal.peer_npub,
2316 "Ignoring NAT traversal handoff while peer handshake is already in progress"
2317 );
2318 return Err(NodeError::PeerAlreadyExists(peer_node_addr));
2319 }
2320
2321 self.peer_aliases
2322 .insert(peer_node_addr, peer_identity.short_npub());
2323 self.register_identity(peer_node_addr, peer_identity.pubkey_full());
2324
2325 let transport_id = self.allocate_transport_id();
2326 let inherited_config = traversal.transport_config.clone().unwrap_or_else(|| {
2346 let mut cfg = self
2347 .lookup_udp_config(traversal.transport_name.as_deref())
2348 .or_else(|| self.lookup_udp_config(None))
2349 .cloned()
2350 .unwrap_or_default();
2351 cfg.bind_addr = None;
2352 cfg.external_addr = None;
2353 cfg
2354 });
2355 let mut transport = crate::transport::udp::UdpTransport::new(
2356 transport_id,
2357 traversal.transport_name.clone(),
2358 inherited_config,
2359 packet_tx,
2360 );
2361
2362 transport
2363 .adopt_socket_async(traversal.socket)
2364 .await
2365 .map_err(|e| NodeError::BootstrapHandoff(e.to_string()))?;
2366
2367 let local_addr = transport.local_addr().ok_or_else(|| {
2368 NodeError::BootstrapHandoff("adopted UDP transport has no local address".into())
2369 })?;
2370
2371 self.transports.insert(
2372 transport_id,
2373 crate::transport::TransportHandle::Udp(transport),
2374 );
2375 self.bootstrap_transports.insert(transport_id);
2376 self.bootstrap_transport_npubs
2377 .insert(transport_id, traversal.peer_npub.clone());
2378
2379 let remote_addr = TransportAddr::from_string(&traversal.remote_addr.to_string());
2380 if let Err(err) = self
2381 .initiate_connection(transport_id, remote_addr.clone(), peer_identity)
2382 .await
2383 {
2384 self.bootstrap_transports.remove(&transport_id);
2385 self.bootstrap_transport_npubs.remove(&transport_id);
2386 if let Some(mut handle) = self.transports.remove(&transport_id) {
2387 let _ = handle.stop().await;
2388 }
2389 return Err(err);
2390 }
2391
2392 info!(
2393 peer = %self.peer_display_name(&peer_node_addr),
2394 transport_id = %transport_id,
2395 local_addr = %local_addr,
2396 remote_addr = %traversal.remote_addr,
2397 session_id = %traversal.session_id,
2398 "adopted NAT traversal socket; handshake initiated"
2399 );
2400
2401 Ok(BootstrapHandoffResult {
2402 transport_id,
2403 local_addr,
2404 remote_addr: traversal.remote_addr,
2405 peer_node_addr,
2406 session_id: traversal.session_id,
2407 })
2408 }
2409}