1use crate::config::{NostrDiscoveryPolicy, TransportInstances, UdpConfig};
7use crate::node::{NodeEndpointCommand, NodeEndpointEvent, NodeEndpointPeer};
8use crate::{
9 Config, FipsAddress, IdentityConfig, Node, NodeAddr, NodeDeliveredPacket, NodeError,
10 PeerIdentity,
11};
12use std::sync::Arc;
13use thiserror::Error;
14use tokio::sync::{Mutex, mpsc, oneshot};
15use tokio::task::JoinHandle;
16
17#[derive(Debug, Error)]
19pub enum FipsEndpointError {
20 #[error("node error: {0}")]
21 Node(#[from] NodeError),
22
23 #[error("endpoint task failed: {0}")]
24 TaskJoin(#[from] tokio::task::JoinError),
25
26 #[error("endpoint is closed")]
27 Closed,
28
29 #[error("invalid remote npub '{npub}': {reason}")]
30 InvalidRemoteNpub { npub: String, reason: String },
31}
32
33#[derive(Debug, Clone, PartialEq, Eq)]
35pub struct FipsEndpointMessage {
36 pub source_node_addr: NodeAddr,
38 pub source_npub: Option<String>,
40 pub data: Vec<u8>,
42}
43
44#[derive(Debug, Clone, Default, PartialEq, Eq)]
46pub struct UpdatePeersOutcome {
47 pub added: usize,
50 pub removed: usize,
54 pub updated: usize,
58 pub unchanged: usize,
60}
61
62impl From<crate::node::UpdatePeersOutcome> for UpdatePeersOutcome {
63 fn from(value: crate::node::UpdatePeersOutcome) -> Self {
64 Self {
65 added: value.added,
66 removed: value.removed,
67 updated: value.updated,
68 unchanged: value.unchanged,
69 }
70 }
71}
72
73#[derive(Debug, Clone, PartialEq, Eq)]
75pub struct FipsEndpointPeer {
76 pub npub: String,
78 pub transport_addr: Option<String>,
80 pub transport_type: Option<String>,
82 pub link_id: u64,
84 pub srtt_ms: Option<u64>,
86 pub packets_sent: u64,
88 pub packets_recv: u64,
90 pub bytes_sent: u64,
92 pub bytes_recv: u64,
94}
95
96#[derive(Debug, Clone)]
98pub struct FipsEndpointBuilder {
99 config: Config,
100 identity_nsec: Option<String>,
101 discovery_scope: Option<String>,
102 disable_system_networking: bool,
103 packet_channel_capacity: usize,
104}
105
106impl Default for FipsEndpointBuilder {
107 fn default() -> Self {
108 Self {
109 config: Config::new(),
110 identity_nsec: None,
111 discovery_scope: None,
112 disable_system_networking: true,
113 packet_channel_capacity: 1024,
114 }
115 }
116}
117
118impl FipsEndpointBuilder {
119 pub fn config(mut self, config: Config) -> Self {
121 self.config = config;
122 self
123 }
124
125 pub fn identity_nsec(mut self, nsec: impl Into<String>) -> Self {
127 self.identity_nsec = Some(nsec.into());
128 self
129 }
130
131 pub fn discovery_scope(mut self, scope: impl Into<String>) -> Self {
139 self.discovery_scope = Some(scope.into());
140 self
141 }
142
143 pub fn without_system_tun(mut self) -> Self {
145 self.disable_system_networking = true;
146 self
147 }
148
149 pub fn packet_channel_capacity(mut self, capacity: usize) -> Self {
151 self.packet_channel_capacity = capacity.max(1);
152 self
153 }
154
155 fn prepared_config(&self) -> Config {
156 let mut config = self.config.clone();
157 if let Some(nsec) = &self.identity_nsec {
158 config.node.identity = IdentityConfig {
159 nsec: Some(nsec.clone()),
160 persistent: false,
161 };
162 }
163 if self.disable_system_networking {
164 config.tun.enabled = false;
165 config.dns.enabled = false;
166 config.node.system_files_enabled = false;
167 }
168 if let Some(scope) = self.discovery_scope.as_deref() {
169 apply_default_scoped_discovery(&mut config, scope);
170 }
171 config
172 }
173
174 pub async fn bind(self) -> Result<FipsEndpoint, FipsEndpointError> {
176 let config = self.prepared_config();
177
178 let mut node = Node::new(config)?;
179 let npub = node.npub();
180 let node_addr = *node.node_addr();
181 let address = *node.identity().address();
182 let packet_io = node.attach_external_packet_io(self.packet_channel_capacity)?;
183 let endpoint_data_io = node.attach_endpoint_data_io(self.packet_channel_capacity)?;
184 node.start().await?;
185
186 let (shutdown_tx, shutdown_rx) = oneshot::channel();
187 let task = spawn_node_task(node, shutdown_rx);
188 let endpoint_commands = endpoint_data_io.command_tx;
189
190 Ok(FipsEndpoint {
191 npub,
192 node_addr,
193 address,
194 discovery_scope: self.discovery_scope,
195 outbound_packets: packet_io.outbound_tx,
196 delivered_packets: Arc::new(Mutex::new(packet_io.inbound_rx)),
197 endpoint_commands,
198 inbound_endpoint_tx: endpoint_data_io.event_tx,
199 inbound_endpoint_rx: Arc::new(Mutex::new(endpoint_data_io.event_rx)),
200 peer_identity_cache: std::sync::Mutex::new(std::collections::HashMap::new()),
201 shutdown_tx: Some(shutdown_tx),
202 task,
203 })
204 }
205}
206
207fn apply_default_scoped_discovery(config: &mut Config, scope: &str) {
208 if config.node.discovery.nostr.enabled || !config.transports.is_empty() {
209 return;
210 }
211
212 config.node.discovery.nostr.enabled = true;
213 config.node.discovery.nostr.advertise = true;
214 config.node.discovery.nostr.policy = NostrDiscoveryPolicy::Open;
215 config.node.discovery.nostr.share_local_candidates = true;
216 config.node.discovery.nostr.app = format!("fips-overlay-v1:{scope}");
217 config.transports.udp = TransportInstances::Single(UdpConfig {
218 bind_addr: Some("0.0.0.0:0".to_string()),
219 advertise_on_nostr: Some(true),
220 public: Some(false),
221 outbound_only: Some(false),
222 accept_connections: Some(true),
223 ..UdpConfig::default()
224 });
225}
226
227fn spawn_node_task(
228 mut node: Node,
229 shutdown_rx: oneshot::Receiver<()>,
230) -> JoinHandle<Result<(), NodeError>> {
231 tokio::spawn(async move {
232 tokio::pin!(shutdown_rx);
233 let loop_result = tokio::select! {
234 result = node.run_rx_loop() => result,
235 _ = &mut shutdown_rx => Ok(()),
236 };
237 let stop_result = if node.state().can_stop() {
238 node.stop().await
239 } else {
240 Ok(())
241 };
242 loop_result?;
243 stop_result
244 })
245}
246
247pub struct FipsEndpoint {
249 npub: String,
250 node_addr: NodeAddr,
251 address: FipsAddress,
252 discovery_scope: Option<String>,
253 outbound_packets: mpsc::Sender<Vec<u8>>,
254 delivered_packets: Arc<Mutex<mpsc::Receiver<NodeDeliveredPacket>>>,
255 endpoint_commands: mpsc::Sender<NodeEndpointCommand>,
256 inbound_endpoint_tx: mpsc::UnboundedSender<NodeEndpointEvent>,
262 inbound_endpoint_rx: Arc<Mutex<mpsc::UnboundedReceiver<NodeEndpointEvent>>>,
268 peer_identity_cache: std::sync::Mutex<std::collections::HashMap<String, PeerIdentity>>,
273 shutdown_tx: Option<oneshot::Sender<()>>,
274 task: JoinHandle<Result<(), NodeError>>,
275}
276
277impl FipsEndpoint {
278 pub fn builder() -> FipsEndpointBuilder {
280 FipsEndpointBuilder::default()
281 }
282
283 pub fn npub(&self) -> &str {
285 &self.npub
286 }
287
288 pub fn node_addr(&self) -> &NodeAddr {
290 &self.node_addr
291 }
292
293 pub fn address(&self) -> FipsAddress {
295 self.address
296 }
297
298 pub fn discovery_scope(&self) -> Option<&str> {
300 self.discovery_scope.as_deref()
301 }
302
303 pub async fn send(
316 &self,
317 remote_npub: impl Into<String>,
318 data: impl Into<Vec<u8>>,
319 ) -> Result<(), FipsEndpointError> {
320 let remote_npub = remote_npub.into();
321 let data = data.into();
322 if remote_npub == self.npub {
323 self.inbound_endpoint_tx
324 .send(NodeEndpointEvent::Data {
325 source_node_addr: self.node_addr,
326 source_npub: Some(self.npub.clone()),
327 payload: data,
328 queued_at: crate::perf_profile::stamp(),
329 })
330 .map_err(|_| FipsEndpointError::Closed)?;
331 return Ok(());
332 }
333
334 let remote = self.resolve_peer_identity(&remote_npub)?;
335
336 self.endpoint_commands
341 .send(NodeEndpointCommand::SendOneway {
342 remote,
343 payload: data,
344 queued_at: crate::perf_profile::stamp(),
345 })
346 .await
347 .map_err(|_| FipsEndpointError::Closed)?;
348 Ok(())
349 }
350
351 fn resolve_peer_identity(&self, remote_npub: &str) -> Result<PeerIdentity, FipsEndpointError> {
352 if let Ok(cache) = self.peer_identity_cache.lock()
355 && let Some(remote) = cache.get(remote_npub)
356 {
357 return Ok(*remote);
358 }
359
360 let remote = PeerIdentity::from_npub(remote_npub).map_err(|error| {
361 FipsEndpointError::InvalidRemoteNpub {
362 npub: remote_npub.to_string(),
363 reason: error.to_string(),
364 }
365 })?;
366
367 if let Ok(mut cache) = self.peer_identity_cache.lock() {
368 cache.entry(remote_npub.to_string()).or_insert(remote);
369 }
370 Ok(remote)
371 }
372
373 pub async fn recv(&self) -> Option<FipsEndpointMessage> {
380 let event = self.inbound_endpoint_rx.lock().await.recv().await?;
381 let NodeEndpointEvent::Data {
382 source_node_addr,
383 source_npub,
384 payload,
385 queued_at,
386 } = event;
387 crate::perf_profile::record_since(crate::perf_profile::Stage::EndpointEventWait, queued_at);
388 Some(FipsEndpointMessage {
389 source_node_addr,
390 source_npub,
391 data: payload,
392 })
393 }
394
395 pub fn blocking_send(
405 &self,
406 remote_npub: impl Into<String>,
407 data: impl Into<Vec<u8>>,
408 ) -> Result<(), FipsEndpointError> {
409 let remote_npub = remote_npub.into();
410 let data = data.into();
411 if remote_npub == self.npub {
412 self.inbound_endpoint_tx
413 .send(NodeEndpointEvent::Data {
414 source_node_addr: self.node_addr,
415 source_npub: Some(self.npub.clone()),
416 payload: data,
417 queued_at: crate::perf_profile::stamp(),
418 })
419 .map_err(|_| FipsEndpointError::Closed)?;
420 return Ok(());
421 }
422 let remote = self.resolve_peer_identity(&remote_npub)?;
423 let (response_tx, _response_rx) = oneshot::channel();
424 self.endpoint_commands
425 .blocking_send(NodeEndpointCommand::Send {
426 remote,
427 payload: data,
428 queued_at: crate::perf_profile::stamp(),
429 response_tx,
430 })
431 .map_err(|_| FipsEndpointError::Closed)?;
432 Ok(())
433 }
434
435 pub fn blocking_recv(&self) -> Option<FipsEndpointMessage> {
451 let mut rx = self.inbound_endpoint_rx.blocking_lock();
452 let event = rx.blocking_recv()?;
453 let NodeEndpointEvent::Data {
454 source_node_addr,
455 source_npub,
456 payload,
457 queued_at,
458 } = event;
459 crate::perf_profile::record_since(crate::perf_profile::Stage::EndpointEventWait, queued_at);
460 Some(FipsEndpointMessage {
461 source_node_addr,
462 source_npub,
463 data: payload,
464 })
465 }
466
467 pub fn try_recv(&self) -> Option<FipsEndpointMessage> {
487 let mut rx = self.inbound_endpoint_rx.try_lock().ok()?;
488 let event = rx.try_recv().ok()?;
489 let NodeEndpointEvent::Data {
490 source_node_addr,
491 source_npub,
492 payload,
493 queued_at,
494 } = event;
495 crate::perf_profile::record_since(crate::perf_profile::Stage::EndpointEventWait, queued_at);
496 Some(FipsEndpointMessage {
497 source_node_addr,
498 source_npub,
499 data: payload,
500 })
501 }
502
503 pub async fn update_peers(
513 &self,
514 peers: Vec<crate::config::PeerConfig>,
515 ) -> Result<UpdatePeersOutcome, FipsEndpointError> {
516 let (response_tx, response_rx) = oneshot::channel();
517 self.endpoint_commands
518 .send(NodeEndpointCommand::UpdatePeers { peers, response_tx })
519 .await
520 .map_err(|_| FipsEndpointError::Closed)?;
521
522 match response_rx.await.map_err(|_| FipsEndpointError::Closed)? {
523 Ok(outcome) => Ok(UpdatePeersOutcome::from(outcome)),
524 Err(error) => Err(FipsEndpointError::Node(error)),
525 }
526 }
527
528 pub async fn peers(&self) -> Result<Vec<FipsEndpointPeer>, FipsEndpointError> {
530 let (response_tx, response_rx) = oneshot::channel();
531 self.endpoint_commands
532 .send(NodeEndpointCommand::PeerSnapshot { response_tx })
533 .await
534 .map_err(|_| FipsEndpointError::Closed)?;
535
536 response_rx
537 .await
538 .map(|peers| peers.into_iter().map(FipsEndpointPeer::from).collect())
539 .map_err(|_| FipsEndpointError::Closed)
540 }
541
542 pub async fn send_ip_packet(
544 &self,
545 packet: impl Into<Vec<u8>>,
546 ) -> Result<(), FipsEndpointError> {
547 self.outbound_packets
548 .send(packet.into())
549 .await
550 .map_err(|_| FipsEndpointError::Closed)
551 }
552
553 pub async fn recv_ip_packet(&self) -> Option<NodeDeliveredPacket> {
555 self.delivered_packets.lock().await.recv().await
556 }
557
558 pub async fn shutdown(mut self) -> Result<(), FipsEndpointError> {
560 if let Some(shutdown_tx) = self.shutdown_tx.take() {
561 let _ = shutdown_tx.send(());
562 }
563 self.task.await??;
564 Ok(())
565 }
566}
567
568impl From<NodeEndpointPeer> for FipsEndpointPeer {
569 fn from(peer: NodeEndpointPeer) -> Self {
570 Self {
571 npub: peer.npub,
572 transport_addr: peer.transport_addr,
573 transport_type: peer.transport_type,
574 link_id: peer.link_id,
575 srtt_ms: peer.srtt_ms,
576 packets_sent: peer.packets_sent,
577 packets_recv: peer.packets_recv,
578 bytes_sent: peer.bytes_sent,
579 bytes_recv: peer.bytes_recv,
580 }
581 }
582}
583
584#[cfg(test)]
585mod tests {
586 use super::*;
587 use std::time::Duration;
588
589 #[tokio::test]
590 async fn endpoint_starts_without_system_tun() {
591 let endpoint = FipsEndpoint::builder()
592 .without_system_tun()
593 .bind()
594 .await
595 .expect("endpoint should bind");
596
597 assert!(!endpoint.npub().is_empty());
598 assert!(endpoint.discovery_scope().is_none());
599 endpoint.shutdown().await.expect("shutdown should succeed");
600 }
601
602 #[tokio::test]
603 async fn loopback_endpoint_data_roundtrips() {
604 let endpoint = FipsEndpoint::builder()
605 .without_system_tun()
606 .bind()
607 .await
608 .expect("endpoint should bind");
609
610 endpoint
611 .send(endpoint.npub().to_string(), b"ping".to_vec())
612 .await
613 .expect("loopback send should succeed");
614 let message = tokio::time::timeout(Duration::from_secs(1), endpoint.recv())
615 .await
616 .expect("recv should not time out")
617 .expect("message should arrive");
618 assert_eq!(message.source_node_addr, *endpoint.node_addr());
619 assert_eq!(message.source_npub, Some(endpoint.npub().to_string()));
620 assert_eq!(message.data, b"ping");
621 assert!(endpoint.discovery_scope().is_none());
622
623 endpoint.shutdown().await.expect("shutdown should succeed");
624 }
625
626 #[test]
627 fn discovery_scope_enables_default_scoped_udp_discovery() {
628 let config = FipsEndpoint::builder()
629 .discovery_scope("nostr-vpn:test")
630 .prepared_config();
631
632 assert!(!config.tun.enabled);
633 assert!(!config.dns.enabled);
634 assert!(!config.node.system_files_enabled);
635 assert!(config.node.discovery.nostr.enabled);
636 assert!(config.node.discovery.nostr.advertise);
637 assert_eq!(
638 config.node.discovery.nostr.policy,
639 NostrDiscoveryPolicy::Open
640 );
641 assert!(config.node.discovery.nostr.share_local_candidates);
642 assert_eq!(
643 config.node.discovery.nostr.app,
644 "fips-overlay-v1:nostr-vpn:test"
645 );
646
647 let udp = match config.transports.udp {
648 TransportInstances::Single(udp) => udp,
649 TransportInstances::Named(_) => panic!("expected a default UDP transport"),
650 };
651 assert_eq!(udp.bind_addr(), "0.0.0.0:0");
652 assert!(udp.advertise_on_nostr());
653 assert!(!udp.is_public());
654 assert!(!udp.outbound_only());
655 assert!(udp.accept_connections());
656 }
657
658 #[test]
659 fn discovery_scope_preserves_explicit_connectivity_config() {
660 let mut explicit = Config::new();
661 explicit.node.discovery.nostr.enabled = true;
662 explicit.node.discovery.nostr.app = "custom-app".to_string();
663 explicit.node.discovery.nostr.policy = NostrDiscoveryPolicy::ConfiguredOnly;
664 explicit.node.discovery.nostr.share_local_candidates = false;
665 explicit.transports.udp = TransportInstances::Single(UdpConfig {
666 bind_addr: Some("127.0.0.1:34567".to_string()),
667 advertise_on_nostr: Some(false),
668 outbound_only: Some(true),
669 ..UdpConfig::default()
670 });
671
672 let config = FipsEndpoint::builder()
673 .config(explicit)
674 .discovery_scope("nostr-vpn:test")
675 .prepared_config();
676
677 assert_eq!(config.node.discovery.nostr.app, "custom-app");
678 assert_eq!(
679 config.node.discovery.nostr.policy,
680 NostrDiscoveryPolicy::ConfiguredOnly
681 );
682 assert!(!config.node.discovery.nostr.share_local_candidates);
683 let udp = match config.transports.udp {
684 TransportInstances::Single(udp) => udp,
685 TransportInstances::Named(_) => panic!("expected explicit UDP transport"),
686 };
687 assert_eq!(udp.bind_addr.as_deref(), Some("127.0.0.1:34567"));
688 assert_eq!(udp.bind_addr(), "0.0.0.0:0");
689 assert!(!udp.advertise_on_nostr());
690 assert!(udp.outbound_only());
691 }
692
693 #[tokio::test]
694 async fn invalid_remote_npub_is_rejected() {
695 let endpoint = FipsEndpoint::builder()
696 .without_system_tun()
697 .bind()
698 .await
699 .expect("endpoint should bind");
700
701 let error = endpoint
702 .send("not-an-npub", b"hello".to_vec())
703 .await
704 .expect_err("invalid npub should fail");
705 assert!(matches!(error, FipsEndpointError::InvalidRemoteNpub { .. }));
706
707 endpoint.shutdown().await.expect("shutdown should succeed");
708 }
709
710 #[tokio::test]
711 async fn endpoint_peer_snapshot_starts_empty() {
712 let endpoint = FipsEndpoint::builder()
713 .without_system_tun()
714 .bind()
715 .await
716 .expect("endpoint should bind");
717
718 let peers = endpoint.peers().await.expect("peer snapshot");
719 assert!(peers.is_empty());
720
721 endpoint.shutdown().await.expect("shutdown should succeed");
722 }
723}