1use crate::config::{NostrDiscoveryPolicy, TransportInstances, UdpConfig};
7use crate::node::{
8 NodeEndpointCommand, NodeEndpointEvent, NodeEndpointPeer, NodeEndpointRelayStatus,
9};
10use crate::{
11 Config, FipsAddress, IdentityConfig, Node, NodeAddr, NodeDeliveredPacket, NodeError,
12 PeerIdentity,
13};
14use std::sync::Arc;
15use thiserror::Error;
16use tokio::sync::{Mutex, mpsc, oneshot};
17use tokio::task::JoinHandle;
18
19#[cfg(debug_assertions)]
20fn endpoint_debug_log(message: impl AsRef<str>) {
21 use std::io::Write as _;
22
23 if let Ok(mut file) = std::fs::OpenOptions::new()
24 .create(true)
25 .append(true)
26 .open(std::env::temp_dir().join("nvpn-fips-endpoint-debug.log"))
27 {
28 let _ = writeln!(
29 file,
30 "{:?} {}",
31 std::time::SystemTime::now(),
32 message.as_ref()
33 );
34 }
35}
36
37#[cfg(not(debug_assertions))]
38fn endpoint_debug_log(_message: impl AsRef<str>) {}
39
40#[derive(Debug, Error)]
42pub enum FipsEndpointError {
43 #[error("node error: {0}")]
44 Node(#[from] NodeError),
45
46 #[error("endpoint task failed: {0}")]
47 TaskJoin(#[from] tokio::task::JoinError),
48
49 #[error("endpoint is closed")]
50 Closed,
51
52 #[error("invalid remote npub '{npub}': {reason}")]
53 InvalidRemoteNpub { npub: String, reason: String },
54}
55
56#[derive(Debug, Clone, PartialEq, Eq)]
58pub struct FipsEndpointMessage {
59 pub source_node_addr: NodeAddr,
61 pub source_npub: Option<String>,
63 pub data: Vec<u8>,
65}
66
67#[derive(Debug, Clone, Default, PartialEq, Eq)]
69pub struct UpdatePeersOutcome {
70 pub added: usize,
73 pub removed: usize,
77 pub updated: usize,
82 pub unchanged: usize,
84}
85
86impl From<crate::node::UpdatePeersOutcome> for UpdatePeersOutcome {
87 fn from(value: crate::node::UpdatePeersOutcome) -> Self {
88 Self {
89 added: value.added,
90 removed: value.removed,
91 updated: value.updated,
92 unchanged: value.unchanged,
93 }
94 }
95}
96
97#[derive(Debug, Clone, PartialEq, Eq)]
99pub struct FipsEndpointPeer {
100 pub npub: String,
102 pub transport_addr: Option<String>,
104 pub transport_type: Option<String>,
106 pub link_id: u64,
108 pub srtt_ms: Option<u64>,
110 pub packets_sent: u64,
112 pub packets_recv: u64,
114 pub bytes_sent: u64,
116 pub bytes_recv: u64,
118}
119
120#[derive(Debug, Clone, PartialEq, Eq)]
122pub struct FipsEndpointRelayStatus {
123 pub url: String,
124 pub status: String,
125}
126
127#[derive(Debug, Clone)]
129pub struct FipsEndpointBuilder {
130 config: Config,
131 identity_nsec: Option<String>,
132 discovery_scope: Option<String>,
133 disable_system_networking: bool,
134 packet_channel_capacity: usize,
135}
136
137impl Default for FipsEndpointBuilder {
138 fn default() -> Self {
139 Self {
140 config: Config::new(),
141 identity_nsec: None,
142 discovery_scope: None,
143 disable_system_networking: true,
144 packet_channel_capacity: 1024,
145 }
146 }
147}
148
149impl FipsEndpointBuilder {
150 pub fn config(mut self, config: Config) -> Self {
152 self.config = config;
153 self
154 }
155
156 pub fn identity_nsec(mut self, nsec: impl Into<String>) -> Self {
158 self.identity_nsec = Some(nsec.into());
159 self
160 }
161
162 pub fn discovery_scope(mut self, scope: impl Into<String>) -> Self {
170 self.discovery_scope = Some(scope.into());
171 self
172 }
173
174 pub fn without_system_tun(mut self) -> Self {
176 self.disable_system_networking = true;
177 self
178 }
179
180 pub fn packet_channel_capacity(mut self, capacity: usize) -> Self {
182 self.packet_channel_capacity = capacity.max(1);
183 self
184 }
185
186 fn prepared_config(&self) -> Config {
187 let mut config = self.config.clone();
188 if let Some(nsec) = &self.identity_nsec {
189 config.node.identity = IdentityConfig {
190 nsec: Some(nsec.clone()),
191 persistent: false,
192 };
193 }
194 if self.disable_system_networking {
195 config.tun.enabled = false;
196 config.dns.enabled = false;
197 config.node.system_files_enabled = false;
198 }
199 if let Some(scope) = self.discovery_scope.as_deref() {
200 config.node.discovery.lan.scope = Some(scope.to_string());
201 apply_default_scoped_discovery(&mut config, scope);
202 }
203 config
204 }
205
206 pub async fn bind(self) -> Result<FipsEndpoint, FipsEndpointError> {
208 endpoint_debug_log("FipsEndpointBuilder::bind begin");
209 let config = self.prepared_config();
210 endpoint_debug_log("FipsEndpointBuilder::bind config prepared");
211
212 let mut node = Node::new(config)?;
213 endpoint_debug_log("FipsEndpointBuilder::bind node created");
214 let npub = node.npub();
215 let node_addr = *node.node_addr();
216 let address = *node.identity().address();
217 let packet_io = node.attach_external_packet_io(self.packet_channel_capacity)?;
218 endpoint_debug_log("FipsEndpointBuilder::bind packet io attached");
219 let endpoint_data_io = node.attach_endpoint_data_io(self.packet_channel_capacity)?;
220 endpoint_debug_log("FipsEndpointBuilder::bind endpoint data io attached");
221 endpoint_debug_log("FipsEndpointBuilder::bind node.start begin");
222 node.start().await?;
223 endpoint_debug_log("FipsEndpointBuilder::bind node.start complete");
224
225 let (shutdown_tx, shutdown_rx) = oneshot::channel();
226 let task = spawn_node_task(node, shutdown_rx);
227 endpoint_debug_log("FipsEndpointBuilder::bind node task spawned");
228 let endpoint_commands = endpoint_data_io.command_tx;
229
230 Ok(FipsEndpoint {
231 npub,
232 node_addr,
233 address,
234 discovery_scope: self.discovery_scope,
235 outbound_packets: packet_io.outbound_tx,
236 delivered_packets: Arc::new(Mutex::new(packet_io.inbound_rx)),
237 endpoint_commands,
238 inbound_endpoint_tx: endpoint_data_io.event_tx,
239 inbound_endpoint_rx: Arc::new(Mutex::new(endpoint_data_io.event_rx)),
240 peer_identity_cache: std::sync::Mutex::new(std::collections::HashMap::new()),
241 shutdown_tx: Some(shutdown_tx),
242 task,
243 })
244 }
245}
246
247fn apply_default_scoped_discovery(config: &mut Config, scope: &str) {
248 if config.node.discovery.nostr.enabled || !config.transports.is_empty() {
249 return;
250 }
251
252 config.node.discovery.nostr.enabled = true;
253 config.node.discovery.nostr.advertise = true;
254 config.node.discovery.nostr.policy = NostrDiscoveryPolicy::Open;
255 config.node.discovery.nostr.share_local_candidates = true;
256 config.node.discovery.nostr.app = format!("fips-overlay-v1:{scope}");
257 config.node.discovery.lan.scope = Some(scope.to_string());
258 config.transports.udp = TransportInstances::Single(UdpConfig {
259 bind_addr: Some("0.0.0.0:0".to_string()),
260 advertise_on_nostr: Some(true),
261 public: Some(false),
262 outbound_only: Some(false),
263 accept_connections: Some(true),
264 ..UdpConfig::default()
265 });
266}
267
268fn spawn_node_task(
269 mut node: Node,
270 shutdown_rx: oneshot::Receiver<()>,
271) -> JoinHandle<Result<(), NodeError>> {
272 tokio::spawn(async move {
273 tokio::pin!(shutdown_rx);
274 let loop_result = tokio::select! {
275 result = node.run_rx_loop() => result,
276 _ = &mut shutdown_rx => Ok(()),
277 };
278 let stop_result = if node.state().can_stop() {
279 node.stop().await
280 } else {
281 Ok(())
282 };
283 loop_result?;
284 stop_result
285 })
286}
287
288pub struct FipsEndpoint {
290 npub: String,
291 node_addr: NodeAddr,
292 address: FipsAddress,
293 discovery_scope: Option<String>,
294 outbound_packets: mpsc::Sender<Vec<u8>>,
295 delivered_packets: Arc<Mutex<mpsc::Receiver<NodeDeliveredPacket>>>,
296 endpoint_commands: mpsc::Sender<NodeEndpointCommand>,
297 inbound_endpoint_tx: mpsc::UnboundedSender<NodeEndpointEvent>,
303 inbound_endpoint_rx: Arc<Mutex<mpsc::UnboundedReceiver<NodeEndpointEvent>>>,
309 peer_identity_cache: std::sync::Mutex<std::collections::HashMap<String, PeerIdentity>>,
314 shutdown_tx: Option<oneshot::Sender<()>>,
315 task: JoinHandle<Result<(), NodeError>>,
316}
317
318impl FipsEndpoint {
319 pub fn builder() -> FipsEndpointBuilder {
321 FipsEndpointBuilder::default()
322 }
323
324 pub fn npub(&self) -> &str {
326 &self.npub
327 }
328
329 pub fn node_addr(&self) -> &NodeAddr {
331 &self.node_addr
332 }
333
334 pub fn address(&self) -> FipsAddress {
336 self.address
337 }
338
339 pub fn discovery_scope(&self) -> Option<&str> {
341 self.discovery_scope.as_deref()
342 }
343
344 pub async fn send(
357 &self,
358 remote_npub: impl Into<String>,
359 data: impl Into<Vec<u8>>,
360 ) -> Result<(), FipsEndpointError> {
361 let remote_npub = remote_npub.into();
362 let data = data.into();
363 if remote_npub == self.npub {
364 self.inbound_endpoint_tx
365 .send(NodeEndpointEvent::Data {
366 source_node_addr: self.node_addr,
367 source_npub: Some(self.npub.clone()),
368 payload: data,
369 queued_at: crate::perf_profile::stamp(),
370 })
371 .map_err(|_| FipsEndpointError::Closed)?;
372 return Ok(());
373 }
374
375 let remote = self.resolve_peer_identity(&remote_npub)?;
376
377 self.endpoint_commands
382 .send(NodeEndpointCommand::SendOneway {
383 remote,
384 payload: data,
385 queued_at: crate::perf_profile::stamp(),
386 })
387 .await
388 .map_err(|_| FipsEndpointError::Closed)?;
389 Ok(())
390 }
391
392 fn resolve_peer_identity(&self, remote_npub: &str) -> Result<PeerIdentity, FipsEndpointError> {
393 if let Ok(cache) = self.peer_identity_cache.lock()
396 && let Some(remote) = cache.get(remote_npub)
397 {
398 return Ok(*remote);
399 }
400
401 let remote = PeerIdentity::from_npub(remote_npub).map_err(|error| {
402 FipsEndpointError::InvalidRemoteNpub {
403 npub: remote_npub.to_string(),
404 reason: error.to_string(),
405 }
406 })?;
407
408 if let Ok(mut cache) = self.peer_identity_cache.lock() {
409 cache.entry(remote_npub.to_string()).or_insert(remote);
410 }
411 Ok(remote)
412 }
413
414 pub async fn recv(&self) -> Option<FipsEndpointMessage> {
421 let event = self.inbound_endpoint_rx.lock().await.recv().await?;
422 let NodeEndpointEvent::Data {
423 source_node_addr,
424 source_npub,
425 payload,
426 queued_at,
427 } = event;
428 crate::perf_profile::record_since(crate::perf_profile::Stage::EndpointEventWait, queued_at);
429 Some(FipsEndpointMessage {
430 source_node_addr,
431 source_npub,
432 data: payload,
433 })
434 }
435
436 pub fn blocking_send(
446 &self,
447 remote_npub: impl Into<String>,
448 data: impl Into<Vec<u8>>,
449 ) -> Result<(), FipsEndpointError> {
450 let remote_npub = remote_npub.into();
451 let data = data.into();
452 if remote_npub == self.npub {
453 self.inbound_endpoint_tx
454 .send(NodeEndpointEvent::Data {
455 source_node_addr: self.node_addr,
456 source_npub: Some(self.npub.clone()),
457 payload: data,
458 queued_at: crate::perf_profile::stamp(),
459 })
460 .map_err(|_| FipsEndpointError::Closed)?;
461 return Ok(());
462 }
463 let remote = self.resolve_peer_identity(&remote_npub)?;
464 let (response_tx, _response_rx) = oneshot::channel();
465 self.endpoint_commands
466 .blocking_send(NodeEndpointCommand::Send {
467 remote,
468 payload: data,
469 queued_at: crate::perf_profile::stamp(),
470 response_tx,
471 })
472 .map_err(|_| FipsEndpointError::Closed)?;
473 Ok(())
474 }
475
476 pub fn blocking_recv(&self) -> Option<FipsEndpointMessage> {
492 let mut rx = self.inbound_endpoint_rx.blocking_lock();
493 let event = rx.blocking_recv()?;
494 let NodeEndpointEvent::Data {
495 source_node_addr,
496 source_npub,
497 payload,
498 queued_at,
499 } = event;
500 crate::perf_profile::record_since(crate::perf_profile::Stage::EndpointEventWait, queued_at);
501 Some(FipsEndpointMessage {
502 source_node_addr,
503 source_npub,
504 data: payload,
505 })
506 }
507
508 pub fn try_recv(&self) -> Option<FipsEndpointMessage> {
528 let mut rx = self.inbound_endpoint_rx.try_lock().ok()?;
529 let event = rx.try_recv().ok()?;
530 let NodeEndpointEvent::Data {
531 source_node_addr,
532 source_npub,
533 payload,
534 queued_at,
535 } = event;
536 crate::perf_profile::record_since(crate::perf_profile::Stage::EndpointEventWait, queued_at);
537 Some(FipsEndpointMessage {
538 source_node_addr,
539 source_npub,
540 data: payload,
541 })
542 }
543
544 pub async fn update_peers(
554 &self,
555 peers: Vec<crate::config::PeerConfig>,
556 ) -> Result<UpdatePeersOutcome, FipsEndpointError> {
557 let (response_tx, response_rx) = oneshot::channel();
558 self.endpoint_commands
559 .send(NodeEndpointCommand::UpdatePeers { peers, response_tx })
560 .await
561 .map_err(|_| FipsEndpointError::Closed)?;
562
563 match response_rx.await.map_err(|_| FipsEndpointError::Closed)? {
564 Ok(outcome) => Ok(UpdatePeersOutcome::from(outcome)),
565 Err(error) => Err(FipsEndpointError::Node(error)),
566 }
567 }
568
569 pub async fn peers(&self) -> Result<Vec<FipsEndpointPeer>, FipsEndpointError> {
571 let (response_tx, response_rx) = oneshot::channel();
572 self.endpoint_commands
573 .send(NodeEndpointCommand::PeerSnapshot { response_tx })
574 .await
575 .map_err(|_| FipsEndpointError::Closed)?;
576
577 response_rx
578 .await
579 .map(|peers| peers.into_iter().map(FipsEndpointPeer::from).collect())
580 .map_err(|_| FipsEndpointError::Closed)
581 }
582
583 pub async fn relay_statuses(&self) -> Result<Vec<FipsEndpointRelayStatus>, FipsEndpointError> {
585 let (response_tx, response_rx) = oneshot::channel();
586 self.endpoint_commands
587 .send(NodeEndpointCommand::RelaySnapshot { response_tx })
588 .await
589 .map_err(|_| FipsEndpointError::Closed)?;
590
591 response_rx
592 .await
593 .map(|relays| {
594 relays
595 .into_iter()
596 .map(FipsEndpointRelayStatus::from)
597 .collect()
598 })
599 .map_err(|_| FipsEndpointError::Closed)
600 }
601
602 pub async fn update_relays(
604 &self,
605 advert_relays: Vec<String>,
606 dm_relays: Vec<String>,
607 ) -> Result<(), FipsEndpointError> {
608 let (response_tx, response_rx) = oneshot::channel();
609 self.endpoint_commands
610 .send(NodeEndpointCommand::UpdateRelays {
611 advert_relays,
612 dm_relays,
613 response_tx,
614 })
615 .await
616 .map_err(|_| FipsEndpointError::Closed)?;
617
618 response_rx
619 .await
620 .map_err(|_| FipsEndpointError::Closed)?
621 .map_err(FipsEndpointError::Node)
622 }
623
624 pub async fn send_ip_packet(
626 &self,
627 packet: impl Into<Vec<u8>>,
628 ) -> Result<(), FipsEndpointError> {
629 self.outbound_packets
630 .send(packet.into())
631 .await
632 .map_err(|_| FipsEndpointError::Closed)
633 }
634
635 pub async fn recv_ip_packet(&self) -> Option<NodeDeliveredPacket> {
637 self.delivered_packets.lock().await.recv().await
638 }
639
640 pub async fn shutdown(mut self) -> Result<(), FipsEndpointError> {
642 if let Some(shutdown_tx) = self.shutdown_tx.take() {
643 let _ = shutdown_tx.send(());
644 }
645 self.task.await??;
646 Ok(())
647 }
648}
649
650impl From<NodeEndpointPeer> for FipsEndpointPeer {
651 fn from(peer: NodeEndpointPeer) -> Self {
652 Self {
653 npub: peer.npub,
654 transport_addr: peer.transport_addr,
655 transport_type: peer.transport_type,
656 link_id: peer.link_id,
657 srtt_ms: peer.srtt_ms,
658 packets_sent: peer.packets_sent,
659 packets_recv: peer.packets_recv,
660 bytes_sent: peer.bytes_sent,
661 bytes_recv: peer.bytes_recv,
662 }
663 }
664}
665
666impl From<NodeEndpointRelayStatus> for FipsEndpointRelayStatus {
667 fn from(relay: NodeEndpointRelayStatus) -> Self {
668 Self {
669 url: relay.url,
670 status: relay.status,
671 }
672 }
673}
674
675#[cfg(test)]
676mod tests {
677 use super::*;
678 use std::time::Duration;
679
680 #[tokio::test]
681 async fn endpoint_starts_without_system_tun() {
682 let endpoint = FipsEndpoint::builder()
683 .without_system_tun()
684 .bind()
685 .await
686 .expect("endpoint should bind");
687
688 assert!(!endpoint.npub().is_empty());
689 assert!(endpoint.discovery_scope().is_none());
690 endpoint.shutdown().await.expect("shutdown should succeed");
691 }
692
693 #[tokio::test]
694 async fn loopback_endpoint_data_roundtrips() {
695 let endpoint = FipsEndpoint::builder()
696 .without_system_tun()
697 .bind()
698 .await
699 .expect("endpoint should bind");
700
701 endpoint
702 .send(endpoint.npub().to_string(), b"ping".to_vec())
703 .await
704 .expect("loopback send should succeed");
705 let message = tokio::time::timeout(Duration::from_secs(1), endpoint.recv())
706 .await
707 .expect("recv should not time out")
708 .expect("message should arrive");
709 assert_eq!(message.source_node_addr, *endpoint.node_addr());
710 assert_eq!(message.source_npub, Some(endpoint.npub().to_string()));
711 assert_eq!(message.data, b"ping");
712 assert!(endpoint.discovery_scope().is_none());
713
714 endpoint.shutdown().await.expect("shutdown should succeed");
715 }
716
717 #[test]
718 fn discovery_scope_enables_default_scoped_udp_discovery() {
719 let config = FipsEndpoint::builder()
720 .discovery_scope("nostr-vpn:test")
721 .prepared_config();
722
723 assert!(!config.tun.enabled);
724 assert!(!config.dns.enabled);
725 assert!(!config.node.system_files_enabled);
726 assert!(config.node.discovery.nostr.enabled);
727 assert!(config.node.discovery.nostr.advertise);
728 assert_eq!(
729 config.node.discovery.nostr.policy,
730 NostrDiscoveryPolicy::Open
731 );
732 assert!(config.node.discovery.nostr.share_local_candidates);
733 assert_eq!(
734 config.node.discovery.nostr.app,
735 "fips-overlay-v1:nostr-vpn:test"
736 );
737 assert_eq!(
738 config.node.discovery.lan.scope.as_deref(),
739 Some("nostr-vpn:test")
740 );
741
742 let udp = match config.transports.udp {
743 TransportInstances::Single(udp) => udp,
744 TransportInstances::Named(_) => panic!("expected a default UDP transport"),
745 };
746 assert_eq!(udp.bind_addr(), "0.0.0.0:0");
747 assert!(udp.advertise_on_nostr());
748 assert!(!udp.is_public());
749 assert!(!udp.outbound_only());
750 assert!(udp.accept_connections());
751 }
752
753 #[test]
754 fn discovery_scope_preserves_explicit_connectivity_config() {
755 let mut explicit = Config::new();
756 explicit.node.discovery.nostr.enabled = true;
757 explicit.node.discovery.nostr.app = "custom-app".to_string();
758 explicit.node.discovery.nostr.policy = NostrDiscoveryPolicy::ConfiguredOnly;
759 explicit.node.discovery.nostr.share_local_candidates = false;
760 explicit.transports.udp = TransportInstances::Single(UdpConfig {
761 bind_addr: Some("127.0.0.1:34567".to_string()),
762 advertise_on_nostr: Some(false),
763 outbound_only: Some(true),
764 ..UdpConfig::default()
765 });
766
767 let config = FipsEndpoint::builder()
768 .config(explicit)
769 .discovery_scope("nostr-vpn:test")
770 .prepared_config();
771
772 assert_eq!(config.node.discovery.nostr.app, "custom-app");
773 assert_eq!(
774 config.node.discovery.nostr.policy,
775 NostrDiscoveryPolicy::ConfiguredOnly
776 );
777 assert!(!config.node.discovery.nostr.share_local_candidates);
778 assert_eq!(
779 config.node.discovery.lan.scope.as_deref(),
780 Some("nostr-vpn:test")
781 );
782 let udp = match config.transports.udp {
783 TransportInstances::Single(udp) => udp,
784 TransportInstances::Named(_) => panic!("expected explicit UDP transport"),
785 };
786 assert_eq!(udp.bind_addr.as_deref(), Some("127.0.0.1:34567"));
787 assert_eq!(udp.bind_addr(), "0.0.0.0:0");
788 assert!(!udp.advertise_on_nostr());
789 assert!(udp.outbound_only());
790 }
791
792 #[tokio::test]
793 async fn invalid_remote_npub_is_rejected() {
794 let endpoint = FipsEndpoint::builder()
795 .without_system_tun()
796 .bind()
797 .await
798 .expect("endpoint should bind");
799
800 let error = endpoint
801 .send("not-an-npub", b"hello".to_vec())
802 .await
803 .expect_err("invalid npub should fail");
804 assert!(matches!(error, FipsEndpointError::InvalidRemoteNpub { .. }));
805
806 endpoint.shutdown().await.expect("shutdown should succeed");
807 }
808
809 #[tokio::test]
810 async fn endpoint_peer_snapshot_starts_empty() {
811 let endpoint = FipsEndpoint::builder()
812 .without_system_tun()
813 .bind()
814 .await
815 .expect("endpoint should bind");
816
817 let peers = endpoint.peers().await.expect("peer snapshot");
818 assert!(peers.is_empty());
819
820 endpoint.shutdown().await.expect("shutdown should succeed");
821 }
822}