1pub mod correlation_store;
7
8pub use correlation_store::ClientCorrelationStore;
9
10use std::num::NonZeroUsize;
11use std::sync::atomic::{AtomicBool, Ordering};
12use std::sync::{Arc, Mutex};
13use std::time::Duration;
14
15use lru::LruCache;
16use nostr_sdk::prelude::*;
17use tokio_util::sync::CancellationToken;
18
19use crate::core::constants::*;
20use crate::core::error::{Error, Result};
21use crate::core::serializers;
22use crate::core::types::*;
23use crate::core::validation;
24use crate::encryption;
25use crate::relay::{RelayPool, RelayPoolTrait};
26use crate::transport::base::BaseTransport;
27use crate::transport::discovery_tags::{parse_discovered_peer_capabilities, PeerCapabilities};
28
29const LOG_TARGET: &str = "contextvm_sdk::transport::client";
30
31#[derive(Debug, Clone)]
33#[non_exhaustive]
34pub struct NostrClientTransportConfig {
35 pub relay_urls: Vec<String>,
37 pub server_pubkey: String,
39 pub encryption_mode: EncryptionMode,
41 pub gift_wrap_mode: GiftWrapMode,
43 pub is_stateless: bool,
45 pub timeout: Duration,
51}
52
53impl Default for NostrClientTransportConfig {
54 fn default() -> Self {
55 Self {
56 relay_urls: vec!["wss://relay.damus.io".to_string()],
57 server_pubkey: String::new(),
58 encryption_mode: EncryptionMode::Optional,
59 gift_wrap_mode: GiftWrapMode::Optional,
60 is_stateless: false,
61 timeout: Duration::from_secs(30),
62 }
63 }
64}
65
66impl NostrClientTransportConfig {
67 pub fn with_server_pubkey(mut self, pubkey: impl Into<String>) -> Self {
69 self.server_pubkey = pubkey.into();
70 self
71 }
72 pub fn with_encryption_mode(mut self, mode: EncryptionMode) -> Self {
74 self.encryption_mode = mode;
75 self
76 }
77 pub fn with_gift_wrap_mode(mut self, mode: GiftWrapMode) -> Self {
79 self.gift_wrap_mode = mode;
80 self
81 }
82 pub fn with_stateless(mut self, stateless: bool) -> Self {
84 self.is_stateless = stateless;
85 self
86 }
87 pub fn with_relay_urls(mut self, urls: Vec<String>) -> Self {
89 self.relay_urls = urls;
90 self
91 }
92 pub fn with_timeout(mut self, timeout: Duration) -> Self {
94 self.timeout = timeout;
95 self
96 }
97}
98
99pub struct NostrClientTransport {
101 base: BaseTransport,
102 config: NostrClientTransportConfig,
103 server_pubkey: PublicKey,
104 pending_requests: ClientCorrelationStore,
106 has_sent_discovery_tags: AtomicBool,
108 discovered_server_capabilities: Arc<Mutex<PeerCapabilities>>,
110 server_initialize_event: Arc<Mutex<Option<Event>>>,
112 server_supports_ephemeral: Arc<AtomicBool>,
114 seen_gift_wrap_ids: Arc<Mutex<LruCache<EventId, ()>>>,
118 message_tx: Option<tokio::sync::mpsc::UnboundedSender<JsonRpcMessage>>,
120 message_rx: Option<tokio::sync::mpsc::UnboundedReceiver<JsonRpcMessage>>,
121 cancellation_token: CancellationToken,
123 event_loop_handle: Option<tokio::task::JoinHandle<()>>,
125}
126
127impl NostrClientTransport {
128 pub async fn new<T>(signer: T, config: NostrClientTransportConfig) -> Result<Self>
130 where
131 T: IntoNostrSigner,
132 {
133 let server_pubkey = PublicKey::from_hex(&config.server_pubkey).map_err(|error| {
134 tracing::error!(
135 target: LOG_TARGET,
136 error = %error,
137 server_pubkey = %config.server_pubkey,
138 "Invalid server pubkey"
139 );
140 Error::Other(format!("Invalid server pubkey: {error}"))
141 })?;
142
143 let relay_pool: Arc<dyn RelayPoolTrait> =
144 Arc::new(RelayPool::new(signer).await.map_err(|error| {
145 tracing::error!(
146 target: LOG_TARGET,
147 error = %error,
148 "Failed to initialize relay pool for client transport"
149 );
150 error
151 })?);
152 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
153 let seen_gift_wrap_ids = Arc::new(Mutex::new(LruCache::new(
154 NonZeroUsize::new(DEFAULT_LRU_SIZE).expect("DEFAULT_LRU_SIZE must be non-zero"),
155 )));
156
157 tracing::info!(
158 target: LOG_TARGET,
159 relay_count = config.relay_urls.len(),
160 stateless = config.is_stateless,
161 encryption_mode = ?config.encryption_mode,
162 "Created client transport"
163 );
164 Ok(Self {
165 base: BaseTransport {
166 relay_pool,
167 encryption_mode: config.encryption_mode,
168 is_connected: false,
169 },
170 config,
171 server_pubkey,
172 pending_requests: ClientCorrelationStore::new(),
173 has_sent_discovery_tags: AtomicBool::new(false),
174 discovered_server_capabilities: Arc::new(Mutex::new(PeerCapabilities::default())),
175 server_initialize_event: Arc::new(Mutex::new(None)),
176 server_supports_ephemeral: Arc::new(AtomicBool::new(false)),
177 seen_gift_wrap_ids,
178 message_tx: Some(tx),
179 message_rx: Some(rx),
180 cancellation_token: CancellationToken::new(),
181 event_loop_handle: None,
182 })
183 }
184
185 pub async fn with_relay_pool(
187 config: NostrClientTransportConfig,
188 relay_pool: Arc<dyn RelayPoolTrait>,
189 ) -> Result<Self> {
190 let server_pubkey = PublicKey::from_hex(&config.server_pubkey).map_err(|error| {
191 tracing::error!(
192 target: LOG_TARGET,
193 error = %error,
194 server_pubkey = %config.server_pubkey,
195 "Invalid server pubkey"
196 );
197 Error::Other(format!("Invalid server pubkey: {error}"))
198 })?;
199
200 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
201 let seen_gift_wrap_ids = Arc::new(Mutex::new(LruCache::new(
202 NonZeroUsize::new(DEFAULT_LRU_SIZE).expect("DEFAULT_LRU_SIZE must be non-zero"),
203 )));
204
205 tracing::info!(
206 target: LOG_TARGET,
207 relay_count = config.relay_urls.len(),
208 stateless = config.is_stateless,
209 encryption_mode = ?config.encryption_mode,
210 "Created client transport (with_relay_pool)"
211 );
212 Ok(Self {
213 base: BaseTransport {
214 relay_pool,
215 encryption_mode: config.encryption_mode,
216 is_connected: false,
217 },
218 config,
219 server_pubkey,
220 pending_requests: ClientCorrelationStore::new(),
221 has_sent_discovery_tags: AtomicBool::new(false),
222 discovered_server_capabilities: Arc::new(Mutex::new(PeerCapabilities::default())),
223 server_initialize_event: Arc::new(Mutex::new(None)),
224 server_supports_ephemeral: Arc::new(AtomicBool::new(false)),
225 seen_gift_wrap_ids,
226 message_tx: Some(tx),
227 message_rx: Some(rx),
228 cancellation_token: CancellationToken::new(),
229 event_loop_handle: None,
230 })
231 }
232
233 pub async fn start(&mut self) -> Result<()> {
235 self.base
236 .connect(&self.config.relay_urls)
237 .await
238 .map_err(|error| {
239 tracing::error!(
240 target: LOG_TARGET,
241 error = %error,
242 "Failed to connect client transport to relays"
243 );
244 error
245 })?;
246
247 let pubkey = self.base.get_public_key().await.map_err(|error| {
248 tracing::error!(
249 target: LOG_TARGET,
250 error = %error,
251 "Failed to fetch client transport public key"
252 );
253 error
254 })?;
255 tracing::info!(
256 target: LOG_TARGET,
257 pubkey = %pubkey.to_hex(),
258 "Client transport started"
259 );
260
261 self.base
262 .subscribe_for_pubkey(&pubkey)
263 .await
264 .map_err(|error| {
265 tracing::error!(
266 target: LOG_TARGET,
267 error = %error,
268 pubkey = %pubkey.to_hex(),
269 "Failed to subscribe client transport for pubkey"
270 );
271 error
272 })?;
273
274 let relay_pool = Arc::clone(&self.base.relay_pool);
276 let pending = self.pending_requests.clone();
277 let server_pubkey = self.server_pubkey;
278 let tx = self
279 .message_tx
280 .as_ref()
281 .expect("message_tx must exist before start()")
282 .clone();
283 let encryption_mode = self.config.encryption_mode;
284 let gift_wrap_mode = self.config.gift_wrap_mode;
285 let discovered_caps = self.discovered_server_capabilities.clone();
286 let init_event = self.server_initialize_event.clone();
287 let server_supports_ephemeral = self.server_supports_ephemeral.clone();
288 let seen_gift_wrap_ids = self.seen_gift_wrap_ids.clone();
289 let timeout = self.config.timeout;
290 let token = self.cancellation_token.child_token();
291
292 self.event_loop_handle = Some(tokio::spawn(async move {
293 Self::event_loop(
294 relay_pool,
295 pending,
296 server_pubkey,
297 tx,
298 encryption_mode,
299 gift_wrap_mode,
300 discovered_caps,
301 init_event,
302 server_supports_ephemeral,
303 seen_gift_wrap_ids,
304 timeout,
305 token,
306 )
307 .await;
308 }));
309
310 tracing::info!(
311 target: LOG_TARGET,
312 relay_count = self.config.relay_urls.len(),
313 "Client transport event loop spawned"
314 );
315 Ok(())
316 }
317
318 pub async fn close(&mut self) -> Result<()> {
320 self.cancellation_token.cancel();
321 if let Some(handle) = self.event_loop_handle.take() {
322 let _ = handle.await;
323 }
324 self.message_tx.take();
325 self.base.disconnect().await
326 }
327
328 pub async fn send(&self, message: &JsonRpcMessage) -> Result<()> {
330 if self.config.is_stateless {
332 if let JsonRpcMessage::Request(ref req) = message {
333 if req.method == "initialize" {
334 self.emulate_initialize_response(&req.id);
335 return Ok(());
336 }
337 }
338 if let JsonRpcMessage::Notification(ref n) = message {
339 if n.method == "notifications/initialized" {
340 return Ok(());
341 }
342 }
343 }
344
345 let is_request = message.is_request();
346 let base_tags = BaseTransport::create_recipient_tags(&self.server_pubkey);
347 let discovery_tags = if is_request {
348 self.get_pending_client_discovery_tags()
349 } else {
350 vec![]
351 };
352 let tags = BaseTransport::compose_outbound_tags(&base_tags, &discovery_tags, &[]);
353
354 let (event_id, publishable_event) = self
355 .base
356 .prepare_mcp_message(
357 message,
358 &self.server_pubkey,
359 CTXVM_MESSAGES_KIND,
360 tags,
361 None,
362 Some(self.choose_outbound_gift_wrap_kind()),
363 )
364 .await
365 .map_err(|error| {
366 tracing::error!(
367 target: LOG_TARGET,
368 error = %error,
369 server_pubkey = %self.server_pubkey.to_hex(),
370 method = ?message.method(),
371 "Failed to prepare client message"
372 );
373 error
374 })?;
375
376 if let JsonRpcMessage::Request(ref req) = message {
377 let is_initialize = req.method == INITIALIZE_METHOD;
378 self.pending_requests
379 .register(event_id.to_hex(), req.id.clone(), is_initialize)
380 .await;
381 }
382
383 if let Err(error) = self.base.relay_pool.publish_event(&publishable_event).await {
384 self.pending_requests.remove(&event_id.to_hex()).await;
385 tracing::error!(
386 target: LOG_TARGET,
387 error = %error,
388 server_pubkey = %self.server_pubkey.to_hex(),
389 method = ?message.method(),
390 "Failed to publish client message"
391 );
392 return Err(error);
393 }
394
395 if is_request && !discovery_tags.is_empty() {
397 self.has_sent_discovery_tags.store(true, Ordering::Relaxed);
398 }
399
400 tracing::debug!(
401 target: LOG_TARGET,
402 event_id = %event_id.to_hex(),
403 method = ?message.method(),
404 "Sent client message"
405 );
406 Ok(())
407 }
408
409 pub fn take_message_receiver(
411 &mut self,
412 ) -> Option<tokio::sync::mpsc::UnboundedReceiver<JsonRpcMessage>> {
413 self.message_rx.take()
414 }
415
416 fn emulate_initialize_response(&self, request_id: &serde_json::Value) {
417 let response = JsonRpcMessage::Response(JsonRpcResponse {
418 jsonrpc: "2.0".to_string(),
419 id: request_id.clone(),
420 result: serde_json::json!({
421 "protocolVersion": crate::core::constants::mcp_protocol_version(),
422 "serverInfo": {
423 "name": "Emulated-Stateless-Server",
424 "version": "1.0.0"
425 },
426 "capabilities": {
427 "tools": { "listChanged": true },
428 "prompts": { "listChanged": true },
429 "resources": { "subscribe": true, "listChanged": true }
430 }
431 }),
432 });
433 if let Some(ref tx) = self.message_tx {
434 let _ = tx.send(response);
435 }
436 }
437
438 #[allow(clippy::too_many_arguments)]
439 async fn event_loop(
440 relay_pool: Arc<dyn RelayPoolTrait>,
441 pending: ClientCorrelationStore,
442 server_pubkey: PublicKey,
443 tx: tokio::sync::mpsc::UnboundedSender<JsonRpcMessage>,
444 encryption_mode: EncryptionMode,
445 gift_wrap_mode: GiftWrapMode,
446 discovered_caps: Arc<Mutex<PeerCapabilities>>,
447 init_event: Arc<Mutex<Option<Event>>>,
448 server_supports_ephemeral: Arc<AtomicBool>,
449 seen_gift_wrap_ids: Arc<Mutex<LruCache<EventId, ()>>>,
450 timeout: Duration,
451 cancel: CancellationToken,
452 ) {
453 let mut notifications = relay_pool.notifications();
454 let sweep_interval = (timeout / 2).clamp(Duration::from_secs(1), Duration::from_secs(30));
456 let mut sweep_timer =
457 tokio::time::interval_at(tokio::time::Instant::now() + sweep_interval, sweep_interval);
458
459 loop {
460 tokio::select! {
461 _ = cancel.cancelled() => {
462 tracing::info!(
463 target: LOG_TARGET,
464 "Client event loop cancelled"
465 );
466 break;
467 }
468 result = notifications.recv() => {
469 let notification = match result {
470 Ok(n) => n,
471 Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
472 tracing::warn!(
473 target: LOG_TARGET,
474 skipped = n,
475 "Relay broadcast lagged, skipping missed events"
476 );
477 continue;
478 }
479 Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
480 };
481 Self::handle_notification(
482 ¬ification,
483 &pending,
484 server_pubkey,
485 &tx,
486 encryption_mode,
487 gift_wrap_mode,
488 &discovered_caps,
489 &init_event,
490 &server_supports_ephemeral,
491 &seen_gift_wrap_ids,
492 &relay_pool,
493 )
494 .await;
495 }
496 _ = sweep_timer.tick() => {
497 let swept = pending.sweep_expired(timeout).await;
498 if swept > 0 {
499 tracing::warn!(
500 target: LOG_TARGET,
501 swept,
502 timeout_ms = timeout.as_millis() as u64,
503 "Swept stale pending requests (rmcp handles timeout errors)"
504 );
505 }
506 }
507 }
508 }
509 }
510
511 fn get_client_capability_tags(&self) -> Vec<Tag> {
515 let mut tags = Vec::new();
516 if self.config.encryption_mode != EncryptionMode::Disabled {
517 tags.push(Tag::custom(
518 TagKind::Custom(tags::SUPPORT_ENCRYPTION.into()),
519 Vec::<String>::new(),
520 ));
521 if self.config.gift_wrap_mode != GiftWrapMode::Persistent {
522 tags.push(Tag::custom(
523 TagKind::Custom(tags::SUPPORT_ENCRYPTION_EPHEMERAL.into()),
524 Vec::<String>::new(),
525 ));
526 }
527 }
528 tags
529 }
530
531 fn get_pending_client_discovery_tags(&self) -> Vec<Tag> {
533 if self.has_sent_discovery_tags.load(Ordering::Relaxed) {
534 vec![]
535 } else {
536 self.get_client_capability_tags()
537 }
538 }
539
540 fn learn_server_discovery(
542 discovered_caps: &Mutex<PeerCapabilities>,
543 init_event: &Mutex<Option<Event>>,
544 event: &Event,
545 ) {
546 let tag_vec: Vec<Tag> = event.tags.clone().to_vec();
547 let discovered = parse_discovered_peer_capabilities(&tag_vec);
548 if discovered.discovery_tags.is_empty() {
549 return;
550 }
551
552 {
553 let mut caps = match discovered_caps.lock() {
554 Ok(g) => g,
555 Err(p) => p.into_inner(),
556 };
557 caps.supports_encryption |= discovered.capabilities.supports_encryption;
558 caps.supports_ephemeral_encryption |=
559 discovered.capabilities.supports_ephemeral_encryption;
560 caps.supports_oversized_transfer |= discovered.capabilities.supports_oversized_transfer;
561 }
562
563 let mut stored = match init_event.lock() {
564 Ok(g) => g,
565 Err(p) => p.into_inner(),
566 };
567 if stored.is_none() {
568 *stored = Some(event.clone());
569 }
570 }
574
575 pub fn get_server_initialize_event(&self) -> Option<Event> {
577 let guard = match self.server_initialize_event.lock() {
578 Ok(g) => g,
579 Err(p) => p.into_inner(),
580 };
581 guard.clone()
582 }
583
584 pub fn discovered_server_capabilities(&self) -> PeerCapabilities {
586 let guard = match self.discovered_server_capabilities.lock() {
587 Ok(g) => g,
588 Err(p) => p.into_inner(),
589 };
590 *guard
591 }
592
593 #[allow(clippy::too_many_arguments)]
594 async fn handle_notification(
595 notification: &RelayPoolNotification,
596 pending: &ClientCorrelationStore,
597 server_pubkey: PublicKey,
598 tx: &tokio::sync::mpsc::UnboundedSender<JsonRpcMessage>,
599 encryption_mode: EncryptionMode,
600 gift_wrap_mode: GiftWrapMode,
601 discovered_caps: &Arc<Mutex<PeerCapabilities>>,
602 init_event: &Arc<Mutex<Option<Event>>>,
603 server_supports_ephemeral: &Arc<AtomicBool>,
604 seen_gift_wrap_ids: &Arc<Mutex<LruCache<EventId, ()>>>,
605 relay_pool: &Arc<dyn RelayPoolTrait>,
606 ) {
607 let event = match notification {
608 RelayPoolNotification::Event { event, .. } => event,
609 _ => return,
610 };
611
612 let is_gift_wrap = is_gift_wrap_kind(&event.kind);
613 let outer_kind = event.kind.as_u16();
614
615 if violates_encryption_policy(&event.kind, &encryption_mode) {
617 if is_gift_wrap {
618 tracing::warn!(
619 target: LOG_TARGET,
620 event_id = %event.id.to_hex(),
621 event_kind = outer_kind,
622 configured_mode = ?gift_wrap_mode,
623 "Skipping encrypted response because client encryption is disabled"
624 );
625 } else {
626 tracing::warn!(
627 target: LOG_TARGET,
628 event_id = %event.id.to_hex(),
629 "Skipping plaintext response because client encryption is required"
630 );
631 }
632 return;
633 }
634
635 if is_gift_wrap && !gift_wrap_mode.allows_kind(outer_kind) {
637 tracing::warn!(
638 target: LOG_TARGET,
639 event_id = %event.id.to_hex(),
640 event_kind = outer_kind,
641 configured_mode = ?gift_wrap_mode,
642 "Skipping gift wrap due to CEP-19 policy"
643 );
644 return;
645 }
646
647 let (actual_event_content, actual_pubkey, e_tag, verified_tags, source_event) =
649 if is_gift_wrap {
650 {
651 let guard = match seen_gift_wrap_ids.lock() {
652 Ok(g) => g,
653 Err(poisoned) => poisoned.into_inner(),
654 };
655 if guard.contains(&event.id) {
656 tracing::debug!(
657 target: LOG_TARGET,
658 event_id = %event.id.to_hex(),
659 "Skipping duplicate gift-wrap (outer id)"
660 );
661 return;
662 }
663 }
664 let signer = match relay_pool.signer().await {
666 Ok(s) => s,
667 Err(error) => {
668 tracing::error!(
669 target: LOG_TARGET,
670 error = %error,
671 "Failed to get signer"
672 );
673 return;
674 }
675 };
676 match encryption::decrypt_gift_wrap_single_layer(&signer, event).await {
677 Ok(decrypted_json) => match serde_json::from_str::<Event>(&decrypted_json) {
678 Ok(inner) => {
679 if let Err(e) = inner.verify() {
680 tracing::warn!("Inner event signature verification failed: {e}");
681 return;
682 }
683 {
684 let mut guard = match seen_gift_wrap_ids.lock() {
685 Ok(g) => g,
686 Err(poisoned) => poisoned.into_inner(),
687 };
688 guard.put(event.id, ());
689 }
690 let e_tag = serializers::get_tag_value(&inner.tags, "e");
691 let inner_clone = inner.clone();
692 (inner.content, inner.pubkey, e_tag, inner.tags, inner_clone)
693 }
694 Err(error) => {
695 tracing::error!(
696 target: LOG_TARGET,
697 error = %error,
698 "Failed to parse inner event"
699 );
700 return;
701 }
702 },
703 Err(error) => {
704 tracing::error!(
705 target: LOG_TARGET,
706 error = %error,
707 "Failed to decrypt gift wrap"
708 );
709 return;
710 }
711 }
712 } else {
713 let e_tag = serializers::get_tag_value(&event.tags, "e");
714 let event_clone: Event = (**event).clone();
715 (
716 event.content.clone(),
717 event.pubkey,
718 e_tag,
719 event.tags.clone(),
720 event_clone,
721 )
722 };
723
724 if actual_pubkey != server_pubkey {
726 tracing::debug!(
727 target: LOG_TARGET,
728 event_pubkey = %actual_pubkey.to_hex(),
729 expected_pubkey = %server_pubkey.to_hex(),
730 "Skipping event from unexpected pubkey"
731 );
732 return;
733 }
734
735 Self::learn_server_discovery(discovered_caps, init_event, &source_event);
737
738 if Self::should_learn_ephemeral_support(
740 actual_pubkey,
741 server_pubkey,
742 if is_gift_wrap { Some(outer_kind) } else { None },
743 &verified_tags,
744 ) {
745 server_supports_ephemeral.store(true, Ordering::Relaxed);
746 }
747
748 if let Some(ref correlated_id) = e_tag {
750 let is_pending = pending.contains(correlated_id.as_str()).await;
751 if !is_pending {
752 tracing::warn!(
753 target: LOG_TARGET,
754 correlated_event_id = %correlated_id,
755 "Response for unknown request"
756 );
757 return;
758 }
759 }
760
761 if let Some(mcp_msg) = validation::validate_and_parse(&actual_event_content) {
763 match &mcp_msg {
765 JsonRpcMessage::Response(_) | JsonRpcMessage::ErrorResponse(_)
766 if e_tag.is_none() =>
767 {
768 tracing::warn!(
769 target: LOG_TARGET,
770 "Dropping response/error without correlation `e` tag"
771 );
772 return;
773 }
774 JsonRpcMessage::Request(_) => {
775 tracing::warn!(
776 target: LOG_TARGET,
777 method = ?mcp_msg.method(),
778 "Dropping server-to-client request (invalid in MCP)"
779 );
780 return;
781 }
782 _ => {}
783 }
784
785 if let Some(ref correlated_id) = e_tag {
787 pending.remove(correlated_id.as_str()).await;
788 }
789 let _ = tx.send(mcp_msg);
790 }
791 }
792
793 fn choose_outbound_gift_wrap_kind(&self) -> u16 {
794 match self.config.gift_wrap_mode {
795 GiftWrapMode::Persistent => GIFT_WRAP_KIND,
796 GiftWrapMode::Ephemeral => EPHEMERAL_GIFT_WRAP_KIND,
797 GiftWrapMode::Optional => {
798 if self.server_supports_ephemeral.load(Ordering::Relaxed) {
799 EPHEMERAL_GIFT_WRAP_KIND
800 } else {
801 GIFT_WRAP_KIND
802 }
803 }
804 }
805 }
806
807 fn has_support_ephemeral_tag(tags: &Tags) -> bool {
808 tags.iter().any(|tag| {
809 tag.kind()
810 == TagKind::Custom(
811 crate::core::constants::tags::SUPPORT_ENCRYPTION_EPHEMERAL.into(),
812 )
813 })
814 }
815
816 fn should_learn_ephemeral_support(
817 actual_pubkey: PublicKey,
818 server_pubkey: PublicKey,
819 event_kind: Option<u16>,
820 tags: &Tags,
821 ) -> bool {
822 actual_pubkey == server_pubkey
823 && (event_kind == Some(EPHEMERAL_GIFT_WRAP_KIND)
824 || Self::has_support_ephemeral_tag(tags))
825 }
826
827 pub fn server_supports_ephemeral_encryption(&self) -> bool {
829 self.server_supports_ephemeral.load(Ordering::Relaxed)
830 }
831}
832
833#[inline]
834fn is_gift_wrap_kind(kind: &Kind) -> bool {
835 *kind == Kind::Custom(GIFT_WRAP_KIND) || *kind == Kind::Custom(EPHEMERAL_GIFT_WRAP_KIND)
836}
837
838#[inline]
841fn violates_encryption_policy(kind: &Kind, mode: &EncryptionMode) -> bool {
842 let is_gift_wrap = is_gift_wrap_kind(kind);
843 (is_gift_wrap && *mode == EncryptionMode::Disabled)
844 || (!is_gift_wrap && *mode == EncryptionMode::Required)
845}
846
847#[cfg(test)]
848mod tests {
849 use super::*;
850
851 #[test]
852 fn test_config_defaults() {
853 let config = NostrClientTransportConfig::default();
854 assert_eq!(config.relay_urls, vec!["wss://relay.damus.io".to_string()]);
855 assert!(config.server_pubkey.is_empty());
856 assert_eq!(config.encryption_mode, EncryptionMode::Optional);
857 assert_eq!(config.gift_wrap_mode, GiftWrapMode::Optional);
858 assert!(!config.is_stateless);
859 assert_eq!(config.timeout, Duration::from_secs(30));
860 }
861
862 #[test]
863 fn test_stateless_config() {
864 let config = NostrClientTransportConfig {
865 is_stateless: true,
866 ..Default::default()
867 };
868 assert!(config.is_stateless);
869 }
870
871 #[test]
872 fn test_custom_timeout_config() {
873 let config = NostrClientTransportConfig {
874 timeout: Duration::from_secs(60),
875 ..Default::default()
876 };
877 assert_eq!(config.timeout, Duration::from_secs(60));
878 }
879
880 #[test]
881 fn test_has_support_ephemeral_tag_detects_capability() {
882 let tags = Tags::from_list(vec![Tag::custom(
883 TagKind::Custom(crate::core::constants::tags::SUPPORT_ENCRYPTION_EPHEMERAL.into()),
884 Vec::<String>::new(),
885 )]);
886 assert!(NostrClientTransport::has_support_ephemeral_tag(&tags));
887 }
888
889 #[test]
890 fn test_has_support_ephemeral_tag_absent() {
891 let tags = Tags::from_list(vec![Tag::custom(
892 TagKind::Custom(crate::core::constants::tags::SUPPORT_ENCRYPTION.into()),
893 Vec::<String>::new(),
894 )]);
895 assert!(!NostrClientTransport::has_support_ephemeral_tag(&tags));
896 }
897
898 #[test]
899 fn test_should_learn_ephemeral_support_requires_matching_server_pubkey() {
900 let server_keys = Keys::generate();
901 let other_keys = Keys::generate();
902 let tags = Tags::from_list(vec![Tag::custom(
903 TagKind::Custom(crate::core::constants::tags::SUPPORT_ENCRYPTION_EPHEMERAL.into()),
904 Vec::<String>::new(),
905 )]);
906
907 assert!(!NostrClientTransport::should_learn_ephemeral_support(
908 other_keys.public_key(),
909 server_keys.public_key(),
910 Some(EPHEMERAL_GIFT_WRAP_KIND),
911 &tags,
912 ));
913 assert!(NostrClientTransport::should_learn_ephemeral_support(
914 server_keys.public_key(),
915 server_keys.public_key(),
916 Some(EPHEMERAL_GIFT_WRAP_KIND),
917 &tags,
918 ));
919 }
920
921 #[test]
922 fn test_should_learn_from_ephemeral_kind_even_without_tag() {
923 let server_keys = Keys::generate();
924 let empty_tags = Tags::from_list(vec![]);
925
926 assert!(NostrClientTransport::should_learn_ephemeral_support(
927 server_keys.public_key(),
928 server_keys.public_key(),
929 Some(EPHEMERAL_GIFT_WRAP_KIND),
930 &empty_tags,
931 ));
932 }
933
934 #[test]
935 fn test_should_learn_from_tag_without_ephemeral_kind() {
936 let server_keys = Keys::generate();
937 let tags = Tags::from_list(vec![Tag::custom(
938 TagKind::Custom(crate::core::constants::tags::SUPPORT_ENCRYPTION_EPHEMERAL.into()),
939 Vec::<String>::new(),
940 )]);
941
942 assert!(NostrClientTransport::should_learn_ephemeral_support(
943 server_keys.public_key(),
944 server_keys.public_key(),
945 Some(GIFT_WRAP_KIND), &tags,
947 ));
948 }
949
950 #[test]
951 fn test_stateless_emulated_initialize_response_shape() {
952 let request_id = serde_json::json!(1);
953 let response = JsonRpcMessage::Response(JsonRpcResponse {
954 jsonrpc: "2.0".to_string(),
955 id: request_id.clone(),
956 result: serde_json::json!({
957 "protocolVersion": crate::core::constants::mcp_protocol_version(),
958 "serverInfo": {
959 "name": "Emulated-Stateless-Server",
960 "version": "1.0.0"
961 },
962 "capabilities": {
963 "tools": { "listChanged": true },
964 "prompts": { "listChanged": true },
965 "resources": { "subscribe": true, "listChanged": true }
966 }
967 }),
968 });
969 assert!(response.is_response());
970 assert_eq!(response.id(), Some(&serde_json::json!(1)));
971
972 if let JsonRpcMessage::Response(r) = &response {
973 assert!(r.result.get("capabilities").is_some());
974 assert!(r.result.get("serverInfo").is_some());
975 let server_info = r.result.get("serverInfo").unwrap();
976 assert_eq!(
977 server_info.get("name").unwrap().as_str().unwrap(),
978 "Emulated-Stateless-Server"
979 );
980 }
981 }
982
983 #[test]
984 fn test_stateless_mode_initialize_request_detection() {
985 let init_req = JsonRpcMessage::Request(JsonRpcRequest {
986 jsonrpc: "2.0".to_string(),
987 id: serde_json::json!(1),
988 method: "initialize".to_string(),
989 params: None,
990 });
991 assert_eq!(init_req.method(), Some("initialize"));
992
993 let init_notif = JsonRpcMessage::Notification(JsonRpcNotification {
994 jsonrpc: "2.0".to_string(),
995 method: "notifications/initialized".to_string(),
996 params: None,
997 });
998 assert_eq!(init_notif.method(), Some("notifications/initialized"));
999 }
1000
1001 #[test]
1002 fn test_gift_wrap_kind_detection() {
1003 assert!(is_gift_wrap_kind(&Kind::Custom(GIFT_WRAP_KIND)));
1004 assert!(is_gift_wrap_kind(&Kind::Custom(EPHEMERAL_GIFT_WRAP_KIND)));
1005 assert!(!is_gift_wrap_kind(&Kind::Custom(CTXVM_MESSAGES_KIND)));
1006 }
1007
1008 #[test]
1009 fn test_required_mode_drops_plaintext() {
1010 let plaintext_kind = Kind::Custom(CTXVM_MESSAGES_KIND);
1011 assert!(
1012 violates_encryption_policy(&plaintext_kind, &EncryptionMode::Required),
1013 "Required mode must reject plaintext (non-gift-wrap) events"
1014 );
1015 }
1016
1017 #[test]
1018 fn test_disabled_mode_drops_encrypted() {
1019 assert!(
1020 violates_encryption_policy(&Kind::Custom(GIFT_WRAP_KIND), &EncryptionMode::Disabled),
1021 "Disabled mode must reject gift-wrap events"
1022 );
1023 assert!(
1024 violates_encryption_policy(
1025 &Kind::Custom(EPHEMERAL_GIFT_WRAP_KIND),
1026 &EncryptionMode::Disabled
1027 ),
1028 "Disabled mode must reject ephemeral gift-wrap events"
1029 );
1030 }
1031
1032 #[test]
1033 fn test_optional_mode_accepts_all() {
1034 let plaintext = Kind::Custom(CTXVM_MESSAGES_KIND);
1035 let gift_wrap = Kind::Custom(GIFT_WRAP_KIND);
1036 let ephemeral = Kind::Custom(EPHEMERAL_GIFT_WRAP_KIND);
1037 assert!(!violates_encryption_policy(
1038 &plaintext,
1039 &EncryptionMode::Optional
1040 ));
1041 assert!(!violates_encryption_policy(
1042 &gift_wrap,
1043 &EncryptionMode::Optional
1044 ));
1045 assert!(!violates_encryption_policy(
1046 &ephemeral,
1047 &EncryptionMode::Optional
1048 ));
1049 }
1050
1051 #[test]
1052 fn test_required_mode_accepts_encrypted() {
1053 assert!(
1054 !violates_encryption_policy(&Kind::Custom(GIFT_WRAP_KIND), &EncryptionMode::Required),
1055 "Required mode must accept gift-wrap events"
1056 );
1057 assert!(
1058 !violates_encryption_policy(
1059 &Kind::Custom(EPHEMERAL_GIFT_WRAP_KIND),
1060 &EncryptionMode::Required
1061 ),
1062 "Required mode must accept ephemeral gift-wrap events"
1063 );
1064 }
1065
1066 #[test]
1067 fn test_disabled_mode_accepts_plaintext() {
1068 let plaintext = Kind::Custom(CTXVM_MESSAGES_KIND);
1069 assert!(
1070 !violates_encryption_policy(&plaintext, &EncryptionMode::Disabled),
1071 "Disabled mode must accept plaintext events"
1072 );
1073 }
1074
1075 fn make_transport_for_tags(
1078 encryption_mode: EncryptionMode,
1079 gift_wrap_mode: GiftWrapMode,
1080 ) -> NostrClientTransport {
1081 let keys = Keys::generate();
1082 NostrClientTransport {
1083 base: BaseTransport {
1084 relay_pool: Arc::new(crate::relay::mock::MockRelayPool::new()),
1085 encryption_mode,
1086 is_connected: false,
1087 },
1088 config: NostrClientTransportConfig {
1089 encryption_mode,
1090 gift_wrap_mode,
1091 server_pubkey: Keys::generate().public_key().to_hex(),
1092 ..Default::default()
1093 },
1094 server_pubkey: keys.public_key(),
1095 pending_requests: ClientCorrelationStore::new(),
1096 has_sent_discovery_tags: AtomicBool::new(false),
1097 discovered_server_capabilities: Arc::new(Mutex::new(PeerCapabilities::default())),
1098 server_initialize_event: Arc::new(Mutex::new(None)),
1099 server_supports_ephemeral: Arc::new(AtomicBool::new(false)),
1100 seen_gift_wrap_ids: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(10).unwrap()))),
1101 message_tx: Some(tokio::sync::mpsc::unbounded_channel().0),
1102 message_rx: None,
1103 cancellation_token: CancellationToken::new(),
1104 event_loop_handle: None,
1105 }
1106 }
1107
1108 fn make_tag(parts: &[&str]) -> Tag {
1109 let kind = TagKind::Custom(parts[0].into());
1110 let values: Vec<String> = parts[1..].iter().map(|s| s.to_string()).collect();
1111 Tag::custom(kind, values)
1112 }
1113
1114 fn tag_names(tags: &[Tag]) -> Vec<String> {
1115 tags.iter().map(|t| t.clone().to_vec()[0].clone()).collect()
1116 }
1117
1118 #[test]
1119 fn client_capability_tags_encryption_optional() {
1120 let t = make_transport_for_tags(EncryptionMode::Optional, GiftWrapMode::Optional);
1121 let tags = t.get_client_capability_tags();
1122 let names = tag_names(&tags);
1123 assert_eq!(
1124 names,
1125 vec!["support_encryption", "support_encryption_ephemeral"]
1126 );
1127 }
1128
1129 #[test]
1130 fn client_capability_tags_encryption_disabled() {
1131 let t = make_transport_for_tags(EncryptionMode::Disabled, GiftWrapMode::Optional);
1132 let tags = t.get_client_capability_tags();
1133 assert!(tags.is_empty());
1134 }
1135
1136 #[test]
1137 fn client_capability_tags_persistent_gift_wrap() {
1138 let t = make_transport_for_tags(EncryptionMode::Optional, GiftWrapMode::Persistent);
1139 let tags = t.get_client_capability_tags();
1140 let names = tag_names(&tags);
1141 assert_eq!(names, vec!["support_encryption"]);
1142 }
1143
1144 #[test]
1145 fn client_discovery_tags_sent_once() {
1146 let t = make_transport_for_tags(EncryptionMode::Optional, GiftWrapMode::Optional);
1147 let first = t.get_pending_client_discovery_tags();
1148 assert!(!first.is_empty());
1149
1150 t.has_sent_discovery_tags.store(true, Ordering::Relaxed);
1151 let second = t.get_pending_client_discovery_tags();
1152 assert!(second.is_empty());
1153 }
1154
1155 fn make_event_with_tags(tag_parts: &[&[&str]]) -> Event {
1158 let keys = Keys::generate();
1159 let tags: Vec<Tag> = tag_parts.iter().map(|p| make_tag(p)).collect();
1160 let builder = EventBuilder::new(Kind::Custom(CTXVM_MESSAGES_KIND), "{}").tags(tags);
1161 let unsigned = builder.build(keys.public_key());
1162 unsigned.sign_with_keys(&keys).unwrap()
1163 }
1164
1165 #[test]
1166 fn client_learn_server_discovery_sets_baseline() {
1167 let caps = Mutex::new(PeerCapabilities::default());
1168 let init = Mutex::new(None);
1169 let event = make_event_with_tags(&[&["support_encryption"], &["name", "TestServer"]]);
1170
1171 NostrClientTransport::learn_server_discovery(&caps, &init, &event);
1172
1173 let c = caps.lock().unwrap();
1174 assert!(c.supports_encryption);
1175 assert!(!c.supports_ephemeral_encryption);
1176
1177 let stored = init.lock().unwrap();
1178 assert!(stored.is_some());
1179 assert_eq!(stored.as_ref().unwrap().id, event.id);
1180 }
1181
1182 #[test]
1183 fn client_learn_server_discovery_or_assigns() {
1184 let caps = Mutex::new(PeerCapabilities::default());
1185 let init = Mutex::new(None);
1186
1187 let event1 = make_event_with_tags(&[&["support_encryption"]]);
1188 NostrClientTransport::learn_server_discovery(&caps, &init, &event1);
1189
1190 let event2 = make_event_with_tags(&[&["support_encryption_ephemeral"]]);
1192 NostrClientTransport::learn_server_discovery(&caps, &init, &event2);
1193
1194 let c = caps.lock().unwrap();
1195 assert!(c.supports_encryption, "must not downgrade");
1196 assert!(c.supports_ephemeral_encryption, "must learn new cap");
1197 }
1198
1199 #[test]
1200 fn client_baseline_not_replaced_on_later_events() {
1201 let caps = Mutex::new(PeerCapabilities::default());
1202 let init = Mutex::new(None);
1203
1204 let event1 = make_event_with_tags(&[&["support_encryption"], &["name", "First"]]);
1205 NostrClientTransport::learn_server_discovery(&caps, &init, &event1);
1206 let first_id = event1.id;
1207
1208 let event2 =
1209 make_event_with_tags(&[&["support_encryption_ephemeral"], &["name", "Second"]]);
1210 NostrClientTransport::learn_server_discovery(&caps, &init, &event2);
1211
1212 let stored = init.lock().unwrap();
1213 assert_eq!(
1214 stored.as_ref().unwrap().id,
1215 first_id,
1216 "baseline must not be replaced"
1217 );
1218 }
1219}