1use crate::config::{NostrDiscoveryPolicy, TransportInstances, UdpConfig};
7use crate::node::{NodeEndpointCommand, NodeEndpointEvent, NodeEndpointPeer};
8use crate::{
9 Config, FipsAddress, IdentityConfig, Node, NodeAddr, NodeDeliveredPacket, NodeError,
10 PeerIdentity,
11};
12use std::sync::Arc;
13use thiserror::Error;
14use tokio::sync::{Mutex, mpsc, oneshot};
15use tokio::task::JoinHandle;
16
17#[cfg(debug_assertions)]
18fn endpoint_debug_log(message: impl AsRef<str>) {
19 use std::io::Write as _;
20
21 if let Ok(mut file) = std::fs::OpenOptions::new()
22 .create(true)
23 .append(true)
24 .open(std::env::temp_dir().join("nvpn-fips-endpoint-debug.log"))
25 {
26 let _ = writeln!(
27 file,
28 "{:?} {}",
29 std::time::SystemTime::now(),
30 message.as_ref()
31 );
32 }
33}
34
35#[cfg(not(debug_assertions))]
36fn endpoint_debug_log(_message: impl AsRef<str>) {}
37
38#[derive(Debug, Error)]
40pub enum FipsEndpointError {
41 #[error("node error: {0}")]
42 Node(#[from] NodeError),
43
44 #[error("endpoint task failed: {0}")]
45 TaskJoin(#[from] tokio::task::JoinError),
46
47 #[error("endpoint is closed")]
48 Closed,
49
50 #[error("invalid remote npub '{npub}': {reason}")]
51 InvalidRemoteNpub { npub: String, reason: String },
52}
53
54#[derive(Debug, Clone, PartialEq, Eq)]
56pub struct FipsEndpointMessage {
57 pub source_node_addr: NodeAddr,
59 pub source_npub: Option<String>,
61 pub data: Vec<u8>,
63}
64
65#[derive(Debug, Clone, Default, PartialEq, Eq)]
67pub struct UpdatePeersOutcome {
68 pub added: usize,
71 pub removed: usize,
75 pub updated: usize,
80 pub unchanged: usize,
82}
83
84impl From<crate::node::UpdatePeersOutcome> for UpdatePeersOutcome {
85 fn from(value: crate::node::UpdatePeersOutcome) -> Self {
86 Self {
87 added: value.added,
88 removed: value.removed,
89 updated: value.updated,
90 unchanged: value.unchanged,
91 }
92 }
93}
94
95#[derive(Debug, Clone, PartialEq, Eq)]
97pub struct FipsEndpointPeer {
98 pub npub: String,
100 pub transport_addr: Option<String>,
102 pub transport_type: Option<String>,
104 pub link_id: u64,
106 pub srtt_ms: Option<u64>,
108 pub packets_sent: u64,
110 pub packets_recv: u64,
112 pub bytes_sent: u64,
114 pub bytes_recv: u64,
116}
117
118#[derive(Debug, Clone)]
120pub struct FipsEndpointBuilder {
121 config: Config,
122 identity_nsec: Option<String>,
123 discovery_scope: Option<String>,
124 disable_system_networking: bool,
125 packet_channel_capacity: usize,
126}
127
128impl Default for FipsEndpointBuilder {
129 fn default() -> Self {
130 Self {
131 config: Config::new(),
132 identity_nsec: None,
133 discovery_scope: None,
134 disable_system_networking: true,
135 packet_channel_capacity: 1024,
136 }
137 }
138}
139
140impl FipsEndpointBuilder {
141 pub fn config(mut self, config: Config) -> Self {
143 self.config = config;
144 self
145 }
146
147 pub fn identity_nsec(mut self, nsec: impl Into<String>) -> Self {
149 self.identity_nsec = Some(nsec.into());
150 self
151 }
152
153 pub fn discovery_scope(mut self, scope: impl Into<String>) -> Self {
161 self.discovery_scope = Some(scope.into());
162 self
163 }
164
165 pub fn without_system_tun(mut self) -> Self {
167 self.disable_system_networking = true;
168 self
169 }
170
171 pub fn packet_channel_capacity(mut self, capacity: usize) -> Self {
173 self.packet_channel_capacity = capacity.max(1);
174 self
175 }
176
177 fn prepared_config(&self) -> Config {
178 let mut config = self.config.clone();
179 if let Some(nsec) = &self.identity_nsec {
180 config.node.identity = IdentityConfig {
181 nsec: Some(nsec.clone()),
182 persistent: false,
183 };
184 }
185 if self.disable_system_networking {
186 config.tun.enabled = false;
187 config.dns.enabled = false;
188 config.node.system_files_enabled = false;
189 }
190 if let Some(scope) = self.discovery_scope.as_deref() {
191 config.node.discovery.lan.scope = Some(scope.to_string());
192 apply_default_scoped_discovery(&mut config, scope);
193 }
194 config
195 }
196
197 pub async fn bind(self) -> Result<FipsEndpoint, FipsEndpointError> {
199 endpoint_debug_log("FipsEndpointBuilder::bind begin");
200 let config = self.prepared_config();
201 endpoint_debug_log("FipsEndpointBuilder::bind config prepared");
202
203 let mut node = Node::new(config)?;
204 endpoint_debug_log("FipsEndpointBuilder::bind node created");
205 let npub = node.npub();
206 let node_addr = *node.node_addr();
207 let address = *node.identity().address();
208 let packet_io = node.attach_external_packet_io(self.packet_channel_capacity)?;
209 endpoint_debug_log("FipsEndpointBuilder::bind packet io attached");
210 let endpoint_data_io = node.attach_endpoint_data_io(self.packet_channel_capacity)?;
211 endpoint_debug_log("FipsEndpointBuilder::bind endpoint data io attached");
212 endpoint_debug_log("FipsEndpointBuilder::bind node.start begin");
213 node.start().await?;
214 endpoint_debug_log("FipsEndpointBuilder::bind node.start complete");
215
216 let (shutdown_tx, shutdown_rx) = oneshot::channel();
217 let task = spawn_node_task(node, shutdown_rx);
218 endpoint_debug_log("FipsEndpointBuilder::bind node task spawned");
219 let endpoint_commands = endpoint_data_io.command_tx;
220
221 Ok(FipsEndpoint {
222 npub,
223 node_addr,
224 address,
225 discovery_scope: self.discovery_scope,
226 outbound_packets: packet_io.outbound_tx,
227 delivered_packets: Arc::new(Mutex::new(packet_io.inbound_rx)),
228 endpoint_commands,
229 inbound_endpoint_tx: endpoint_data_io.event_tx,
230 inbound_endpoint_rx: Arc::new(Mutex::new(endpoint_data_io.event_rx)),
231 peer_identity_cache: std::sync::Mutex::new(std::collections::HashMap::new()),
232 shutdown_tx: Some(shutdown_tx),
233 task,
234 })
235 }
236}
237
238fn apply_default_scoped_discovery(config: &mut Config, scope: &str) {
239 if config.node.discovery.nostr.enabled || !config.transports.is_empty() {
240 return;
241 }
242
243 config.node.discovery.nostr.enabled = true;
244 config.node.discovery.nostr.advertise = true;
245 config.node.discovery.nostr.policy = NostrDiscoveryPolicy::Open;
246 config.node.discovery.nostr.share_local_candidates = true;
247 config.node.discovery.nostr.app = format!("fips-overlay-v1:{scope}");
248 config.node.discovery.lan.scope = Some(scope.to_string());
249 config.transports.udp = TransportInstances::Single(UdpConfig {
250 bind_addr: Some("0.0.0.0:0".to_string()),
251 advertise_on_nostr: Some(true),
252 public: Some(false),
253 outbound_only: Some(false),
254 accept_connections: Some(true),
255 ..UdpConfig::default()
256 });
257}
258
259fn spawn_node_task(
260 mut node: Node,
261 shutdown_rx: oneshot::Receiver<()>,
262) -> JoinHandle<Result<(), NodeError>> {
263 tokio::spawn(async move {
264 tokio::pin!(shutdown_rx);
265 let loop_result = tokio::select! {
266 result = node.run_rx_loop() => result,
267 _ = &mut shutdown_rx => Ok(()),
268 };
269 let stop_result = if node.state().can_stop() {
270 node.stop().await
271 } else {
272 Ok(())
273 };
274 loop_result?;
275 stop_result
276 })
277}
278
279pub struct FipsEndpoint {
281 npub: String,
282 node_addr: NodeAddr,
283 address: FipsAddress,
284 discovery_scope: Option<String>,
285 outbound_packets: mpsc::Sender<Vec<u8>>,
286 delivered_packets: Arc<Mutex<mpsc::Receiver<NodeDeliveredPacket>>>,
287 endpoint_commands: mpsc::Sender<NodeEndpointCommand>,
288 inbound_endpoint_tx: mpsc::UnboundedSender<NodeEndpointEvent>,
294 inbound_endpoint_rx: Arc<Mutex<mpsc::UnboundedReceiver<NodeEndpointEvent>>>,
300 peer_identity_cache: std::sync::Mutex<std::collections::HashMap<String, PeerIdentity>>,
305 shutdown_tx: Option<oneshot::Sender<()>>,
306 task: JoinHandle<Result<(), NodeError>>,
307}
308
309impl FipsEndpoint {
310 pub fn builder() -> FipsEndpointBuilder {
312 FipsEndpointBuilder::default()
313 }
314
315 pub fn npub(&self) -> &str {
317 &self.npub
318 }
319
320 pub fn node_addr(&self) -> &NodeAddr {
322 &self.node_addr
323 }
324
325 pub fn address(&self) -> FipsAddress {
327 self.address
328 }
329
330 pub fn discovery_scope(&self) -> Option<&str> {
332 self.discovery_scope.as_deref()
333 }
334
335 pub async fn send(
348 &self,
349 remote_npub: impl Into<String>,
350 data: impl Into<Vec<u8>>,
351 ) -> Result<(), FipsEndpointError> {
352 let remote_npub = remote_npub.into();
353 let data = data.into();
354 if remote_npub == self.npub {
355 self.inbound_endpoint_tx
356 .send(NodeEndpointEvent::Data {
357 source_node_addr: self.node_addr,
358 source_npub: Some(self.npub.clone()),
359 payload: data,
360 queued_at: crate::perf_profile::stamp(),
361 })
362 .map_err(|_| FipsEndpointError::Closed)?;
363 return Ok(());
364 }
365
366 let remote = self.resolve_peer_identity(&remote_npub)?;
367
368 self.endpoint_commands
373 .send(NodeEndpointCommand::SendOneway {
374 remote,
375 payload: data,
376 queued_at: crate::perf_profile::stamp(),
377 })
378 .await
379 .map_err(|_| FipsEndpointError::Closed)?;
380 Ok(())
381 }
382
383 fn resolve_peer_identity(&self, remote_npub: &str) -> Result<PeerIdentity, FipsEndpointError> {
384 if let Ok(cache) = self.peer_identity_cache.lock()
387 && let Some(remote) = cache.get(remote_npub)
388 {
389 return Ok(*remote);
390 }
391
392 let remote = PeerIdentity::from_npub(remote_npub).map_err(|error| {
393 FipsEndpointError::InvalidRemoteNpub {
394 npub: remote_npub.to_string(),
395 reason: error.to_string(),
396 }
397 })?;
398
399 if let Ok(mut cache) = self.peer_identity_cache.lock() {
400 cache.entry(remote_npub.to_string()).or_insert(remote);
401 }
402 Ok(remote)
403 }
404
405 pub async fn recv(&self) -> Option<FipsEndpointMessage> {
412 let event = self.inbound_endpoint_rx.lock().await.recv().await?;
413 let NodeEndpointEvent::Data {
414 source_node_addr,
415 source_npub,
416 payload,
417 queued_at,
418 } = event;
419 crate::perf_profile::record_since(crate::perf_profile::Stage::EndpointEventWait, queued_at);
420 Some(FipsEndpointMessage {
421 source_node_addr,
422 source_npub,
423 data: payload,
424 })
425 }
426
427 pub fn blocking_send(
437 &self,
438 remote_npub: impl Into<String>,
439 data: impl Into<Vec<u8>>,
440 ) -> Result<(), FipsEndpointError> {
441 let remote_npub = remote_npub.into();
442 let data = data.into();
443 if remote_npub == self.npub {
444 self.inbound_endpoint_tx
445 .send(NodeEndpointEvent::Data {
446 source_node_addr: self.node_addr,
447 source_npub: Some(self.npub.clone()),
448 payload: data,
449 queued_at: crate::perf_profile::stamp(),
450 })
451 .map_err(|_| FipsEndpointError::Closed)?;
452 return Ok(());
453 }
454 let remote = self.resolve_peer_identity(&remote_npub)?;
455 let (response_tx, _response_rx) = oneshot::channel();
456 self.endpoint_commands
457 .blocking_send(NodeEndpointCommand::Send {
458 remote,
459 payload: data,
460 queued_at: crate::perf_profile::stamp(),
461 response_tx,
462 })
463 .map_err(|_| FipsEndpointError::Closed)?;
464 Ok(())
465 }
466
467 pub fn blocking_recv(&self) -> Option<FipsEndpointMessage> {
483 let mut rx = self.inbound_endpoint_rx.blocking_lock();
484 let event = rx.blocking_recv()?;
485 let NodeEndpointEvent::Data {
486 source_node_addr,
487 source_npub,
488 payload,
489 queued_at,
490 } = event;
491 crate::perf_profile::record_since(crate::perf_profile::Stage::EndpointEventWait, queued_at);
492 Some(FipsEndpointMessage {
493 source_node_addr,
494 source_npub,
495 data: payload,
496 })
497 }
498
499 pub fn try_recv(&self) -> Option<FipsEndpointMessage> {
519 let mut rx = self.inbound_endpoint_rx.try_lock().ok()?;
520 let event = rx.try_recv().ok()?;
521 let NodeEndpointEvent::Data {
522 source_node_addr,
523 source_npub,
524 payload,
525 queued_at,
526 } = event;
527 crate::perf_profile::record_since(crate::perf_profile::Stage::EndpointEventWait, queued_at);
528 Some(FipsEndpointMessage {
529 source_node_addr,
530 source_npub,
531 data: payload,
532 })
533 }
534
535 pub async fn update_peers(
545 &self,
546 peers: Vec<crate::config::PeerConfig>,
547 ) -> Result<UpdatePeersOutcome, FipsEndpointError> {
548 let (response_tx, response_rx) = oneshot::channel();
549 self.endpoint_commands
550 .send(NodeEndpointCommand::UpdatePeers { peers, response_tx })
551 .await
552 .map_err(|_| FipsEndpointError::Closed)?;
553
554 match response_rx.await.map_err(|_| FipsEndpointError::Closed)? {
555 Ok(outcome) => Ok(UpdatePeersOutcome::from(outcome)),
556 Err(error) => Err(FipsEndpointError::Node(error)),
557 }
558 }
559
560 pub async fn peers(&self) -> Result<Vec<FipsEndpointPeer>, FipsEndpointError> {
562 let (response_tx, response_rx) = oneshot::channel();
563 self.endpoint_commands
564 .send(NodeEndpointCommand::PeerSnapshot { response_tx })
565 .await
566 .map_err(|_| FipsEndpointError::Closed)?;
567
568 response_rx
569 .await
570 .map(|peers| peers.into_iter().map(FipsEndpointPeer::from).collect())
571 .map_err(|_| FipsEndpointError::Closed)
572 }
573
574 pub async fn send_ip_packet(
576 &self,
577 packet: impl Into<Vec<u8>>,
578 ) -> Result<(), FipsEndpointError> {
579 self.outbound_packets
580 .send(packet.into())
581 .await
582 .map_err(|_| FipsEndpointError::Closed)
583 }
584
585 pub async fn recv_ip_packet(&self) -> Option<NodeDeliveredPacket> {
587 self.delivered_packets.lock().await.recv().await
588 }
589
590 pub async fn shutdown(mut self) -> Result<(), FipsEndpointError> {
592 if let Some(shutdown_tx) = self.shutdown_tx.take() {
593 let _ = shutdown_tx.send(());
594 }
595 self.task.await??;
596 Ok(())
597 }
598}
599
600impl From<NodeEndpointPeer> for FipsEndpointPeer {
601 fn from(peer: NodeEndpointPeer) -> Self {
602 Self {
603 npub: peer.npub,
604 transport_addr: peer.transport_addr,
605 transport_type: peer.transport_type,
606 link_id: peer.link_id,
607 srtt_ms: peer.srtt_ms,
608 packets_sent: peer.packets_sent,
609 packets_recv: peer.packets_recv,
610 bytes_sent: peer.bytes_sent,
611 bytes_recv: peer.bytes_recv,
612 }
613 }
614}
615
616#[cfg(test)]
617mod tests {
618 use super::*;
619 use std::time::Duration;
620
621 #[tokio::test]
622 async fn endpoint_starts_without_system_tun() {
623 let endpoint = FipsEndpoint::builder()
624 .without_system_tun()
625 .bind()
626 .await
627 .expect("endpoint should bind");
628
629 assert!(!endpoint.npub().is_empty());
630 assert!(endpoint.discovery_scope().is_none());
631 endpoint.shutdown().await.expect("shutdown should succeed");
632 }
633
634 #[tokio::test]
635 async fn loopback_endpoint_data_roundtrips() {
636 let endpoint = FipsEndpoint::builder()
637 .without_system_tun()
638 .bind()
639 .await
640 .expect("endpoint should bind");
641
642 endpoint
643 .send(endpoint.npub().to_string(), b"ping".to_vec())
644 .await
645 .expect("loopback send should succeed");
646 let message = tokio::time::timeout(Duration::from_secs(1), endpoint.recv())
647 .await
648 .expect("recv should not time out")
649 .expect("message should arrive");
650 assert_eq!(message.source_node_addr, *endpoint.node_addr());
651 assert_eq!(message.source_npub, Some(endpoint.npub().to_string()));
652 assert_eq!(message.data, b"ping");
653 assert!(endpoint.discovery_scope().is_none());
654
655 endpoint.shutdown().await.expect("shutdown should succeed");
656 }
657
658 #[test]
659 fn discovery_scope_enables_default_scoped_udp_discovery() {
660 let config = FipsEndpoint::builder()
661 .discovery_scope("nostr-vpn:test")
662 .prepared_config();
663
664 assert!(!config.tun.enabled);
665 assert!(!config.dns.enabled);
666 assert!(!config.node.system_files_enabled);
667 assert!(config.node.discovery.nostr.enabled);
668 assert!(config.node.discovery.nostr.advertise);
669 assert_eq!(
670 config.node.discovery.nostr.policy,
671 NostrDiscoveryPolicy::Open
672 );
673 assert!(config.node.discovery.nostr.share_local_candidates);
674 assert_eq!(
675 config.node.discovery.nostr.app,
676 "fips-overlay-v1:nostr-vpn:test"
677 );
678 assert_eq!(
679 config.node.discovery.lan.scope.as_deref(),
680 Some("nostr-vpn:test")
681 );
682
683 let udp = match config.transports.udp {
684 TransportInstances::Single(udp) => udp,
685 TransportInstances::Named(_) => panic!("expected a default UDP transport"),
686 };
687 assert_eq!(udp.bind_addr(), "0.0.0.0:0");
688 assert!(udp.advertise_on_nostr());
689 assert!(!udp.is_public());
690 assert!(!udp.outbound_only());
691 assert!(udp.accept_connections());
692 }
693
694 #[test]
695 fn discovery_scope_preserves_explicit_connectivity_config() {
696 let mut explicit = Config::new();
697 explicit.node.discovery.nostr.enabled = true;
698 explicit.node.discovery.nostr.app = "custom-app".to_string();
699 explicit.node.discovery.nostr.policy = NostrDiscoveryPolicy::ConfiguredOnly;
700 explicit.node.discovery.nostr.share_local_candidates = false;
701 explicit.transports.udp = TransportInstances::Single(UdpConfig {
702 bind_addr: Some("127.0.0.1:34567".to_string()),
703 advertise_on_nostr: Some(false),
704 outbound_only: Some(true),
705 ..UdpConfig::default()
706 });
707
708 let config = FipsEndpoint::builder()
709 .config(explicit)
710 .discovery_scope("nostr-vpn:test")
711 .prepared_config();
712
713 assert_eq!(config.node.discovery.nostr.app, "custom-app");
714 assert_eq!(
715 config.node.discovery.nostr.policy,
716 NostrDiscoveryPolicy::ConfiguredOnly
717 );
718 assert!(!config.node.discovery.nostr.share_local_candidates);
719 assert_eq!(
720 config.node.discovery.lan.scope.as_deref(),
721 Some("nostr-vpn:test")
722 );
723 let udp = match config.transports.udp {
724 TransportInstances::Single(udp) => udp,
725 TransportInstances::Named(_) => panic!("expected explicit UDP transport"),
726 };
727 assert_eq!(udp.bind_addr.as_deref(), Some("127.0.0.1:34567"));
728 assert_eq!(udp.bind_addr(), "0.0.0.0:0");
729 assert!(!udp.advertise_on_nostr());
730 assert!(udp.outbound_only());
731 }
732
733 #[tokio::test]
734 async fn invalid_remote_npub_is_rejected() {
735 let endpoint = FipsEndpoint::builder()
736 .without_system_tun()
737 .bind()
738 .await
739 .expect("endpoint should bind");
740
741 let error = endpoint
742 .send("not-an-npub", b"hello".to_vec())
743 .await
744 .expect_err("invalid npub should fail");
745 assert!(matches!(error, FipsEndpointError::InvalidRemoteNpub { .. }));
746
747 endpoint.shutdown().await.expect("shutdown should succeed");
748 }
749
750 #[tokio::test]
751 async fn endpoint_peer_snapshot_starts_empty() {
752 let endpoint = FipsEndpoint::builder()
753 .without_system_tun()
754 .bind()
755 .await
756 .expect("endpoint should bind");
757
758 let peers = endpoint.peers().await.expect("peer snapshot");
759 assert!(peers.is_empty());
760
761 endpoint.shutdown().await.expect("shutdown should succeed");
762 }
763}