turbomcp_client/client/
protocol.rs

1//! Protocol client for JSON-RPC communication
2//!
3//! This module provides the ProtocolClient which handles the low-level
4//! JSON-RPC protocol communication with MCP servers.
5//!
6//! ## Bidirectional Communication Architecture
7//!
8//! The ProtocolClient uses a MessageDispatcher to solve the bidirectional
9//! communication problem. Instead of directly calling `transport.receive()`,
10//! which created race conditions when multiple code paths tried to receive,
11//! we now use a centralized message routing layer:
12//!
13//! ```text
14//! ProtocolClient::request()
15//!     ↓
16//!   1. Register oneshot channel with dispatcher
17//!   2. Send request via transport
18//!   3. Wait on oneshot channel
19//!     ↓
20//! MessageDispatcher (background task)
21//!     ↓
22//!   Continuously reads transport.receive()
23//!   Routes responses → oneshot channels
24//!   Routes requests → Client handlers
25//! ```
26//!
27//! This ensures there's only ONE consumer of transport.receive(),
28//! eliminating the race condition.
29
30use std::sync::Arc;
31use std::sync::atomic::{AtomicU64, Ordering};
32
33use turbomcp_protocol::jsonrpc::{JsonRpcRequest, JsonRpcVersion};
34use turbomcp_protocol::{Error, Result};
35use turbomcp_transport::{Transport, TransportMessage};
36
37use super::dispatcher::MessageDispatcher;
38
39/// JSON-RPC protocol handler for MCP communication
40///
41/// Handles request/response correlation, serialization, and protocol-level concerns.
42/// This is the abstraction layer between raw Transport and high-level Client APIs.
43///
44/// ## Architecture
45///
46/// The ProtocolClient now uses a MessageDispatcher to handle bidirectional
47/// communication correctly. The dispatcher runs a background task that:
48/// - Reads ALL messages from the transport
49/// - Routes responses to waiting request() calls
50/// - Routes incoming requests to registered handlers
51///
52/// This eliminates race conditions by centralizing all message routing
53/// in a single background task.
54#[derive(Debug)]
55pub(super) struct ProtocolClient<T: Transport> {
56    transport: Arc<T>,
57    dispatcher: Arc<MessageDispatcher>,
58    next_id: AtomicU64,
59}
60
61impl<T: Transport + 'static> ProtocolClient<T> {
62    /// Create a new protocol client with message dispatcher
63    ///
64    /// This automatically starts the message routing background task.
65    pub(super) fn new(transport: T) -> Self {
66        let transport = Arc::new(transport);
67        let dispatcher = MessageDispatcher::new(transport.clone());
68
69        Self {
70            transport,
71            dispatcher,
72            next_id: AtomicU64::new(1),
73        }
74    }
75
76    /// Get the message dispatcher for handler registration
77    ///
78    /// This allows the Client to register request/notification handlers
79    /// with the dispatcher.
80    pub(super) fn dispatcher(&self) -> &Arc<MessageDispatcher> {
81        &self.dispatcher
82    }
83
84    /// Send JSON-RPC request and await typed response
85    ///
86    /// ## New Architecture (v2.0+)
87    ///
88    /// Instead of calling `transport.receive()` directly (which created the
89    /// race condition), this method now:
90    ///
91    /// 1. Registers a oneshot channel with the dispatcher BEFORE sending
92    /// 2. Sends the request via transport
93    /// 3. Waits on the oneshot channel for the response
94    ///
95    /// The dispatcher's background task receives the response and routes it
96    /// to the oneshot channel. This ensures responses always reach the right
97    /// request() call, even when the server sends requests (elicitation, etc.)
98    /// in between.
99    ///
100    /// ## Example Flow with Elicitation
101    ///
102    /// ```text
103    /// Client: call_tool("test") → request(id=1)
104    ///   1. Register oneshot channel for id=1
105    ///   2. Send tools/call request
106    ///   3. Wait on channel...
107    ///
108    /// Server: Sends elicitation/create request (id=2)
109    ///   → Dispatcher routes to request handler
110    ///   → Client processes elicitation
111    ///   → Client sends elicitation response
112    ///
113    /// Server: Sends tools/call response (id=1)
114    ///   → Dispatcher routes to oneshot channel for id=1
115    ///   → request() receives response ✓
116    /// ```
117    pub(super) async fn request<R: serde::de::DeserializeOwned>(
118        &self,
119        method: &str,
120        params: Option<serde_json::Value>,
121    ) -> Result<R> {
122        // Generate unique request ID
123        let id = self.next_id.fetch_add(1, Ordering::Relaxed);
124        let request_id = turbomcp_protocol::MessageId::from(id.to_string());
125
126        // Build JSON-RPC request
127        let request = JsonRpcRequest {
128            jsonrpc: JsonRpcVersion,
129            id: request_id.clone(),
130            method: method.to_string(),
131            params,
132        };
133
134        // Step 1: Register oneshot channel BEFORE sending request
135        // This ensures the dispatcher can route the response when it arrives
136        let response_receiver = self.dispatcher.wait_for_response(request_id.clone());
137
138        // Step 2: Serialize and send request
139        let payload = serde_json::to_vec(&request)
140            .map_err(|e| Error::protocol(format!("Failed to serialize request: {e}")))?;
141
142        let message = TransportMessage::new(
143            turbomcp_protocol::MessageId::from(format!("req-{id}")),
144            payload.into(),
145        );
146
147        self.transport
148            .send(message)
149            .await
150            .map_err(|e| Error::transport(format!("Transport send failed: {e}")))?;
151
152        // Step 3: Wait for response via oneshot channel
153        // The dispatcher's background task will send the response when it arrives
154        let response = response_receiver
155            .await
156            .map_err(|_| Error::transport("Response channel closed".to_string()))?;
157
158        // Handle JSON-RPC errors
159        if let Some(error) = response.error() {
160            tracing::info!(
161                "🔍 [protocol.rs] Received JSON-RPC error - code: {}, message: {}",
162                error.code,
163                error.message
164            );
165            let err = Error::rpc(error.code, &error.message);
166            tracing::info!(
167                "🔍 [protocol.rs] Created Error - kind: {:?}, jsonrpc_code: {}",
168                err.kind,
169                err.jsonrpc_error_code()
170            );
171            return Err(err);
172        }
173
174        // Deserialize result
175        serde_json::from_value(response.result().unwrap_or_default().clone())
176            .map_err(|e| Error::protocol(format!("Failed to deserialize response: {e}")))
177    }
178
179    /// Send JSON-RPC notification (no response expected)
180    pub(super) async fn notify(
181        &self,
182        method: &str,
183        params: Option<serde_json::Value>,
184    ) -> Result<()> {
185        let request = serde_json::json!({
186            "jsonrpc": "2.0",
187            "method": method,
188            "params": params
189        });
190
191        let payload = serde_json::to_vec(&request)
192            .map_err(|e| Error::protocol(format!("Failed to serialize notification: {e}")))?;
193
194        let message = TransportMessage::new(
195            turbomcp_protocol::MessageId::from("notification"),
196            payload.into(),
197        );
198
199        self.transport
200            .send(message)
201            .await
202            .map_err(|e| Error::transport(format!("Transport send failed: {e}")))
203    }
204
205    /// Connect the transport
206    #[allow(dead_code)] // Reserved for future use
207    pub(super) async fn connect(&self) -> Result<()> {
208        self.transport
209            .connect()
210            .await
211            .map_err(|e| Error::transport(format!("Transport connect failed: {e}")))
212    }
213
214    /// Disconnect the transport
215    #[allow(dead_code)] // Reserved for future use
216    pub(super) async fn disconnect(&self) -> Result<()> {
217        self.transport
218            .disconnect()
219            .await
220            .map_err(|e| Error::transport(format!("Transport disconnect failed: {e}")))
221    }
222
223    /// Get transport reference
224    ///
225    /// Returns an Arc reference to the transport, allowing it to be shared
226    /// with other components (like the message dispatcher).
227    pub(super) fn transport(&self) -> &Arc<T> {
228        &self.transport
229    }
230}