1use super::{Node, NodeError, NodeState};
4use crate::config::{ConnectPolicy, PeerAddress, PeerConfig};
5use crate::discovery::nostr::{
6 ADVERT_IDENTIFIER, ADVERT_VERSION, BootstrapEvent, NostrDiscovery, OverlayAdvert,
7 OverlayEndpointAdvert, OverlayTransportKind,
8};
9use crate::discovery::{BootstrapHandoffResult, EstablishedTraversal};
10use crate::node::acl::PeerAclContext;
11use crate::node::wire::build_msg1;
12use crate::peer::PeerConnection;
13use crate::protocol::{Disconnect, DisconnectReason};
14use crate::transport::{Link, LinkDirection, LinkId, TransportAddr, TransportId, packet_channel};
15use crate::upper::tun::{TunDevice, TunState, run_tun_reader, shutdown_tun_interface};
16use crate::{NodeAddr, PeerIdentity};
17use std::collections::HashSet;
18use std::net::IpAddr;
19use std::thread;
20use std::time::Duration;
21use tracing::{debug, info, warn};
22
23fn is_unroutable_advert_ip(ip: IpAddr) -> bool {
31 match ip {
32 IpAddr::V4(v4) => {
33 v4.is_private()
34 || v4.is_loopback()
35 || v4.is_link_local()
36 || v4.is_unspecified()
37 || v4.is_multicast()
38 || v4.is_broadcast()
39 || v4.is_documentation()
40 || (v4.octets()[0] == 100 && (v4.octets()[1] & 0xc0) == 64)
43 }
44 IpAddr::V6(v6) => {
45 v6.is_loopback()
46 || v6.is_unspecified()
47 || v6.is_unique_local()
48 || v6.is_multicast()
49 || (v6.segments()[0] & 0xffc0) == 0xfe80
51 }
52 }
53}
54
55const OPEN_DISCOVERY_RETRY_LIFETIME_MULTIPLIER: u64 = 2;
56
57impl Node {
58 pub(super) async fn initiate_peer_connections(&mut self) {
63 let peer_identities: Vec<(PeerIdentity, Option<String>)> = self
69 .config
70 .peers()
71 .iter()
72 .filter_map(|pc| {
73 PeerIdentity::from_npub(&pc.npub)
74 .ok()
75 .map(|id| (id, pc.alias.clone()))
76 })
77 .collect();
78
79 for (identity, alias) in peer_identities {
80 let name = alias.unwrap_or_else(|| identity.short_npub());
81 self.peer_aliases.insert(*identity.node_addr(), name);
82 self.register_identity(*identity.node_addr(), identity.pubkey_full());
86 }
87
88 let peer_configs: Vec<_> = self.config.auto_connect_peers().cloned().collect();
90
91 if peer_configs.is_empty() {
92 debug!("No static peers configured");
93 return;
94 }
95
96 debug!(
97 count = peer_configs.len(),
98 "Initiating static peer connections"
99 );
100
101 for peer_config in peer_configs {
102 if let Err(e) = self.initiate_peer_connection(&peer_config).await {
103 warn!(
104 npub = %peer_config.npub,
105 alias = ?peer_config.alias,
106 error = %e,
107 "Failed to initiate peer connection"
108 );
109 if let Ok(peer_identity) = PeerIdentity::from_npub(&peer_config.npub) {
113 self.schedule_retry(*peer_identity.node_addr(), Self::now_ms());
114 }
115 if matches!(e, crate::node::NodeError::NoTransportForType(_))
121 && let Some(bootstrap) = self.nostr_discovery.clone()
122 {
123 let npub = peer_config.npub.clone();
124 tokio::spawn(async move {
125 let _ = bootstrap.refetch_advert_for_stale_check(&npub).await;
126 });
127 }
128 }
129 }
130 }
131
132 pub(super) async fn initiate_peer_connection(
136 &mut self,
137 peer_config: &crate::config::PeerConfig,
138 ) -> Result<(), NodeError> {
139 let peer_identity =
141 PeerIdentity::from_npub(&peer_config.npub).map_err(|e| NodeError::InvalidPeerNpub {
142 npub: peer_config.npub.clone(),
143 reason: e.to_string(),
144 })?;
145
146 let peer_node_addr = *peer_identity.node_addr();
147
148 if self.peers.contains_key(&peer_node_addr) {
150 debug!(
151 npub = %peer_config.npub,
152 "Peer already exists, skipping"
153 );
154 return Ok(());
155 }
156
157 let already_connecting = self.connections.values().any(|conn| {
159 conn.expected_identity()
160 .map(|id| id.node_addr() == &peer_node_addr)
161 .unwrap_or(false)
162 });
163 if already_connecting {
164 debug!(
165 npub = %peer_config.npub,
166 "Connection already in progress, skipping"
167 );
168 return Ok(());
169 }
170
171 self.try_peer_addresses(peer_config, peer_identity, true)
172 .await
173 }
174
175 pub(super) async fn initiate_connection(
186 &mut self,
187 transport_id: TransportId,
188 remote_addr: TransportAddr,
189 peer_identity: PeerIdentity,
190 ) -> Result<(), NodeError> {
191 let peer_node_addr = *peer_identity.node_addr();
192
193 self.authorize_peer(
194 &peer_identity,
195 PeerAclContext::OutboundConnect,
196 transport_id,
197 &remote_addr,
198 )?;
199
200 let is_connection_oriented = self
201 .transports
202 .get(&transport_id)
203 .map(|t| t.transport_type().connection_oriented)
204 .unwrap_or(false);
205
206 let link_id = self.allocate_link_id();
208
209 let link = if is_connection_oriented {
210 Link::new(
211 link_id,
212 transport_id,
213 remote_addr.clone(),
214 LinkDirection::Outbound,
215 Duration::from_millis(self.config.node.base_rtt_ms),
216 )
217 } else {
218 Link::connectionless(
219 link_id,
220 transport_id,
221 remote_addr.clone(),
222 LinkDirection::Outbound,
223 Duration::from_millis(self.config.node.base_rtt_ms),
224 )
225 };
226
227 self.links.insert(link_id, link);
228
229 self.addr_to_link
231 .insert((transport_id, remote_addr.clone()), link_id);
232
233 if is_connection_oriented {
234 if let Some(transport) = self.transports.get(&transport_id) {
236 match transport.connect(&remote_addr).await {
237 Ok(()) => {
238 debug!(
239 peer = %self.peer_display_name(&peer_node_addr),
240 transport_id = %transport_id,
241 remote_addr = %remote_addr,
242 link_id = %link_id,
243 "Transport connect initiated (non-blocking)"
244 );
245 self.pending_connects.push(super::PendingConnect {
246 link_id,
247 transport_id,
248 remote_addr,
249 peer_identity,
250 });
251 }
252 Err(e) => {
253 self.links.remove(&link_id);
255 self.addr_to_link.remove(&(transport_id, remote_addr));
256 return Err(NodeError::TransportError(e.to_string()));
257 }
258 }
259 }
260 Ok(())
261 } else {
262 self.start_handshake(link_id, transport_id, remote_addr, peer_identity)
264 .await
265 }
266 }
267
268 pub(super) async fn start_handshake(
273 &mut self,
274 link_id: LinkId,
275 transport_id: TransportId,
276 remote_addr: TransportAddr,
277 peer_identity: PeerIdentity,
278 ) -> Result<(), NodeError> {
279 let peer_node_addr = *peer_identity.node_addr();
280
281 let current_time_ms = Self::now_ms();
283 let mut connection = PeerConnection::outbound(link_id, peer_identity, current_time_ms);
284
285 let our_index = match self.index_allocator.allocate() {
287 Ok(idx) => idx,
288 Err(e) => {
289 self.links.remove(&link_id);
291 self.addr_to_link.remove(&(transport_id, remote_addr));
292 return Err(NodeError::IndexAllocationFailed(e.to_string()));
293 }
294 };
295
296 let our_keypair = self.identity.keypair();
298 let noise_msg1 =
299 match connection.start_handshake(our_keypair, self.startup_epoch, current_time_ms) {
300 Ok(msg) => msg,
301 Err(e) => {
302 let _ = self.index_allocator.free(our_index);
304 self.links.remove(&link_id);
305 self.addr_to_link.remove(&(transport_id, remote_addr));
306 return Err(NodeError::HandshakeFailed(e.to_string()));
307 }
308 };
309
310 connection.set_our_index(our_index);
312 connection.set_transport_id(transport_id);
313 connection.set_source_addr(remote_addr.clone());
314
315 let wire_msg1 = build_msg1(our_index, &noise_msg1);
317
318 debug!(
319 peer = %self.peer_display_name(&peer_node_addr),
320 transport_id = %transport_id,
321 remote_addr = %remote_addr,
322 link_id = %link_id,
323 our_index = %our_index,
324 "Connection initiated"
325 );
326
327 let resend_interval = self.config.node.rate_limit.handshake_resend_interval_ms;
329 connection.set_handshake_msg1(wire_msg1.clone(), current_time_ms + resend_interval);
330
331 self.pending_outbound
333 .insert((transport_id, our_index.as_u32()), link_id);
334 self.connections.insert(link_id, connection);
335
336 let send_result = match self.transports.get(&transport_id) {
338 Some(transport) => Some(transport.send(&remote_addr, &wire_msg1).await),
339 None => None,
340 };
341 if let Some(send_result) = send_result {
342 self.note_local_send_outcome(&send_result);
343 match send_result {
344 Ok(bytes) => {
345 debug!(
346 link_id = %link_id,
347 our_index = %our_index,
348 bytes,
349 "Sent Noise handshake message 1 (wire format)"
350 );
351 }
352 Err(e) => {
353 warn!(
354 link_id = %link_id,
355 transport_id = %transport_id,
356 remote_addr = %remote_addr,
357 our_index = %our_index,
358 error = %e,
359 "Failed to send handshake message"
360 );
361 if let Some(conn) = self.connections.get_mut(&link_id) {
364 conn.mark_failed();
365 }
366 }
367 }
368 }
369
370 Ok(())
371 }
372
373 pub(super) async fn poll_transport_discovery(&mut self) {
379 let mut to_connect = Vec::new();
381
382 for (transport_id, transport) in &self.transports {
383 if !transport.is_operational() {
384 continue;
385 }
386 if !transport.auto_connect() {
387 let _ = transport.discover();
389 continue;
390 }
391 let discovered = match transport.discover() {
392 Ok(peers) => peers,
393 Err(_) => continue,
394 };
395 for peer in discovered {
396 let pubkey = match peer.pubkey_hint {
397 Some(pk) => pk,
398 None => continue,
399 };
400 let identity = PeerIdentity::from_pubkey(pubkey);
401 let node_addr = *identity.node_addr();
402
403 if node_addr == *self.identity.node_addr() {
405 continue;
406 }
407 if self.peers.contains_key(&node_addr) {
409 continue;
410 }
411 let connecting = self.connections.values().any(|c| {
413 c.expected_identity()
414 .map(|id| id.node_addr() == &node_addr)
415 .unwrap_or(false)
416 });
417 if connecting {
418 continue;
419 }
420
421 to_connect.push((*transport_id, peer.addr, identity));
422 }
423 }
424
425 for (transport_id, remote_addr, identity) in to_connect {
426 info!(
427 peer = %self.peer_display_name(identity.node_addr()),
428 transport_id = %transport_id,
429 remote_addr = %remote_addr,
430 "Auto-connecting to discovered peer"
431 );
432 if let Err(e) = self
433 .initiate_connection(transport_id, remote_addr, identity)
434 .await
435 {
436 warn!(error = %e, "Failed to auto-connect to discovered peer");
437 }
438 }
439 }
440
441 pub(super) async fn poll_nostr_discovery(&mut self) {
442 let Some(bootstrap) = self.nostr_discovery.clone() else {
443 return;
444 };
445
446 if let Err(err) = self.refresh_overlay_advert(&bootstrap).await {
447 debug!(error = %err, "Failed to refresh local Nostr overlay advert");
448 }
449
450 for event in bootstrap.drain_events().await {
451 match event {
452 BootstrapEvent::Established { traversal } => {
453 let peer_npub = traversal.peer_npub.clone();
454 match self.adopt_established_traversal(traversal).await {
455 Ok(_) => {
456 info!(peer_npub = %peer_npub, "Adopted NAT traversal socket");
457 }
458 Err(err) => {
459 warn!(peer_npub = %peer_npub, error = %err, "Failed to adopt NAT traversal");
460 if let Ok(peer_identity) = PeerIdentity::from_npub(&peer_npub) {
461 self.schedule_retry(*peer_identity.node_addr(), Self::now_ms());
462 }
463 }
464 }
465 }
466 BootstrapEvent::Failed {
467 peer_config,
468 reason,
469 } => {
470 let now_ms = Self::now_ms();
471 let decision = bootstrap.record_traversal_failure(&peer_config.npub, now_ms);
472 if decision.should_warn {
473 warn!(
474 npub = %peer_config.npub,
475 error = %reason,
476 consecutive_failures = decision.consecutive_failures,
477 cooldown_secs = decision
478 .cooldown_until_ms
479 .map(|t| t.saturating_sub(now_ms) / 1000),
480 "NAT traversal failed"
481 );
482 } else {
483 debug!(
484 npub = %peer_config.npub,
485 error = %reason,
486 consecutive_failures = decision.consecutive_failures,
487 "NAT traversal failed (suppressed by warn-rate-limit)"
488 );
489 }
490
491 if decision.crossed_threshold {
495 let bootstrap = bootstrap.clone();
496 let npub = peer_config.npub.clone();
497 tokio::spawn(async move {
498 let outcome = bootstrap.refetch_advert_for_stale_check(&npub).await;
499 match outcome {
500 crate::discovery::nostr::NostrRefetchOutcome::Evicted => info!(
501 npub = %npub,
502 "stale-advert sweep: peer evicted from advert cache"
503 ),
504 crate::discovery::nostr::NostrRefetchOutcome::Refreshed => info!(
505 npub = %npub,
506 "stale-advert sweep: peer republished, cache refreshed and streak reset"
507 ),
508 crate::discovery::nostr::NostrRefetchOutcome::SameAdvert => debug!(
509 npub = %npub,
510 "stale-advert sweep: advert unchanged, cooldown stands"
511 ),
512 crate::discovery::nostr::NostrRefetchOutcome::Skipped => debug!(
513 npub = %npub,
514 "stale-advert sweep: skipped (relay error or no advert_relays)"
515 ),
516 }
517 });
518 }
519
520 let peer_identity = match PeerIdentity::from_npub(&peer_config.npub) {
521 Ok(identity) => identity,
522 Err(_) => continue,
523 };
524
525 if self
526 .try_peer_addresses(&peer_config, peer_identity, false)
527 .await
528 .is_ok()
529 {
530 continue;
531 }
532
533 let node_addr = *peer_identity.node_addr();
534 self.schedule_retry(node_addr, now_ms);
535 if let Some(cooldown_until_ms) = decision.cooldown_until_ms
536 && let Some(state) = self.retry_pending.get_mut(&node_addr)
537 {
538 state.retry_after_ms = state.retry_after_ms.max(cooldown_until_ms);
542 }
543 }
544 }
545 }
546
547 self.maybe_run_startup_open_discovery_sweep(&bootstrap)
548 .await;
549 self.queue_open_discovery_retries(&bootstrap).await;
550 }
551
552 pub(super) fn lan_discovery_scope(&self) -> Option<String> {
557 let app = self.config.node.discovery.nostr.app.trim();
558 if app.is_empty() {
559 return None;
560 }
561 if let Some(rest) = app.strip_prefix("fips-overlay-v1:") {
562 let scope = rest.trim();
563 if scope.is_empty() {
564 None
565 } else {
566 Some(scope.to_string())
567 }
568 } else {
569 Some(app.to_string())
570 }
571 }
572
573 pub(super) async fn poll_lan_discovery(&mut self) {
579 let Some(runtime) = self.lan_discovery.clone() else {
580 return;
581 };
582 let events = runtime.drain_events().await;
583 if events.is_empty() {
584 return;
585 }
586 let udp_transport_id = self.find_transport_for_type("udp");
589 let Some(transport_id) = udp_transport_id else {
590 debug!("lan: no operational UDP transport, skipping discovered peers");
591 return;
592 };
593 for event in events {
594 let crate::discovery::lan::LanEvent::Discovered(peer) = event;
595 let identity = match crate::PeerIdentity::from_npub(&peer.npub) {
596 Ok(id) => id,
597 Err(err) => {
598 debug!(npub = %peer.npub, error = %err, "lan: skip bad npub");
599 continue;
600 }
601 };
602 let peer_node_addr = *identity.node_addr();
603 if self.peers.contains_key(&peer_node_addr) {
604 continue;
605 }
606 let already_connecting = self.connections.values().any(|conn| {
607 conn.expected_identity()
608 .map(|id| id.node_addr() == &peer_node_addr)
609 .unwrap_or(false)
610 });
611 if already_connecting {
612 continue;
613 }
614 let remote_addr = crate::transport::TransportAddr::from_string(&peer.addr.to_string());
615 info!(
616 npub = %identity.short_npub(),
617 addr = %peer.addr,
618 "lan: initiating handshake to discovered peer"
619 );
620 if let Err(err) = self
621 .initiate_connection(transport_id, remote_addr, identity)
622 .await
623 {
624 debug!(
625 npub = %peer.npub,
626 error = %err,
627 "lan: failed to initiate connection to discovered peer"
628 );
629 }
630 }
631 }
632
633 pub(super) async fn poll_pending_connects(&mut self) {
640 if self.pending_connects.is_empty() {
641 return;
642 }
643
644 let mut completed = Vec::new();
645
646 for (i, pending) in self.pending_connects.iter().enumerate() {
647 let state = if let Some(transport) = self.transports.get(&pending.transport_id) {
648 transport.connection_state(&pending.remote_addr)
649 } else {
650 crate::transport::ConnectionState::Failed("transport removed".into())
651 };
652
653 match state {
654 crate::transport::ConnectionState::Connected => {
655 completed.push((i, true, None));
656 }
657 crate::transport::ConnectionState::Failed(reason) => {
658 completed.push((i, false, Some(reason)));
659 }
660 crate::transport::ConnectionState::Connecting => {
661 }
663 crate::transport::ConnectionState::None => {
664 completed.push((i, false, Some("no connection attempt found".into())));
666 }
667 }
668 }
669
670 for (i, success, reason) in completed.into_iter().rev() {
672 let pending = self.pending_connects.remove(i);
673
674 if success {
675 if let Some(link) = self.links.get_mut(&pending.link_id) {
677 link.set_connected();
678 }
679
680 debug!(
681 peer = %self.peer_display_name(pending.peer_identity.node_addr()),
682 transport_id = %pending.transport_id,
683 remote_addr = %pending.remote_addr,
684 link_id = %pending.link_id,
685 "Transport connected, starting handshake"
686 );
687
688 if let Err(e) = self
690 .start_handshake(
691 pending.link_id,
692 pending.transport_id,
693 pending.remote_addr.clone(),
694 pending.peer_identity,
695 )
696 .await
697 {
698 warn!(
699 link_id = %pending.link_id,
700 error = %e,
701 "Failed to start handshake after transport connect"
702 );
703 self.remove_link(&pending.link_id);
705 }
706 } else {
707 let reason = reason.unwrap_or_default();
708 warn!(
709 peer = %self.peer_display_name(pending.peer_identity.node_addr()),
710 transport_id = %pending.transport_id,
711 remote_addr = %pending.remote_addr,
712 link_id = %pending.link_id,
713 reason = %reason,
714 "Transport connect failed"
715 );
716
717 self.remove_link(&pending.link_id);
719 self.links.remove(&pending.link_id);
720 self.schedule_retry(*pending.peer_identity.node_addr(), Self::now_ms());
721 }
722 }
723 }
724
725 pub async fn start(&mut self) -> Result<(), NodeError> {
732 if !self.state.can_start() {
733 return Err(NodeError::AlreadyStarted);
734 }
735 self.state = NodeState::Starting;
736
737 let packet_buffer_size = self.config.node.buffers.packet_channel;
739 let (packet_tx, packet_rx) = packet_channel(packet_buffer_size);
740 self.packet_tx = Some(packet_tx.clone());
741 self.packet_rx = Some(packet_rx);
742
743 let transport_handles = self.create_transports(&packet_tx).await;
745
746 for mut handle in transport_handles {
747 let transport_id = handle.transport_id();
748 let transport_type = handle.transport_type().name;
749 let name = handle.name().map(|s| s.to_string());
750
751 match handle.start().await {
752 Ok(()) => {
753 self.transports.insert(transport_id, handle);
754 }
755 Err(e) => {
756 if let Some(ref n) = name {
757 warn!(transport_type, name = %n, error = %e, "Transport failed to start");
758 } else {
759 warn!(transport_type, error = %e, "Transport failed to start");
760 }
761 }
762 }
763 }
764
765 if !self.transports.is_empty() {
766 info!(count = self.transports.len(), "Transports initialized");
767 }
768
769 #[cfg(unix)]
785 {
786 let cpu_default = std::thread::available_parallelism()
787 .map(|n| n.get())
788 .unwrap_or(1)
789 .max(1);
790 let encrypt_worker_count: usize = std::env::var("FIPS_ENCRYPT_WORKERS")
791 .ok()
792 .and_then(|s| s.parse().ok())
793 .unwrap_or(cpu_default)
794 .max(1);
795 self.encrypt_workers = Some(super::encrypt_worker::EncryptWorkerPool::spawn(
796 encrypt_worker_count,
797 ));
798 info!(
799 workers = encrypt_worker_count,
800 "Spawned FMP-encrypt worker pool"
801 );
802
803 let decrypt_worker_count: usize = std::env::var("FIPS_DECRYPT_WORKERS")
812 .ok()
813 .and_then(|s| s.parse().ok())
814 .unwrap_or(cpu_default);
815 if decrypt_worker_count == 0 {
816 info!("FIPS_DECRYPT_WORKERS=0 → in-line decrypt in rx_loop (no worker pool)");
817 } else {
818 self.decrypt_workers = Some(super::decrypt_worker::DecryptWorkerPool::spawn(
819 decrypt_worker_count,
820 ));
821 info!(
822 workers = decrypt_worker_count,
823 "Spawned FMP+FSP-decrypt worker pool"
824 );
825 }
826 }
827
828 if self.config.node.discovery.nostr.enabled {
829 match NostrDiscovery::start(&self.identity, self.config.node.discovery.nostr.clone())
830 .await
831 {
832 Ok(runtime) => {
833 if let Err(err) = self.refresh_overlay_advert(&runtime).await {
834 warn!(error = %err, "Failed to publish initial Nostr overlay advert");
835 }
836 self.nostr_discovery = Some(runtime);
837 self.nostr_discovery_started_at_ms = Some(Self::now_ms());
838 info!("Nostr overlay discovery enabled");
839 }
840 Err(err) => {
841 warn!(error = %err, "Failed to start Nostr overlay discovery");
842 }
843 }
844 }
845
846 if self.config.node.discovery.lan.enabled {
850 let advertised_udp_port = self
851 .transports
852 .values()
853 .filter(|h| h.is_operational())
854 .filter(|h| h.transport_type().name == "udp")
855 .find_map(|h| h.local_addr().map(|addr| addr.port()))
856 .unwrap_or(0);
857 let scope = self.lan_discovery_scope();
858 match crate::discovery::lan::LanDiscovery::start(
859 &self.identity,
860 scope,
861 advertised_udp_port,
862 self.config.node.discovery.lan.clone(),
863 )
864 .await
865 {
866 Ok(runtime) => {
867 self.lan_discovery = Some(runtime);
868 info!("LAN mDNS discovery enabled");
869 }
870 Err(err) => {
871 debug!(error = %err, "LAN mDNS discovery not started");
872 }
873 }
874 }
875
876 self.initiate_peer_connections().await;
879
880 if self.config.tun.enabled {
882 let address = *self.identity.address();
883 match TunDevice::create(&self.config.tun, address).await {
884 Ok(device) => {
885 let mtu = device.mtu();
886 let name = device.name().to_string();
887 let our_addr = *device.address();
888
889 info!("TUN device active:");
890 info!(" name: {}", name);
891 info!(" address: {}", device.address());
892 info!(" mtu: {}", mtu);
893
894 let effective_mtu = self.effective_ipv6_mtu();
896 let max_mss = effective_mtu.saturating_sub(40).saturating_sub(20); info!("effective MTU: {} bytes", effective_mtu);
899 debug!(" max TCP MSS: {} bytes", max_mss);
900
901 #[cfg(target_os = "macos")]
905 let (shutdown_read_fd, shutdown_write_fd) = {
906 let mut fds = [0i32; 2];
907 if unsafe { libc::pipe(fds.as_mut_ptr()) } < 0 {
908 return Err(NodeError::Tun(crate::upper::tun::TunError::Configure(
909 "failed to create shutdown pipe".into(),
910 )));
911 }
912 (fds[0], fds[1])
913 };
914
915 let (writer, tun_tx) =
919 device.create_writer(max_mss, self.path_mtu_lookup.clone())?;
920
921 let writer_handle = thread::spawn(move || {
923 writer.run();
924 });
925
926 let reader_tun_tx = tun_tx.clone();
928
929 let tun_channel_size = self.config.node.buffers.tun_channel;
931 let (outbound_tx, outbound_rx) = tokio::sync::mpsc::channel(tun_channel_size);
932
933 let transport_mtu = self.transport_mtu();
935 let path_mtu_lookup = self.path_mtu_lookup.clone();
936 #[cfg(target_os = "macos")]
937 let reader_handle = thread::spawn(move || {
938 run_tun_reader(
939 device,
940 mtu,
941 our_addr,
942 reader_tun_tx,
943 outbound_tx,
944 transport_mtu,
945 path_mtu_lookup,
946 shutdown_read_fd,
947 );
948 });
949 #[cfg(not(target_os = "macos"))]
950 let reader_handle = thread::spawn(move || {
951 run_tun_reader(
952 device,
953 mtu,
954 our_addr,
955 reader_tun_tx,
956 outbound_tx,
957 transport_mtu,
958 path_mtu_lookup,
959 );
960 });
961
962 self.tun_state = TunState::Active;
963 self.tun_name = Some(name);
964 self.tun_tx = Some(tun_tx);
965 self.tun_outbound_rx = Some(outbound_rx);
966 self.tun_reader_handle = Some(reader_handle);
967 self.tun_writer_handle = Some(writer_handle);
968 #[cfg(target_os = "macos")]
969 {
970 self.tun_shutdown_fd = Some(shutdown_write_fd);
971 }
972 }
973 Err(e) => {
974 self.tun_state = TunState::Failed;
975 warn!(error = %e, "Failed to initialize TUN, continuing without it");
976 }
977 }
978 }
979
980 if self.config.dns.enabled {
997 let addr_str = self.config.dns.bind_addr();
998 match addr_str.parse::<std::net::IpAddr>() {
999 Ok(ip) => {
1000 let bind = std::net::SocketAddr::new(ip, self.config.dns.port());
1001 match Self::bind_dns_socket(bind) {
1002 Ok(socket) => {
1003 let dns_channel_size = self.config.node.buffers.dns_channel;
1004 let (identity_tx, identity_rx) =
1005 tokio::sync::mpsc::channel(dns_channel_size);
1006 let dns_ttl = self.config.dns.ttl();
1007 let base_hosts = crate::upper::hosts::HostMap::from_peer_configs(
1008 self.config.peers(),
1009 );
1010 let reloader = if self.config.node.system_files_enabled {
1011 let hosts_path = std::path::PathBuf::from(
1012 crate::upper::hosts::DEFAULT_HOSTS_PATH,
1013 );
1014 crate::upper::hosts::HostMapReloader::new(base_hosts, hosts_path)
1015 } else {
1016 crate::upper::hosts::HostMapReloader::memory_only(base_hosts)
1017 };
1018 let mesh_ifindex = Self::lookup_mesh_ifindex(self.config.tun.name());
1026 info!(
1027 bind = %bind,
1028 hosts = reloader.hosts().len(),
1029 mesh_ifindex = ?mesh_ifindex,
1030 "DNS responder started for .fips domain (auto-reload enabled)"
1031 );
1032 let handle = tokio::spawn(crate::upper::dns::run_dns_responder(
1033 socket,
1034 identity_tx,
1035 dns_ttl,
1036 reloader,
1037 mesh_ifindex,
1038 ));
1039 self.dns_identity_rx = Some(identity_rx);
1040 self.dns_task = Some(handle);
1041 }
1042 Err(e) => {
1043 warn!(bind = %bind, error = %e, "Failed to start DNS responder");
1044 }
1045 }
1046 }
1047 Err(e) => {
1048 warn!(addr = %addr_str, error = %e, "Invalid dns.bind_addr; DNS responder not started");
1049 }
1050 }
1051 }
1052
1053 self.state = NodeState::Running;
1054 info!("Node started:");
1055 info!(" state: {}", self.state);
1056 info!(" transports: {}", self.transports.len());
1057 info!(" connections: {}", self.connections.len());
1058 Ok(())
1059 }
1060
1061 fn bind_dns_socket(
1074 addr: std::net::SocketAddr,
1075 ) -> Result<tokio::net::UdpSocket, std::io::Error> {
1076 use socket2::{Domain, Protocol, Socket, Type};
1077 let domain = if addr.is_ipv4() {
1078 Domain::IPV4
1079 } else {
1080 Domain::IPV6
1081 };
1082 let sock = Socket::new(domain, Type::DGRAM, Some(Protocol::UDP))?;
1083 if addr.is_ipv6() {
1084 sock.set_only_v6(false)?;
1085 #[cfg(unix)]
1086 Self::set_recv_pktinfo_v6(&sock)?;
1087 }
1088 sock.set_nonblocking(true)?;
1089 sock.bind(&addr.into())?;
1090 tokio::net::UdpSocket::from_std(sock.into())
1091 }
1092
1093 #[cfg(unix)]
1099 fn set_recv_pktinfo_v6(sock: &socket2::Socket) -> Result<(), std::io::Error> {
1100 use std::os::fd::AsRawFd;
1101 let enable: libc::c_int = 1;
1102 let ret = unsafe {
1103 libc::setsockopt(
1104 sock.as_raw_fd(),
1105 libc::IPPROTO_IPV6,
1106 libc::IPV6_RECVPKTINFO,
1107 &enable as *const _ as *const libc::c_void,
1108 std::mem::size_of::<libc::c_int>() as libc::socklen_t,
1109 )
1110 };
1111 if ret < 0 {
1112 return Err(std::io::Error::last_os_error());
1113 }
1114 Ok(())
1115 }
1116
1117 fn lookup_mesh_ifindex(name: &str) -> Option<u32> {
1124 #[cfg(unix)]
1125 {
1126 let c_name = std::ffi::CString::new(name).ok()?;
1127 let idx = unsafe { libc::if_nametoindex(c_name.as_ptr()) };
1128 if idx == 0 { None } else { Some(idx) }
1129 }
1130 #[cfg(not(unix))]
1131 {
1132 let _ = name;
1133 None
1134 }
1135 }
1136
1137 pub async fn stop(&mut self) -> Result<(), NodeError> {
1142 if !self.state.can_stop() {
1143 return Err(NodeError::NotStarted);
1144 }
1145 self.state = NodeState::Stopping;
1146 info!(state = %self.state, "Node stopping");
1147
1148 if let Some(handle) = self.dns_task.take() {
1150 handle.abort();
1151 debug!("DNS responder stopped");
1152 }
1153
1154 self.send_disconnect_to_all_peers(DisconnectReason::Shutdown)
1156 .await;
1157
1158 if let Some(bootstrap) = self.nostr_discovery.take()
1160 && let Err(e) = bootstrap.shutdown().await
1161 {
1162 warn!(error = %e, "Failed to shutdown Nostr overlay discovery");
1163 }
1164
1165 if let Some(lan) = self.lan_discovery.take() {
1169 lan.shutdown().await;
1170 }
1171
1172 let transport_ids: Vec<_> = self.transports.keys().cloned().collect();
1174 for transport_id in transport_ids {
1175 if let Some(mut handle) = self.transports.remove(&transport_id) {
1176 let transport_type = handle.transport_type().name;
1177 match handle.stop().await {
1178 Ok(()) => {
1179 info!(transport_id = %transport_id, transport_type, "Transport stopped");
1180 }
1181 Err(e) => {
1182 warn!(
1183 transport_id = %transport_id,
1184 transport_type,
1185 error = %e,
1186 "Transport stop failed"
1187 );
1188 }
1189 }
1190 }
1191 }
1192
1193 self.packet_tx.take();
1195 self.packet_rx.take();
1196
1197 if let Some(name) = self.tun_name.take() {
1199 info!(name = %name, "Shutting down TUN interface");
1200
1201 self.tun_tx.take();
1203
1204 if let Err(e) = shutdown_tun_interface(&name).await {
1206 warn!(name = %name, error = %e, "Failed to shutdown TUN interface");
1207 }
1208
1209 #[cfg(target_os = "macos")]
1212 if let Some(fd) = self.tun_shutdown_fd.take() {
1213 unsafe {
1214 libc::write(fd, b"x".as_ptr() as *const libc::c_void, 1);
1215 libc::close(fd);
1216 }
1217 }
1218
1219 if let Some(handle) = self.tun_reader_handle.take() {
1221 let _ = handle.join();
1222 }
1223 if let Some(handle) = self.tun_writer_handle.take() {
1224 let _ = handle.join();
1225 }
1226
1227 self.tun_state = TunState::Disabled;
1228 }
1229
1230 self.state = NodeState::Stopped;
1231 info!(state = %self.state, "Node stopped");
1232 Ok(())
1233 }
1234
1235 async fn send_disconnect_to_all_peers(&mut self, reason: DisconnectReason) {
1240 let disconnect = Disconnect::new(reason);
1241 let plaintext = disconnect.encode();
1242
1243 let peer_addrs: Vec<NodeAddr> = self
1245 .peers
1246 .iter()
1247 .filter(|(_, peer)| peer.can_send() && peer.has_session())
1248 .map(|(addr, _)| *addr)
1249 .collect();
1250
1251 if peer_addrs.is_empty() {
1252 debug!(
1253 total_peers = self.peers.len(),
1254 "No sendable peers for disconnect notification"
1255 );
1256 return;
1257 }
1258
1259 let mut sent = 0usize;
1260 for node_addr in &peer_addrs {
1261 match self
1262 .send_encrypted_link_message(node_addr, &plaintext)
1263 .await
1264 {
1265 Ok(()) => sent += 1,
1266 Err(e) => {
1267 debug!(
1268 peer = %self.peer_display_name(node_addr),
1269 error = %e,
1270 "Failed to send disconnect (transport may be down)"
1271 );
1272 }
1273 }
1274 }
1275
1276 info!(sent, total = peer_addrs.len(), reason = %reason, "Sent disconnect notifications");
1277 }
1278
1279 fn static_peer_addresses(&self, peer_config: &PeerConfig) -> Vec<PeerAddress> {
1280 peer_config
1281 .addresses_by_priority()
1282 .into_iter()
1283 .cloned()
1284 .collect()
1285 }
1286
1287 async fn nostr_peer_fallback_addresses(
1288 &self,
1289 peer_config: &PeerConfig,
1290 existing: &[PeerAddress],
1291 ) -> Vec<PeerAddress> {
1292 if !self.config.node.discovery.nostr.enabled
1293 || self.config.node.discovery.nostr.policy
1294 == crate::config::NostrDiscoveryPolicy::Disabled
1295 {
1296 return Vec::new();
1297 }
1298
1299 let Some(bootstrap) = self.nostr_discovery.clone() else {
1300 return Vec::new();
1301 };
1302 let endpoints = match bootstrap.advert_endpoints_for_peer(&peer_config.npub).await {
1303 Ok(endpoints) => endpoints,
1304 Err(err) => {
1305 debug!(
1306 npub = %peer_config.npub,
1307 error = %err,
1308 "Failed to resolve Nostr advert endpoints for configured peer"
1309 );
1310 return Vec::new();
1311 }
1312 };
1313
1314 let mut fallback = Vec::new();
1315 let mut next_priority = existing
1316 .iter()
1317 .map(|addr| addr.priority)
1318 .max()
1319 .unwrap_or(100)
1320 .saturating_add(1);
1321 for endpoint in endpoints {
1322 let Some(candidate) = Self::overlay_endpoint_to_peer_address(&endpoint, next_priority)
1323 else {
1324 continue;
1325 };
1326 if existing
1327 .iter()
1328 .any(|addr| addr.transport == candidate.transport && addr.addr == candidate.addr)
1329 || fallback.iter().any(|addr: &PeerAddress| {
1330 addr.transport == candidate.transport && addr.addr == candidate.addr
1331 })
1332 {
1333 continue;
1334 }
1335 fallback.push(candidate);
1336 next_priority = next_priority.saturating_add(1);
1337 }
1338 fallback
1339 }
1340
1341 fn overlay_endpoint_to_peer_address(
1342 endpoint: &OverlayEndpointAdvert,
1343 priority: u8,
1344 ) -> Option<PeerAddress> {
1345 let transport = match endpoint.transport {
1346 OverlayTransportKind::Udp => "udp",
1347 OverlayTransportKind::Tcp => "tcp",
1348 OverlayTransportKind::Tor => "tor",
1349 };
1350 Some(PeerAddress::with_priority(
1351 transport,
1352 endpoint.addr.clone(),
1353 priority,
1354 ))
1355 }
1356
1357 async fn attempt_peer_address_list(
1358 &mut self,
1359 peer_config: &PeerConfig,
1360 peer_identity: PeerIdentity,
1361 allow_bootstrap_nat: bool,
1362 addresses: &[PeerAddress],
1363 ) -> Result<(), NodeError> {
1364 for addr in addresses {
1365 if addr.transport == "udp" && addr.addr.eq_ignore_ascii_case("nat") {
1366 if !allow_bootstrap_nat {
1367 continue;
1368 }
1369 let Some(bootstrap) = self.nostr_discovery.clone() else {
1370 debug!(npub = %peer_config.npub, "No Nostr overlay runtime for udp:nat address");
1371 continue;
1372 };
1373 bootstrap.request_connect(peer_config.clone()).await;
1374 info!(npub = %peer_config.npub, "Started Nostr UDP NAT traversal attempt");
1375 return Ok(());
1376 }
1377
1378 let (transport_id, remote_addr) = if addr.transport == "ethernet" {
1379 match self.resolve_ethernet_addr(&addr.addr) {
1380 Ok(result) => result,
1381 Err(e) => {
1382 debug!(
1383 transport = %addr.transport,
1384 addr = %addr.addr,
1385 error = %e,
1386 "Failed to resolve Ethernet address"
1387 );
1388 continue;
1389 }
1390 }
1391 } else if addr.transport == "ble" {
1392 #[cfg(bluer_available)]
1393 {
1394 match self.resolve_ble_addr(&addr.addr) {
1395 Ok(result) => result,
1396 Err(e) => {
1397 debug!(
1398 transport = %addr.transport,
1399 addr = %addr.addr,
1400 error = %e,
1401 "Failed to resolve BLE address"
1402 );
1403 continue;
1404 }
1405 }
1406 }
1407 #[cfg(not(bluer_available))]
1408 {
1409 debug!(transport = %addr.transport, "BLE transport not available on this build");
1410 continue;
1411 }
1412 } else {
1413 let tid = match self.find_transport_for_type(&addr.transport) {
1414 Some(id) => id,
1415 None => {
1416 debug!(
1417 transport = %addr.transport,
1418 addr = %addr.addr,
1419 "No operational transport for address type"
1420 );
1421 continue;
1422 }
1423 };
1424 (tid, TransportAddr::from_string(&addr.addr))
1425 };
1426
1427 match self
1428 .initiate_connection(transport_id, remote_addr, peer_identity)
1429 .await
1430 {
1431 Ok(()) => return Ok(()),
1432 Err(e @ NodeError::AccessDenied(_)) => return Err(e),
1433 Err(e) => {
1434 debug!(
1435 npub = %peer_config.npub,
1436 transport_id = %transport_id,
1437 error = %e,
1438 "Connection attempt failed, trying next address"
1439 );
1440 }
1441 }
1442 }
1443
1444 Err(NodeError::NoTransportForType(format!(
1445 "no operational transport for any of {}'s addresses",
1446 peer_config.npub
1447 )))
1448 }
1449
1450 async fn queue_open_discovery_retries(&mut self, bootstrap: &std::sync::Arc<NostrDiscovery>) {
1451 self.run_open_discovery_sweep(bootstrap, None, "per-tick")
1452 .await;
1453 }
1454
1455 pub(in crate::node) async fn run_open_discovery_sweep(
1466 &mut self,
1467 bootstrap: &std::sync::Arc<NostrDiscovery>,
1468 max_age_secs: Option<u64>,
1469 caller: &'static str,
1470 ) {
1471 if !self.config.node.discovery.nostr.enabled
1472 || self.config.node.discovery.nostr.policy != crate::config::NostrDiscoveryPolicy::Open
1473 {
1474 return;
1475 }
1476
1477 let configured_npubs = self
1478 .config
1479 .peers()
1480 .iter()
1481 .map(|peer| peer.npub.clone())
1482 .collect::<HashSet<_>>();
1483 let now_ms = Self::now_ms();
1484 let now_secs = now_ms / 1000;
1485 let mut enqueue_budget = self.open_discovery_enqueue_budget(&configured_npubs);
1486 if enqueue_budget == 0 {
1487 debug!(
1488 caller = %caller,
1489 "open-discovery sweep: enqueue budget is 0, skipping"
1490 );
1491 return;
1492 }
1493
1494 let candidates = bootstrap.cached_open_discovery_candidates(64).await;
1495 let cached_count = candidates.len();
1496 let mut enqueued = 0usize;
1497 let mut skipped_age = 0usize;
1498 let mut skipped_configured = 0usize;
1499 let mut skipped_self = 0usize;
1500 let mut skipped_connected = 0usize;
1501 let mut skipped_retry_pending = 0usize;
1502 let mut skipped_connecting = 0usize;
1503 let mut skipped_no_endpoints = 0usize;
1504 let mut skipped_invalid_npub = 0usize;
1505 let mut skipped_cooldown = 0usize;
1506
1507 for (npub, endpoints, created_at_secs) in candidates {
1508 if enqueue_budget == 0 {
1509 break;
1510 }
1511
1512 if let Some(max_age) = max_age_secs
1513 && now_secs.saturating_sub(created_at_secs) > max_age
1514 {
1515 skipped_age = skipped_age.saturating_add(1);
1516 continue;
1517 }
1518
1519 if configured_npubs.contains(&npub) {
1520 skipped_configured = skipped_configured.saturating_add(1);
1521 continue;
1522 }
1523
1524 let peer_identity = match PeerIdentity::from_npub(&npub) {
1525 Ok(identity) => identity,
1526 Err(_) => {
1527 skipped_invalid_npub = skipped_invalid_npub.saturating_add(1);
1528 continue;
1529 }
1530 };
1531 let node_addr = *peer_identity.node_addr();
1532 if node_addr == *self.identity.node_addr() {
1533 skipped_self = skipped_self.saturating_add(1);
1534 continue;
1535 }
1536 if self.peers.contains_key(&node_addr) {
1537 skipped_connected = skipped_connected.saturating_add(1);
1538 continue;
1539 }
1540 if self.retry_pending.contains_key(&node_addr) {
1541 skipped_retry_pending = skipped_retry_pending.saturating_add(1);
1542 continue;
1543 }
1544 if bootstrap.cooldown_until(&npub, now_ms).is_some() {
1545 skipped_cooldown = skipped_cooldown.saturating_add(1);
1546 continue;
1547 }
1548 let connecting = self.connections.values().any(|conn| {
1549 conn.expected_identity()
1550 .map(|id| id.node_addr() == &node_addr)
1551 .unwrap_or(false)
1552 });
1553 if connecting {
1554 skipped_connecting = skipped_connecting.saturating_add(1);
1555 continue;
1556 }
1557
1558 let mut addresses = Vec::new();
1559 let mut priority = 120u8;
1560 for endpoint in endpoints {
1561 let Some(candidate) = Self::overlay_endpoint_to_peer_address(&endpoint, priority)
1562 else {
1563 continue;
1564 };
1565 if addresses.iter().any(|existing: &PeerAddress| {
1566 existing.transport == candidate.transport && existing.addr == candidate.addr
1567 }) {
1568 continue;
1569 }
1570 addresses.push(candidate);
1571 priority = priority.saturating_add(1);
1572 }
1573 if addresses.is_empty() {
1574 skipped_no_endpoints = skipped_no_endpoints.saturating_add(1);
1575 continue;
1576 }
1577
1578 self.peer_aliases
1579 .entry(node_addr)
1580 .or_insert_with(|| peer_identity.short_npub());
1581 self.register_identity(node_addr, peer_identity.pubkey_full());
1582
1583 let mut state = super::retry::RetryState::new(PeerConfig {
1584 npub: npub.clone(),
1585 alias: None,
1586 addresses,
1587 connect_policy: ConnectPolicy::AutoConnect,
1588 auto_reconnect: true,
1589 });
1590 state.reconnect = false;
1591 state.retry_after_ms = now_ms;
1592 state.expires_at_ms = Some(self.open_discovery_retry_expires_at_ms(now_ms));
1593 self.retry_pending.insert(node_addr, state);
1594 info!(
1595 caller = %caller,
1596 peer = %peer_identity.short_npub(),
1597 advert_age_secs = now_secs.saturating_sub(created_at_secs),
1598 "open-discovery sweep: queued retry for cached advert"
1599 );
1600 enqueue_budget = enqueue_budget.saturating_sub(1);
1601 enqueued = enqueued.saturating_add(1);
1602 }
1603
1604 let total_skipped = skipped_age
1608 + skipped_configured
1609 + skipped_self
1610 + skipped_connected
1611 + skipped_retry_pending
1612 + skipped_connecting
1613 + skipped_no_endpoints
1614 + skipped_invalid_npub
1615 + skipped_cooldown;
1616 let should_summarize = caller == "startup" || enqueued > 0;
1617 if should_summarize {
1618 info!(
1619 caller = %caller,
1620 cached = cached_count,
1621 queued = enqueued,
1622 skipped_age = skipped_age,
1623 skipped_configured = skipped_configured,
1624 skipped_self = skipped_self,
1625 skipped_connected = skipped_connected,
1626 skipped_retry_pending = skipped_retry_pending,
1627 skipped_connecting = skipped_connecting,
1628 skipped_no_endpoints = skipped_no_endpoints,
1629 skipped_invalid_npub = skipped_invalid_npub,
1630 skipped_cooldown = skipped_cooldown,
1631 skipped_total = total_skipped,
1632 "open-discovery sweep complete"
1633 );
1634 }
1635 }
1636
1637 async fn maybe_run_startup_open_discovery_sweep(
1645 &mut self,
1646 bootstrap: &std::sync::Arc<NostrDiscovery>,
1647 ) {
1648 if self.startup_open_discovery_sweep_done {
1649 return;
1650 }
1651 if !self.config.node.discovery.nostr.enabled
1652 || self.config.node.discovery.nostr.policy != crate::config::NostrDiscoveryPolicy::Open
1653 {
1654 self.startup_open_discovery_sweep_done = true;
1656 return;
1657 }
1658 let Some(started_at_ms) = self.nostr_discovery_started_at_ms else {
1659 return;
1660 };
1661 let now_ms = Self::now_ms();
1662 let delay_ms = self
1663 .config
1664 .node
1665 .discovery
1666 .nostr
1667 .startup_sweep_delay_secs
1668 .saturating_mul(1000);
1669 if now_ms < started_at_ms.saturating_add(delay_ms) {
1670 return;
1671 }
1672
1673 let max_age_secs = self.config.node.discovery.nostr.startup_sweep_max_age_secs;
1674 self.run_open_discovery_sweep(bootstrap, Some(max_age_secs), "startup")
1675 .await;
1676 self.startup_open_discovery_sweep_done = true;
1677 }
1678
1679 fn available_outbound_slots(&self) -> usize {
1680 let connection_used = self
1681 .connections
1682 .len()
1683 .saturating_add(self.pending_connects.len());
1684 let connection_slots = if self.max_connections == 0 {
1685 usize::MAX
1686 } else {
1687 self.max_connections.saturating_sub(connection_used)
1688 };
1689
1690 let peer_slots = if self.max_peers == 0 {
1691 usize::MAX
1692 } else {
1693 self.max_peers.saturating_sub(self.peers.len())
1694 };
1695
1696 connection_slots.min(peer_slots)
1697 }
1698
1699 fn open_discovery_enqueue_budget(&self, configured_npubs: &HashSet<String>) -> usize {
1700 let current_open_discovery_pending = self
1701 .retry_pending
1702 .values()
1703 .filter(|state| !configured_npubs.contains(&state.peer_config.npub))
1704 .count();
1705
1706 let cap_remaining = self
1707 .config
1708 .node
1709 .discovery
1710 .nostr
1711 .open_discovery_max_pending
1712 .saturating_sub(current_open_discovery_pending);
1713
1714 cap_remaining.min(self.available_outbound_slots())
1715 }
1716
1717 fn open_discovery_retry_expires_at_ms(&self, now_ms: u64) -> u64 {
1718 now_ms.saturating_add(
1719 self.config
1720 .node
1721 .discovery
1722 .nostr
1723 .advert_ttl_secs
1724 .saturating_mul(1000)
1725 .saturating_mul(OPEN_DISCOVERY_RETRY_LIFETIME_MULTIPLIER),
1726 )
1727 }
1728
1729 async fn build_overlay_advert(
1730 &self,
1731 bootstrap: &std::sync::Arc<NostrDiscovery>,
1732 ) -> Option<OverlayAdvert> {
1733 if !self.config.node.discovery.nostr.enabled {
1734 return None;
1735 }
1736
1737 let mut endpoints = Vec::new();
1738 let mut has_udp_nat = false;
1739
1740 for handle in self.transports.values() {
1741 if !handle.is_operational() {
1742 continue;
1743 }
1744
1745 match handle.transport_type().name {
1746 "udp" => {
1747 let Some(cfg) = self.lookup_udp_config(handle.name()) else {
1748 continue;
1749 };
1750 if !cfg.advertise_on_nostr() {
1751 continue;
1752 }
1753 if cfg.is_public() {
1754 if let Some(explicit) = cfg.external_advert_addr() {
1764 endpoints.push(OverlayEndpointAdvert {
1765 transport: OverlayTransportKind::Udp,
1766 addr: explicit.to_string(),
1767 });
1768 } else {
1769 match handle.local_addr() {
1770 Some(addr)
1771 if !addr.ip().is_unspecified()
1772 && !is_unroutable_advert_ip(addr.ip()) =>
1773 {
1774 endpoints.push(OverlayEndpointAdvert {
1775 transport: OverlayTransportKind::Udp,
1776 addr: addr.to_string(),
1777 });
1778 }
1779 Some(addr) => {
1780 let key = handle.transport_id().as_u32();
1781 let port = addr.port();
1782 if let Some(public) =
1783 bootstrap.learn_public_udp_addr(key, port).await
1784 {
1785 endpoints.push(OverlayEndpointAdvert {
1786 transport: OverlayTransportKind::Udp,
1787 addr: public.to_string(),
1788 });
1789 } else {
1790 warn!(
1791 transport_id = key,
1792 bind_addr = %addr,
1793 "advert: udp public=true but bind is wildcard \
1794 or private and STUN observation failed; \
1795 advertising no UDP endpoint. Either set \
1796 transports.udp.external_addr, bind to a \
1797 specific *public* IP, or ensure \
1798 node.discovery.nostr.stun_servers is reachable"
1799 );
1800 }
1801 }
1802 None => {}
1803 }
1804 }
1805 } else {
1806 endpoints.push(OverlayEndpointAdvert {
1807 transport: OverlayTransportKind::Udp,
1808 addr: "nat".to_string(),
1809 });
1810 has_udp_nat = true;
1811 }
1812 }
1813 "tcp" => {
1814 let Some(cfg) = self.lookup_tcp_config(handle.name()) else {
1815 continue;
1816 };
1817 if !cfg.advertise_on_nostr() {
1818 continue;
1819 }
1820 if let Some(explicit) = cfg.external_advert_addr() {
1832 endpoints.push(OverlayEndpointAdvert {
1833 transport: OverlayTransportKind::Tcp,
1834 addr: explicit.to_string(),
1835 });
1836 } else {
1837 match handle.local_addr() {
1838 Some(addr)
1839 if !addr.ip().is_unspecified()
1840 && !is_unroutable_advert_ip(addr.ip()) =>
1841 {
1842 endpoints.push(OverlayEndpointAdvert {
1843 transport: OverlayTransportKind::Tcp,
1844 addr: addr.to_string(),
1845 });
1846 }
1847 Some(addr) => {
1848 warn!(
1849 bind_addr = %addr,
1850 "advert: tcp advertise_on_nostr=true bound to wildcard \
1851 or private IP and no transports.tcp.external_addr set; \
1852 advertising no TCP endpoint. Either set external_addr \
1853 to the public IP (recommended for cloud 1:1-NAT setups) \
1854 or bind explicitly to the public IP"
1855 );
1856 }
1857 None => {}
1858 }
1859 }
1860 }
1861 "tor" => {
1862 let Some(cfg) = self.lookup_tor_config(handle.name()) else {
1863 continue;
1864 };
1865 if !cfg.advertise_on_nostr() {
1866 continue;
1867 }
1868 if let Some(addr) = handle.onion_address() {
1869 endpoints.push(OverlayEndpointAdvert {
1870 transport: OverlayTransportKind::Tor,
1871 addr: format!("{}:{}", addr, cfg.advertised_port()),
1872 });
1873 }
1874 }
1875 _ => {}
1876 }
1877 }
1878
1879 if endpoints.is_empty() {
1880 return None;
1881 }
1882
1883 Some(OverlayAdvert {
1884 identifier: ADVERT_IDENTIFIER.to_string(),
1885 version: ADVERT_VERSION,
1886 endpoints,
1887 signal_relays: has_udp_nat.then(|| self.config.node.discovery.nostr.dm_relays.clone()),
1888 stun_servers: has_udp_nat
1889 .then(|| self.config.node.discovery.nostr.stun_servers.clone()),
1890 })
1891 }
1892
1893 async fn refresh_overlay_advert(
1894 &self,
1895 bootstrap: &std::sync::Arc<NostrDiscovery>,
1896 ) -> Result<(), crate::discovery::nostr::BootstrapError> {
1897 let advert = self.build_overlay_advert(bootstrap).await;
1898 bootstrap.update_local_advert(advert).await
1899 }
1900
1901 fn lookup_udp_config(&self, transport_name: Option<&str>) -> Option<&crate::config::UdpConfig> {
1902 match (&self.config.transports.udp, transport_name) {
1903 (crate::config::TransportInstances::Single(cfg), None) => Some(cfg),
1904 (crate::config::TransportInstances::Named(configs), Some(name)) => configs.get(name),
1905 _ => None,
1906 }
1907 }
1908
1909 fn lookup_tcp_config(&self, transport_name: Option<&str>) -> Option<&crate::config::TcpConfig> {
1910 match (&self.config.transports.tcp, transport_name) {
1911 (crate::config::TransportInstances::Single(cfg), None) => Some(cfg),
1912 (crate::config::TransportInstances::Named(configs), Some(name)) => configs.get(name),
1913 _ => None,
1914 }
1915 }
1916
1917 fn lookup_tor_config(&self, transport_name: Option<&str>) -> Option<&crate::config::TorConfig> {
1918 match (&self.config.transports.tor, transport_name) {
1919 (crate::config::TransportInstances::Single(cfg), None) => Some(cfg),
1920 (crate::config::TransportInstances::Named(configs), Some(name)) => configs.get(name),
1921 _ => None,
1922 }
1923 }
1924
1925 pub(in crate::node) async fn try_peer_addresses(
1926 &mut self,
1927 peer_config: &PeerConfig,
1928 peer_identity: PeerIdentity,
1929 allow_bootstrap_nat: bool,
1930 ) -> Result<(), NodeError> {
1931 let static_addresses = self.static_peer_addresses(peer_config);
1934 if self
1935 .attempt_peer_address_list(
1936 peer_config,
1937 peer_identity,
1938 allow_bootstrap_nat,
1939 &static_addresses,
1940 )
1941 .await
1942 .is_ok()
1943 {
1944 return Ok(());
1945 }
1946
1947 {
1948 let fallback = self
1949 .nostr_peer_fallback_addresses(peer_config, &static_addresses)
1950 .await;
1951 if !fallback.is_empty()
1952 && self
1953 .attempt_peer_address_list(
1954 peer_config,
1955 peer_identity,
1956 allow_bootstrap_nat,
1957 &fallback,
1958 )
1959 .await
1960 .is_ok()
1961 {
1962 return Ok(());
1963 }
1964 }
1965
1966 Err(NodeError::NoTransportForType(format!(
1967 "no operational transport for any of {}'s addresses",
1968 peer_config.npub
1969 )))
1970 }
1971
1972 pub(crate) async fn api_connect(
1980 &mut self,
1981 npub: &str,
1982 address: &str,
1983 transport: &str,
1984 ) -> Result<serde_json::Value, String> {
1985 let peer_config = PeerConfig {
1986 npub: npub.to_string(),
1987 alias: None,
1988 addresses: vec![PeerAddress::new(transport, address)],
1989 connect_policy: ConnectPolicy::Manual,
1990 auto_reconnect: false,
1991 };
1992
1993 if let Ok(identity) = PeerIdentity::from_npub(npub) {
1995 self.peer_aliases
1996 .insert(*identity.node_addr(), identity.short_npub());
1997 self.register_identity(*identity.node_addr(), identity.pubkey_full());
1998 }
1999
2000 self.initiate_peer_connection(&peer_config)
2001 .await
2002 .map(|()| {
2003 info!(
2004 npub = %npub,
2005 address = %address,
2006 transport = %transport,
2007 "API connect initiated"
2008 );
2009 serde_json::json!({
2010 "npub": npub,
2011 "address": address,
2012 "transport": transport,
2013 })
2014 })
2015 .map_err(|e| e.to_string())
2016 }
2017
2018 pub(crate) fn api_disconnect(&mut self, npub: &str) -> Result<serde_json::Value, String> {
2022 let peer_identity =
2023 PeerIdentity::from_npub(npub).map_err(|e| format!("invalid npub '{npub}': {e}"))?;
2024 let node_addr = *peer_identity.node_addr();
2025
2026 if !self.peers.contains_key(&node_addr) {
2027 return Err(format!("peer not found: {npub}"));
2028 }
2029
2030 self.remove_active_peer(&node_addr);
2032
2033 self.retry_pending.remove(&node_addr);
2035
2036 info!(npub = %npub, "API disconnect completed");
2037
2038 Ok(serde_json::json!({
2039 "npub": npub,
2040 "disconnected": true,
2041 }))
2042 }
2043
2044 pub async fn adopt_established_traversal(
2051 &mut self,
2052 traversal: EstablishedTraversal,
2053 ) -> Result<BootstrapHandoffResult, NodeError> {
2054 debug!(
2055 peer_npub = %traversal.peer_npub,
2056 session_id = %traversal.session_id,
2057 remote_addr = %traversal.remote_addr,
2058 "adopting established traversal socket"
2059 );
2060
2061 if !self.state.is_operational() {
2062 return Err(NodeError::NotStarted);
2063 }
2064
2065 let packet_tx = self.packet_tx.clone().ok_or(NodeError::NotStarted)?;
2066 let peer_identity = PeerIdentity::from_npub(&traversal.peer_npub).map_err(|e| {
2067 NodeError::InvalidPeerNpub {
2068 npub: traversal.peer_npub.clone(),
2069 reason: e.to_string(),
2070 }
2071 })?;
2072 let peer_node_addr = *peer_identity.node_addr();
2073
2074 self.peer_aliases
2075 .insert(peer_node_addr, peer_identity.short_npub());
2076 self.register_identity(peer_node_addr, peer_identity.pubkey_full());
2077
2078 let transport_id = self.allocate_transport_id();
2079 let inherited_config = traversal.transport_config.clone().unwrap_or_else(|| {
2099 let mut cfg = self
2100 .lookup_udp_config(traversal.transport_name.as_deref())
2101 .or_else(|| self.lookup_udp_config(None))
2102 .cloned()
2103 .unwrap_or_default();
2104 cfg.bind_addr = None;
2105 cfg.external_addr = None;
2106 cfg
2107 });
2108 let mut transport = crate::transport::udp::UdpTransport::new(
2109 transport_id,
2110 traversal.transport_name.clone(),
2111 inherited_config,
2112 packet_tx,
2113 );
2114
2115 transport
2116 .adopt_socket_async(traversal.socket)
2117 .await
2118 .map_err(|e| NodeError::BootstrapHandoff(e.to_string()))?;
2119
2120 let local_addr = transport.local_addr().ok_or_else(|| {
2121 NodeError::BootstrapHandoff("adopted UDP transport has no local address".into())
2122 })?;
2123
2124 self.transports.insert(
2125 transport_id,
2126 crate::transport::TransportHandle::Udp(transport),
2127 );
2128 self.bootstrap_transports.insert(transport_id);
2129 self.bootstrap_transport_npubs
2130 .insert(transport_id, traversal.peer_npub.clone());
2131
2132 let remote_addr = TransportAddr::from_string(&traversal.remote_addr.to_string());
2133 if let Err(err) = self
2134 .initiate_connection(transport_id, remote_addr.clone(), peer_identity)
2135 .await
2136 {
2137 self.bootstrap_transports.remove(&transport_id);
2138 self.bootstrap_transport_npubs.remove(&transport_id);
2139 if let Some(mut handle) = self.transports.remove(&transport_id) {
2140 let _ = handle.stop().await;
2141 }
2142 return Err(err);
2143 }
2144
2145 info!(
2146 peer = %self.peer_display_name(&peer_node_addr),
2147 transport_id = %transport_id,
2148 local_addr = %local_addr,
2149 remote_addr = %traversal.remote_addr,
2150 session_id = %traversal.session_id,
2151 "adopted NAT traversal socket; handshake initiated"
2152 );
2153
2154 Ok(BootstrapHandoffResult {
2155 transport_id,
2156 local_addr,
2157 remote_addr: traversal.remote_addr,
2158 peer_node_addr,
2159 session_id: traversal.session_id,
2160 })
2161 }
2162}