1use crate::config::{NostrDiscoveryPolicy, TransportInstances, UdpConfig};
7use crate::node::{NodeEndpointCommand, NodeEndpointEvent, NodeEndpointPeer};
8use crate::{
9 Config, FipsAddress, IdentityConfig, Node, NodeAddr, NodeDeliveredPacket, NodeError,
10 PeerIdentity,
11};
12use std::sync::Arc;
13use thiserror::Error;
14use tokio::sync::{Mutex, mpsc, oneshot};
15use tokio::task::JoinHandle;
16
17#[derive(Debug, Error)]
19pub enum FipsEndpointError {
20 #[error("node error: {0}")]
21 Node(#[from] NodeError),
22
23 #[error("endpoint task failed: {0}")]
24 TaskJoin(#[from] tokio::task::JoinError),
25
26 #[error("endpoint is closed")]
27 Closed,
28
29 #[error("invalid remote npub '{npub}': {reason}")]
30 InvalidRemoteNpub { npub: String, reason: String },
31}
32
33#[derive(Debug, Clone, PartialEq, Eq)]
35pub struct FipsEndpointMessage {
36 pub source_node_addr: NodeAddr,
38 pub source_npub: Option<String>,
40 pub data: Vec<u8>,
42}
43
44#[derive(Debug, Clone, PartialEq, Eq)]
46pub struct FipsEndpointPeer {
47 pub npub: String,
49 pub transport_addr: Option<String>,
51 pub transport_type: Option<String>,
53 pub link_id: u64,
55 pub srtt_ms: Option<u64>,
57 pub packets_sent: u64,
59 pub packets_recv: u64,
61 pub bytes_sent: u64,
63 pub bytes_recv: u64,
65}
66
67#[derive(Debug, Clone)]
69pub struct FipsEndpointBuilder {
70 config: Config,
71 identity_nsec: Option<String>,
72 discovery_scope: Option<String>,
73 disable_system_networking: bool,
74 packet_channel_capacity: usize,
75}
76
77impl Default for FipsEndpointBuilder {
78 fn default() -> Self {
79 Self {
80 config: Config::new(),
81 identity_nsec: None,
82 discovery_scope: None,
83 disable_system_networking: true,
84 packet_channel_capacity: 1024,
85 }
86 }
87}
88
89impl FipsEndpointBuilder {
90 pub fn config(mut self, config: Config) -> Self {
92 self.config = config;
93 self
94 }
95
96 pub fn identity_nsec(mut self, nsec: impl Into<String>) -> Self {
98 self.identity_nsec = Some(nsec.into());
99 self
100 }
101
102 pub fn discovery_scope(mut self, scope: impl Into<String>) -> Self {
110 self.discovery_scope = Some(scope.into());
111 self
112 }
113
114 pub fn without_system_tun(mut self) -> Self {
116 self.disable_system_networking = true;
117 self
118 }
119
120 pub fn packet_channel_capacity(mut self, capacity: usize) -> Self {
122 self.packet_channel_capacity = capacity.max(1);
123 self
124 }
125
126 fn prepared_config(&self) -> Config {
127 let mut config = self.config.clone();
128 if let Some(nsec) = &self.identity_nsec {
129 config.node.identity = IdentityConfig {
130 nsec: Some(nsec.clone()),
131 persistent: false,
132 };
133 }
134 if self.disable_system_networking {
135 config.tun.enabled = false;
136 config.dns.enabled = false;
137 config.node.system_files_enabled = false;
138 }
139 if let Some(scope) = self.discovery_scope.as_deref() {
140 apply_default_scoped_discovery(&mut config, scope);
141 }
142 config
143 }
144
145 pub async fn bind(self) -> Result<FipsEndpoint, FipsEndpointError> {
147 let config = self.prepared_config();
148
149 let mut node = Node::new(config)?;
150 let npub = node.npub();
151 let node_addr = *node.node_addr();
152 let address = *node.identity().address();
153 let packet_io = node.attach_external_packet_io(self.packet_channel_capacity)?;
154 let endpoint_data_io = node.attach_endpoint_data_io(self.packet_channel_capacity)?;
155 node.start().await?;
156
157 let (shutdown_tx, shutdown_rx) = oneshot::channel();
158 let task = spawn_node_task(node, shutdown_rx);
159 let endpoint_commands = endpoint_data_io.command_tx;
160
161 Ok(FipsEndpoint {
162 npub,
163 node_addr,
164 address,
165 discovery_scope: self.discovery_scope,
166 outbound_packets: packet_io.outbound_tx,
167 delivered_packets: Arc::new(Mutex::new(packet_io.inbound_rx)),
168 endpoint_commands,
169 inbound_endpoint_tx: endpoint_data_io.event_tx,
170 inbound_endpoint_rx: Arc::new(Mutex::new(endpoint_data_io.event_rx)),
171 peer_identity_cache: std::sync::Mutex::new(std::collections::HashMap::new()),
172 shutdown_tx: Some(shutdown_tx),
173 task,
174 })
175 }
176}
177
178fn apply_default_scoped_discovery(config: &mut Config, scope: &str) {
179 if config.node.discovery.nostr.enabled || !config.transports.is_empty() {
180 return;
181 }
182
183 config.node.discovery.nostr.enabled = true;
184 config.node.discovery.nostr.advertise = true;
185 config.node.discovery.nostr.policy = NostrDiscoveryPolicy::Open;
186 config.node.discovery.nostr.share_local_candidates = true;
187 config.node.discovery.nostr.app = format!("fips-overlay-v1:{scope}");
188 config.transports.udp = TransportInstances::Single(UdpConfig {
189 bind_addr: Some("0.0.0.0:0".to_string()),
190 advertise_on_nostr: Some(true),
191 public: Some(false),
192 outbound_only: Some(false),
193 accept_connections: Some(true),
194 ..UdpConfig::default()
195 });
196}
197
198fn spawn_node_task(
199 mut node: Node,
200 shutdown_rx: oneshot::Receiver<()>,
201) -> JoinHandle<Result<(), NodeError>> {
202 tokio::spawn(async move {
203 tokio::pin!(shutdown_rx);
204 let loop_result = tokio::select! {
205 result = node.run_rx_loop() => result,
206 _ = &mut shutdown_rx => Ok(()),
207 };
208 let stop_result = if node.state().can_stop() {
209 node.stop().await
210 } else {
211 Ok(())
212 };
213 loop_result?;
214 stop_result
215 })
216}
217
218pub struct FipsEndpoint {
220 npub: String,
221 node_addr: NodeAddr,
222 address: FipsAddress,
223 discovery_scope: Option<String>,
224 outbound_packets: mpsc::Sender<Vec<u8>>,
225 delivered_packets: Arc<Mutex<mpsc::Receiver<NodeDeliveredPacket>>>,
226 endpoint_commands: mpsc::Sender<NodeEndpointCommand>,
227 inbound_endpoint_tx: mpsc::UnboundedSender<NodeEndpointEvent>,
233 inbound_endpoint_rx: Arc<Mutex<mpsc::UnboundedReceiver<NodeEndpointEvent>>>,
239 peer_identity_cache: std::sync::Mutex<std::collections::HashMap<String, PeerIdentity>>,
244 shutdown_tx: Option<oneshot::Sender<()>>,
245 task: JoinHandle<Result<(), NodeError>>,
246}
247
248impl FipsEndpoint {
249 pub fn builder() -> FipsEndpointBuilder {
251 FipsEndpointBuilder::default()
252 }
253
254 pub fn npub(&self) -> &str {
256 &self.npub
257 }
258
259 pub fn node_addr(&self) -> &NodeAddr {
261 &self.node_addr
262 }
263
264 pub fn address(&self) -> FipsAddress {
266 self.address
267 }
268
269 pub fn discovery_scope(&self) -> Option<&str> {
271 self.discovery_scope.as_deref()
272 }
273
274 pub async fn send(
287 &self,
288 remote_npub: impl Into<String>,
289 data: impl Into<Vec<u8>>,
290 ) -> Result<(), FipsEndpointError> {
291 let remote_npub = remote_npub.into();
292 let data = data.into();
293 if remote_npub == self.npub {
294 self.inbound_endpoint_tx
295 .send(NodeEndpointEvent::Data {
296 source_node_addr: self.node_addr,
297 source_npub: Some(self.npub.clone()),
298 payload: data,
299 queued_at: crate::perf_profile::stamp(),
300 })
301 .map_err(|_| FipsEndpointError::Closed)?;
302 return Ok(());
303 }
304
305 let remote = self.resolve_peer_identity(&remote_npub)?;
306
307 self.endpoint_commands
312 .send(NodeEndpointCommand::SendOneway {
313 remote,
314 payload: data,
315 queued_at: crate::perf_profile::stamp(),
316 })
317 .await
318 .map_err(|_| FipsEndpointError::Closed)?;
319 Ok(())
320 }
321
322 fn resolve_peer_identity(&self, remote_npub: &str) -> Result<PeerIdentity, FipsEndpointError> {
323 if let Ok(cache) = self.peer_identity_cache.lock()
326 && let Some(remote) = cache.get(remote_npub)
327 {
328 return Ok(*remote);
329 }
330
331 let remote = PeerIdentity::from_npub(remote_npub).map_err(|error| {
332 FipsEndpointError::InvalidRemoteNpub {
333 npub: remote_npub.to_string(),
334 reason: error.to_string(),
335 }
336 })?;
337
338 if let Ok(mut cache) = self.peer_identity_cache.lock() {
339 cache.entry(remote_npub.to_string()).or_insert(remote);
340 }
341 Ok(remote)
342 }
343
344 pub async fn recv(&self) -> Option<FipsEndpointMessage> {
351 let event = self.inbound_endpoint_rx.lock().await.recv().await?;
352 let NodeEndpointEvent::Data {
353 source_node_addr,
354 source_npub,
355 payload,
356 queued_at,
357 } = event;
358 crate::perf_profile::record_since(crate::perf_profile::Stage::EndpointEventWait, queued_at);
359 Some(FipsEndpointMessage {
360 source_node_addr,
361 source_npub,
362 data: payload,
363 })
364 }
365
366 pub fn blocking_send(
376 &self,
377 remote_npub: impl Into<String>,
378 data: impl Into<Vec<u8>>,
379 ) -> Result<(), FipsEndpointError> {
380 let remote_npub = remote_npub.into();
381 let data = data.into();
382 if remote_npub == self.npub {
383 self.inbound_endpoint_tx
384 .send(NodeEndpointEvent::Data {
385 source_node_addr: self.node_addr,
386 source_npub: Some(self.npub.clone()),
387 payload: data,
388 queued_at: crate::perf_profile::stamp(),
389 })
390 .map_err(|_| FipsEndpointError::Closed)?;
391 return Ok(());
392 }
393 let remote = self.resolve_peer_identity(&remote_npub)?;
394 let (response_tx, _response_rx) = oneshot::channel();
395 self.endpoint_commands
396 .blocking_send(NodeEndpointCommand::Send {
397 remote,
398 payload: data,
399 queued_at: crate::perf_profile::stamp(),
400 response_tx,
401 })
402 .map_err(|_| FipsEndpointError::Closed)?;
403 Ok(())
404 }
405
406 pub fn blocking_recv(&self) -> Option<FipsEndpointMessage> {
422 let mut rx = self.inbound_endpoint_rx.blocking_lock();
423 let event = rx.blocking_recv()?;
424 let NodeEndpointEvent::Data {
425 source_node_addr,
426 source_npub,
427 payload,
428 queued_at,
429 } = event;
430 crate::perf_profile::record_since(crate::perf_profile::Stage::EndpointEventWait, queued_at);
431 Some(FipsEndpointMessage {
432 source_node_addr,
433 source_npub,
434 data: payload,
435 })
436 }
437
438 pub fn try_recv(&self) -> Option<FipsEndpointMessage> {
458 let mut rx = self.inbound_endpoint_rx.try_lock().ok()?;
459 let event = rx.try_recv().ok()?;
460 let NodeEndpointEvent::Data {
461 source_node_addr,
462 source_npub,
463 payload,
464 queued_at,
465 } = event;
466 crate::perf_profile::record_since(crate::perf_profile::Stage::EndpointEventWait, queued_at);
467 Some(FipsEndpointMessage {
468 source_node_addr,
469 source_npub,
470 data: payload,
471 })
472 }
473
474 pub async fn peers(&self) -> Result<Vec<FipsEndpointPeer>, FipsEndpointError> {
476 let (response_tx, response_rx) = oneshot::channel();
477 self.endpoint_commands
478 .send(NodeEndpointCommand::PeerSnapshot { response_tx })
479 .await
480 .map_err(|_| FipsEndpointError::Closed)?;
481
482 response_rx
483 .await
484 .map(|peers| peers.into_iter().map(FipsEndpointPeer::from).collect())
485 .map_err(|_| FipsEndpointError::Closed)
486 }
487
488 pub async fn send_ip_packet(
490 &self,
491 packet: impl Into<Vec<u8>>,
492 ) -> Result<(), FipsEndpointError> {
493 self.outbound_packets
494 .send(packet.into())
495 .await
496 .map_err(|_| FipsEndpointError::Closed)
497 }
498
499 pub async fn recv_ip_packet(&self) -> Option<NodeDeliveredPacket> {
501 self.delivered_packets.lock().await.recv().await
502 }
503
504 pub async fn shutdown(mut self) -> Result<(), FipsEndpointError> {
506 if let Some(shutdown_tx) = self.shutdown_tx.take() {
507 let _ = shutdown_tx.send(());
508 }
509 self.task.await??;
510 Ok(())
511 }
512}
513
514impl From<NodeEndpointPeer> for FipsEndpointPeer {
515 fn from(peer: NodeEndpointPeer) -> Self {
516 Self {
517 npub: peer.npub,
518 transport_addr: peer.transport_addr,
519 transport_type: peer.transport_type,
520 link_id: peer.link_id,
521 srtt_ms: peer.srtt_ms,
522 packets_sent: peer.packets_sent,
523 packets_recv: peer.packets_recv,
524 bytes_sent: peer.bytes_sent,
525 bytes_recv: peer.bytes_recv,
526 }
527 }
528}
529
530#[cfg(test)]
531mod tests {
532 use super::*;
533 use std::time::Duration;
534
535 #[tokio::test]
536 async fn endpoint_starts_without_system_tun() {
537 let endpoint = FipsEndpoint::builder()
538 .without_system_tun()
539 .bind()
540 .await
541 .expect("endpoint should bind");
542
543 assert!(!endpoint.npub().is_empty());
544 assert!(endpoint.discovery_scope().is_none());
545 endpoint.shutdown().await.expect("shutdown should succeed");
546 }
547
548 #[tokio::test]
549 async fn loopback_endpoint_data_roundtrips() {
550 let endpoint = FipsEndpoint::builder()
551 .without_system_tun()
552 .bind()
553 .await
554 .expect("endpoint should bind");
555
556 endpoint
557 .send(endpoint.npub().to_string(), b"ping".to_vec())
558 .await
559 .expect("loopback send should succeed");
560 let message = tokio::time::timeout(Duration::from_secs(1), endpoint.recv())
561 .await
562 .expect("recv should not time out")
563 .expect("message should arrive");
564 assert_eq!(message.source_node_addr, *endpoint.node_addr());
565 assert_eq!(message.source_npub, Some(endpoint.npub().to_string()));
566 assert_eq!(message.data, b"ping");
567 assert!(endpoint.discovery_scope().is_none());
568
569 endpoint.shutdown().await.expect("shutdown should succeed");
570 }
571
572 #[test]
573 fn discovery_scope_enables_default_scoped_udp_discovery() {
574 let config = FipsEndpoint::builder()
575 .discovery_scope("nostr-vpn:test")
576 .prepared_config();
577
578 assert!(!config.tun.enabled);
579 assert!(!config.dns.enabled);
580 assert!(!config.node.system_files_enabled);
581 assert!(config.node.discovery.nostr.enabled);
582 assert!(config.node.discovery.nostr.advertise);
583 assert_eq!(
584 config.node.discovery.nostr.policy,
585 NostrDiscoveryPolicy::Open
586 );
587 assert!(config.node.discovery.nostr.share_local_candidates);
588 assert_eq!(
589 config.node.discovery.nostr.app,
590 "fips-overlay-v1:nostr-vpn:test"
591 );
592
593 let udp = match config.transports.udp {
594 TransportInstances::Single(udp) => udp,
595 TransportInstances::Named(_) => panic!("expected a default UDP transport"),
596 };
597 assert_eq!(udp.bind_addr(), "0.0.0.0:0");
598 assert!(udp.advertise_on_nostr());
599 assert!(!udp.is_public());
600 assert!(!udp.outbound_only());
601 assert!(udp.accept_connections());
602 }
603
604 #[test]
605 fn discovery_scope_preserves_explicit_connectivity_config() {
606 let mut explicit = Config::new();
607 explicit.node.discovery.nostr.enabled = true;
608 explicit.node.discovery.nostr.app = "custom-app".to_string();
609 explicit.node.discovery.nostr.policy = NostrDiscoveryPolicy::ConfiguredOnly;
610 explicit.node.discovery.nostr.share_local_candidates = false;
611 explicit.transports.udp = TransportInstances::Single(UdpConfig {
612 bind_addr: Some("127.0.0.1:34567".to_string()),
613 advertise_on_nostr: Some(false),
614 outbound_only: Some(true),
615 ..UdpConfig::default()
616 });
617
618 let config = FipsEndpoint::builder()
619 .config(explicit)
620 .discovery_scope("nostr-vpn:test")
621 .prepared_config();
622
623 assert_eq!(config.node.discovery.nostr.app, "custom-app");
624 assert_eq!(
625 config.node.discovery.nostr.policy,
626 NostrDiscoveryPolicy::ConfiguredOnly
627 );
628 assert!(!config.node.discovery.nostr.share_local_candidates);
629 let udp = match config.transports.udp {
630 TransportInstances::Single(udp) => udp,
631 TransportInstances::Named(_) => panic!("expected explicit UDP transport"),
632 };
633 assert_eq!(udp.bind_addr.as_deref(), Some("127.0.0.1:34567"));
634 assert_eq!(udp.bind_addr(), "0.0.0.0:0");
635 assert!(!udp.advertise_on_nostr());
636 assert!(udp.outbound_only());
637 }
638
639 #[tokio::test]
640 async fn invalid_remote_npub_is_rejected() {
641 let endpoint = FipsEndpoint::builder()
642 .without_system_tun()
643 .bind()
644 .await
645 .expect("endpoint should bind");
646
647 let error = endpoint
648 .send("not-an-npub", b"hello".to_vec())
649 .await
650 .expect_err("invalid npub should fail");
651 assert!(matches!(error, FipsEndpointError::InvalidRemoteNpub { .. }));
652
653 endpoint.shutdown().await.expect("shutdown should succeed");
654 }
655
656 #[tokio::test]
657 async fn endpoint_peer_snapshot_starts_empty() {
658 let endpoint = FipsEndpoint::builder()
659 .without_system_tun()
660 .bind()
661 .await
662 .expect("endpoint should bind");
663
664 let peers = endpoint.peers().await.expect("peer snapshot");
665 assert!(peers.is_empty());
666
667 endpoint.shutdown().await.expect("shutdown should succeed");
668 }
669}