Skip to main content

contextvm_sdk/transport/client/
mod.rs

1//! Client-side Nostr transport for ContextVM.
2//!
3//! Connects to a remote MCP server over Nostr. Sends JSON-RPC requests as
4//! kind 25910 events, correlates responses via `e` tag.
5
6pub mod correlation_store;
7
8pub use correlation_store::ClientCorrelationStore;
9
10use std::num::NonZeroUsize;
11use std::sync::atomic::{AtomicBool, Ordering};
12use std::sync::{Arc, Mutex};
13use std::time::Duration;
14
15use lru::LruCache;
16use nostr_sdk::prelude::*;
17use tokio_util::sync::CancellationToken;
18
19use crate::core::constants::*;
20use crate::core::error::{Error, Result};
21use crate::core::serializers;
22use crate::core::types::*;
23use crate::core::validation;
24use crate::encryption;
25use crate::relay::{RelayPool, RelayPoolTrait};
26use crate::transport::base::BaseTransport;
27use crate::transport::discovery_tags::{parse_discovered_peer_capabilities, PeerCapabilities};
28
29const LOG_TARGET: &str = "contextvm_sdk::transport::client";
30
31/// Configuration for the client transport.
32#[derive(Debug, Clone)]
33#[non_exhaustive]
34pub struct NostrClientTransportConfig {
35    /// Relay URLs to connect to.
36    pub relay_urls: Vec<String>,
37    /// The server's public key (hex).
38    pub server_pubkey: String,
39    /// Encryption mode.
40    pub encryption_mode: EncryptionMode,
41    /// Gift-wrap policy for encrypted messages.
42    pub gift_wrap_mode: GiftWrapMode,
43    /// Stateless mode: emulate initialize response locally.
44    pub is_stateless: bool,
45    /// Correlation-retention TTL for pending client requests (default: 30s).
46    ///
47    /// Stale pending entries older than this are swept from the correlation store.
48    /// This prevents leaks -- rmcp owns actual request timeout and cancellation.
49    /// Keep this value above your rmcp request timeout to avoid premature cleanup.
50    pub timeout: Duration,
51}
52
53impl Default for NostrClientTransportConfig {
54    fn default() -> Self {
55        Self {
56            relay_urls: vec!["wss://relay.damus.io".to_string()],
57            server_pubkey: String::new(),
58            encryption_mode: EncryptionMode::Optional,
59            gift_wrap_mode: GiftWrapMode::Optional,
60            is_stateless: false,
61            timeout: Duration::from_secs(30),
62        }
63    }
64}
65
66impl NostrClientTransportConfig {
67    /// Set the server's public key (hex).
68    pub fn with_server_pubkey(mut self, pubkey: impl Into<String>) -> Self {
69        self.server_pubkey = pubkey.into();
70        self
71    }
72    /// Set the encryption mode.
73    pub fn with_encryption_mode(mut self, mode: EncryptionMode) -> Self {
74        self.encryption_mode = mode;
75        self
76    }
77    /// Set the gift-wrap mode (CEP-19).
78    pub fn with_gift_wrap_mode(mut self, mode: GiftWrapMode) -> Self {
79        self.gift_wrap_mode = mode;
80        self
81    }
82    /// Enable or disable stateless mode.
83    pub fn with_stateless(mut self, stateless: bool) -> Self {
84        self.is_stateless = stateless;
85        self
86    }
87    /// Set the relay URLs to connect to.
88    pub fn with_relay_urls(mut self, urls: Vec<String>) -> Self {
89        self.relay_urls = urls;
90        self
91    }
92    /// Set the correlation-retention TTL.
93    pub fn with_timeout(mut self, timeout: Duration) -> Self {
94        self.timeout = timeout;
95        self
96    }
97}
98
99/// Client-side Nostr transport for sending MCP requests and receiving responses.
100pub struct NostrClientTransport {
101    base: BaseTransport,
102    config: NostrClientTransportConfig,
103    server_pubkey: PublicKey,
104    /// Pending request event IDs awaiting responses.
105    pending_requests: ClientCorrelationStore,
106    /// CEP-35: one-shot flag for client discovery tag emission.
107    has_sent_discovery_tags: AtomicBool,
108    /// CEP-35: learned server capabilities from inbound discovery tags.
109    discovered_server_capabilities: Arc<Mutex<PeerCapabilities>>,
110    /// CEP-35: first inbound event carrying discovery tags (session baseline).
111    server_initialize_event: Arc<Mutex<Option<Event>>>,
112    /// Learned support for server-side ephemeral gift wraps.
113    server_supports_ephemeral: Arc<AtomicBool>,
114    /// Outer gift-wrap event IDs successfully decrypted and verified (inner `verify()`).
115    /// Duplicate outer ids are skipped before decrypt; ids are inserted only after success
116    /// so failed decrypt/verify can be retried on redelivery.
117    seen_gift_wrap_ids: Arc<Mutex<LruCache<EventId, ()>>>,
118    /// Channel for receiving processed MCP messages from the event loop.
119    message_tx: Option<tokio::sync::mpsc::UnboundedSender<JsonRpcMessage>>,
120    message_rx: Option<tokio::sync::mpsc::UnboundedReceiver<JsonRpcMessage>>,
121    /// Token used to cancel the spawned event loop on close().
122    cancellation_token: CancellationToken,
123    /// Handle for the spawned event loop task.
124    event_loop_handle: Option<tokio::task::JoinHandle<()>>,
125}
126
127impl NostrClientTransport {
128    /// Create a new client transport.
129    pub async fn new<T>(signer: T, config: NostrClientTransportConfig) -> Result<Self>
130    where
131        T: IntoNostrSigner,
132    {
133        let server_pubkey = PublicKey::from_hex(&config.server_pubkey).map_err(|error| {
134            tracing::error!(
135                target: LOG_TARGET,
136                error = %error,
137                server_pubkey = %config.server_pubkey,
138                "Invalid server pubkey"
139            );
140            Error::Other(format!("Invalid server pubkey: {error}"))
141        })?;
142
143        let relay_pool: Arc<dyn RelayPoolTrait> =
144            Arc::new(RelayPool::new(signer).await.map_err(|error| {
145                tracing::error!(
146                    target: LOG_TARGET,
147                    error = %error,
148                    "Failed to initialize relay pool for client transport"
149                );
150                error
151            })?);
152        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
153        let seen_gift_wrap_ids = Arc::new(Mutex::new(LruCache::new(
154            NonZeroUsize::new(DEFAULT_LRU_SIZE).expect("DEFAULT_LRU_SIZE must be non-zero"),
155        )));
156
157        tracing::info!(
158            target: LOG_TARGET,
159            relay_count = config.relay_urls.len(),
160            stateless = config.is_stateless,
161            encryption_mode = ?config.encryption_mode,
162            "Created client transport"
163        );
164        Ok(Self {
165            base: BaseTransport {
166                relay_pool,
167                encryption_mode: config.encryption_mode,
168                is_connected: false,
169            },
170            config,
171            server_pubkey,
172            pending_requests: ClientCorrelationStore::new(),
173            has_sent_discovery_tags: AtomicBool::new(false),
174            discovered_server_capabilities: Arc::new(Mutex::new(PeerCapabilities::default())),
175            server_initialize_event: Arc::new(Mutex::new(None)),
176            server_supports_ephemeral: Arc::new(AtomicBool::new(false)),
177            seen_gift_wrap_ids,
178            message_tx: Some(tx),
179            message_rx: Some(rx),
180            cancellation_token: CancellationToken::new(),
181            event_loop_handle: None,
182        })
183    }
184
185    /// Like [`new`](Self::new) but accepts an existing relay pool.
186    pub async fn with_relay_pool(
187        config: NostrClientTransportConfig,
188        relay_pool: Arc<dyn RelayPoolTrait>,
189    ) -> Result<Self> {
190        let server_pubkey = PublicKey::from_hex(&config.server_pubkey).map_err(|error| {
191            tracing::error!(
192                target: LOG_TARGET,
193                error = %error,
194                server_pubkey = %config.server_pubkey,
195                "Invalid server pubkey"
196            );
197            Error::Other(format!("Invalid server pubkey: {error}"))
198        })?;
199
200        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
201        let seen_gift_wrap_ids = Arc::new(Mutex::new(LruCache::new(
202            NonZeroUsize::new(DEFAULT_LRU_SIZE).expect("DEFAULT_LRU_SIZE must be non-zero"),
203        )));
204
205        tracing::info!(
206            target: LOG_TARGET,
207            relay_count = config.relay_urls.len(),
208            stateless = config.is_stateless,
209            encryption_mode = ?config.encryption_mode,
210            "Created client transport (with_relay_pool)"
211        );
212        Ok(Self {
213            base: BaseTransport {
214                relay_pool,
215                encryption_mode: config.encryption_mode,
216                is_connected: false,
217            },
218            config,
219            server_pubkey,
220            pending_requests: ClientCorrelationStore::new(),
221            has_sent_discovery_tags: AtomicBool::new(false),
222            discovered_server_capabilities: Arc::new(Mutex::new(PeerCapabilities::default())),
223            server_initialize_event: Arc::new(Mutex::new(None)),
224            server_supports_ephemeral: Arc::new(AtomicBool::new(false)),
225            seen_gift_wrap_ids,
226            message_tx: Some(tx),
227            message_rx: Some(rx),
228            cancellation_token: CancellationToken::new(),
229            event_loop_handle: None,
230        })
231    }
232
233    /// Connect and start listening for responses.
234    pub async fn start(&mut self) -> Result<()> {
235        self.base
236            .connect(&self.config.relay_urls)
237            .await
238            .map_err(|error| {
239                tracing::error!(
240                    target: LOG_TARGET,
241                    error = %error,
242                    "Failed to connect client transport to relays"
243                );
244                error
245            })?;
246
247        let pubkey = self.base.get_public_key().await.map_err(|error| {
248            tracing::error!(
249                target: LOG_TARGET,
250                error = %error,
251                "Failed to fetch client transport public key"
252            );
253            error
254        })?;
255        tracing::info!(
256            target: LOG_TARGET,
257            pubkey = %pubkey.to_hex(),
258            "Client transport started"
259        );
260
261        self.base
262            .subscribe_for_pubkey(&pubkey)
263            .await
264            .map_err(|error| {
265                tracing::error!(
266                    target: LOG_TARGET,
267                    error = %error,
268                    pubkey = %pubkey.to_hex(),
269                    "Failed to subscribe client transport for pubkey"
270                );
271                error
272            })?;
273
274        // Spawn event loop with cancellation support
275        let relay_pool = Arc::clone(&self.base.relay_pool);
276        let pending = self.pending_requests.clone();
277        let server_pubkey = self.server_pubkey;
278        let tx = self
279            .message_tx
280            .as_ref()
281            .expect("message_tx must exist before start()")
282            .clone();
283        let encryption_mode = self.config.encryption_mode;
284        let gift_wrap_mode = self.config.gift_wrap_mode;
285        let discovered_caps = self.discovered_server_capabilities.clone();
286        let init_event = self.server_initialize_event.clone();
287        let server_supports_ephemeral = self.server_supports_ephemeral.clone();
288        let seen_gift_wrap_ids = self.seen_gift_wrap_ids.clone();
289        let timeout = self.config.timeout;
290        let token = self.cancellation_token.child_token();
291
292        self.event_loop_handle = Some(tokio::spawn(async move {
293            Self::event_loop(
294                relay_pool,
295                pending,
296                server_pubkey,
297                tx,
298                encryption_mode,
299                gift_wrap_mode,
300                discovered_caps,
301                init_event,
302                server_supports_ephemeral,
303                seen_gift_wrap_ids,
304                timeout,
305                token,
306            )
307            .await;
308        }));
309
310        tracing::info!(
311            target: LOG_TARGET,
312            relay_count = self.config.relay_urls.len(),
313            "Client transport event loop spawned"
314        );
315        Ok(())
316    }
317
318    /// Close the transport — cancels the event loop and disconnects from relays.
319    pub async fn close(&mut self) -> Result<()> {
320        self.cancellation_token.cancel();
321        if let Some(handle) = self.event_loop_handle.take() {
322            let _ = handle.await;
323        }
324        self.message_tx.take();
325        self.base.disconnect().await
326    }
327
328    /// Send a JSON-RPC message to the server.
329    pub async fn send(&self, message: &JsonRpcMessage) -> Result<()> {
330        // Stateless mode: emulate initialize response
331        if self.config.is_stateless {
332            if let JsonRpcMessage::Request(ref req) = message {
333                if req.method == "initialize" {
334                    self.emulate_initialize_response(&req.id);
335                    return Ok(());
336                }
337            }
338            if let JsonRpcMessage::Notification(ref n) = message {
339                if n.method == "notifications/initialized" {
340                    return Ok(());
341                }
342            }
343        }
344
345        let is_request = message.is_request();
346        let base_tags = BaseTransport::create_recipient_tags(&self.server_pubkey);
347        let discovery_tags = if is_request {
348            self.get_pending_client_discovery_tags()
349        } else {
350            vec![]
351        };
352        let tags = BaseTransport::compose_outbound_tags(&base_tags, &discovery_tags, &[]);
353
354        let (event_id, publishable_event) = self
355            .base
356            .prepare_mcp_message(
357                message,
358                &self.server_pubkey,
359                CTXVM_MESSAGES_KIND,
360                tags,
361                None,
362                Some(self.choose_outbound_gift_wrap_kind()),
363            )
364            .await
365            .map_err(|error| {
366                tracing::error!(
367                    target: LOG_TARGET,
368                    error = %error,
369                    server_pubkey = %self.server_pubkey.to_hex(),
370                    method = ?message.method(),
371                    "Failed to prepare client message"
372                );
373                error
374            })?;
375
376        if let JsonRpcMessage::Request(ref req) = message {
377            let is_initialize = req.method == INITIALIZE_METHOD;
378            self.pending_requests
379                .register(event_id.to_hex(), req.id.clone(), is_initialize)
380                .await;
381        }
382
383        if let Err(error) = self.base.relay_pool.publish_event(&publishable_event).await {
384            self.pending_requests.remove(&event_id.to_hex()).await;
385            tracing::error!(
386                target: LOG_TARGET,
387                error = %error,
388                server_pubkey = %self.server_pubkey.to_hex(),
389                method = ?message.method(),
390                "Failed to publish client message"
391            );
392            return Err(error);
393        }
394
395        // Flip one-shot flag only after successful publish
396        if is_request && !discovery_tags.is_empty() {
397            self.has_sent_discovery_tags.store(true, Ordering::Relaxed);
398        }
399
400        tracing::debug!(
401            target: LOG_TARGET,
402            event_id = %event_id.to_hex(),
403            method = ?message.method(),
404            "Sent client message"
405        );
406        Ok(())
407    }
408
409    /// Take the message receiver for consuming incoming messages.
410    pub fn take_message_receiver(
411        &mut self,
412    ) -> Option<tokio::sync::mpsc::UnboundedReceiver<JsonRpcMessage>> {
413        self.message_rx.take()
414    }
415
416    fn emulate_initialize_response(&self, request_id: &serde_json::Value) {
417        let response = JsonRpcMessage::Response(JsonRpcResponse {
418            jsonrpc: "2.0".to_string(),
419            id: request_id.clone(),
420            result: serde_json::json!({
421                "protocolVersion": crate::core::constants::mcp_protocol_version(),
422                "serverInfo": {
423                    "name": "Emulated-Stateless-Server",
424                    "version": "1.0.0"
425                },
426                "capabilities": {
427                    "tools": { "listChanged": true },
428                    "prompts": { "listChanged": true },
429                    "resources": { "subscribe": true, "listChanged": true }
430                }
431            }),
432        });
433        if let Some(ref tx) = self.message_tx {
434            let _ = tx.send(response);
435        }
436    }
437
438    #[allow(clippy::too_many_arguments)]
439    async fn event_loop(
440        relay_pool: Arc<dyn RelayPoolTrait>,
441        pending: ClientCorrelationStore,
442        server_pubkey: PublicKey,
443        tx: tokio::sync::mpsc::UnboundedSender<JsonRpcMessage>,
444        encryption_mode: EncryptionMode,
445        gift_wrap_mode: GiftWrapMode,
446        discovered_caps: Arc<Mutex<PeerCapabilities>>,
447        init_event: Arc<Mutex<Option<Event>>>,
448        server_supports_ephemeral: Arc<AtomicBool>,
449        seen_gift_wrap_ids: Arc<Mutex<LruCache<EventId, ()>>>,
450        timeout: Duration,
451        cancel: CancellationToken,
452    ) {
453        let mut notifications = relay_pool.notifications();
454        // Sweep interval: half the timeout, clamped to [1s, 30s].
455        let sweep_interval = (timeout / 2).clamp(Duration::from_secs(1), Duration::from_secs(30));
456        let mut sweep_timer =
457            tokio::time::interval_at(tokio::time::Instant::now() + sweep_interval, sweep_interval);
458
459        loop {
460            tokio::select! {
461                _ = cancel.cancelled() => {
462                    tracing::info!(
463                        target: LOG_TARGET,
464                        "Client event loop cancelled"
465                    );
466                    break;
467                }
468                result = notifications.recv() => {
469                    let notification = match result {
470                        Ok(n) => n,
471                        Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
472                            tracing::warn!(
473                                target: LOG_TARGET,
474                                skipped = n,
475                                "Relay broadcast lagged, skipping missed events"
476                            );
477                            continue;
478                        }
479                        Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
480                    };
481                    Self::handle_notification(
482                        &notification,
483                        &pending,
484                        server_pubkey,
485                        &tx,
486                        encryption_mode,
487                        gift_wrap_mode,
488                        &discovered_caps,
489                        &init_event,
490                        &server_supports_ephemeral,
491                        &seen_gift_wrap_ids,
492                        &relay_pool,
493                    )
494                    .await;
495                }
496                _ = sweep_timer.tick() => {
497                    let swept = pending.sweep_expired(timeout).await;
498                    if swept > 0 {
499                        tracing::warn!(
500                            target: LOG_TARGET,
501                            swept,
502                            timeout_ms = timeout.as_millis() as u64,
503                            "Swept stale pending requests (rmcp handles timeout errors)"
504                        );
505                    }
506                }
507            }
508        }
509    }
510
511    // ── CEP-35 discovery tag helpers ──────────────────────────────
512
513    /// Constructs client capability tags based on config.
514    fn get_client_capability_tags(&self) -> Vec<Tag> {
515        let mut tags = Vec::new();
516        if self.config.encryption_mode != EncryptionMode::Disabled {
517            tags.push(Tag::custom(
518                TagKind::Custom(tags::SUPPORT_ENCRYPTION.into()),
519                Vec::<String>::new(),
520            ));
521            if self.config.gift_wrap_mode != GiftWrapMode::Persistent {
522                tags.push(Tag::custom(
523                    TagKind::Custom(tags::SUPPORT_ENCRYPTION_EPHEMERAL.into()),
524                    Vec::<String>::new(),
525                ));
526            }
527        }
528        tags
529    }
530
531    /// One-shot: returns capability tags if not yet sent, empty otherwise.
532    fn get_pending_client_discovery_tags(&self) -> Vec<Tag> {
533        if self.has_sent_discovery_tags.load(Ordering::Relaxed) {
534            vec![]
535        } else {
536            self.get_client_capability_tags()
537        }
538    }
539
540    /// Parses inbound event tags and updates learned server capabilities.
541    fn learn_server_discovery(
542        discovered_caps: &Mutex<PeerCapabilities>,
543        init_event: &Mutex<Option<Event>>,
544        event: &Event,
545    ) {
546        let tag_vec: Vec<Tag> = event.tags.clone().to_vec();
547        let discovered = parse_discovered_peer_capabilities(&tag_vec);
548        if discovered.discovery_tags.is_empty() {
549            return;
550        }
551
552        {
553            let mut caps = match discovered_caps.lock() {
554                Ok(g) => g,
555                Err(p) => p.into_inner(),
556            };
557            caps.supports_encryption |= discovered.capabilities.supports_encryption;
558            caps.supports_ephemeral_encryption |=
559                discovered.capabilities.supports_ephemeral_encryption;
560            caps.supports_oversized_transfer |= discovered.capabilities.supports_oversized_transfer;
561        }
562
563        let mut stored = match init_event.lock() {
564            Ok(g) => g,
565            Err(p) => p.into_inner(),
566        };
567        if stored.is_none() {
568            *stored = Some(event.clone());
569        }
570        // Note: TS SDK has an upgrade path where a later event with an InitializeResult
571        // replaces a non-initialize baseline. Not implemented here -- edge case only
572        // relevant if the first server message with discovery tags is a notification.
573    }
574
575    /// Returns a clone of the first inbound event that carried server discovery tags.
576    pub fn get_server_initialize_event(&self) -> Option<Event> {
577        let guard = match self.server_initialize_event.lock() {
578            Ok(g) => g,
579            Err(p) => p.into_inner(),
580        };
581        guard.clone()
582    }
583
584    /// Returns a snapshot of the learned server capabilities from discovery tags.
585    pub fn discovered_server_capabilities(&self) -> PeerCapabilities {
586        let guard = match self.discovered_server_capabilities.lock() {
587            Ok(g) => g,
588            Err(p) => p.into_inner(),
589        };
590        *guard
591    }
592
593    #[allow(clippy::too_many_arguments)]
594    async fn handle_notification(
595        notification: &RelayPoolNotification,
596        pending: &ClientCorrelationStore,
597        server_pubkey: PublicKey,
598        tx: &tokio::sync::mpsc::UnboundedSender<JsonRpcMessage>,
599        encryption_mode: EncryptionMode,
600        gift_wrap_mode: GiftWrapMode,
601        discovered_caps: &Arc<Mutex<PeerCapabilities>>,
602        init_event: &Arc<Mutex<Option<Event>>>,
603        server_supports_ephemeral: &Arc<AtomicBool>,
604        seen_gift_wrap_ids: &Arc<Mutex<LruCache<EventId, ()>>>,
605        relay_pool: &Arc<dyn RelayPoolTrait>,
606    ) {
607        let event = match notification {
608            RelayPoolNotification::Event { event, .. } => event,
609            _ => return,
610        };
611
612        let is_gift_wrap = is_gift_wrap_kind(&event.kind);
613        let outer_kind = event.kind.as_u16();
614
615        // Enforce encryption mode before decrypt/parse.
616        if violates_encryption_policy(&event.kind, &encryption_mode) {
617            if is_gift_wrap {
618                tracing::warn!(
619                    target: LOG_TARGET,
620                    event_id = %event.id.to_hex(),
621                    event_kind = outer_kind,
622                    configured_mode = ?gift_wrap_mode,
623                    "Skipping encrypted response because client encryption is disabled"
624                );
625            } else {
626                tracing::warn!(
627                    target: LOG_TARGET,
628                    event_id = %event.id.to_hex(),
629                    "Skipping plaintext response because client encryption is required"
630                );
631            }
632            return;
633        }
634
635        // Enforce CEP-19 gift-wrap-mode policy.
636        if is_gift_wrap && !gift_wrap_mode.allows_kind(outer_kind) {
637            tracing::warn!(
638                target: LOG_TARGET,
639                event_id = %event.id.to_hex(),
640                event_kind = outer_kind,
641                configured_mode = ?gift_wrap_mode,
642                "Skipping gift wrap due to CEP-19 policy"
643            );
644            return;
645        }
646
647        // Handle gift-wrapped events
648        let (actual_event_content, actual_pubkey, e_tag, verified_tags, source_event) =
649            if is_gift_wrap {
650                {
651                    let guard = match seen_gift_wrap_ids.lock() {
652                        Ok(g) => g,
653                        Err(poisoned) => poisoned.into_inner(),
654                    };
655                    if guard.contains(&event.id) {
656                        tracing::debug!(
657                            target: LOG_TARGET,
658                            event_id = %event.id.to_hex(),
659                            "Skipping duplicate gift-wrap (outer id)"
660                        );
661                        return;
662                    }
663                }
664                // Single-layer NIP-44 decrypt (matches JS/TS SDK)
665                let signer = match relay_pool.signer().await {
666                    Ok(s) => s,
667                    Err(error) => {
668                        tracing::error!(
669                            target: LOG_TARGET,
670                            error = %error,
671                            "Failed to get signer"
672                        );
673                        return;
674                    }
675                };
676                match encryption::decrypt_gift_wrap_single_layer(&signer, event).await {
677                    Ok(decrypted_json) => match serde_json::from_str::<Event>(&decrypted_json) {
678                        Ok(inner) => {
679                            if let Err(e) = inner.verify() {
680                                tracing::warn!("Inner event signature verification failed: {e}");
681                                return;
682                            }
683                            {
684                                let mut guard = match seen_gift_wrap_ids.lock() {
685                                    Ok(g) => g,
686                                    Err(poisoned) => poisoned.into_inner(),
687                                };
688                                guard.put(event.id, ());
689                            }
690                            let e_tag = serializers::get_tag_value(&inner.tags, "e");
691                            let inner_clone = inner.clone();
692                            (inner.content, inner.pubkey, e_tag, inner.tags, inner_clone)
693                        }
694                        Err(error) => {
695                            tracing::error!(
696                                target: LOG_TARGET,
697                                error = %error,
698                                "Failed to parse inner event"
699                            );
700                            return;
701                        }
702                    },
703                    Err(error) => {
704                        tracing::error!(
705                            target: LOG_TARGET,
706                            error = %error,
707                            "Failed to decrypt gift wrap"
708                        );
709                        return;
710                    }
711                }
712            } else {
713                let e_tag = serializers::get_tag_value(&event.tags, "e");
714                let event_clone: Event = (**event).clone();
715                (
716                    event.content.clone(),
717                    event.pubkey,
718                    e_tag,
719                    event.tags.clone(),
720                    event_clone,
721                )
722            };
723
724        // Verify it's from our server
725        if actual_pubkey != server_pubkey {
726            tracing::debug!(
727                target: LOG_TARGET,
728                event_pubkey = %actual_pubkey.to_hex(),
729                expected_pubkey = %server_pubkey.to_hex(),
730                "Skipping event from unexpected pubkey"
731            );
732            return;
733        }
734
735        // CEP-35: learn server capabilities from discovery tags
736        Self::learn_server_discovery(discovered_caps, init_event, &source_event);
737
738        // CEP-19: learn ephemeral support from server
739        if Self::should_learn_ephemeral_support(
740            actual_pubkey,
741            server_pubkey,
742            if is_gift_wrap { Some(outer_kind) } else { None },
743            &verified_tags,
744        ) {
745            server_supports_ephemeral.store(true, Ordering::Relaxed);
746        }
747
748        // Correlate response
749        if let Some(ref correlated_id) = e_tag {
750            let is_pending = pending.contains(correlated_id.as_str()).await;
751            if !is_pending {
752                tracing::warn!(
753                    target: LOG_TARGET,
754                    correlated_event_id = %correlated_id,
755                    "Response for unknown request"
756                );
757                return;
758            }
759        }
760
761        // Parse MCP message
762        if let Some(mcp_msg) = validation::validate_and_parse(&actual_event_content) {
763            // Drop uncorrelated responses and server-to-client requests (matches TS SDK).
764            match &mcp_msg {
765                JsonRpcMessage::Response(_) | JsonRpcMessage::ErrorResponse(_)
766                    if e_tag.is_none() =>
767                {
768                    tracing::warn!(
769                        target: LOG_TARGET,
770                        "Dropping response/error without correlation `e` tag"
771                    );
772                    return;
773                }
774                JsonRpcMessage::Request(_) => {
775                    tracing::warn!(
776                        target: LOG_TARGET,
777                        method = ?mcp_msg.method(),
778                        "Dropping server-to-client request (invalid in MCP)"
779                    );
780                    return;
781                }
782                _ => {}
783            }
784
785            // Clean up pending request
786            if let Some(ref correlated_id) = e_tag {
787                pending.remove(correlated_id.as_str()).await;
788            }
789            let _ = tx.send(mcp_msg);
790        }
791    }
792
793    fn choose_outbound_gift_wrap_kind(&self) -> u16 {
794        match self.config.gift_wrap_mode {
795            GiftWrapMode::Persistent => GIFT_WRAP_KIND,
796            GiftWrapMode::Ephemeral => EPHEMERAL_GIFT_WRAP_KIND,
797            GiftWrapMode::Optional => {
798                if self.server_supports_ephemeral.load(Ordering::Relaxed) {
799                    EPHEMERAL_GIFT_WRAP_KIND
800                } else {
801                    GIFT_WRAP_KIND
802                }
803            }
804        }
805    }
806
807    fn has_support_ephemeral_tag(tags: &Tags) -> bool {
808        tags.iter().any(|tag| {
809            tag.kind()
810                == TagKind::Custom(
811                    crate::core::constants::tags::SUPPORT_ENCRYPTION_EPHEMERAL.into(),
812                )
813        })
814    }
815
816    fn should_learn_ephemeral_support(
817        actual_pubkey: PublicKey,
818        server_pubkey: PublicKey,
819        event_kind: Option<u16>,
820        tags: &Tags,
821    ) -> bool {
822        actual_pubkey == server_pubkey
823            && (event_kind == Some(EPHEMERAL_GIFT_WRAP_KIND)
824                || Self::has_support_ephemeral_tag(tags))
825    }
826
827    /// Returns whether the client has learned ephemeral gift-wrap support from the server.
828    pub fn server_supports_ephemeral_encryption(&self) -> bool {
829        self.server_supports_ephemeral.load(Ordering::Relaxed)
830    }
831}
832
833#[inline]
834fn is_gift_wrap_kind(kind: &Kind) -> bool {
835    *kind == Kind::Custom(GIFT_WRAP_KIND) || *kind == Kind::Custom(EPHEMERAL_GIFT_WRAP_KIND)
836}
837
838/// Returns `true` when the inbound event kind violates the configured encryption
839/// policy and must be dropped before any further processing.
840#[inline]
841fn violates_encryption_policy(kind: &Kind, mode: &EncryptionMode) -> bool {
842    let is_gift_wrap = is_gift_wrap_kind(kind);
843    (is_gift_wrap && *mode == EncryptionMode::Disabled)
844        || (!is_gift_wrap && *mode == EncryptionMode::Required)
845}
846
847#[cfg(test)]
848mod tests {
849    use super::*;
850
851    #[test]
852    fn test_config_defaults() {
853        let config = NostrClientTransportConfig::default();
854        assert_eq!(config.relay_urls, vec!["wss://relay.damus.io".to_string()]);
855        assert!(config.server_pubkey.is_empty());
856        assert_eq!(config.encryption_mode, EncryptionMode::Optional);
857        assert_eq!(config.gift_wrap_mode, GiftWrapMode::Optional);
858        assert!(!config.is_stateless);
859        assert_eq!(config.timeout, Duration::from_secs(30));
860    }
861
862    #[test]
863    fn test_stateless_config() {
864        let config = NostrClientTransportConfig {
865            is_stateless: true,
866            ..Default::default()
867        };
868        assert!(config.is_stateless);
869    }
870
871    #[test]
872    fn test_custom_timeout_config() {
873        let config = NostrClientTransportConfig {
874            timeout: Duration::from_secs(60),
875            ..Default::default()
876        };
877        assert_eq!(config.timeout, Duration::from_secs(60));
878    }
879
880    #[test]
881    fn test_has_support_ephemeral_tag_detects_capability() {
882        let tags = Tags::from_list(vec![Tag::custom(
883            TagKind::Custom(crate::core::constants::tags::SUPPORT_ENCRYPTION_EPHEMERAL.into()),
884            Vec::<String>::new(),
885        )]);
886        assert!(NostrClientTransport::has_support_ephemeral_tag(&tags));
887    }
888
889    #[test]
890    fn test_has_support_ephemeral_tag_absent() {
891        let tags = Tags::from_list(vec![Tag::custom(
892            TagKind::Custom(crate::core::constants::tags::SUPPORT_ENCRYPTION.into()),
893            Vec::<String>::new(),
894        )]);
895        assert!(!NostrClientTransport::has_support_ephemeral_tag(&tags));
896    }
897
898    #[test]
899    fn test_should_learn_ephemeral_support_requires_matching_server_pubkey() {
900        let server_keys = Keys::generate();
901        let other_keys = Keys::generate();
902        let tags = Tags::from_list(vec![Tag::custom(
903            TagKind::Custom(crate::core::constants::tags::SUPPORT_ENCRYPTION_EPHEMERAL.into()),
904            Vec::<String>::new(),
905        )]);
906
907        assert!(!NostrClientTransport::should_learn_ephemeral_support(
908            other_keys.public_key(),
909            server_keys.public_key(),
910            Some(EPHEMERAL_GIFT_WRAP_KIND),
911            &tags,
912        ));
913        assert!(NostrClientTransport::should_learn_ephemeral_support(
914            server_keys.public_key(),
915            server_keys.public_key(),
916            Some(EPHEMERAL_GIFT_WRAP_KIND),
917            &tags,
918        ));
919    }
920
921    #[test]
922    fn test_should_learn_from_ephemeral_kind_even_without_tag() {
923        let server_keys = Keys::generate();
924        let empty_tags = Tags::from_list(vec![]);
925
926        assert!(NostrClientTransport::should_learn_ephemeral_support(
927            server_keys.public_key(),
928            server_keys.public_key(),
929            Some(EPHEMERAL_GIFT_WRAP_KIND),
930            &empty_tags,
931        ));
932    }
933
934    #[test]
935    fn test_should_learn_from_tag_without_ephemeral_kind() {
936        let server_keys = Keys::generate();
937        let tags = Tags::from_list(vec![Tag::custom(
938            TagKind::Custom(crate::core::constants::tags::SUPPORT_ENCRYPTION_EPHEMERAL.into()),
939            Vec::<String>::new(),
940        )]);
941
942        assert!(NostrClientTransport::should_learn_ephemeral_support(
943            server_keys.public_key(),
944            server_keys.public_key(),
945            Some(GIFT_WRAP_KIND), // persistent kind, but tag present
946            &tags,
947        ));
948    }
949
950    #[test]
951    fn test_stateless_emulated_initialize_response_shape() {
952        let request_id = serde_json::json!(1);
953        let response = JsonRpcMessage::Response(JsonRpcResponse {
954            jsonrpc: "2.0".to_string(),
955            id: request_id.clone(),
956            result: serde_json::json!({
957                "protocolVersion": crate::core::constants::mcp_protocol_version(),
958                "serverInfo": {
959                    "name": "Emulated-Stateless-Server",
960                    "version": "1.0.0"
961                },
962                "capabilities": {
963                    "tools": { "listChanged": true },
964                    "prompts": { "listChanged": true },
965                    "resources": { "subscribe": true, "listChanged": true }
966                }
967            }),
968        });
969        assert!(response.is_response());
970        assert_eq!(response.id(), Some(&serde_json::json!(1)));
971
972        if let JsonRpcMessage::Response(r) = &response {
973            assert!(r.result.get("capabilities").is_some());
974            assert!(r.result.get("serverInfo").is_some());
975            let server_info = r.result.get("serverInfo").unwrap();
976            assert_eq!(
977                server_info.get("name").unwrap().as_str().unwrap(),
978                "Emulated-Stateless-Server"
979            );
980        }
981    }
982
983    #[test]
984    fn test_stateless_mode_initialize_request_detection() {
985        let init_req = JsonRpcMessage::Request(JsonRpcRequest {
986            jsonrpc: "2.0".to_string(),
987            id: serde_json::json!(1),
988            method: "initialize".to_string(),
989            params: None,
990        });
991        assert_eq!(init_req.method(), Some("initialize"));
992
993        let init_notif = JsonRpcMessage::Notification(JsonRpcNotification {
994            jsonrpc: "2.0".to_string(),
995            method: "notifications/initialized".to_string(),
996            params: None,
997        });
998        assert_eq!(init_notif.method(), Some("notifications/initialized"));
999    }
1000
1001    #[test]
1002    fn test_gift_wrap_kind_detection() {
1003        assert!(is_gift_wrap_kind(&Kind::Custom(GIFT_WRAP_KIND)));
1004        assert!(is_gift_wrap_kind(&Kind::Custom(EPHEMERAL_GIFT_WRAP_KIND)));
1005        assert!(!is_gift_wrap_kind(&Kind::Custom(CTXVM_MESSAGES_KIND)));
1006    }
1007
1008    #[test]
1009    fn test_required_mode_drops_plaintext() {
1010        let plaintext_kind = Kind::Custom(CTXVM_MESSAGES_KIND);
1011        assert!(
1012            violates_encryption_policy(&plaintext_kind, &EncryptionMode::Required),
1013            "Required mode must reject plaintext (non-gift-wrap) events"
1014        );
1015    }
1016
1017    #[test]
1018    fn test_disabled_mode_drops_encrypted() {
1019        assert!(
1020            violates_encryption_policy(&Kind::Custom(GIFT_WRAP_KIND), &EncryptionMode::Disabled),
1021            "Disabled mode must reject gift-wrap events"
1022        );
1023        assert!(
1024            violates_encryption_policy(
1025                &Kind::Custom(EPHEMERAL_GIFT_WRAP_KIND),
1026                &EncryptionMode::Disabled
1027            ),
1028            "Disabled mode must reject ephemeral gift-wrap events"
1029        );
1030    }
1031
1032    #[test]
1033    fn test_optional_mode_accepts_all() {
1034        let plaintext = Kind::Custom(CTXVM_MESSAGES_KIND);
1035        let gift_wrap = Kind::Custom(GIFT_WRAP_KIND);
1036        let ephemeral = Kind::Custom(EPHEMERAL_GIFT_WRAP_KIND);
1037        assert!(!violates_encryption_policy(
1038            &plaintext,
1039            &EncryptionMode::Optional
1040        ));
1041        assert!(!violates_encryption_policy(
1042            &gift_wrap,
1043            &EncryptionMode::Optional
1044        ));
1045        assert!(!violates_encryption_policy(
1046            &ephemeral,
1047            &EncryptionMode::Optional
1048        ));
1049    }
1050
1051    #[test]
1052    fn test_required_mode_accepts_encrypted() {
1053        assert!(
1054            !violates_encryption_policy(&Kind::Custom(GIFT_WRAP_KIND), &EncryptionMode::Required),
1055            "Required mode must accept gift-wrap events"
1056        );
1057        assert!(
1058            !violates_encryption_policy(
1059                &Kind::Custom(EPHEMERAL_GIFT_WRAP_KIND),
1060                &EncryptionMode::Required
1061            ),
1062            "Required mode must accept ephemeral gift-wrap events"
1063        );
1064    }
1065
1066    #[test]
1067    fn test_disabled_mode_accepts_plaintext() {
1068        let plaintext = Kind::Custom(CTXVM_MESSAGES_KIND);
1069        assert!(
1070            !violates_encryption_policy(&plaintext, &EncryptionMode::Disabled),
1071            "Disabled mode must accept plaintext events"
1072        );
1073    }
1074
1075    // ── CEP-35 client discovery tag emission ────────────────────
1076
1077    fn make_transport_for_tags(
1078        encryption_mode: EncryptionMode,
1079        gift_wrap_mode: GiftWrapMode,
1080    ) -> NostrClientTransport {
1081        let keys = Keys::generate();
1082        NostrClientTransport {
1083            base: BaseTransport {
1084                relay_pool: Arc::new(crate::relay::mock::MockRelayPool::new()),
1085                encryption_mode,
1086                is_connected: false,
1087            },
1088            config: NostrClientTransportConfig {
1089                encryption_mode,
1090                gift_wrap_mode,
1091                server_pubkey: Keys::generate().public_key().to_hex(),
1092                ..Default::default()
1093            },
1094            server_pubkey: keys.public_key(),
1095            pending_requests: ClientCorrelationStore::new(),
1096            has_sent_discovery_tags: AtomicBool::new(false),
1097            discovered_server_capabilities: Arc::new(Mutex::new(PeerCapabilities::default())),
1098            server_initialize_event: Arc::new(Mutex::new(None)),
1099            server_supports_ephemeral: Arc::new(AtomicBool::new(false)),
1100            seen_gift_wrap_ids: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(10).unwrap()))),
1101            message_tx: Some(tokio::sync::mpsc::unbounded_channel().0),
1102            message_rx: None,
1103            cancellation_token: CancellationToken::new(),
1104            event_loop_handle: None,
1105        }
1106    }
1107
1108    fn make_tag(parts: &[&str]) -> Tag {
1109        let kind = TagKind::Custom(parts[0].into());
1110        let values: Vec<String> = parts[1..].iter().map(|s| s.to_string()).collect();
1111        Tag::custom(kind, values)
1112    }
1113
1114    fn tag_names(tags: &[Tag]) -> Vec<String> {
1115        tags.iter().map(|t| t.clone().to_vec()[0].clone()).collect()
1116    }
1117
1118    #[test]
1119    fn client_capability_tags_encryption_optional() {
1120        let t = make_transport_for_tags(EncryptionMode::Optional, GiftWrapMode::Optional);
1121        let tags = t.get_client_capability_tags();
1122        let names = tag_names(&tags);
1123        assert_eq!(
1124            names,
1125            vec!["support_encryption", "support_encryption_ephemeral"]
1126        );
1127    }
1128
1129    #[test]
1130    fn client_capability_tags_encryption_disabled() {
1131        let t = make_transport_for_tags(EncryptionMode::Disabled, GiftWrapMode::Optional);
1132        let tags = t.get_client_capability_tags();
1133        assert!(tags.is_empty());
1134    }
1135
1136    #[test]
1137    fn client_capability_tags_persistent_gift_wrap() {
1138        let t = make_transport_for_tags(EncryptionMode::Optional, GiftWrapMode::Persistent);
1139        let tags = t.get_client_capability_tags();
1140        let names = tag_names(&tags);
1141        assert_eq!(names, vec!["support_encryption"]);
1142    }
1143
1144    #[test]
1145    fn client_discovery_tags_sent_once() {
1146        let t = make_transport_for_tags(EncryptionMode::Optional, GiftWrapMode::Optional);
1147        let first = t.get_pending_client_discovery_tags();
1148        assert!(!first.is_empty());
1149
1150        t.has_sent_discovery_tags.store(true, Ordering::Relaxed);
1151        let second = t.get_pending_client_discovery_tags();
1152        assert!(second.is_empty());
1153    }
1154
1155    // ── CEP-35 client capability learning ───────────────────────
1156
1157    fn make_event_with_tags(tag_parts: &[&[&str]]) -> Event {
1158        let keys = Keys::generate();
1159        let tags: Vec<Tag> = tag_parts.iter().map(|p| make_tag(p)).collect();
1160        let builder = EventBuilder::new(Kind::Custom(CTXVM_MESSAGES_KIND), "{}").tags(tags);
1161        let unsigned = builder.build(keys.public_key());
1162        unsigned.sign_with_keys(&keys).unwrap()
1163    }
1164
1165    #[test]
1166    fn client_learn_server_discovery_sets_baseline() {
1167        let caps = Mutex::new(PeerCapabilities::default());
1168        let init = Mutex::new(None);
1169        let event = make_event_with_tags(&[&["support_encryption"], &["name", "TestServer"]]);
1170
1171        NostrClientTransport::learn_server_discovery(&caps, &init, &event);
1172
1173        let c = caps.lock().unwrap();
1174        assert!(c.supports_encryption);
1175        assert!(!c.supports_ephemeral_encryption);
1176
1177        let stored = init.lock().unwrap();
1178        assert!(stored.is_some());
1179        assert_eq!(stored.as_ref().unwrap().id, event.id);
1180    }
1181
1182    #[test]
1183    fn client_learn_server_discovery_or_assigns() {
1184        let caps = Mutex::new(PeerCapabilities::default());
1185        let init = Mutex::new(None);
1186
1187        let event1 = make_event_with_tags(&[&["support_encryption"]]);
1188        NostrClientTransport::learn_server_discovery(&caps, &init, &event1);
1189
1190        // Second event with different caps does NOT downgrade
1191        let event2 = make_event_with_tags(&[&["support_encryption_ephemeral"]]);
1192        NostrClientTransport::learn_server_discovery(&caps, &init, &event2);
1193
1194        let c = caps.lock().unwrap();
1195        assert!(c.supports_encryption, "must not downgrade");
1196        assert!(c.supports_ephemeral_encryption, "must learn new cap");
1197    }
1198
1199    #[test]
1200    fn client_baseline_not_replaced_on_later_events() {
1201        let caps = Mutex::new(PeerCapabilities::default());
1202        let init = Mutex::new(None);
1203
1204        let event1 = make_event_with_tags(&[&["support_encryption"], &["name", "First"]]);
1205        NostrClientTransport::learn_server_discovery(&caps, &init, &event1);
1206        let first_id = event1.id;
1207
1208        let event2 =
1209            make_event_with_tags(&[&["support_encryption_ephemeral"], &["name", "Second"]]);
1210        NostrClientTransport::learn_server_discovery(&caps, &init, &event2);
1211
1212        let stored = init.lock().unwrap();
1213        assert_eq!(
1214            stored.as_ref().unwrap().id,
1215            first_id,
1216            "baseline must not be replaced"
1217        );
1218    }
1219}