1use super::{Node, NodeError, NodeState};
4use crate::config::{ConnectPolicy, PeerAddress, PeerConfig};
5use crate::discovery::nostr::{
6 ADVERT_IDENTIFIER, ADVERT_VERSION, BootstrapEvent, NostrDiscovery, OverlayAdvert,
7 OverlayEndpointAdvert, OverlayTransportKind,
8};
9use crate::discovery::{BootstrapHandoffResult, EstablishedTraversal};
10use crate::node::acl::PeerAclContext;
11use crate::node::wire::build_msg1;
12use crate::peer::PeerConnection;
13use crate::protocol::{Disconnect, DisconnectReason};
14use crate::transport::{Link, LinkDirection, LinkId, TransportAddr, TransportId, packet_channel};
15use crate::upper::tun::{TunDevice, TunState, run_tun_reader, shutdown_tun_interface};
16use crate::{NodeAddr, PeerIdentity};
17use std::collections::HashSet;
18use std::net::IpAddr;
19use std::thread;
20use std::time::Duration;
21use tracing::{debug, info, warn};
22
23fn is_unroutable_advert_ip(ip: IpAddr) -> bool {
31 match ip {
32 IpAddr::V4(v4) => {
33 v4.is_private()
34 || v4.is_loopback()
35 || v4.is_link_local()
36 || v4.is_unspecified()
37 || v4.is_multicast()
38 || v4.is_broadcast()
39 || v4.is_documentation()
40 || (v4.octets()[0] == 100 && (v4.octets()[1] & 0xc0) == 64)
43 }
44 IpAddr::V6(v6) => {
45 v6.is_loopback()
46 || v6.is_unspecified()
47 || v6.is_unique_local()
48 || v6.is_multicast()
49 || (v6.segments()[0] & 0xffc0) == 0xfe80
51 }
52 }
53}
54
55const OPEN_DISCOVERY_RETRY_LIFETIME_MULTIPLIER: u64 = 2;
56
57impl Node {
58 pub(super) async fn initiate_peer_connections(&mut self) {
63 let peer_identities: Vec<(PeerIdentity, Option<String>)> = self
69 .config
70 .peers()
71 .iter()
72 .filter_map(|pc| {
73 PeerIdentity::from_npub(&pc.npub)
74 .ok()
75 .map(|id| (id, pc.alias.clone()))
76 })
77 .collect();
78
79 for (identity, alias) in peer_identities {
80 let name = alias.unwrap_or_else(|| identity.short_npub());
81 self.peer_aliases.insert(*identity.node_addr(), name);
82 self.register_identity(*identity.node_addr(), identity.pubkey_full());
86 }
87
88 let peer_configs: Vec<_> = self.config.auto_connect_peers().cloned().collect();
90
91 if peer_configs.is_empty() {
92 debug!("No static peers configured");
93 return;
94 }
95
96 debug!(
97 count = peer_configs.len(),
98 "Initiating static peer connections"
99 );
100
101 for peer_config in peer_configs {
102 if let Err(e) = self.initiate_peer_connection(&peer_config).await {
103 warn!(
104 npub = %peer_config.npub,
105 alias = ?peer_config.alias,
106 error = %e,
107 "Failed to initiate peer connection"
108 );
109 if let Ok(peer_identity) = PeerIdentity::from_npub(&peer_config.npub) {
113 self.schedule_retry(*peer_identity.node_addr(), Self::now_ms());
114 }
115 if matches!(e, crate::node::NodeError::NoTransportForType(_))
121 && let Some(bootstrap) = self.nostr_discovery.clone()
122 {
123 let npub = peer_config.npub.clone();
124 tokio::spawn(async move {
125 let _ = bootstrap.refetch_advert_for_stale_check(&npub).await;
126 });
127 }
128 }
129 }
130 }
131
132 pub(super) async fn initiate_peer_connection(
136 &mut self,
137 peer_config: &crate::config::PeerConfig,
138 ) -> Result<(), NodeError> {
139 let peer_identity =
141 PeerIdentity::from_npub(&peer_config.npub).map_err(|e| NodeError::InvalidPeerNpub {
142 npub: peer_config.npub.clone(),
143 reason: e.to_string(),
144 })?;
145
146 let peer_node_addr = *peer_identity.node_addr();
147
148 if self.peers.contains_key(&peer_node_addr) {
150 debug!(
151 npub = %peer_config.npub,
152 "Peer already exists, skipping"
153 );
154 return Ok(());
155 }
156
157 let already_connecting = self.connections.values().any(|conn| {
159 conn.expected_identity()
160 .map(|id| id.node_addr() == &peer_node_addr)
161 .unwrap_or(false)
162 });
163 if already_connecting {
164 debug!(
165 npub = %peer_config.npub,
166 "Connection already in progress, skipping"
167 );
168 return Ok(());
169 }
170
171 self.try_peer_addresses(peer_config, peer_identity, true)
172 .await
173 }
174
175 fn is_connecting_to_peer(&self, peer_node_addr: &NodeAddr) -> bool {
176 self.connections.values().any(|conn| {
177 conn.expected_identity()
178 .map(|id| id.node_addr() == peer_node_addr)
179 .unwrap_or(false)
180 })
181 }
182
183 pub(super) async fn initiate_connection(
194 &mut self,
195 transport_id: TransportId,
196 remote_addr: TransportAddr,
197 peer_identity: PeerIdentity,
198 ) -> Result<(), NodeError> {
199 let peer_node_addr = *peer_identity.node_addr();
200
201 self.authorize_peer(
202 &peer_identity,
203 PeerAclContext::OutboundConnect,
204 transport_id,
205 &remote_addr,
206 )?;
207
208 let is_connection_oriented = self
209 .transports
210 .get(&transport_id)
211 .map(|t| t.transport_type().connection_oriented)
212 .unwrap_or(false);
213
214 let link_id = self.allocate_link_id();
216
217 let link = if is_connection_oriented {
218 Link::new(
219 link_id,
220 transport_id,
221 remote_addr.clone(),
222 LinkDirection::Outbound,
223 Duration::from_millis(self.config.node.base_rtt_ms),
224 )
225 } else {
226 Link::connectionless(
227 link_id,
228 transport_id,
229 remote_addr.clone(),
230 LinkDirection::Outbound,
231 Duration::from_millis(self.config.node.base_rtt_ms),
232 )
233 };
234
235 self.links.insert(link_id, link);
236
237 self.addr_to_link
239 .insert((transport_id, remote_addr.clone()), link_id);
240
241 if is_connection_oriented {
242 if let Some(transport) = self.transports.get(&transport_id) {
244 match transport.connect(&remote_addr).await {
245 Ok(()) => {
246 debug!(
247 peer = %self.peer_display_name(&peer_node_addr),
248 transport_id = %transport_id,
249 remote_addr = %remote_addr,
250 link_id = %link_id,
251 "Transport connect initiated (non-blocking)"
252 );
253 self.pending_connects.push(super::PendingConnect {
254 link_id,
255 transport_id,
256 remote_addr,
257 peer_identity,
258 });
259 }
260 Err(e) => {
261 self.links.remove(&link_id);
263 self.addr_to_link.remove(&(transport_id, remote_addr));
264 return Err(NodeError::TransportError(e.to_string()));
265 }
266 }
267 }
268 Ok(())
269 } else {
270 self.start_handshake(link_id, transport_id, remote_addr, peer_identity)
272 .await
273 }
274 }
275
276 pub(super) async fn start_handshake(
281 &mut self,
282 link_id: LinkId,
283 transport_id: TransportId,
284 remote_addr: TransportAddr,
285 peer_identity: PeerIdentity,
286 ) -> Result<(), NodeError> {
287 let peer_node_addr = *peer_identity.node_addr();
288
289 let current_time_ms = Self::now_ms();
291 let mut connection = PeerConnection::outbound(link_id, peer_identity, current_time_ms);
292
293 let our_index = match self.index_allocator.allocate() {
295 Ok(idx) => idx,
296 Err(e) => {
297 self.links.remove(&link_id);
299 self.addr_to_link.remove(&(transport_id, remote_addr));
300 return Err(NodeError::IndexAllocationFailed(e.to_string()));
301 }
302 };
303
304 let our_keypair = self.identity.keypair();
306 let noise_msg1 =
307 match connection.start_handshake(our_keypair, self.startup_epoch, current_time_ms) {
308 Ok(msg) => msg,
309 Err(e) => {
310 let _ = self.index_allocator.free(our_index);
312 self.links.remove(&link_id);
313 self.addr_to_link.remove(&(transport_id, remote_addr));
314 return Err(NodeError::HandshakeFailed(e.to_string()));
315 }
316 };
317
318 connection.set_our_index(our_index);
320 connection.set_transport_id(transport_id);
321 connection.set_source_addr(remote_addr.clone());
322
323 let wire_msg1 = build_msg1(our_index, &noise_msg1);
325
326 debug!(
327 peer = %self.peer_display_name(&peer_node_addr),
328 transport_id = %transport_id,
329 remote_addr = %remote_addr,
330 link_id = %link_id,
331 our_index = %our_index,
332 "Connection initiated"
333 );
334
335 let resend_interval = self.config.node.rate_limit.handshake_resend_interval_ms;
337 connection.set_handshake_msg1(wire_msg1.clone(), current_time_ms + resend_interval);
338
339 self.pending_outbound
341 .insert((transport_id, our_index.as_u32()), link_id);
342 self.connections.insert(link_id, connection);
343
344 let send_result = match self.transports.get(&transport_id) {
346 Some(transport) => Some(transport.send(&remote_addr, &wire_msg1).await),
347 None => None,
348 };
349 if let Some(send_result) = send_result {
350 self.note_local_send_outcome(&send_result);
351 match send_result {
352 Ok(bytes) => {
353 debug!(
354 link_id = %link_id,
355 our_index = %our_index,
356 bytes,
357 "Sent Noise handshake message 1 (wire format)"
358 );
359 }
360 Err(e) => {
361 warn!(
362 link_id = %link_id,
363 transport_id = %transport_id,
364 remote_addr = %remote_addr,
365 our_index = %our_index,
366 error = %e,
367 "Failed to send handshake message"
368 );
369 if let Some(conn) = self.connections.get_mut(&link_id) {
372 conn.mark_failed();
373 }
374 }
375 }
376 }
377
378 Ok(())
379 }
380
381 pub(super) async fn poll_transport_discovery(&mut self) {
387 let mut to_connect = Vec::new();
389
390 for (transport_id, transport) in &self.transports {
391 if !transport.is_operational() {
392 continue;
393 }
394 if !transport.auto_connect() {
395 let _ = transport.discover();
397 continue;
398 }
399 let discovered = match transport.discover() {
400 Ok(peers) => peers,
401 Err(_) => continue,
402 };
403 for peer in discovered {
404 let pubkey = match peer.pubkey_hint {
405 Some(pk) => pk,
406 None => continue,
407 };
408 let identity = PeerIdentity::from_pubkey(pubkey);
409 let node_addr = *identity.node_addr();
410
411 if node_addr == *self.identity.node_addr() {
413 continue;
414 }
415 if self.peers.contains_key(&node_addr) {
417 continue;
418 }
419 let connecting = self.connections.values().any(|c| {
421 c.expected_identity()
422 .map(|id| id.node_addr() == &node_addr)
423 .unwrap_or(false)
424 });
425 if connecting {
426 continue;
427 }
428
429 to_connect.push((*transport_id, peer.addr, identity));
430 }
431 }
432
433 for (transport_id, remote_addr, identity) in to_connect {
434 info!(
435 peer = %self.peer_display_name(identity.node_addr()),
436 transport_id = %transport_id,
437 remote_addr = %remote_addr,
438 "Auto-connecting to discovered peer"
439 );
440 if let Err(e) = self
441 .initiate_connection(transport_id, remote_addr, identity)
442 .await
443 {
444 warn!(error = %e, "Failed to auto-connect to discovered peer");
445 }
446 }
447 }
448
449 pub(super) async fn poll_nostr_discovery(&mut self) {
450 let Some(bootstrap) = self.nostr_discovery.clone() else {
451 return;
452 };
453
454 if let Err(err) = self.refresh_overlay_advert(&bootstrap).await {
455 debug!(error = %err, "Failed to refresh local Nostr overlay advert");
456 }
457
458 for event in bootstrap.drain_events().await {
459 match event {
460 BootstrapEvent::Established { traversal } => {
461 let peer_npub = traversal.peer_npub.clone();
462 if let Ok(peer_identity) = PeerIdentity::from_npub(&peer_npub) {
463 let peer_addr = *peer_identity.node_addr();
464 if self.peers.contains_key(&peer_addr) {
465 debug!(
466 peer_npub = %peer_npub,
467 "Ignoring established NAT traversal for already-connected peer"
468 );
469 continue;
470 }
471 if self.is_connecting_to_peer(&peer_addr) {
472 debug!(
473 peer_npub = %peer_npub,
474 "Ignoring established NAT traversal while peer handshake is already in progress"
475 );
476 continue;
477 }
478 }
479 match self.adopt_established_traversal(traversal).await {
480 Ok(_) => {
481 info!(peer_npub = %peer_npub, "Adopted NAT traversal socket");
482 }
483 Err(err) => {
484 warn!(peer_npub = %peer_npub, error = %err, "Failed to adopt NAT traversal");
485 if let Ok(peer_identity) = PeerIdentity::from_npub(&peer_npub) {
486 self.schedule_retry(*peer_identity.node_addr(), Self::now_ms());
487 }
488 }
489 }
490 }
491 BootstrapEvent::Failed {
492 peer_config,
493 reason,
494 } => {
495 let peer_identity = match PeerIdentity::from_npub(&peer_config.npub) {
496 Ok(identity) => identity,
497 Err(_) => continue,
498 };
499 let node_addr = *peer_identity.node_addr();
500 if self.peers.contains_key(&node_addr) {
501 debug!(
502 npub = %peer_config.npub,
503 error = %reason,
504 "Ignoring failed NAT traversal for already-connected peer"
505 );
506 continue;
507 }
508 if self.is_connecting_to_peer(&node_addr) {
509 debug!(
510 npub = %peer_config.npub,
511 error = %reason,
512 "Ignoring failed NAT traversal while peer handshake is already in progress"
513 );
514 continue;
515 }
516
517 let now_ms = Self::now_ms();
518 let decision = bootstrap.record_traversal_failure(&peer_config.npub, now_ms);
519 if decision.should_warn {
520 warn!(
521 npub = %peer_config.npub,
522 error = %reason,
523 consecutive_failures = decision.consecutive_failures,
524 cooldown_secs = decision
525 .cooldown_until_ms
526 .map(|t| t.saturating_sub(now_ms) / 1000),
527 "NAT traversal failed"
528 );
529 } else {
530 debug!(
531 npub = %peer_config.npub,
532 error = %reason,
533 consecutive_failures = decision.consecutive_failures,
534 "NAT traversal failed (suppressed by warn-rate-limit)"
535 );
536 }
537
538 if decision.crossed_threshold {
542 let bootstrap = bootstrap.clone();
543 let npub = peer_config.npub.clone();
544 tokio::spawn(async move {
545 let outcome = bootstrap.refetch_advert_for_stale_check(&npub).await;
546 match outcome {
547 crate::discovery::nostr::NostrRefetchOutcome::Evicted => info!(
548 npub = %npub,
549 "stale-advert sweep: peer evicted from advert cache"
550 ),
551 crate::discovery::nostr::NostrRefetchOutcome::Refreshed => info!(
552 npub = %npub,
553 "stale-advert sweep: peer republished, cache refreshed and streak reset"
554 ),
555 crate::discovery::nostr::NostrRefetchOutcome::SameAdvert => debug!(
556 npub = %npub,
557 "stale-advert sweep: advert unchanged, cooldown stands"
558 ),
559 crate::discovery::nostr::NostrRefetchOutcome::Skipped => debug!(
560 npub = %npub,
561 "stale-advert sweep: skipped (relay error or no advert_relays)"
562 ),
563 }
564 });
565 }
566
567 if self
568 .try_peer_addresses(&peer_config, peer_identity, false)
569 .await
570 .is_ok()
571 {
572 continue;
573 }
574
575 self.schedule_retry(node_addr, now_ms);
576 if let Some(cooldown_until_ms) = decision.cooldown_until_ms
577 && let Some(state) = self.retry_pending.get_mut(&node_addr)
578 {
579 state.retry_after_ms = state.retry_after_ms.max(cooldown_until_ms);
583 }
584 }
585 }
586 }
587
588 self.maybe_run_startup_open_discovery_sweep(&bootstrap)
589 .await;
590 self.queue_open_discovery_retries(&bootstrap).await;
591 }
592
593 pub(super) fn lan_discovery_scope(&self) -> Option<String> {
598 let app = self.config.node.discovery.nostr.app.trim();
599 if app.is_empty() {
600 return None;
601 }
602 if let Some(rest) = app.strip_prefix("fips-overlay-v1:") {
603 let scope = rest.trim();
604 if scope.is_empty() {
605 None
606 } else {
607 Some(scope.to_string())
608 }
609 } else {
610 Some(app.to_string())
611 }
612 }
613
614 pub(super) async fn poll_lan_discovery(&mut self) {
620 let Some(runtime) = self.lan_discovery.clone() else {
621 return;
622 };
623 let events = runtime.drain_events().await;
624 if events.is_empty() {
625 return;
626 }
627 let udp_transport_id = self.find_transport_for_type("udp");
630 let Some(transport_id) = udp_transport_id else {
631 debug!("lan: no operational UDP transport, skipping discovered peers");
632 return;
633 };
634 for event in events {
635 let crate::discovery::lan::LanEvent::Discovered(peer) = event;
636 let identity = match crate::PeerIdentity::from_npub(&peer.npub) {
637 Ok(id) => id,
638 Err(err) => {
639 debug!(npub = %peer.npub, error = %err, "lan: skip bad npub");
640 continue;
641 }
642 };
643 let peer_node_addr = *identity.node_addr();
644 if self.peers.contains_key(&peer_node_addr) {
645 continue;
646 }
647 let already_connecting = self.connections.values().any(|conn| {
648 conn.expected_identity()
649 .map(|id| id.node_addr() == &peer_node_addr)
650 .unwrap_or(false)
651 });
652 if already_connecting {
653 continue;
654 }
655 let remote_addr = crate::transport::TransportAddr::from_string(&peer.addr.to_string());
656 info!(
657 npub = %identity.short_npub(),
658 addr = %peer.addr,
659 "lan: initiating handshake to discovered peer"
660 );
661 if let Err(err) = self
662 .initiate_connection(transport_id, remote_addr, identity)
663 .await
664 {
665 debug!(
666 npub = %peer.npub,
667 error = %err,
668 "lan: failed to initiate connection to discovered peer"
669 );
670 }
671 }
672 }
673
674 pub(super) async fn poll_pending_connects(&mut self) {
681 if self.pending_connects.is_empty() {
682 return;
683 }
684
685 let mut completed = Vec::new();
686
687 for (i, pending) in self.pending_connects.iter().enumerate() {
688 let state = if let Some(transport) = self.transports.get(&pending.transport_id) {
689 transport.connection_state(&pending.remote_addr)
690 } else {
691 crate::transport::ConnectionState::Failed("transport removed".into())
692 };
693
694 match state {
695 crate::transport::ConnectionState::Connected => {
696 completed.push((i, true, None));
697 }
698 crate::transport::ConnectionState::Failed(reason) => {
699 completed.push((i, false, Some(reason)));
700 }
701 crate::transport::ConnectionState::Connecting => {
702 }
704 crate::transport::ConnectionState::None => {
705 completed.push((i, false, Some("no connection attempt found".into())));
707 }
708 }
709 }
710
711 for (i, success, reason) in completed.into_iter().rev() {
713 let pending = self.pending_connects.remove(i);
714
715 if success {
716 if let Some(link) = self.links.get_mut(&pending.link_id) {
718 link.set_connected();
719 }
720
721 debug!(
722 peer = %self.peer_display_name(pending.peer_identity.node_addr()),
723 transport_id = %pending.transport_id,
724 remote_addr = %pending.remote_addr,
725 link_id = %pending.link_id,
726 "Transport connected, starting handshake"
727 );
728
729 if let Err(e) = self
731 .start_handshake(
732 pending.link_id,
733 pending.transport_id,
734 pending.remote_addr.clone(),
735 pending.peer_identity,
736 )
737 .await
738 {
739 warn!(
740 link_id = %pending.link_id,
741 error = %e,
742 "Failed to start handshake after transport connect"
743 );
744 self.remove_link(&pending.link_id);
746 }
747 } else {
748 let reason = reason.unwrap_or_default();
749 warn!(
750 peer = %self.peer_display_name(pending.peer_identity.node_addr()),
751 transport_id = %pending.transport_id,
752 remote_addr = %pending.remote_addr,
753 link_id = %pending.link_id,
754 reason = %reason,
755 "Transport connect failed"
756 );
757
758 self.remove_link(&pending.link_id);
760 self.links.remove(&pending.link_id);
761 self.schedule_retry(*pending.peer_identity.node_addr(), Self::now_ms());
762 }
763 }
764 }
765
766 pub async fn start(&mut self) -> Result<(), NodeError> {
773 if !self.state.can_start() {
774 return Err(NodeError::AlreadyStarted);
775 }
776 self.state = NodeState::Starting;
777
778 let packet_buffer_size = self.config.node.buffers.packet_channel;
780 let (packet_tx, packet_rx) = packet_channel(packet_buffer_size);
781 self.packet_tx = Some(packet_tx.clone());
782 self.packet_rx = Some(packet_rx);
783
784 let transport_handles = self.create_transports(&packet_tx).await;
786
787 for mut handle in transport_handles {
788 let transport_id = handle.transport_id();
789 let transport_type = handle.transport_type().name;
790 let name = handle.name().map(|s| s.to_string());
791
792 match handle.start().await {
793 Ok(()) => {
794 self.transports.insert(transport_id, handle);
795 }
796 Err(e) => {
797 if let Some(ref n) = name {
798 warn!(transport_type, name = %n, error = %e, "Transport failed to start");
799 } else {
800 warn!(transport_type, error = %e, "Transport failed to start");
801 }
802 }
803 }
804 }
805
806 if !self.transports.is_empty() {
807 info!(count = self.transports.len(), "Transports initialized");
808 }
809
810 #[cfg(unix)]
826 {
827 let cpu_default = std::thread::available_parallelism()
828 .map(|n| n.get())
829 .unwrap_or(1)
830 .max(1);
831 let encrypt_worker_count: usize = std::env::var("FIPS_ENCRYPT_WORKERS")
832 .ok()
833 .and_then(|s| s.parse().ok())
834 .unwrap_or(cpu_default)
835 .max(1);
836 self.encrypt_workers = Some(super::encrypt_worker::EncryptWorkerPool::spawn(
837 encrypt_worker_count,
838 ));
839 info!(
840 workers = encrypt_worker_count,
841 "Spawned FMP-encrypt worker pool"
842 );
843
844 let decrypt_worker_count: usize = std::env::var("FIPS_DECRYPT_WORKERS")
853 .ok()
854 .and_then(|s| s.parse().ok())
855 .unwrap_or(cpu_default);
856 if decrypt_worker_count == 0 {
857 info!("FIPS_DECRYPT_WORKERS=0 → in-line decrypt in rx_loop (no worker pool)");
858 } else {
859 self.decrypt_workers = Some(super::decrypt_worker::DecryptWorkerPool::spawn(
860 decrypt_worker_count,
861 ));
862 info!(
863 workers = decrypt_worker_count,
864 "Spawned FMP+FSP-decrypt worker pool"
865 );
866 }
867 }
868
869 if self.config.node.discovery.nostr.enabled {
870 match NostrDiscovery::start(&self.identity, self.config.node.discovery.nostr.clone())
871 .await
872 {
873 Ok(runtime) => {
874 if let Err(err) = self.refresh_overlay_advert(&runtime).await {
875 warn!(error = %err, "Failed to publish initial Nostr overlay advert");
876 }
877 self.nostr_discovery = Some(runtime);
878 self.nostr_discovery_started_at_ms = Some(Self::now_ms());
879 info!("Nostr overlay discovery enabled");
880 }
881 Err(err) => {
882 warn!(error = %err, "Failed to start Nostr overlay discovery");
883 }
884 }
885 }
886
887 if self.config.node.discovery.lan.enabled {
891 let advertised_udp_port = self
892 .transports
893 .values()
894 .filter(|h| h.is_operational())
895 .filter(|h| h.transport_type().name == "udp")
896 .find_map(|h| h.local_addr().map(|addr| addr.port()))
897 .unwrap_or(0);
898 let scope = self.lan_discovery_scope();
899 match crate::discovery::lan::LanDiscovery::start(
900 &self.identity,
901 scope,
902 advertised_udp_port,
903 self.config.node.discovery.lan.clone(),
904 )
905 .await
906 {
907 Ok(runtime) => {
908 self.lan_discovery = Some(runtime);
909 info!("LAN mDNS discovery enabled");
910 }
911 Err(err) => {
912 debug!(error = %err, "LAN mDNS discovery not started");
913 }
914 }
915 }
916
917 self.initiate_peer_connections().await;
920
921 if self.config.tun.enabled {
923 let address = *self.identity.address();
924 match TunDevice::create(&self.config.tun, address).await {
925 Ok(device) => {
926 let mtu = device.mtu();
927 let name = device.name().to_string();
928 let our_addr = *device.address();
929
930 info!("TUN device active:");
931 info!(" name: {}", name);
932 info!(" address: {}", device.address());
933 info!(" mtu: {}", mtu);
934
935 let effective_mtu = self.effective_ipv6_mtu();
937 let max_mss = effective_mtu.saturating_sub(40).saturating_sub(20); info!("effective MTU: {} bytes", effective_mtu);
940 debug!(" max TCP MSS: {} bytes", max_mss);
941
942 #[cfg(target_os = "macos")]
946 let (shutdown_read_fd, shutdown_write_fd) = {
947 let mut fds = [0i32; 2];
948 if unsafe { libc::pipe(fds.as_mut_ptr()) } < 0 {
949 return Err(NodeError::Tun(crate::upper::tun::TunError::Configure(
950 "failed to create shutdown pipe".into(),
951 )));
952 }
953 (fds[0], fds[1])
954 };
955
956 let (writer, tun_tx) =
960 device.create_writer(max_mss, self.path_mtu_lookup.clone())?;
961
962 let writer_handle = thread::spawn(move || {
964 writer.run();
965 });
966
967 let reader_tun_tx = tun_tx.clone();
969
970 let tun_channel_size = self.config.node.buffers.tun_channel;
972 let (outbound_tx, outbound_rx) = tokio::sync::mpsc::channel(tun_channel_size);
973
974 let transport_mtu = self.transport_mtu();
976 let path_mtu_lookup = self.path_mtu_lookup.clone();
977 #[cfg(target_os = "macos")]
978 let reader_handle = thread::spawn(move || {
979 run_tun_reader(
980 device,
981 mtu,
982 our_addr,
983 reader_tun_tx,
984 outbound_tx,
985 transport_mtu,
986 path_mtu_lookup,
987 shutdown_read_fd,
988 );
989 });
990 #[cfg(not(target_os = "macos"))]
991 let reader_handle = thread::spawn(move || {
992 run_tun_reader(
993 device,
994 mtu,
995 our_addr,
996 reader_tun_tx,
997 outbound_tx,
998 transport_mtu,
999 path_mtu_lookup,
1000 );
1001 });
1002
1003 self.tun_state = TunState::Active;
1004 self.tun_name = Some(name);
1005 self.tun_tx = Some(tun_tx);
1006 self.tun_outbound_rx = Some(outbound_rx);
1007 self.tun_reader_handle = Some(reader_handle);
1008 self.tun_writer_handle = Some(writer_handle);
1009 #[cfg(target_os = "macos")]
1010 {
1011 self.tun_shutdown_fd = Some(shutdown_write_fd);
1012 }
1013 }
1014 Err(e) => {
1015 self.tun_state = TunState::Failed;
1016 warn!(error = %e, "Failed to initialize TUN, continuing without it");
1017 }
1018 }
1019 }
1020
1021 if self.config.dns.enabled {
1038 let addr_str = self.config.dns.bind_addr();
1039 match addr_str.parse::<std::net::IpAddr>() {
1040 Ok(ip) => {
1041 let bind = std::net::SocketAddr::new(ip, self.config.dns.port());
1042 match Self::bind_dns_socket(bind) {
1043 Ok(socket) => {
1044 let dns_channel_size = self.config.node.buffers.dns_channel;
1045 let (identity_tx, identity_rx) =
1046 tokio::sync::mpsc::channel(dns_channel_size);
1047 let dns_ttl = self.config.dns.ttl();
1048 let base_hosts = crate::upper::hosts::HostMap::from_peer_configs(
1049 self.config.peers(),
1050 );
1051 let reloader = if self.config.node.system_files_enabled {
1052 let hosts_path = std::path::PathBuf::from(
1053 crate::upper::hosts::DEFAULT_HOSTS_PATH,
1054 );
1055 crate::upper::hosts::HostMapReloader::new(base_hosts, hosts_path)
1056 } else {
1057 crate::upper::hosts::HostMapReloader::memory_only(base_hosts)
1058 };
1059 let mesh_ifindex = Self::lookup_mesh_ifindex(self.config.tun.name());
1067 info!(
1068 bind = %bind,
1069 hosts = reloader.hosts().len(),
1070 mesh_ifindex = ?mesh_ifindex,
1071 "DNS responder started for .fips domain (auto-reload enabled)"
1072 );
1073 let handle = tokio::spawn(crate::upper::dns::run_dns_responder(
1074 socket,
1075 identity_tx,
1076 dns_ttl,
1077 reloader,
1078 mesh_ifindex,
1079 ));
1080 self.dns_identity_rx = Some(identity_rx);
1081 self.dns_task = Some(handle);
1082 }
1083 Err(e) => {
1084 warn!(bind = %bind, error = %e, "Failed to start DNS responder");
1085 }
1086 }
1087 }
1088 Err(e) => {
1089 warn!(addr = %addr_str, error = %e, "Invalid dns.bind_addr; DNS responder not started");
1090 }
1091 }
1092 }
1093
1094 self.state = NodeState::Running;
1095 info!("Node started:");
1096 info!(" state: {}", self.state);
1097 info!(" transports: {}", self.transports.len());
1098 info!(" connections: {}", self.connections.len());
1099 Ok(())
1100 }
1101
1102 fn bind_dns_socket(
1115 addr: std::net::SocketAddr,
1116 ) -> Result<tokio::net::UdpSocket, std::io::Error> {
1117 use socket2::{Domain, Protocol, Socket, Type};
1118 let domain = if addr.is_ipv4() {
1119 Domain::IPV4
1120 } else {
1121 Domain::IPV6
1122 };
1123 let sock = Socket::new(domain, Type::DGRAM, Some(Protocol::UDP))?;
1124 if addr.is_ipv6() {
1125 sock.set_only_v6(false)?;
1126 #[cfg(unix)]
1127 Self::set_recv_pktinfo_v6(&sock)?;
1128 }
1129 sock.set_nonblocking(true)?;
1130 sock.bind(&addr.into())?;
1131 tokio::net::UdpSocket::from_std(sock.into())
1132 }
1133
1134 #[cfg(unix)]
1140 fn set_recv_pktinfo_v6(sock: &socket2::Socket) -> Result<(), std::io::Error> {
1141 use std::os::fd::AsRawFd;
1142 let enable: libc::c_int = 1;
1143 let ret = unsafe {
1144 libc::setsockopt(
1145 sock.as_raw_fd(),
1146 libc::IPPROTO_IPV6,
1147 libc::IPV6_RECVPKTINFO,
1148 &enable as *const _ as *const libc::c_void,
1149 std::mem::size_of::<libc::c_int>() as libc::socklen_t,
1150 )
1151 };
1152 if ret < 0 {
1153 return Err(std::io::Error::last_os_error());
1154 }
1155 Ok(())
1156 }
1157
1158 fn lookup_mesh_ifindex(name: &str) -> Option<u32> {
1165 #[cfg(unix)]
1166 {
1167 let c_name = std::ffi::CString::new(name).ok()?;
1168 let idx = unsafe { libc::if_nametoindex(c_name.as_ptr()) };
1169 if idx == 0 { None } else { Some(idx) }
1170 }
1171 #[cfg(not(unix))]
1172 {
1173 let _ = name;
1174 None
1175 }
1176 }
1177
1178 pub async fn stop(&mut self) -> Result<(), NodeError> {
1183 if !self.state.can_stop() {
1184 return Err(NodeError::NotStarted);
1185 }
1186 self.state = NodeState::Stopping;
1187 info!(state = %self.state, "Node stopping");
1188
1189 if let Some(handle) = self.dns_task.take() {
1191 handle.abort();
1192 debug!("DNS responder stopped");
1193 }
1194
1195 self.send_disconnect_to_all_peers(DisconnectReason::Shutdown)
1197 .await;
1198
1199 if let Some(bootstrap) = self.nostr_discovery.take()
1201 && let Err(e) = bootstrap.shutdown().await
1202 {
1203 warn!(error = %e, "Failed to shutdown Nostr overlay discovery");
1204 }
1205
1206 if let Some(lan) = self.lan_discovery.take() {
1210 lan.shutdown().await;
1211 }
1212
1213 let transport_ids: Vec<_> = self.transports.keys().cloned().collect();
1215 for transport_id in transport_ids {
1216 if let Some(mut handle) = self.transports.remove(&transport_id) {
1217 let transport_type = handle.transport_type().name;
1218 match handle.stop().await {
1219 Ok(()) => {
1220 info!(transport_id = %transport_id, transport_type, "Transport stopped");
1221 }
1222 Err(e) => {
1223 warn!(
1224 transport_id = %transport_id,
1225 transport_type,
1226 error = %e,
1227 "Transport stop failed"
1228 );
1229 }
1230 }
1231 }
1232 }
1233
1234 self.packet_tx.take();
1236 self.packet_rx.take();
1237
1238 if let Some(name) = self.tun_name.take() {
1240 info!(name = %name, "Shutting down TUN interface");
1241
1242 self.tun_tx.take();
1244
1245 if let Err(e) = shutdown_tun_interface(&name).await {
1247 warn!(name = %name, error = %e, "Failed to shutdown TUN interface");
1248 }
1249
1250 #[cfg(target_os = "macos")]
1253 if let Some(fd) = self.tun_shutdown_fd.take() {
1254 unsafe {
1255 libc::write(fd, b"x".as_ptr() as *const libc::c_void, 1);
1256 libc::close(fd);
1257 }
1258 }
1259
1260 if let Some(handle) = self.tun_reader_handle.take() {
1262 let _ = handle.join();
1263 }
1264 if let Some(handle) = self.tun_writer_handle.take() {
1265 let _ = handle.join();
1266 }
1267
1268 self.tun_state = TunState::Disabled;
1269 }
1270
1271 self.state = NodeState::Stopped;
1272 info!(state = %self.state, "Node stopped");
1273 Ok(())
1274 }
1275
1276 async fn send_disconnect_to_all_peers(&mut self, reason: DisconnectReason) {
1281 let disconnect = Disconnect::new(reason);
1282 let plaintext = disconnect.encode();
1283
1284 let peer_addrs: Vec<NodeAddr> = self
1286 .peers
1287 .iter()
1288 .filter(|(_, peer)| peer.can_send() && peer.has_session())
1289 .map(|(addr, _)| *addr)
1290 .collect();
1291
1292 if peer_addrs.is_empty() {
1293 debug!(
1294 total_peers = self.peers.len(),
1295 "No sendable peers for disconnect notification"
1296 );
1297 return;
1298 }
1299
1300 let mut sent = 0usize;
1301 for node_addr in &peer_addrs {
1302 match self
1303 .send_encrypted_link_message(node_addr, &plaintext)
1304 .await
1305 {
1306 Ok(()) => sent += 1,
1307 Err(e) => {
1308 debug!(
1309 peer = %self.peer_display_name(node_addr),
1310 error = %e,
1311 "Failed to send disconnect (transport may be down)"
1312 );
1313 }
1314 }
1315 }
1316
1317 info!(sent, total = peer_addrs.len(), reason = %reason, "Sent disconnect notifications");
1318 }
1319
1320 fn static_peer_addresses(&self, peer_config: &PeerConfig) -> Vec<PeerAddress> {
1321 peer_config
1322 .addresses_by_priority()
1323 .into_iter()
1324 .cloned()
1325 .collect()
1326 }
1327
1328 async fn nostr_peer_fallback_addresses(
1329 &self,
1330 peer_config: &PeerConfig,
1331 existing: &[PeerAddress],
1332 ) -> Vec<PeerAddress> {
1333 if !self.config.node.discovery.nostr.enabled
1334 || self.config.node.discovery.nostr.policy
1335 == crate::config::NostrDiscoveryPolicy::Disabled
1336 {
1337 return Vec::new();
1338 }
1339
1340 let Some(bootstrap) = self.nostr_discovery.clone() else {
1341 return Vec::new();
1342 };
1343 let endpoints = match bootstrap.advert_endpoints_for_peer(&peer_config.npub).await {
1344 Ok(endpoints) => endpoints,
1345 Err(err) => {
1346 debug!(
1347 npub = %peer_config.npub,
1348 error = %err,
1349 "Failed to resolve Nostr advert endpoints for configured peer"
1350 );
1351 return Vec::new();
1352 }
1353 };
1354
1355 let mut fallback = Vec::new();
1356 let mut next_priority = existing
1357 .iter()
1358 .map(|addr| addr.priority)
1359 .max()
1360 .unwrap_or(100)
1361 .saturating_add(1);
1362 for endpoint in endpoints {
1363 let Some(candidate) = Self::overlay_endpoint_to_peer_address(&endpoint, next_priority)
1364 else {
1365 continue;
1366 };
1367 if existing
1368 .iter()
1369 .any(|addr| addr.transport == candidate.transport && addr.addr == candidate.addr)
1370 || fallback.iter().any(|addr: &PeerAddress| {
1371 addr.transport == candidate.transport && addr.addr == candidate.addr
1372 })
1373 {
1374 continue;
1375 }
1376 fallback.push(candidate);
1377 next_priority = next_priority.saturating_add(1);
1378 }
1379 fallback
1380 }
1381
1382 fn overlay_endpoint_to_peer_address(
1383 endpoint: &OverlayEndpointAdvert,
1384 priority: u8,
1385 ) -> Option<PeerAddress> {
1386 let transport = match endpoint.transport {
1387 OverlayTransportKind::Udp => "udp",
1388 OverlayTransportKind::Tcp => "tcp",
1389 OverlayTransportKind::Tor => "tor",
1390 };
1391 Some(PeerAddress::with_priority(
1392 transport,
1393 endpoint.addr.clone(),
1394 priority,
1395 ))
1396 }
1397
1398 async fn attempt_peer_address_list(
1399 &mut self,
1400 peer_config: &PeerConfig,
1401 peer_identity: PeerIdentity,
1402 allow_bootstrap_nat: bool,
1403 addresses: &[PeerAddress],
1404 ) -> Result<(), NodeError> {
1405 for addr in addresses {
1406 if addr.transport == "udp" && addr.addr.eq_ignore_ascii_case("nat") {
1407 if !allow_bootstrap_nat {
1408 continue;
1409 }
1410 let Some(bootstrap) = self.nostr_discovery.clone() else {
1411 debug!(npub = %peer_config.npub, "No Nostr overlay runtime for udp:nat address");
1412 continue;
1413 };
1414 bootstrap.request_connect(peer_config.clone()).await;
1415 info!(npub = %peer_config.npub, "Started Nostr UDP NAT traversal attempt");
1416 return Ok(());
1417 }
1418
1419 let (transport_id, remote_addr) = if addr.transport == "ethernet" {
1420 match self.resolve_ethernet_addr(&addr.addr) {
1421 Ok(result) => result,
1422 Err(e) => {
1423 debug!(
1424 transport = %addr.transport,
1425 addr = %addr.addr,
1426 error = %e,
1427 "Failed to resolve Ethernet address"
1428 );
1429 continue;
1430 }
1431 }
1432 } else if addr.transport == "ble" {
1433 #[cfg(bluer_available)]
1434 {
1435 match self.resolve_ble_addr(&addr.addr) {
1436 Ok(result) => result,
1437 Err(e) => {
1438 debug!(
1439 transport = %addr.transport,
1440 addr = %addr.addr,
1441 error = %e,
1442 "Failed to resolve BLE address"
1443 );
1444 continue;
1445 }
1446 }
1447 }
1448 #[cfg(not(bluer_available))]
1449 {
1450 debug!(transport = %addr.transport, "BLE transport not available on this build");
1451 continue;
1452 }
1453 } else {
1454 let tid = match self.find_transport_for_type(&addr.transport) {
1455 Some(id) => id,
1456 None => {
1457 debug!(
1458 transport = %addr.transport,
1459 addr = %addr.addr,
1460 "No operational transport for address type"
1461 );
1462 continue;
1463 }
1464 };
1465 (tid, TransportAddr::from_string(&addr.addr))
1466 };
1467
1468 match self
1469 .initiate_connection(transport_id, remote_addr, peer_identity)
1470 .await
1471 {
1472 Ok(()) => return Ok(()),
1473 Err(e @ NodeError::AccessDenied(_)) => return Err(e),
1474 Err(e) => {
1475 debug!(
1476 npub = %peer_config.npub,
1477 transport_id = %transport_id,
1478 error = %e,
1479 "Connection attempt failed, trying next address"
1480 );
1481 }
1482 }
1483 }
1484
1485 Err(NodeError::NoTransportForType(format!(
1486 "no operational transport for any of {}'s addresses",
1487 peer_config.npub
1488 )))
1489 }
1490
1491 async fn queue_open_discovery_retries(&mut self, bootstrap: &std::sync::Arc<NostrDiscovery>) {
1492 self.run_open_discovery_sweep(bootstrap, None, "per-tick")
1493 .await;
1494 }
1495
1496 pub(in crate::node) async fn run_open_discovery_sweep(
1507 &mut self,
1508 bootstrap: &std::sync::Arc<NostrDiscovery>,
1509 max_age_secs: Option<u64>,
1510 caller: &'static str,
1511 ) {
1512 if !self.config.node.discovery.nostr.enabled
1513 || self.config.node.discovery.nostr.policy != crate::config::NostrDiscoveryPolicy::Open
1514 {
1515 return;
1516 }
1517
1518 let configured_npubs = self
1519 .config
1520 .peers()
1521 .iter()
1522 .map(|peer| peer.npub.clone())
1523 .collect::<HashSet<_>>();
1524 let now_ms = Self::now_ms();
1525 let now_secs = now_ms / 1000;
1526 let mut enqueue_budget = self.open_discovery_enqueue_budget(&configured_npubs);
1527 if enqueue_budget == 0 {
1528 debug!(
1529 caller = %caller,
1530 "open-discovery sweep: enqueue budget is 0, skipping"
1531 );
1532 return;
1533 }
1534
1535 let candidates = bootstrap.cached_open_discovery_candidates(64).await;
1536 let cached_count = candidates.len();
1537 let mut enqueued = 0usize;
1538 let mut skipped_age = 0usize;
1539 let mut skipped_configured = 0usize;
1540 let mut skipped_self = 0usize;
1541 let mut skipped_connected = 0usize;
1542 let mut skipped_retry_pending = 0usize;
1543 let mut skipped_connecting = 0usize;
1544 let mut skipped_no_endpoints = 0usize;
1545 let mut skipped_invalid_npub = 0usize;
1546 let mut skipped_cooldown = 0usize;
1547
1548 for (npub, endpoints, created_at_secs) in candidates {
1549 if enqueue_budget == 0 {
1550 break;
1551 }
1552
1553 if let Some(max_age) = max_age_secs
1554 && now_secs.saturating_sub(created_at_secs) > max_age
1555 {
1556 skipped_age = skipped_age.saturating_add(1);
1557 continue;
1558 }
1559
1560 if configured_npubs.contains(&npub) {
1561 skipped_configured = skipped_configured.saturating_add(1);
1562 continue;
1563 }
1564
1565 let peer_identity = match PeerIdentity::from_npub(&npub) {
1566 Ok(identity) => identity,
1567 Err(_) => {
1568 skipped_invalid_npub = skipped_invalid_npub.saturating_add(1);
1569 continue;
1570 }
1571 };
1572 let node_addr = *peer_identity.node_addr();
1573 if node_addr == *self.identity.node_addr() {
1574 skipped_self = skipped_self.saturating_add(1);
1575 continue;
1576 }
1577 if self.peers.contains_key(&node_addr) {
1578 skipped_connected = skipped_connected.saturating_add(1);
1579 continue;
1580 }
1581 if self.retry_pending.contains_key(&node_addr) {
1582 skipped_retry_pending = skipped_retry_pending.saturating_add(1);
1583 continue;
1584 }
1585 if bootstrap.cooldown_until(&npub, now_ms).is_some() {
1586 skipped_cooldown = skipped_cooldown.saturating_add(1);
1587 continue;
1588 }
1589 let connecting = self.connections.values().any(|conn| {
1590 conn.expected_identity()
1591 .map(|id| id.node_addr() == &node_addr)
1592 .unwrap_or(false)
1593 });
1594 if connecting {
1595 skipped_connecting = skipped_connecting.saturating_add(1);
1596 continue;
1597 }
1598
1599 let mut addresses = Vec::new();
1600 let mut priority = 120u8;
1601 for endpoint in endpoints {
1602 let Some(candidate) = Self::overlay_endpoint_to_peer_address(&endpoint, priority)
1603 else {
1604 continue;
1605 };
1606 if addresses.iter().any(|existing: &PeerAddress| {
1607 existing.transport == candidate.transport && existing.addr == candidate.addr
1608 }) {
1609 continue;
1610 }
1611 addresses.push(candidate);
1612 priority = priority.saturating_add(1);
1613 }
1614 if addresses.is_empty() {
1615 skipped_no_endpoints = skipped_no_endpoints.saturating_add(1);
1616 continue;
1617 }
1618
1619 self.peer_aliases
1620 .entry(node_addr)
1621 .or_insert_with(|| peer_identity.short_npub());
1622 self.register_identity(node_addr, peer_identity.pubkey_full());
1623
1624 let mut state = super::retry::RetryState::new(PeerConfig {
1625 npub: npub.clone(),
1626 alias: None,
1627 addresses,
1628 connect_policy: ConnectPolicy::AutoConnect,
1629 auto_reconnect: true,
1630 });
1631 state.reconnect = false;
1632 state.retry_after_ms = now_ms;
1633 state.expires_at_ms = Some(self.open_discovery_retry_expires_at_ms(now_ms));
1634 self.retry_pending.insert(node_addr, state);
1635 info!(
1636 caller = %caller,
1637 peer = %peer_identity.short_npub(),
1638 advert_age_secs = now_secs.saturating_sub(created_at_secs),
1639 "open-discovery sweep: queued retry for cached advert"
1640 );
1641 enqueue_budget = enqueue_budget.saturating_sub(1);
1642 enqueued = enqueued.saturating_add(1);
1643 }
1644
1645 let total_skipped = skipped_age
1649 + skipped_configured
1650 + skipped_self
1651 + skipped_connected
1652 + skipped_retry_pending
1653 + skipped_connecting
1654 + skipped_no_endpoints
1655 + skipped_invalid_npub
1656 + skipped_cooldown;
1657 let should_summarize = caller == "startup" || enqueued > 0;
1658 if should_summarize {
1659 info!(
1660 caller = %caller,
1661 cached = cached_count,
1662 queued = enqueued,
1663 skipped_age = skipped_age,
1664 skipped_configured = skipped_configured,
1665 skipped_self = skipped_self,
1666 skipped_connected = skipped_connected,
1667 skipped_retry_pending = skipped_retry_pending,
1668 skipped_connecting = skipped_connecting,
1669 skipped_no_endpoints = skipped_no_endpoints,
1670 skipped_invalid_npub = skipped_invalid_npub,
1671 skipped_cooldown = skipped_cooldown,
1672 skipped_total = total_skipped,
1673 "open-discovery sweep complete"
1674 );
1675 }
1676 }
1677
1678 async fn maybe_run_startup_open_discovery_sweep(
1686 &mut self,
1687 bootstrap: &std::sync::Arc<NostrDiscovery>,
1688 ) {
1689 if self.startup_open_discovery_sweep_done {
1690 return;
1691 }
1692 if !self.config.node.discovery.nostr.enabled
1693 || self.config.node.discovery.nostr.policy != crate::config::NostrDiscoveryPolicy::Open
1694 {
1695 self.startup_open_discovery_sweep_done = true;
1697 return;
1698 }
1699 let Some(started_at_ms) = self.nostr_discovery_started_at_ms else {
1700 return;
1701 };
1702 let now_ms = Self::now_ms();
1703 let delay_ms = self
1704 .config
1705 .node
1706 .discovery
1707 .nostr
1708 .startup_sweep_delay_secs
1709 .saturating_mul(1000);
1710 if now_ms < started_at_ms.saturating_add(delay_ms) {
1711 return;
1712 }
1713
1714 let max_age_secs = self.config.node.discovery.nostr.startup_sweep_max_age_secs;
1715 self.run_open_discovery_sweep(bootstrap, Some(max_age_secs), "startup")
1716 .await;
1717 self.startup_open_discovery_sweep_done = true;
1718 }
1719
1720 fn available_outbound_slots(&self) -> usize {
1721 let connection_used = self
1722 .connections
1723 .len()
1724 .saturating_add(self.pending_connects.len());
1725 let connection_slots = if self.max_connections == 0 {
1726 usize::MAX
1727 } else {
1728 self.max_connections.saturating_sub(connection_used)
1729 };
1730
1731 let peer_slots = if self.max_peers == 0 {
1732 usize::MAX
1733 } else {
1734 self.max_peers.saturating_sub(self.peers.len())
1735 };
1736
1737 connection_slots.min(peer_slots)
1738 }
1739
1740 fn open_discovery_enqueue_budget(&self, configured_npubs: &HashSet<String>) -> usize {
1741 let current_open_discovery_pending = self
1742 .retry_pending
1743 .values()
1744 .filter(|state| !configured_npubs.contains(&state.peer_config.npub))
1745 .count();
1746
1747 let cap_remaining = self
1748 .config
1749 .node
1750 .discovery
1751 .nostr
1752 .open_discovery_max_pending
1753 .saturating_sub(current_open_discovery_pending);
1754
1755 cap_remaining.min(self.available_outbound_slots())
1756 }
1757
1758 fn open_discovery_retry_expires_at_ms(&self, now_ms: u64) -> u64 {
1759 now_ms.saturating_add(
1760 self.config
1761 .node
1762 .discovery
1763 .nostr
1764 .advert_ttl_secs
1765 .saturating_mul(1000)
1766 .saturating_mul(OPEN_DISCOVERY_RETRY_LIFETIME_MULTIPLIER),
1767 )
1768 }
1769
1770 async fn build_overlay_advert(
1771 &self,
1772 bootstrap: &std::sync::Arc<NostrDiscovery>,
1773 ) -> Option<OverlayAdvert> {
1774 if !self.config.node.discovery.nostr.enabled {
1775 return None;
1776 }
1777
1778 let mut endpoints = Vec::new();
1779 let mut has_udp_nat = false;
1780
1781 for handle in self.transports.values() {
1782 if !handle.is_operational() {
1783 continue;
1784 }
1785
1786 match handle.transport_type().name {
1787 "udp" => {
1788 let Some(cfg) = self.lookup_udp_config(handle.name()) else {
1789 continue;
1790 };
1791 if !cfg.advertise_on_nostr() {
1792 continue;
1793 }
1794 if cfg.is_public() {
1795 if let Some(explicit) = cfg.external_advert_addr() {
1805 endpoints.push(OverlayEndpointAdvert {
1806 transport: OverlayTransportKind::Udp,
1807 addr: explicit.to_string(),
1808 });
1809 } else {
1810 match handle.local_addr() {
1811 Some(addr)
1812 if !addr.ip().is_unspecified()
1813 && !is_unroutable_advert_ip(addr.ip()) =>
1814 {
1815 endpoints.push(OverlayEndpointAdvert {
1816 transport: OverlayTransportKind::Udp,
1817 addr: addr.to_string(),
1818 });
1819 }
1820 Some(addr) => {
1821 let key = handle.transport_id().as_u32();
1822 let port = addr.port();
1823 if let Some(public) =
1824 bootstrap.learn_public_udp_addr(key, port).await
1825 {
1826 endpoints.push(OverlayEndpointAdvert {
1827 transport: OverlayTransportKind::Udp,
1828 addr: public.to_string(),
1829 });
1830 } else {
1831 warn!(
1832 transport_id = key,
1833 bind_addr = %addr,
1834 "advert: udp public=true but bind is wildcard \
1835 or private and STUN observation failed; \
1836 advertising no UDP endpoint. Either set \
1837 transports.udp.external_addr, bind to a \
1838 specific *public* IP, or ensure \
1839 node.discovery.nostr.stun_servers is reachable"
1840 );
1841 }
1842 }
1843 None => {}
1844 }
1845 }
1846 } else {
1847 endpoints.push(OverlayEndpointAdvert {
1848 transport: OverlayTransportKind::Udp,
1849 addr: "nat".to_string(),
1850 });
1851 has_udp_nat = true;
1852 }
1853 }
1854 "tcp" => {
1855 let Some(cfg) = self.lookup_tcp_config(handle.name()) else {
1856 continue;
1857 };
1858 if !cfg.advertise_on_nostr() {
1859 continue;
1860 }
1861 if let Some(explicit) = cfg.external_advert_addr() {
1873 endpoints.push(OverlayEndpointAdvert {
1874 transport: OverlayTransportKind::Tcp,
1875 addr: explicit.to_string(),
1876 });
1877 } else {
1878 match handle.local_addr() {
1879 Some(addr)
1880 if !addr.ip().is_unspecified()
1881 && !is_unroutable_advert_ip(addr.ip()) =>
1882 {
1883 endpoints.push(OverlayEndpointAdvert {
1884 transport: OverlayTransportKind::Tcp,
1885 addr: addr.to_string(),
1886 });
1887 }
1888 Some(addr) => {
1889 warn!(
1890 bind_addr = %addr,
1891 "advert: tcp advertise_on_nostr=true bound to wildcard \
1892 or private IP and no transports.tcp.external_addr set; \
1893 advertising no TCP endpoint. Either set external_addr \
1894 to the public IP (recommended for cloud 1:1-NAT setups) \
1895 or bind explicitly to the public IP"
1896 );
1897 }
1898 None => {}
1899 }
1900 }
1901 }
1902 "tor" => {
1903 let Some(cfg) = self.lookup_tor_config(handle.name()) else {
1904 continue;
1905 };
1906 if !cfg.advertise_on_nostr() {
1907 continue;
1908 }
1909 if let Some(addr) = handle.onion_address() {
1910 endpoints.push(OverlayEndpointAdvert {
1911 transport: OverlayTransportKind::Tor,
1912 addr: format!("{}:{}", addr, cfg.advertised_port()),
1913 });
1914 }
1915 }
1916 _ => {}
1917 }
1918 }
1919
1920 if endpoints.is_empty() {
1921 return None;
1922 }
1923
1924 Some(OverlayAdvert {
1925 identifier: ADVERT_IDENTIFIER.to_string(),
1926 version: ADVERT_VERSION,
1927 endpoints,
1928 signal_relays: has_udp_nat.then(|| self.config.node.discovery.nostr.dm_relays.clone()),
1929 stun_servers: has_udp_nat
1930 .then(|| self.config.node.discovery.nostr.stun_servers.clone()),
1931 })
1932 }
1933
1934 async fn refresh_overlay_advert(
1935 &self,
1936 bootstrap: &std::sync::Arc<NostrDiscovery>,
1937 ) -> Result<(), crate::discovery::nostr::BootstrapError> {
1938 let advert = self.build_overlay_advert(bootstrap).await;
1939 bootstrap.update_local_advert(advert).await
1940 }
1941
1942 fn lookup_udp_config(&self, transport_name: Option<&str>) -> Option<&crate::config::UdpConfig> {
1943 match (&self.config.transports.udp, transport_name) {
1944 (crate::config::TransportInstances::Single(cfg), None) => Some(cfg),
1945 (crate::config::TransportInstances::Named(configs), Some(name)) => configs.get(name),
1946 _ => None,
1947 }
1948 }
1949
1950 fn lookup_tcp_config(&self, transport_name: Option<&str>) -> Option<&crate::config::TcpConfig> {
1951 match (&self.config.transports.tcp, transport_name) {
1952 (crate::config::TransportInstances::Single(cfg), None) => Some(cfg),
1953 (crate::config::TransportInstances::Named(configs), Some(name)) => configs.get(name),
1954 _ => None,
1955 }
1956 }
1957
1958 fn lookup_tor_config(&self, transport_name: Option<&str>) -> Option<&crate::config::TorConfig> {
1959 match (&self.config.transports.tor, transport_name) {
1960 (crate::config::TransportInstances::Single(cfg), None) => Some(cfg),
1961 (crate::config::TransportInstances::Named(configs), Some(name)) => configs.get(name),
1962 _ => None,
1963 }
1964 }
1965
1966 pub(in crate::node) async fn try_peer_addresses(
1967 &mut self,
1968 peer_config: &PeerConfig,
1969 peer_identity: PeerIdentity,
1970 allow_bootstrap_nat: bool,
1971 ) -> Result<(), NodeError> {
1972 let peer_node_addr = *peer_identity.node_addr();
1973 if self.peers.contains_key(&peer_node_addr) {
1974 debug!(
1975 npub = %peer_config.npub,
1976 "Peer already exists, skipping address attempts"
1977 );
1978 return Ok(());
1979 }
1980 if self.is_connecting_to_peer(&peer_node_addr) {
1981 debug!(
1982 npub = %peer_config.npub,
1983 "Connection already in progress, skipping address attempts"
1984 );
1985 return Ok(());
1986 }
1987
1988 let static_addresses = self.static_peer_addresses(peer_config);
1991 if self
1992 .attempt_peer_address_list(
1993 peer_config,
1994 peer_identity,
1995 allow_bootstrap_nat,
1996 &static_addresses,
1997 )
1998 .await
1999 .is_ok()
2000 {
2001 return Ok(());
2002 }
2003
2004 {
2005 let fallback = self
2006 .nostr_peer_fallback_addresses(peer_config, &static_addresses)
2007 .await;
2008 if !fallback.is_empty()
2009 && self
2010 .attempt_peer_address_list(
2011 peer_config,
2012 peer_identity,
2013 allow_bootstrap_nat,
2014 &fallback,
2015 )
2016 .await
2017 .is_ok()
2018 {
2019 return Ok(());
2020 }
2021 }
2022
2023 Err(NodeError::NoTransportForType(format!(
2024 "no operational transport for any of {}'s addresses",
2025 peer_config.npub
2026 )))
2027 }
2028
2029 pub(crate) async fn api_connect(
2037 &mut self,
2038 npub: &str,
2039 address: &str,
2040 transport: &str,
2041 ) -> Result<serde_json::Value, String> {
2042 let peer_config = PeerConfig {
2043 npub: npub.to_string(),
2044 alias: None,
2045 addresses: vec![PeerAddress::new(transport, address)],
2046 connect_policy: ConnectPolicy::Manual,
2047 auto_reconnect: false,
2048 };
2049
2050 if let Ok(identity) = PeerIdentity::from_npub(npub) {
2052 self.peer_aliases
2053 .insert(*identity.node_addr(), identity.short_npub());
2054 self.register_identity(*identity.node_addr(), identity.pubkey_full());
2055 }
2056
2057 self.initiate_peer_connection(&peer_config)
2058 .await
2059 .map(|()| {
2060 info!(
2061 npub = %npub,
2062 address = %address,
2063 transport = %transport,
2064 "API connect initiated"
2065 );
2066 serde_json::json!({
2067 "npub": npub,
2068 "address": address,
2069 "transport": transport,
2070 })
2071 })
2072 .map_err(|e| e.to_string())
2073 }
2074
2075 pub(crate) fn api_disconnect(&mut self, npub: &str) -> Result<serde_json::Value, String> {
2079 let peer_identity =
2080 PeerIdentity::from_npub(npub).map_err(|e| format!("invalid npub '{npub}': {e}"))?;
2081 let node_addr = *peer_identity.node_addr();
2082
2083 if !self.peers.contains_key(&node_addr) {
2084 return Err(format!("peer not found: {npub}"));
2085 }
2086
2087 self.remove_active_peer(&node_addr);
2089
2090 self.retry_pending.remove(&node_addr);
2092
2093 info!(npub = %npub, "API disconnect completed");
2094
2095 Ok(serde_json::json!({
2096 "npub": npub,
2097 "disconnected": true,
2098 }))
2099 }
2100
2101 pub async fn adopt_established_traversal(
2108 &mut self,
2109 traversal: EstablishedTraversal,
2110 ) -> Result<BootstrapHandoffResult, NodeError> {
2111 debug!(
2112 peer_npub = %traversal.peer_npub,
2113 session_id = %traversal.session_id,
2114 remote_addr = %traversal.remote_addr,
2115 "adopting established traversal socket"
2116 );
2117
2118 if !self.state.is_operational() {
2119 return Err(NodeError::NotStarted);
2120 }
2121
2122 let packet_tx = self.packet_tx.clone().ok_or(NodeError::NotStarted)?;
2123 let peer_identity = PeerIdentity::from_npub(&traversal.peer_npub).map_err(|e| {
2124 NodeError::InvalidPeerNpub {
2125 npub: traversal.peer_npub.clone(),
2126 reason: e.to_string(),
2127 }
2128 })?;
2129 let peer_node_addr = *peer_identity.node_addr();
2130 if self.peers.contains_key(&peer_node_addr) {
2131 debug!(
2132 peer_npub = %traversal.peer_npub,
2133 "Ignoring NAT traversal handoff for already-connected peer"
2134 );
2135 return Err(NodeError::PeerAlreadyExists(peer_node_addr));
2136 }
2137 if self.is_connecting_to_peer(&peer_node_addr) {
2138 debug!(
2139 peer_npub = %traversal.peer_npub,
2140 "Ignoring NAT traversal handoff while peer handshake is already in progress"
2141 );
2142 return Err(NodeError::PeerAlreadyExists(peer_node_addr));
2143 }
2144
2145 self.peer_aliases
2146 .insert(peer_node_addr, peer_identity.short_npub());
2147 self.register_identity(peer_node_addr, peer_identity.pubkey_full());
2148
2149 let transport_id = self.allocate_transport_id();
2150 let inherited_config = traversal.transport_config.clone().unwrap_or_else(|| {
2170 let mut cfg = self
2171 .lookup_udp_config(traversal.transport_name.as_deref())
2172 .or_else(|| self.lookup_udp_config(None))
2173 .cloned()
2174 .unwrap_or_default();
2175 cfg.bind_addr = None;
2176 cfg.external_addr = None;
2177 cfg
2178 });
2179 let mut transport = crate::transport::udp::UdpTransport::new(
2180 transport_id,
2181 traversal.transport_name.clone(),
2182 inherited_config,
2183 packet_tx,
2184 );
2185
2186 transport
2187 .adopt_socket_async(traversal.socket)
2188 .await
2189 .map_err(|e| NodeError::BootstrapHandoff(e.to_string()))?;
2190
2191 let local_addr = transport.local_addr().ok_or_else(|| {
2192 NodeError::BootstrapHandoff("adopted UDP transport has no local address".into())
2193 })?;
2194
2195 self.transports.insert(
2196 transport_id,
2197 crate::transport::TransportHandle::Udp(transport),
2198 );
2199 self.bootstrap_transports.insert(transport_id);
2200 self.bootstrap_transport_npubs
2201 .insert(transport_id, traversal.peer_npub.clone());
2202
2203 let remote_addr = TransportAddr::from_string(&traversal.remote_addr.to_string());
2204 if let Err(err) = self
2205 .initiate_connection(transport_id, remote_addr.clone(), peer_identity)
2206 .await
2207 {
2208 self.bootstrap_transports.remove(&transport_id);
2209 self.bootstrap_transport_npubs.remove(&transport_id);
2210 if let Some(mut handle) = self.transports.remove(&transport_id) {
2211 let _ = handle.stop().await;
2212 }
2213 return Err(err);
2214 }
2215
2216 info!(
2217 peer = %self.peer_display_name(&peer_node_addr),
2218 transport_id = %transport_id,
2219 local_addr = %local_addr,
2220 remote_addr = %traversal.remote_addr,
2221 session_id = %traversal.session_id,
2222 "adopted NAT traversal socket; handshake initiated"
2223 );
2224
2225 Ok(BootstrapHandoffResult {
2226 transport_id,
2227 local_addr,
2228 remote_addr: traversal.remote_addr,
2229 peer_node_addr,
2230 session_id: traversal.session_id,
2231 })
2232 }
2233}