1use super::{Node, NodeError, NodeState};
4use crate::config::{ConnectPolicy, PeerAddress, PeerConfig};
5use crate::discovery::nostr::{
6 ADVERT_IDENTIFIER, ADVERT_VERSION, BootstrapEvent, NostrDiscovery, OverlayAdvert,
7 OverlayEndpointAdvert, OverlayTransportKind,
8};
9use crate::discovery::{BootstrapHandoffResult, EstablishedTraversal};
10use crate::node::acl::PeerAclContext;
11use crate::node::wire::build_msg1;
12use crate::peer::PeerConnection;
13use crate::protocol::{Disconnect, DisconnectReason};
14use crate::transport::{Link, LinkDirection, LinkId, TransportAddr, TransportId, packet_channel};
15use crate::upper::tun::{TunDevice, TunState, run_tun_reader, shutdown_tun_interface};
16use crate::{NodeAddr, PeerIdentity};
17use std::collections::HashSet;
18use std::net::IpAddr;
19use std::thread;
20use std::time::Duration;
21use tracing::{debug, info, warn};
22
23fn is_unroutable_advert_ip(ip: IpAddr) -> bool {
31 match ip {
32 IpAddr::V4(v4) => {
33 v4.is_private()
34 || v4.is_loopback()
35 || v4.is_link_local()
36 || v4.is_unspecified()
37 || v4.is_multicast()
38 || v4.is_broadcast()
39 || v4.is_documentation()
40 || (v4.octets()[0] == 100 && (v4.octets()[1] & 0xc0) == 64)
43 }
44 IpAddr::V6(v6) => {
45 v6.is_loopback()
46 || v6.is_unspecified()
47 || v6.is_unique_local()
48 || v6.is_multicast()
49 || (v6.segments()[0] & 0xffc0) == 0xfe80
51 }
52 }
53}
54
55const OPEN_DISCOVERY_RETRY_LIFETIME_MULTIPLIER: u64 = 2;
56
57impl Node {
58 pub(super) async fn update_peers(
69 &mut self,
70 new_peers: Vec<crate::config::PeerConfig>,
71 ) -> Result<crate::node::UpdatePeersOutcome, crate::node::NodeError> {
72 use std::collections::{HashMap, HashSet};
73
74 let mut new_by_addr: HashMap<crate::identity::NodeAddr, crate::config::PeerConfig> =
75 HashMap::with_capacity(new_peers.len());
76 for peer in new_peers {
77 let identity = match PeerIdentity::from_npub(&peer.npub) {
78 Ok(id) => id,
79 Err(e) => {
80 return Err(crate::node::NodeError::InvalidPeerNpub {
81 npub: peer.npub.clone(),
82 reason: e.to_string(),
83 });
84 }
85 };
86 new_by_addr.insert(*identity.node_addr(), peer);
90 }
91
92 let current_by_addr: HashMap<crate::identity::NodeAddr, crate::config::PeerConfig> = self
93 .config
94 .peers()
95 .iter()
96 .filter_map(|pc| {
97 PeerIdentity::from_npub(&pc.npub)
98 .ok()
99 .map(|id| (*id.node_addr(), pc.clone()))
100 })
101 .collect();
102
103 let new_addrs: HashSet<_> = new_by_addr.keys().copied().collect();
104 let current_addrs: HashSet<_> = current_by_addr.keys().copied().collect();
105
106 let removed: Vec<_> = current_addrs.difference(&new_addrs).copied().collect();
107 let added: Vec<_> = new_addrs.difference(¤t_addrs).copied().collect();
108 let kept: Vec<_> = new_addrs.intersection(¤t_addrs).copied().collect();
109
110 let mut outcome = crate::node::UpdatePeersOutcome::default();
111
112 for node_addr in &removed {
113 if self.retry_pending.remove(node_addr).is_some() {
114 debug!(
115 peer = %self.peer_display_name(node_addr),
116 "Dropping retry entry for peer removed from runtime peer list"
117 );
118 }
119 self.peer_aliases.remove(node_addr);
120 outcome.removed += 1;
121 }
122
123 for node_addr in &kept {
124 let new_pc = &new_by_addr[node_addr];
125 let current_pc = ¤t_by_addr[node_addr];
126 if new_pc.addresses != current_pc.addresses
127 || new_pc.alias != current_pc.alias
128 || new_pc.connect_policy != current_pc.connect_policy
129 || new_pc.auto_reconnect != current_pc.auto_reconnect
130 {
131 outcome.updated += 1;
132 if let Some(state) = self.retry_pending.get_mut(node_addr) {
133 state.peer_config = new_pc.clone();
134 }
135 if let Some(alias) = new_pc.alias.clone() {
136 self.peer_aliases.insert(*node_addr, alias);
137 }
138 } else {
139 outcome.unchanged += 1;
140 }
141 }
142
143 let added_configs: Vec<crate::config::PeerConfig> =
144 added.iter().map(|addr| new_by_addr[addr].clone()).collect();
145
146 self.config.peers = new_by_addr.into_values().collect();
150
151 for peer_config in added_configs {
152 outcome.added += 1;
153 let Ok(identity) = PeerIdentity::from_npub(&peer_config.npub) else {
154 continue;
155 };
156 let name = peer_config
157 .alias
158 .clone()
159 .unwrap_or_else(|| identity.short_npub());
160 self.peer_aliases.insert(*identity.node_addr(), name);
161 self.register_identity(*identity.node_addr(), identity.pubkey_full());
162
163 if !peer_config.is_auto_connect() {
164 continue;
165 }
166
167 if let Err(e) = self.initiate_peer_connection(&peer_config).await {
168 warn!(
169 npub = %peer_config.npub,
170 error = %e,
171 "Failed to initiate connection for newly added peer"
172 );
173 if let Ok(peer_identity) = PeerIdentity::from_npub(&peer_config.npub) {
174 self.schedule_retry(*peer_identity.node_addr(), Self::now_ms());
175 }
176 if matches!(e, crate::node::NodeError::NoTransportForType(_))
177 && let Some(bootstrap) = self.nostr_discovery.clone()
178 {
179 let npub = peer_config.npub.clone();
180 tokio::spawn(async move {
181 let _ = bootstrap.refetch_advert_for_stale_check(&npub).await;
182 });
183 }
184 }
185 }
186
187 Ok(outcome)
188 }
189
190 pub(super) async fn initiate_peer_connections(&mut self) {
191 let peer_identities: Vec<(PeerIdentity, Option<String>)> = self
197 .config
198 .peers()
199 .iter()
200 .filter_map(|pc| {
201 PeerIdentity::from_npub(&pc.npub)
202 .ok()
203 .map(|id| (id, pc.alias.clone()))
204 })
205 .collect();
206
207 for (identity, alias) in peer_identities {
208 let name = alias.unwrap_or_else(|| identity.short_npub());
209 self.peer_aliases.insert(*identity.node_addr(), name);
210 self.register_identity(*identity.node_addr(), identity.pubkey_full());
214 }
215
216 let peer_configs: Vec<_> = self.config.auto_connect_peers().cloned().collect();
218
219 if peer_configs.is_empty() {
220 debug!("No static peers configured");
221 return;
222 }
223
224 debug!(
225 count = peer_configs.len(),
226 "Initiating static peer connections"
227 );
228
229 for peer_config in peer_configs {
230 if let Err(e) = self.initiate_peer_connection(&peer_config).await {
231 warn!(
232 npub = %peer_config.npub,
233 alias = ?peer_config.alias,
234 error = %e,
235 "Failed to initiate peer connection"
236 );
237 if let Ok(peer_identity) = PeerIdentity::from_npub(&peer_config.npub) {
241 self.schedule_retry(*peer_identity.node_addr(), Self::now_ms());
242 }
243 if matches!(e, crate::node::NodeError::NoTransportForType(_))
249 && let Some(bootstrap) = self.nostr_discovery.clone()
250 {
251 let npub = peer_config.npub.clone();
252 tokio::spawn(async move {
253 let _ = bootstrap.refetch_advert_for_stale_check(&npub).await;
254 });
255 }
256 }
257 }
258 }
259
260 pub(super) async fn initiate_peer_connection(
264 &mut self,
265 peer_config: &crate::config::PeerConfig,
266 ) -> Result<(), NodeError> {
267 self.initiate_peer_connection_inner(peer_config).await
268 }
269
270 pub(super) async fn initiate_peer_retry_connection(
276 &mut self,
277 peer_config: &crate::config::PeerConfig,
278 ) -> Result<(), NodeError> {
279 self.initiate_peer_connection_inner(peer_config).await
280 }
281
282 async fn initiate_peer_connection_inner(
283 &mut self,
284 peer_config: &crate::config::PeerConfig,
285 ) -> Result<(), NodeError> {
286 let peer_identity =
288 PeerIdentity::from_npub(&peer_config.npub).map_err(|e| NodeError::InvalidPeerNpub {
289 npub: peer_config.npub.clone(),
290 reason: e.to_string(),
291 })?;
292
293 let peer_node_addr = *peer_identity.node_addr();
294
295 if self.peers.contains_key(&peer_node_addr) {
297 debug!(
298 npub = %peer_config.npub,
299 "Peer already exists, skipping"
300 );
301 return Ok(());
302 }
303
304 let already_connecting = self.connections.values().any(|conn| {
306 conn.expected_identity()
307 .map(|id| id.node_addr() == &peer_node_addr)
308 .unwrap_or(false)
309 });
310 if already_connecting {
311 debug!(
312 npub = %peer_config.npub,
313 "Connection already in progress, skipping"
314 );
315 return Ok(());
316 }
317
318 self.try_peer_addresses(peer_config, peer_identity, true)
319 .await
320 }
321
322 fn is_connecting_to_peer(&self, peer_node_addr: &NodeAddr) -> bool {
323 self.connections.values().any(|conn| {
324 conn.expected_identity()
325 .map(|id| id.node_addr() == peer_node_addr)
326 .unwrap_or(false)
327 })
328 }
329
330 pub(super) async fn initiate_connection(
341 &mut self,
342 transport_id: TransportId,
343 remote_addr: TransportAddr,
344 peer_identity: PeerIdentity,
345 ) -> Result<(), NodeError> {
346 let peer_node_addr = *peer_identity.node_addr();
347
348 self.authorize_peer(
349 &peer_identity,
350 PeerAclContext::OutboundConnect,
351 transport_id,
352 &remote_addr,
353 )?;
354
355 let is_connection_oriented = self
356 .transports
357 .get(&transport_id)
358 .map(|t| t.transport_type().connection_oriented)
359 .unwrap_or(false);
360
361 let link_id = self.allocate_link_id();
363
364 let link = if is_connection_oriented {
365 Link::new(
366 link_id,
367 transport_id,
368 remote_addr.clone(),
369 LinkDirection::Outbound,
370 Duration::from_millis(self.config.node.base_rtt_ms),
371 )
372 } else {
373 Link::connectionless(
374 link_id,
375 transport_id,
376 remote_addr.clone(),
377 LinkDirection::Outbound,
378 Duration::from_millis(self.config.node.base_rtt_ms),
379 )
380 };
381
382 self.links.insert(link_id, link);
383
384 self.addr_to_link
386 .insert((transport_id, remote_addr.clone()), link_id);
387
388 if is_connection_oriented {
389 if let Some(transport) = self.transports.get(&transport_id) {
391 match transport.connect(&remote_addr).await {
392 Ok(()) => {
393 debug!(
394 peer = %self.peer_display_name(&peer_node_addr),
395 transport_id = %transport_id,
396 remote_addr = %remote_addr,
397 link_id = %link_id,
398 "Transport connect initiated (non-blocking)"
399 );
400 self.pending_connects.push(super::PendingConnect {
401 link_id,
402 transport_id,
403 remote_addr,
404 peer_identity,
405 });
406 }
407 Err(e) => {
408 self.links.remove(&link_id);
410 self.addr_to_link.remove(&(transport_id, remote_addr));
411 return Err(NodeError::TransportError(e.to_string()));
412 }
413 }
414 }
415 Ok(())
416 } else {
417 self.start_handshake(link_id, transport_id, remote_addr, peer_identity)
419 .await
420 }
421 }
422
423 pub(super) async fn start_handshake(
428 &mut self,
429 link_id: LinkId,
430 transport_id: TransportId,
431 remote_addr: TransportAddr,
432 peer_identity: PeerIdentity,
433 ) -> Result<(), NodeError> {
434 let peer_node_addr = *peer_identity.node_addr();
435
436 let current_time_ms = Self::now_ms();
438 let mut connection = PeerConnection::outbound(link_id, peer_identity, current_time_ms);
439
440 let our_index = match self.index_allocator.allocate() {
442 Ok(idx) => idx,
443 Err(e) => {
444 self.links.remove(&link_id);
446 self.addr_to_link.remove(&(transport_id, remote_addr));
447 return Err(NodeError::IndexAllocationFailed(e.to_string()));
448 }
449 };
450
451 let our_keypair = self.identity.keypair();
453 let noise_msg1 =
454 match connection.start_handshake(our_keypair, self.startup_epoch, current_time_ms) {
455 Ok(msg) => msg,
456 Err(e) => {
457 let _ = self.index_allocator.free(our_index);
459 self.links.remove(&link_id);
460 self.addr_to_link.remove(&(transport_id, remote_addr));
461 return Err(NodeError::HandshakeFailed(e.to_string()));
462 }
463 };
464
465 connection.set_our_index(our_index);
467 connection.set_transport_id(transport_id);
468 connection.set_source_addr(remote_addr.clone());
469
470 let wire_msg1 = build_msg1(our_index, &noise_msg1);
472
473 debug!(
474 peer = %self.peer_display_name(&peer_node_addr),
475 transport_id = %transport_id,
476 remote_addr = %remote_addr,
477 link_id = %link_id,
478 our_index = %our_index,
479 "Connection initiated"
480 );
481
482 let resend_interval = self.config.node.rate_limit.handshake_resend_interval_ms;
484 connection.set_handshake_msg1(wire_msg1.clone(), current_time_ms + resend_interval);
485
486 self.pending_outbound
488 .insert((transport_id, our_index.as_u32()), link_id);
489 self.connections.insert(link_id, connection);
490
491 let send_result = match self.transports.get(&transport_id) {
493 Some(transport) => Some(transport.send(&remote_addr, &wire_msg1).await),
494 None => None,
495 };
496 if let Some(send_result) = send_result {
497 self.note_local_send_outcome(&send_result);
498 match send_result {
499 Ok(bytes) => {
500 debug!(
501 link_id = %link_id,
502 our_index = %our_index,
503 bytes,
504 "Sent Noise handshake message 1 (wire format)"
505 );
506 }
507 Err(e) => {
508 warn!(
509 link_id = %link_id,
510 transport_id = %transport_id,
511 remote_addr = %remote_addr,
512 our_index = %our_index,
513 error = %e,
514 "Failed to send handshake message"
515 );
516 if let Some(conn) = self.connections.get_mut(&link_id) {
519 conn.mark_failed();
520 }
521 }
522 }
523 }
524
525 Ok(())
526 }
527
528 pub(super) async fn poll_transport_discovery(&mut self) {
534 let mut to_connect = Vec::new();
536
537 for (transport_id, transport) in &self.transports {
538 if !transport.is_operational() {
539 continue;
540 }
541 if !transport.auto_connect() {
542 let _ = transport.discover();
544 continue;
545 }
546 let discovered = match transport.discover() {
547 Ok(peers) => peers,
548 Err(_) => continue,
549 };
550 for peer in discovered {
551 let pubkey = match peer.pubkey_hint {
552 Some(pk) => pk,
553 None => continue,
554 };
555 let identity = PeerIdentity::from_pubkey(pubkey);
556 let node_addr = *identity.node_addr();
557
558 if node_addr == *self.identity.node_addr() {
560 continue;
561 }
562 if self.peers.contains_key(&node_addr) {
564 continue;
565 }
566 let connecting = self.connections.values().any(|c| {
568 c.expected_identity()
569 .map(|id| id.node_addr() == &node_addr)
570 .unwrap_or(false)
571 });
572 if connecting {
573 continue;
574 }
575
576 to_connect.push((*transport_id, peer.addr, identity));
577 }
578 }
579
580 for (transport_id, remote_addr, identity) in to_connect {
581 info!(
582 peer = %self.peer_display_name(identity.node_addr()),
583 transport_id = %transport_id,
584 remote_addr = %remote_addr,
585 "Auto-connecting to discovered peer"
586 );
587 if let Err(e) = self
588 .initiate_connection(transport_id, remote_addr, identity)
589 .await
590 {
591 warn!(error = %e, "Failed to auto-connect to discovered peer");
592 }
593 }
594 }
595
596 pub(super) async fn poll_nostr_discovery(&mut self) {
597 let Some(bootstrap) = self.nostr_discovery.clone() else {
598 return;
599 };
600
601 if let Err(err) = self.refresh_overlay_advert(&bootstrap).await {
602 debug!(error = %err, "Failed to refresh local Nostr overlay advert");
603 }
604
605 for event in bootstrap.drain_events().await {
606 match event {
607 BootstrapEvent::Established { traversal } => {
608 let peer_npub = traversal.peer_npub.clone();
609 if let Ok(peer_identity) = PeerIdentity::from_npub(&peer_npub) {
610 let peer_addr = *peer_identity.node_addr();
611 if self.peers.contains_key(&peer_addr) {
612 debug!(
613 peer_npub = %peer_npub,
614 "Ignoring established NAT traversal for already-connected peer"
615 );
616 continue;
617 }
618 if self.is_connecting_to_peer(&peer_addr) {
619 debug!(
620 peer_npub = %peer_npub,
621 "Ignoring established NAT traversal while peer handshake is already in progress"
622 );
623 continue;
624 }
625 }
626 match self.adopt_established_traversal(traversal).await {
627 Ok(_) => {
628 info!(peer_npub = %peer_npub, "Adopted NAT traversal socket");
629 }
630 Err(err) => {
631 warn!(peer_npub = %peer_npub, error = %err, "Failed to adopt NAT traversal");
632 if let Ok(peer_identity) = PeerIdentity::from_npub(&peer_npub) {
633 self.schedule_retry(*peer_identity.node_addr(), Self::now_ms());
634 }
635 }
636 }
637 }
638 BootstrapEvent::Failed {
639 peer_config,
640 reason,
641 } => {
642 let peer_identity = match PeerIdentity::from_npub(&peer_config.npub) {
643 Ok(identity) => identity,
644 Err(_) => continue,
645 };
646 let node_addr = *peer_identity.node_addr();
647 if self.peers.contains_key(&node_addr) {
648 debug!(
649 npub = %peer_config.npub,
650 error = %reason,
651 "Ignoring failed NAT traversal for already-connected peer"
652 );
653 continue;
654 }
655 if self.is_connecting_to_peer(&node_addr) {
656 debug!(
657 npub = %peer_config.npub,
658 error = %reason,
659 "Ignoring failed NAT traversal while peer handshake is already in progress"
660 );
661 continue;
662 }
663
664 let now_ms = Self::now_ms();
665 let decision = bootstrap.record_traversal_failure(&peer_config.npub, now_ms);
666 if decision.should_warn {
667 warn!(
668 npub = %peer_config.npub,
669 error = %reason,
670 consecutive_failures = decision.consecutive_failures,
671 cooldown_secs = decision
672 .cooldown_until_ms
673 .map(|t| t.saturating_sub(now_ms) / 1000),
674 "NAT traversal failed"
675 );
676 } else {
677 debug!(
678 npub = %peer_config.npub,
679 error = %reason,
680 consecutive_failures = decision.consecutive_failures,
681 "NAT traversal failed (suppressed by warn-rate-limit)"
682 );
683 }
684
685 if decision.crossed_threshold {
689 let bootstrap = bootstrap.clone();
690 let npub = peer_config.npub.clone();
691 tokio::spawn(async move {
692 let outcome = bootstrap.refetch_advert_for_stale_check(&npub).await;
693 match outcome {
694 crate::discovery::nostr::NostrRefetchOutcome::Evicted => info!(
695 npub = %npub,
696 "stale-advert sweep: peer evicted from advert cache"
697 ),
698 crate::discovery::nostr::NostrRefetchOutcome::Refreshed => info!(
699 npub = %npub,
700 "stale-advert sweep: peer republished, cache refreshed and streak reset"
701 ),
702 crate::discovery::nostr::NostrRefetchOutcome::SameAdvert => debug!(
703 npub = %npub,
704 "stale-advert sweep: advert unchanged, cooldown stands"
705 ),
706 crate::discovery::nostr::NostrRefetchOutcome::Skipped => debug!(
707 npub = %npub,
708 "stale-advert sweep: skipped (relay error or no advert_relays)"
709 ),
710 }
711 });
712 }
713
714 if self
715 .try_peer_addresses(&peer_config, peer_identity, false)
716 .await
717 .is_ok()
718 {
719 continue;
720 }
721
722 self.schedule_retry(node_addr, now_ms);
723 if let Some(cooldown_until_ms) = decision.cooldown_until_ms
724 && let Some(state) = self.retry_pending.get_mut(&node_addr)
725 {
726 state.retry_after_ms = state.retry_after_ms.max(cooldown_until_ms);
730 }
731 }
732 }
733 }
734
735 self.maybe_run_startup_open_discovery_sweep(&bootstrap)
736 .await;
737 self.queue_open_discovery_retries(&bootstrap).await;
738 }
739
740 pub(super) fn lan_discovery_scope(&self) -> Option<String> {
745 let app = self.config.node.discovery.nostr.app.trim();
746 if app.is_empty() {
747 return None;
748 }
749 if let Some(rest) = app.strip_prefix("fips-overlay-v1:") {
750 let scope = rest.trim();
751 if scope.is_empty() {
752 None
753 } else {
754 Some(scope.to_string())
755 }
756 } else {
757 Some(app.to_string())
758 }
759 }
760
761 pub(super) async fn poll_lan_discovery(&mut self) {
767 let Some(runtime) = self.lan_discovery.clone() else {
768 return;
769 };
770 let events = runtime.drain_events().await;
771 if events.is_empty() {
772 return;
773 }
774 let udp_transport_id = self.find_transport_for_type("udp");
777 let Some(transport_id) = udp_transport_id else {
778 debug!("lan: no operational UDP transport, skipping discovered peers");
779 return;
780 };
781 for event in events {
782 let crate::discovery::lan::LanEvent::Discovered(peer) = event;
783 let identity = match crate::PeerIdentity::from_npub(&peer.npub) {
784 Ok(id) => id,
785 Err(err) => {
786 debug!(npub = %peer.npub, error = %err, "lan: skip bad npub");
787 continue;
788 }
789 };
790 let peer_node_addr = *identity.node_addr();
791 if self.peers.contains_key(&peer_node_addr) {
792 continue;
793 }
794 let already_connecting = self.connections.values().any(|conn| {
795 conn.expected_identity()
796 .map(|id| id.node_addr() == &peer_node_addr)
797 .unwrap_or(false)
798 });
799 if already_connecting {
800 continue;
801 }
802 let remote_addr = crate::transport::TransportAddr::from_string(&peer.addr.to_string());
803 info!(
804 npub = %identity.short_npub(),
805 addr = %peer.addr,
806 "lan: initiating handshake to discovered peer"
807 );
808 if let Err(err) = self
809 .initiate_connection(transport_id, remote_addr, identity)
810 .await
811 {
812 debug!(
813 npub = %peer.npub,
814 error = %err,
815 "lan: failed to initiate connection to discovered peer"
816 );
817 }
818 }
819 }
820
821 pub(super) async fn poll_pending_connects(&mut self) {
828 if self.pending_connects.is_empty() {
829 return;
830 }
831
832 let mut completed = Vec::new();
833
834 for (i, pending) in self.pending_connects.iter().enumerate() {
835 let state = if let Some(transport) = self.transports.get(&pending.transport_id) {
836 transport.connection_state(&pending.remote_addr)
837 } else {
838 crate::transport::ConnectionState::Failed("transport removed".into())
839 };
840
841 match state {
842 crate::transport::ConnectionState::Connected => {
843 completed.push((i, true, None));
844 }
845 crate::transport::ConnectionState::Failed(reason) => {
846 completed.push((i, false, Some(reason)));
847 }
848 crate::transport::ConnectionState::Connecting => {
849 }
851 crate::transport::ConnectionState::None => {
852 completed.push((i, false, Some("no connection attempt found".into())));
854 }
855 }
856 }
857
858 for (i, success, reason) in completed.into_iter().rev() {
860 let pending = self.pending_connects.remove(i);
861
862 if success {
863 if let Some(link) = self.links.get_mut(&pending.link_id) {
865 link.set_connected();
866 }
867
868 debug!(
869 peer = %self.peer_display_name(pending.peer_identity.node_addr()),
870 transport_id = %pending.transport_id,
871 remote_addr = %pending.remote_addr,
872 link_id = %pending.link_id,
873 "Transport connected, starting handshake"
874 );
875
876 if let Err(e) = self
878 .start_handshake(
879 pending.link_id,
880 pending.transport_id,
881 pending.remote_addr.clone(),
882 pending.peer_identity,
883 )
884 .await
885 {
886 warn!(
887 link_id = %pending.link_id,
888 error = %e,
889 "Failed to start handshake after transport connect"
890 );
891 self.remove_link(&pending.link_id);
893 }
894 } else {
895 let reason = reason.unwrap_or_default();
896 warn!(
897 peer = %self.peer_display_name(pending.peer_identity.node_addr()),
898 transport_id = %pending.transport_id,
899 remote_addr = %pending.remote_addr,
900 link_id = %pending.link_id,
901 reason = %reason,
902 "Transport connect failed"
903 );
904
905 self.remove_link(&pending.link_id);
907 self.links.remove(&pending.link_id);
908 self.schedule_retry(*pending.peer_identity.node_addr(), Self::now_ms());
909 }
910 }
911 }
912
913 pub async fn start(&mut self) -> Result<(), NodeError> {
920 if !self.state.can_start() {
921 return Err(NodeError::AlreadyStarted);
922 }
923 self.state = NodeState::Starting;
924
925 let packet_buffer_size = self.config.node.buffers.packet_channel;
927 let (packet_tx, packet_rx) = packet_channel(packet_buffer_size);
928 self.packet_tx = Some(packet_tx.clone());
929 self.packet_rx = Some(packet_rx);
930
931 let transport_handles = self.create_transports(&packet_tx).await;
933
934 for mut handle in transport_handles {
935 let transport_id = handle.transport_id();
936 let transport_type = handle.transport_type().name;
937 let name = handle.name().map(|s| s.to_string());
938
939 match handle.start().await {
940 Ok(()) => {
941 self.transports.insert(transport_id, handle);
942 }
943 Err(e) => {
944 if let Some(ref n) = name {
945 warn!(transport_type, name = %n, error = %e, "Transport failed to start");
946 } else {
947 warn!(transport_type, error = %e, "Transport failed to start");
948 }
949 }
950 }
951 }
952
953 if !self.transports.is_empty() {
954 info!(count = self.transports.len(), "Transports initialized");
955 }
956
957 #[cfg(unix)]
973 {
974 let cpu_default = std::thread::available_parallelism()
975 .map(|n| n.get())
976 .unwrap_or(1)
977 .max(1);
978 let encrypt_worker_count: usize = std::env::var("FIPS_ENCRYPT_WORKERS")
979 .ok()
980 .and_then(|s| s.parse().ok())
981 .unwrap_or(cpu_default)
982 .max(1);
983 self.encrypt_workers = Some(super::encrypt_worker::EncryptWorkerPool::spawn(
984 encrypt_worker_count,
985 ));
986 info!(
987 workers = encrypt_worker_count,
988 "Spawned FMP-encrypt worker pool"
989 );
990
991 let decrypt_worker_count: usize = std::env::var("FIPS_DECRYPT_WORKERS")
1000 .ok()
1001 .and_then(|s| s.parse().ok())
1002 .unwrap_or(cpu_default);
1003 if decrypt_worker_count == 0 {
1004 info!("FIPS_DECRYPT_WORKERS=0 → in-line decrypt in rx_loop (no worker pool)");
1005 } else {
1006 self.decrypt_workers = Some(super::decrypt_worker::DecryptWorkerPool::spawn(
1007 decrypt_worker_count,
1008 ));
1009 info!(
1010 workers = decrypt_worker_count,
1011 "Spawned FMP+FSP-decrypt worker pool"
1012 );
1013 }
1014 }
1015
1016 if self.config.node.discovery.nostr.enabled {
1017 match NostrDiscovery::start(&self.identity, self.config.node.discovery.nostr.clone())
1018 .await
1019 {
1020 Ok(runtime) => {
1021 if let Err(err) = self.refresh_overlay_advert(&runtime).await {
1022 warn!(error = %err, "Failed to publish initial Nostr overlay advert");
1023 }
1024 self.nostr_discovery = Some(runtime);
1025 self.nostr_discovery_started_at_ms = Some(Self::now_ms());
1026 info!("Nostr overlay discovery enabled");
1027 }
1028 Err(err) => {
1029 warn!(error = %err, "Failed to start Nostr overlay discovery");
1030 }
1031 }
1032 }
1033
1034 if self.config.node.discovery.lan.enabled {
1038 let advertised_udp_port = self
1039 .transports
1040 .values()
1041 .filter(|h| h.is_operational())
1042 .filter(|h| h.transport_type().name == "udp")
1043 .find_map(|h| h.local_addr().map(|addr| addr.port()))
1044 .unwrap_or(0);
1045 let scope = self.lan_discovery_scope();
1046 match crate::discovery::lan::LanDiscovery::start(
1047 &self.identity,
1048 scope,
1049 advertised_udp_port,
1050 self.config.node.discovery.lan.clone(),
1051 )
1052 .await
1053 {
1054 Ok(runtime) => {
1055 self.lan_discovery = Some(runtime);
1056 info!("LAN mDNS discovery enabled");
1057 }
1058 Err(err) => {
1059 debug!(error = %err, "LAN mDNS discovery not started");
1060 }
1061 }
1062 }
1063
1064 self.initiate_peer_connections().await;
1067
1068 if self.config.tun.enabled {
1070 let address = *self.identity.address();
1071 match TunDevice::create(&self.config.tun, address).await {
1072 Ok(device) => {
1073 let mtu = device.mtu();
1074 let name = device.name().to_string();
1075 let our_addr = *device.address();
1076
1077 info!("TUN device active:");
1078 info!(" name: {}", name);
1079 info!(" address: {}", device.address());
1080 info!(" mtu: {}", mtu);
1081
1082 let effective_mtu = self.effective_ipv6_mtu();
1084 let max_mss = effective_mtu.saturating_sub(40).saturating_sub(20); info!("effective MTU: {} bytes", effective_mtu);
1087 debug!(" max TCP MSS: {} bytes", max_mss);
1088
1089 #[cfg(target_os = "macos")]
1093 let (shutdown_read_fd, shutdown_write_fd) = {
1094 let mut fds = [0i32; 2];
1095 if unsafe { libc::pipe(fds.as_mut_ptr()) } < 0 {
1096 return Err(NodeError::Tun(crate::upper::tun::TunError::Configure(
1097 "failed to create shutdown pipe".into(),
1098 )));
1099 }
1100 (fds[0], fds[1])
1101 };
1102
1103 let (writer, tun_tx) =
1107 device.create_writer(max_mss, self.path_mtu_lookup.clone())?;
1108
1109 let writer_handle = thread::spawn(move || {
1111 writer.run();
1112 });
1113
1114 let reader_tun_tx = tun_tx.clone();
1116
1117 let tun_channel_size = self.config.node.buffers.tun_channel;
1119 let (outbound_tx, outbound_rx) = tokio::sync::mpsc::channel(tun_channel_size);
1120
1121 let transport_mtu = self.transport_mtu();
1123 let path_mtu_lookup = self.path_mtu_lookup.clone();
1124 #[cfg(target_os = "macos")]
1125 let reader_handle = thread::spawn(move || {
1126 run_tun_reader(
1127 device,
1128 mtu,
1129 our_addr,
1130 reader_tun_tx,
1131 outbound_tx,
1132 transport_mtu,
1133 path_mtu_lookup,
1134 shutdown_read_fd,
1135 );
1136 });
1137 #[cfg(not(target_os = "macos"))]
1138 let reader_handle = thread::spawn(move || {
1139 run_tun_reader(
1140 device,
1141 mtu,
1142 our_addr,
1143 reader_tun_tx,
1144 outbound_tx,
1145 transport_mtu,
1146 path_mtu_lookup,
1147 );
1148 });
1149
1150 self.tun_state = TunState::Active;
1151 self.tun_name = Some(name);
1152 self.tun_tx = Some(tun_tx);
1153 self.tun_outbound_rx = Some(outbound_rx);
1154 self.tun_reader_handle = Some(reader_handle);
1155 self.tun_writer_handle = Some(writer_handle);
1156 #[cfg(target_os = "macos")]
1157 {
1158 self.tun_shutdown_fd = Some(shutdown_write_fd);
1159 }
1160 }
1161 Err(e) => {
1162 self.tun_state = TunState::Failed;
1163 warn!(error = %e, "Failed to initialize TUN, continuing without it");
1164 }
1165 }
1166 }
1167
1168 if self.config.dns.enabled {
1185 let addr_str = self.config.dns.bind_addr();
1186 match addr_str.parse::<std::net::IpAddr>() {
1187 Ok(ip) => {
1188 let bind = std::net::SocketAddr::new(ip, self.config.dns.port());
1189 match Self::bind_dns_socket(bind) {
1190 Ok(socket) => {
1191 let dns_channel_size = self.config.node.buffers.dns_channel;
1192 let (identity_tx, identity_rx) =
1193 tokio::sync::mpsc::channel(dns_channel_size);
1194 let dns_ttl = self.config.dns.ttl();
1195 let base_hosts = crate::upper::hosts::HostMap::from_peer_configs(
1196 self.config.peers(),
1197 );
1198 let reloader = if self.config.node.system_files_enabled {
1199 let hosts_path = std::path::PathBuf::from(
1200 crate::upper::hosts::DEFAULT_HOSTS_PATH,
1201 );
1202 crate::upper::hosts::HostMapReloader::new(base_hosts, hosts_path)
1203 } else {
1204 crate::upper::hosts::HostMapReloader::memory_only(base_hosts)
1205 };
1206 let mesh_ifindex = Self::lookup_mesh_ifindex(self.config.tun.name());
1214 info!(
1215 bind = %bind,
1216 hosts = reloader.hosts().len(),
1217 mesh_ifindex = ?mesh_ifindex,
1218 "DNS responder started for .fips domain (auto-reload enabled)"
1219 );
1220 let handle = tokio::spawn(crate::upper::dns::run_dns_responder(
1221 socket,
1222 identity_tx,
1223 dns_ttl,
1224 reloader,
1225 mesh_ifindex,
1226 ));
1227 self.dns_identity_rx = Some(identity_rx);
1228 self.dns_task = Some(handle);
1229 }
1230 Err(e) => {
1231 warn!(bind = %bind, error = %e, "Failed to start DNS responder");
1232 }
1233 }
1234 }
1235 Err(e) => {
1236 warn!(addr = %addr_str, error = %e, "Invalid dns.bind_addr; DNS responder not started");
1237 }
1238 }
1239 }
1240
1241 self.state = NodeState::Running;
1242 info!("Node started:");
1243 info!(" state: {}", self.state);
1244 info!(" transports: {}", self.transports.len());
1245 info!(" connections: {}", self.connections.len());
1246 Ok(())
1247 }
1248
1249 fn bind_dns_socket(
1262 addr: std::net::SocketAddr,
1263 ) -> Result<tokio::net::UdpSocket, std::io::Error> {
1264 use socket2::{Domain, Protocol, Socket, Type};
1265 let domain = if addr.is_ipv4() {
1266 Domain::IPV4
1267 } else {
1268 Domain::IPV6
1269 };
1270 let sock = Socket::new(domain, Type::DGRAM, Some(Protocol::UDP))?;
1271 if addr.is_ipv6() {
1272 sock.set_only_v6(false)?;
1273 #[cfg(unix)]
1274 Self::set_recv_pktinfo_v6(&sock)?;
1275 }
1276 sock.set_nonblocking(true)?;
1277 sock.bind(&addr.into())?;
1278 tokio::net::UdpSocket::from_std(sock.into())
1279 }
1280
1281 #[cfg(unix)]
1287 fn set_recv_pktinfo_v6(sock: &socket2::Socket) -> Result<(), std::io::Error> {
1288 use std::os::fd::AsRawFd;
1289 let enable: libc::c_int = 1;
1290 let ret = unsafe {
1291 libc::setsockopt(
1292 sock.as_raw_fd(),
1293 libc::IPPROTO_IPV6,
1294 libc::IPV6_RECVPKTINFO,
1295 &enable as *const _ as *const libc::c_void,
1296 std::mem::size_of::<libc::c_int>() as libc::socklen_t,
1297 )
1298 };
1299 if ret < 0 {
1300 return Err(std::io::Error::last_os_error());
1301 }
1302 Ok(())
1303 }
1304
1305 fn lookup_mesh_ifindex(name: &str) -> Option<u32> {
1312 #[cfg(unix)]
1313 {
1314 let c_name = std::ffi::CString::new(name).ok()?;
1315 let idx = unsafe { libc::if_nametoindex(c_name.as_ptr()) };
1316 if idx == 0 { None } else { Some(idx) }
1317 }
1318 #[cfg(not(unix))]
1319 {
1320 let _ = name;
1321 None
1322 }
1323 }
1324
1325 pub async fn stop(&mut self) -> Result<(), NodeError> {
1330 if !self.state.can_stop() {
1331 return Err(NodeError::NotStarted);
1332 }
1333 self.state = NodeState::Stopping;
1334 info!(state = %self.state, "Node stopping");
1335
1336 if let Some(handle) = self.dns_task.take() {
1338 handle.abort();
1339 debug!("DNS responder stopped");
1340 }
1341
1342 self.send_disconnect_to_all_peers(DisconnectReason::Shutdown)
1344 .await;
1345
1346 if let Some(bootstrap) = self.nostr_discovery.take()
1348 && let Err(e) = bootstrap.shutdown().await
1349 {
1350 warn!(error = %e, "Failed to shutdown Nostr overlay discovery");
1351 }
1352
1353 if let Some(lan) = self.lan_discovery.take() {
1357 lan.shutdown().await;
1358 }
1359
1360 let transport_ids: Vec<_> = self.transports.keys().cloned().collect();
1362 for transport_id in transport_ids {
1363 if let Some(mut handle) = self.transports.remove(&transport_id) {
1364 let transport_type = handle.transport_type().name;
1365 match handle.stop().await {
1366 Ok(()) => {
1367 info!(transport_id = %transport_id, transport_type, "Transport stopped");
1368 }
1369 Err(e) => {
1370 warn!(
1371 transport_id = %transport_id,
1372 transport_type,
1373 error = %e,
1374 "Transport stop failed"
1375 );
1376 }
1377 }
1378 }
1379 }
1380
1381 self.packet_tx.take();
1383 self.packet_rx.take();
1384
1385 if let Some(name) = self.tun_name.take() {
1387 info!(name = %name, "Shutting down TUN interface");
1388
1389 self.tun_tx.take();
1391
1392 if let Err(e) = shutdown_tun_interface(&name).await {
1394 warn!(name = %name, error = %e, "Failed to shutdown TUN interface");
1395 }
1396
1397 #[cfg(target_os = "macos")]
1400 if let Some(fd) = self.tun_shutdown_fd.take() {
1401 unsafe {
1402 libc::write(fd, b"x".as_ptr() as *const libc::c_void, 1);
1403 libc::close(fd);
1404 }
1405 }
1406
1407 if let Some(handle) = self.tun_reader_handle.take() {
1409 let _ = handle.join();
1410 }
1411 if let Some(handle) = self.tun_writer_handle.take() {
1412 let _ = handle.join();
1413 }
1414
1415 self.tun_state = TunState::Disabled;
1416 }
1417
1418 self.state = NodeState::Stopped;
1419 info!(state = %self.state, "Node stopped");
1420 Ok(())
1421 }
1422
1423 async fn send_disconnect_to_all_peers(&mut self, reason: DisconnectReason) {
1428 let disconnect = Disconnect::new(reason);
1429 let plaintext = disconnect.encode();
1430
1431 let peer_addrs: Vec<NodeAddr> = self
1433 .peers
1434 .iter()
1435 .filter(|(_, peer)| peer.can_send() && peer.has_session())
1436 .map(|(addr, _)| *addr)
1437 .collect();
1438
1439 if peer_addrs.is_empty() {
1440 debug!(
1441 total_peers = self.peers.len(),
1442 "No sendable peers for disconnect notification"
1443 );
1444 return;
1445 }
1446
1447 let mut sent = 0usize;
1448 for node_addr in &peer_addrs {
1449 match self
1450 .send_encrypted_link_message(node_addr, &plaintext)
1451 .await
1452 {
1453 Ok(()) => sent += 1,
1454 Err(e) => {
1455 debug!(
1456 peer = %self.peer_display_name(node_addr),
1457 error = %e,
1458 "Failed to send disconnect (transport may be down)"
1459 );
1460 }
1461 }
1462 }
1463
1464 info!(sent, total = peer_addrs.len(), reason = %reason, "Sent disconnect notifications");
1465 }
1466
1467 fn static_peer_addresses(&self, peer_config: &PeerConfig) -> Vec<PeerAddress> {
1468 peer_config
1469 .addresses_by_priority()
1470 .into_iter()
1471 .cloned()
1472 .collect()
1473 }
1474
1475 async fn nostr_peer_fallback_addresses(
1476 &self,
1477 peer_config: &PeerConfig,
1478 existing: &[PeerAddress],
1479 ) -> Vec<PeerAddress> {
1480 if !self.config.node.discovery.nostr.enabled
1481 || self.config.node.discovery.nostr.policy
1482 == crate::config::NostrDiscoveryPolicy::Disabled
1483 {
1484 return Vec::new();
1485 }
1486
1487 let Some(bootstrap) = self.nostr_discovery.clone() else {
1488 return Vec::new();
1489 };
1490 let endpoints = match bootstrap.advert_endpoints_for_peer(&peer_config.npub).await {
1491 Ok(endpoints) => endpoints,
1492 Err(err) => {
1493 debug!(
1494 npub = %peer_config.npub,
1495 error = %err,
1496 "Failed to resolve Nostr advert endpoints for configured peer"
1497 );
1498 return Vec::new();
1499 }
1500 };
1501
1502 let mut fallback = Vec::new();
1503 let mut next_priority = existing
1504 .iter()
1505 .map(|addr| addr.priority)
1506 .max()
1507 .unwrap_or(100)
1508 .saturating_add(1);
1509 let seen_at_ms = Self::now_ms();
1514 for endpoint in endpoints {
1515 let Some(candidate) =
1516 Self::overlay_endpoint_to_peer_address(&endpoint, next_priority, seen_at_ms)
1517 else {
1518 continue;
1519 };
1520 if existing
1521 .iter()
1522 .any(|addr| addr.transport == candidate.transport && addr.addr == candidate.addr)
1523 || fallback.iter().any(|addr: &PeerAddress| {
1524 addr.transport == candidate.transport && addr.addr == candidate.addr
1525 })
1526 {
1527 continue;
1528 }
1529 fallback.push(candidate);
1530 next_priority = next_priority.saturating_add(1);
1531 }
1532 fallback
1533 }
1534
1535 fn overlay_endpoint_to_peer_address(
1536 endpoint: &OverlayEndpointAdvert,
1537 priority: u8,
1538 seen_at_ms: u64,
1539 ) -> Option<PeerAddress> {
1540 let transport = match endpoint.transport {
1541 OverlayTransportKind::Udp => "udp",
1542 OverlayTransportKind::Tcp => "tcp",
1543 OverlayTransportKind::Tor => "tor",
1544 };
1545 Some(
1546 PeerAddress::with_priority(transport, endpoint.addr.clone(), priority)
1547 .with_seen_at_ms(seen_at_ms),
1548 )
1549 }
1550
1551 async fn attempt_peer_address_list(
1552 &mut self,
1553 peer_config: &PeerConfig,
1554 peer_identity: PeerIdentity,
1555 allow_bootstrap_nat: bool,
1556 addresses: &[PeerAddress],
1557 ) -> Result<(), NodeError> {
1558 for addr in addresses {
1559 if addr.transport == "udp" && addr.addr.eq_ignore_ascii_case("nat") {
1560 if !allow_bootstrap_nat {
1561 continue;
1562 }
1563 let Some(bootstrap) = self.nostr_discovery.clone() else {
1564 debug!(npub = %peer_config.npub, "No Nostr overlay runtime for udp:nat address");
1565 continue;
1566 };
1567 bootstrap.request_connect(peer_config.clone()).await;
1568 info!(npub = %peer_config.npub, "Started Nostr UDP NAT traversal attempt");
1569 return Ok(());
1570 }
1571
1572 let (transport_id, remote_addr) = if addr.transport == "ethernet" {
1573 match self.resolve_ethernet_addr(&addr.addr) {
1574 Ok(result) => result,
1575 Err(e) => {
1576 debug!(
1577 transport = %addr.transport,
1578 addr = %addr.addr,
1579 error = %e,
1580 "Failed to resolve Ethernet address"
1581 );
1582 continue;
1583 }
1584 }
1585 } else if addr.transport == "ble" {
1586 #[cfg(bluer_available)]
1587 {
1588 match self.resolve_ble_addr(&addr.addr) {
1589 Ok(result) => result,
1590 Err(e) => {
1591 debug!(
1592 transport = %addr.transport,
1593 addr = %addr.addr,
1594 error = %e,
1595 "Failed to resolve BLE address"
1596 );
1597 continue;
1598 }
1599 }
1600 }
1601 #[cfg(not(bluer_available))]
1602 {
1603 debug!(transport = %addr.transport, "BLE transport not available on this build");
1604 continue;
1605 }
1606 } else {
1607 let tid = match self.find_transport_for_type(&addr.transport) {
1608 Some(id) => id,
1609 None => {
1610 debug!(
1611 transport = %addr.transport,
1612 addr = %addr.addr,
1613 "No operational transport for address type"
1614 );
1615 continue;
1616 }
1617 };
1618 (tid, TransportAddr::from_string(&addr.addr))
1619 };
1620
1621 match self
1622 .initiate_connection(transport_id, remote_addr, peer_identity)
1623 .await
1624 {
1625 Ok(()) => return Ok(()),
1626 Err(e @ NodeError::AccessDenied(_)) => return Err(e),
1627 Err(e) => {
1628 debug!(
1629 npub = %peer_config.npub,
1630 transport_id = %transport_id,
1631 error = %e,
1632 "Connection attempt failed, trying next address"
1633 );
1634 }
1635 }
1636 }
1637
1638 Err(NodeError::NoTransportForType(format!(
1639 "no operational transport for any of {}'s addresses",
1640 peer_config.npub
1641 )))
1642 }
1643
1644 async fn queue_open_discovery_retries(&mut self, bootstrap: &std::sync::Arc<NostrDiscovery>) {
1645 self.run_open_discovery_sweep(bootstrap, None, "per-tick")
1646 .await;
1647 }
1648
1649 pub(in crate::node) async fn run_open_discovery_sweep(
1660 &mut self,
1661 bootstrap: &std::sync::Arc<NostrDiscovery>,
1662 max_age_secs: Option<u64>,
1663 caller: &'static str,
1664 ) {
1665 if !self.config.node.discovery.nostr.enabled
1666 || self.config.node.discovery.nostr.policy != crate::config::NostrDiscoveryPolicy::Open
1667 {
1668 return;
1669 }
1670
1671 let configured_npubs = self
1672 .config
1673 .peers()
1674 .iter()
1675 .map(|peer| peer.npub.clone())
1676 .collect::<HashSet<_>>();
1677 let now_ms = Self::now_ms();
1678 let now_secs = now_ms / 1000;
1679 let mut enqueue_budget = self.open_discovery_enqueue_budget(&configured_npubs);
1680 if enqueue_budget == 0 {
1681 debug!(
1682 caller = %caller,
1683 "open-discovery sweep: enqueue budget is 0, skipping"
1684 );
1685 return;
1686 }
1687
1688 let candidates = bootstrap.cached_open_discovery_candidates(64).await;
1689 let cached_count = candidates.len();
1690 let mut enqueued = 0usize;
1691 let mut skipped_age = 0usize;
1692 let mut skipped_configured = 0usize;
1693 let mut skipped_self = 0usize;
1694 let mut skipped_connected = 0usize;
1695 let mut skipped_retry_pending = 0usize;
1696 let mut skipped_connecting = 0usize;
1697 let mut skipped_no_endpoints = 0usize;
1698 let mut skipped_invalid_npub = 0usize;
1699 let mut skipped_cooldown = 0usize;
1700
1701 for (npub, endpoints, created_at_secs) in candidates {
1702 if enqueue_budget == 0 {
1703 break;
1704 }
1705
1706 if let Some(max_age) = max_age_secs
1707 && now_secs.saturating_sub(created_at_secs) > max_age
1708 {
1709 skipped_age = skipped_age.saturating_add(1);
1710 continue;
1711 }
1712
1713 if configured_npubs.contains(&npub) {
1714 if let Ok(identity) = PeerIdentity::from_npub(&npub) {
1734 let configured_addr = *identity.node_addr();
1735 if let Some(state) = self.retry_pending.get_mut(&configured_addr)
1736 && state.retry_after_ms > now_ms
1737 {
1738 state.retry_after_ms = now_ms;
1739 debug!(
1740 caller = %caller,
1741 peer = %self.peer_display_name(&configured_addr),
1742 advert_age_secs = now_secs.saturating_sub(created_at_secs),
1743 "Expediting configured-peer retry after fresh overlay advert"
1744 );
1745 }
1746 }
1747 skipped_configured = skipped_configured.saturating_add(1);
1748 continue;
1749 }
1750
1751 let peer_identity = match PeerIdentity::from_npub(&npub) {
1752 Ok(identity) => identity,
1753 Err(_) => {
1754 skipped_invalid_npub = skipped_invalid_npub.saturating_add(1);
1755 continue;
1756 }
1757 };
1758 let node_addr = *peer_identity.node_addr();
1759 if node_addr == *self.identity.node_addr() {
1760 skipped_self = skipped_self.saturating_add(1);
1761 continue;
1762 }
1763 if self.peers.contains_key(&node_addr) {
1764 skipped_connected = skipped_connected.saturating_add(1);
1765 continue;
1766 }
1767 if self.retry_pending.contains_key(&node_addr) {
1768 skipped_retry_pending = skipped_retry_pending.saturating_add(1);
1769 continue;
1770 }
1771 if bootstrap.cooldown_until(&npub, now_ms).is_some() {
1772 skipped_cooldown = skipped_cooldown.saturating_add(1);
1773 continue;
1774 }
1775 let connecting = self.connections.values().any(|conn| {
1776 conn.expected_identity()
1777 .map(|id| id.node_addr() == &node_addr)
1778 .unwrap_or(false)
1779 });
1780 if connecting {
1781 skipped_connecting = skipped_connecting.saturating_add(1);
1782 continue;
1783 }
1784
1785 let mut addresses = Vec::new();
1786 let mut priority = 120u8;
1787 let seen_at_ms = Self::now_ms();
1788 for endpoint in endpoints {
1789 let Some(candidate) =
1790 Self::overlay_endpoint_to_peer_address(&endpoint, priority, seen_at_ms)
1791 else {
1792 continue;
1793 };
1794 if addresses.iter().any(|existing: &PeerAddress| {
1795 existing.transport == candidate.transport && existing.addr == candidate.addr
1796 }) {
1797 continue;
1798 }
1799 addresses.push(candidate);
1800 priority = priority.saturating_add(1);
1801 }
1802 if addresses.is_empty() {
1803 skipped_no_endpoints = skipped_no_endpoints.saturating_add(1);
1804 continue;
1805 }
1806
1807 self.peer_aliases
1808 .entry(node_addr)
1809 .or_insert_with(|| peer_identity.short_npub());
1810 self.register_identity(node_addr, peer_identity.pubkey_full());
1811
1812 let mut state = super::retry::RetryState::new(PeerConfig {
1813 npub: npub.clone(),
1814 alias: None,
1815 addresses,
1816 connect_policy: ConnectPolicy::AutoConnect,
1817 auto_reconnect: true,
1818 });
1819 state.reconnect = false;
1820 state.retry_after_ms = now_ms;
1821 state.expires_at_ms = Some(self.open_discovery_retry_expires_at_ms(now_ms));
1822 self.retry_pending.insert(node_addr, state);
1823 info!(
1824 caller = %caller,
1825 peer = %peer_identity.short_npub(),
1826 advert_age_secs = now_secs.saturating_sub(created_at_secs),
1827 "open-discovery sweep: queued retry for cached advert"
1828 );
1829 enqueue_budget = enqueue_budget.saturating_sub(1);
1830 enqueued = enqueued.saturating_add(1);
1831 }
1832
1833 let total_skipped = skipped_age
1837 + skipped_configured
1838 + skipped_self
1839 + skipped_connected
1840 + skipped_retry_pending
1841 + skipped_connecting
1842 + skipped_no_endpoints
1843 + skipped_invalid_npub
1844 + skipped_cooldown;
1845 let should_summarize = caller == "startup" || enqueued > 0;
1846 if should_summarize {
1847 info!(
1848 caller = %caller,
1849 cached = cached_count,
1850 queued = enqueued,
1851 skipped_age = skipped_age,
1852 skipped_configured = skipped_configured,
1853 skipped_self = skipped_self,
1854 skipped_connected = skipped_connected,
1855 skipped_retry_pending = skipped_retry_pending,
1856 skipped_connecting = skipped_connecting,
1857 skipped_no_endpoints = skipped_no_endpoints,
1858 skipped_invalid_npub = skipped_invalid_npub,
1859 skipped_cooldown = skipped_cooldown,
1860 skipped_total = total_skipped,
1861 "open-discovery sweep complete"
1862 );
1863 }
1864 }
1865
1866 async fn maybe_run_startup_open_discovery_sweep(
1874 &mut self,
1875 bootstrap: &std::sync::Arc<NostrDiscovery>,
1876 ) {
1877 if self.startup_open_discovery_sweep_done {
1878 return;
1879 }
1880 if !self.config.node.discovery.nostr.enabled
1881 || self.config.node.discovery.nostr.policy != crate::config::NostrDiscoveryPolicy::Open
1882 {
1883 self.startup_open_discovery_sweep_done = true;
1885 return;
1886 }
1887 let Some(started_at_ms) = self.nostr_discovery_started_at_ms else {
1888 return;
1889 };
1890 let now_ms = Self::now_ms();
1891 let delay_ms = self
1892 .config
1893 .node
1894 .discovery
1895 .nostr
1896 .startup_sweep_delay_secs
1897 .saturating_mul(1000);
1898 if now_ms < started_at_ms.saturating_add(delay_ms) {
1899 return;
1900 }
1901
1902 let max_age_secs = self.config.node.discovery.nostr.startup_sweep_max_age_secs;
1903 self.run_open_discovery_sweep(bootstrap, Some(max_age_secs), "startup")
1904 .await;
1905 self.startup_open_discovery_sweep_done = true;
1906 }
1907
1908 fn available_outbound_slots(&self) -> usize {
1909 let connection_used = self
1910 .connections
1911 .len()
1912 .saturating_add(self.pending_connects.len());
1913 let connection_slots = if self.max_connections == 0 {
1914 usize::MAX
1915 } else {
1916 self.max_connections.saturating_sub(connection_used)
1917 };
1918
1919 let peer_slots = if self.max_peers == 0 {
1920 usize::MAX
1921 } else {
1922 self.max_peers.saturating_sub(self.peers.len())
1923 };
1924
1925 connection_slots.min(peer_slots)
1926 }
1927
1928 fn open_discovery_enqueue_budget(&self, configured_npubs: &HashSet<String>) -> usize {
1929 let current_open_discovery_pending = self
1930 .retry_pending
1931 .values()
1932 .filter(|state| !configured_npubs.contains(&state.peer_config.npub))
1933 .count();
1934
1935 let cap_remaining = self
1936 .config
1937 .node
1938 .discovery
1939 .nostr
1940 .open_discovery_max_pending
1941 .saturating_sub(current_open_discovery_pending);
1942
1943 cap_remaining.min(self.available_outbound_slots())
1944 }
1945
1946 fn open_discovery_retry_expires_at_ms(&self, now_ms: u64) -> u64 {
1947 now_ms.saturating_add(
1948 self.config
1949 .node
1950 .discovery
1951 .nostr
1952 .advert_ttl_secs
1953 .saturating_mul(1000)
1954 .saturating_mul(OPEN_DISCOVERY_RETRY_LIFETIME_MULTIPLIER),
1955 )
1956 }
1957
1958 async fn build_overlay_advert(
1959 &self,
1960 bootstrap: &std::sync::Arc<NostrDiscovery>,
1961 ) -> Option<OverlayAdvert> {
1962 if !self.config.node.discovery.nostr.enabled {
1963 return None;
1964 }
1965
1966 let mut endpoints = Vec::new();
1967 let mut has_udp_nat = false;
1968
1969 for handle in self.transports.values() {
1970 if !handle.is_operational() {
1971 continue;
1972 }
1973
1974 match handle.transport_type().name {
1975 "udp" => {
1976 let Some(cfg) = self.lookup_udp_config(handle.name()) else {
1977 continue;
1978 };
1979 if !cfg.advertise_on_nostr() {
1980 continue;
1981 }
1982 if cfg.is_public() {
1983 if let Some(explicit) = cfg.external_advert_addr() {
1993 endpoints.push(OverlayEndpointAdvert {
1994 transport: OverlayTransportKind::Udp,
1995 addr: explicit.to_string(),
1996 });
1997 } else {
1998 match handle.local_addr() {
1999 Some(addr)
2000 if !addr.ip().is_unspecified()
2001 && !is_unroutable_advert_ip(addr.ip()) =>
2002 {
2003 endpoints.push(OverlayEndpointAdvert {
2004 transport: OverlayTransportKind::Udp,
2005 addr: addr.to_string(),
2006 });
2007 }
2008 Some(addr) => {
2009 let key = handle.transport_id().as_u32();
2010 let port = addr.port();
2011 if let Some(public) =
2012 bootstrap.learn_public_udp_addr(key, port).await
2013 {
2014 endpoints.push(OverlayEndpointAdvert {
2015 transport: OverlayTransportKind::Udp,
2016 addr: public.to_string(),
2017 });
2018 } else {
2019 warn!(
2020 transport_id = key,
2021 bind_addr = %addr,
2022 "advert: udp public=true but bind is wildcard \
2023 or private and STUN observation failed; \
2024 advertising no UDP endpoint. Either set \
2025 transports.udp.external_addr, bind to a \
2026 specific *public* IP, or ensure \
2027 node.discovery.nostr.stun_servers is reachable"
2028 );
2029 }
2030 }
2031 None => {}
2032 }
2033 }
2034 } else {
2035 endpoints.push(OverlayEndpointAdvert {
2036 transport: OverlayTransportKind::Udp,
2037 addr: "nat".to_string(),
2038 });
2039 has_udp_nat = true;
2040 }
2041 }
2042 "tcp" => {
2043 let Some(cfg) = self.lookup_tcp_config(handle.name()) else {
2044 continue;
2045 };
2046 if !cfg.advertise_on_nostr() {
2047 continue;
2048 }
2049 if let Some(explicit) = cfg.external_advert_addr() {
2061 endpoints.push(OverlayEndpointAdvert {
2062 transport: OverlayTransportKind::Tcp,
2063 addr: explicit.to_string(),
2064 });
2065 } else {
2066 match handle.local_addr() {
2067 Some(addr)
2068 if !addr.ip().is_unspecified()
2069 && !is_unroutable_advert_ip(addr.ip()) =>
2070 {
2071 endpoints.push(OverlayEndpointAdvert {
2072 transport: OverlayTransportKind::Tcp,
2073 addr: addr.to_string(),
2074 });
2075 }
2076 Some(addr) => {
2077 warn!(
2078 bind_addr = %addr,
2079 "advert: tcp advertise_on_nostr=true bound to wildcard \
2080 or private IP and no transports.tcp.external_addr set; \
2081 advertising no TCP endpoint. Either set external_addr \
2082 to the public IP (recommended for cloud 1:1-NAT setups) \
2083 or bind explicitly to the public IP"
2084 );
2085 }
2086 None => {}
2087 }
2088 }
2089 }
2090 "tor" => {
2091 let Some(cfg) = self.lookup_tor_config(handle.name()) else {
2092 continue;
2093 };
2094 if !cfg.advertise_on_nostr() {
2095 continue;
2096 }
2097 if let Some(addr) = handle.onion_address() {
2098 endpoints.push(OverlayEndpointAdvert {
2099 transport: OverlayTransportKind::Tor,
2100 addr: format!("{}:{}", addr, cfg.advertised_port()),
2101 });
2102 }
2103 }
2104 _ => {}
2105 }
2106 }
2107
2108 if endpoints.is_empty() {
2109 return None;
2110 }
2111
2112 Some(OverlayAdvert {
2113 identifier: ADVERT_IDENTIFIER.to_string(),
2114 version: ADVERT_VERSION,
2115 endpoints,
2116 signal_relays: has_udp_nat.then(|| self.config.node.discovery.nostr.dm_relays.clone()),
2117 stun_servers: has_udp_nat
2118 .then(|| self.config.node.discovery.nostr.stun_servers.clone()),
2119 })
2120 }
2121
2122 async fn refresh_overlay_advert(
2123 &self,
2124 bootstrap: &std::sync::Arc<NostrDiscovery>,
2125 ) -> Result<(), crate::discovery::nostr::BootstrapError> {
2126 let advert = self.build_overlay_advert(bootstrap).await;
2127 bootstrap.update_local_advert(advert).await
2128 }
2129
2130 fn lookup_udp_config(&self, transport_name: Option<&str>) -> Option<&crate::config::UdpConfig> {
2131 match (&self.config.transports.udp, transport_name) {
2132 (crate::config::TransportInstances::Single(cfg), None) => Some(cfg),
2133 (crate::config::TransportInstances::Named(configs), Some(name)) => configs.get(name),
2134 _ => None,
2135 }
2136 }
2137
2138 fn lookup_tcp_config(&self, transport_name: Option<&str>) -> Option<&crate::config::TcpConfig> {
2139 match (&self.config.transports.tcp, transport_name) {
2140 (crate::config::TransportInstances::Single(cfg), None) => Some(cfg),
2141 (crate::config::TransportInstances::Named(configs), Some(name)) => configs.get(name),
2142 _ => None,
2143 }
2144 }
2145
2146 fn lookup_tor_config(&self, transport_name: Option<&str>) -> Option<&crate::config::TorConfig> {
2147 match (&self.config.transports.tor, transport_name) {
2148 (crate::config::TransportInstances::Single(cfg), None) => Some(cfg),
2149 (crate::config::TransportInstances::Named(configs), Some(name)) => configs.get(name),
2150 _ => None,
2151 }
2152 }
2153
2154 pub(in crate::node) async fn try_peer_addresses(
2155 &mut self,
2156 peer_config: &PeerConfig,
2157 peer_identity: PeerIdentity,
2158 allow_bootstrap_nat: bool,
2159 ) -> Result<(), NodeError> {
2160 let peer_node_addr = *peer_identity.node_addr();
2161 if self.peers.contains_key(&peer_node_addr) {
2162 debug!(
2163 npub = %peer_config.npub,
2164 "Peer already exists, skipping address attempts"
2165 );
2166 return Ok(());
2167 }
2168 if self.is_connecting_to_peer(&peer_node_addr) {
2169 debug!(
2170 npub = %peer_config.npub,
2171 "Connection already in progress, skipping address attempts"
2172 );
2173 return Ok(());
2174 }
2175
2176 let static_addresses = self.static_peer_addresses(peer_config);
2184 let overlay_addresses = self
2185 .nostr_peer_fallback_addresses(peer_config, &static_addresses)
2186 .await;
2187
2188 let mut candidates = Vec::with_capacity(overlay_addresses.len() + static_addresses.len());
2189 for addr in overlay_addresses.into_iter().chain(static_addresses) {
2190 if !candidates.iter().any(|existing: &PeerAddress| {
2191 existing.transport == addr.transport && existing.addr == addr.addr
2192 }) {
2193 candidates.push(addr);
2194 }
2195 }
2196
2197 candidates.sort_by(|a, b| match (a.seen_at_ms, b.seen_at_ms) {
2202 (Some(a_ts), Some(b_ts)) => b_ts.cmp(&a_ts),
2203 (Some(_), None) => std::cmp::Ordering::Less,
2204 (None, Some(_)) => std::cmp::Ordering::Greater,
2205 (None, None) => std::cmp::Ordering::Equal,
2206 });
2207
2208 if candidates.is_empty() {
2209 return Err(NodeError::NoTransportForType(format!(
2210 "no addresses known for {}",
2211 peer_config.npub
2212 )));
2213 }
2214
2215 if self
2216 .attempt_peer_address_list(peer_config, peer_identity, allow_bootstrap_nat, &candidates)
2217 .await
2218 .is_ok()
2219 {
2220 return Ok(());
2221 }
2222
2223 Err(NodeError::NoTransportForType(format!(
2224 "no operational transport for any of {}'s addresses",
2225 peer_config.npub
2226 )))
2227 }
2228
2229 pub(crate) async fn api_connect(
2237 &mut self,
2238 npub: &str,
2239 address: &str,
2240 transport: &str,
2241 ) -> Result<serde_json::Value, String> {
2242 let peer_config = PeerConfig {
2243 npub: npub.to_string(),
2244 alias: None,
2245 addresses: vec![PeerAddress::new(transport, address)],
2246 connect_policy: ConnectPolicy::Manual,
2247 auto_reconnect: false,
2248 };
2249
2250 if let Ok(identity) = PeerIdentity::from_npub(npub) {
2252 self.peer_aliases
2253 .insert(*identity.node_addr(), identity.short_npub());
2254 self.register_identity(*identity.node_addr(), identity.pubkey_full());
2255 }
2256
2257 self.initiate_peer_connection(&peer_config)
2258 .await
2259 .map(|()| {
2260 info!(
2261 npub = %npub,
2262 address = %address,
2263 transport = %transport,
2264 "API connect initiated"
2265 );
2266 serde_json::json!({
2267 "npub": npub,
2268 "address": address,
2269 "transport": transport,
2270 })
2271 })
2272 .map_err(|e| e.to_string())
2273 }
2274
2275 pub(crate) fn api_disconnect(&mut self, npub: &str) -> Result<serde_json::Value, String> {
2279 let peer_identity =
2280 PeerIdentity::from_npub(npub).map_err(|e| format!("invalid npub '{npub}': {e}"))?;
2281 let node_addr = *peer_identity.node_addr();
2282
2283 if !self.peers.contains_key(&node_addr) {
2284 return Err(format!("peer not found: {npub}"));
2285 }
2286
2287 self.remove_active_peer(&node_addr);
2289
2290 self.retry_pending.remove(&node_addr);
2292
2293 info!(npub = %npub, "API disconnect completed");
2294
2295 Ok(serde_json::json!({
2296 "npub": npub,
2297 "disconnected": true,
2298 }))
2299 }
2300
2301 pub async fn adopt_established_traversal(
2308 &mut self,
2309 traversal: EstablishedTraversal,
2310 ) -> Result<BootstrapHandoffResult, NodeError> {
2311 debug!(
2312 peer_npub = %traversal.peer_npub,
2313 session_id = %traversal.session_id,
2314 remote_addr = %traversal.remote_addr,
2315 "adopting established traversal socket"
2316 );
2317
2318 if !self.state.is_operational() {
2319 return Err(NodeError::NotStarted);
2320 }
2321
2322 let packet_tx = self.packet_tx.clone().ok_or(NodeError::NotStarted)?;
2323 let peer_identity = PeerIdentity::from_npub(&traversal.peer_npub).map_err(|e| {
2324 NodeError::InvalidPeerNpub {
2325 npub: traversal.peer_npub.clone(),
2326 reason: e.to_string(),
2327 }
2328 })?;
2329 let peer_node_addr = *peer_identity.node_addr();
2330 if self.peers.contains_key(&peer_node_addr) {
2331 debug!(
2332 peer_npub = %traversal.peer_npub,
2333 "Ignoring NAT traversal handoff for already-connected peer"
2334 );
2335 return Err(NodeError::PeerAlreadyExists(peer_node_addr));
2336 }
2337 if self.is_connecting_to_peer(&peer_node_addr) {
2338 debug!(
2339 peer_npub = %traversal.peer_npub,
2340 "Ignoring NAT traversal handoff while peer handshake is already in progress"
2341 );
2342 return Err(NodeError::PeerAlreadyExists(peer_node_addr));
2343 }
2344
2345 self.peer_aliases
2346 .insert(peer_node_addr, peer_identity.short_npub());
2347 self.register_identity(peer_node_addr, peer_identity.pubkey_full());
2348
2349 let transport_id = self.allocate_transport_id();
2350 let inherited_config = traversal.transport_config.clone().unwrap_or_else(|| {
2370 let mut cfg = self
2371 .lookup_udp_config(traversal.transport_name.as_deref())
2372 .or_else(|| self.lookup_udp_config(None))
2373 .cloned()
2374 .unwrap_or_default();
2375 cfg.bind_addr = None;
2376 cfg.external_addr = None;
2377 cfg
2378 });
2379 let mut transport = crate::transport::udp::UdpTransport::new(
2380 transport_id,
2381 traversal.transport_name.clone(),
2382 inherited_config,
2383 packet_tx,
2384 );
2385
2386 transport
2387 .adopt_socket_async(traversal.socket)
2388 .await
2389 .map_err(|e| NodeError::BootstrapHandoff(e.to_string()))?;
2390
2391 let local_addr = transport.local_addr().ok_or_else(|| {
2392 NodeError::BootstrapHandoff("adopted UDP transport has no local address".into())
2393 })?;
2394
2395 self.transports.insert(
2396 transport_id,
2397 crate::transport::TransportHandle::Udp(transport),
2398 );
2399 self.bootstrap_transports.insert(transport_id);
2400 self.bootstrap_transport_npubs
2401 .insert(transport_id, traversal.peer_npub.clone());
2402
2403 let remote_addr = TransportAddr::from_string(&traversal.remote_addr.to_string());
2404 if let Err(err) = self
2405 .initiate_connection(transport_id, remote_addr.clone(), peer_identity)
2406 .await
2407 {
2408 self.bootstrap_transports.remove(&transport_id);
2409 self.bootstrap_transport_npubs.remove(&transport_id);
2410 if let Some(mut handle) = self.transports.remove(&transport_id) {
2411 let _ = handle.stop().await;
2412 }
2413 return Err(err);
2414 }
2415
2416 info!(
2417 peer = %self.peer_display_name(&peer_node_addr),
2418 transport_id = %transport_id,
2419 local_addr = %local_addr,
2420 remote_addr = %traversal.remote_addr,
2421 session_id = %traversal.session_id,
2422 "adopted NAT traversal socket; handshake initiated"
2423 );
2424
2425 Ok(BootstrapHandoffResult {
2426 transport_id,
2427 local_addr,
2428 remote_addr: traversal.remote_addr,
2429 peer_node_addr,
2430 session_id: traversal.session_id,
2431 })
2432 }
2433}