turbomcp_client/client/
protocol.rs

1//! Protocol client for JSON-RPC communication
2//!
3//! This module provides the ProtocolClient which handles the low-level
4//! JSON-RPC protocol communication with MCP servers.
5
6use std::sync::atomic::{AtomicU64, Ordering};
7
8use turbomcp_protocol::jsonrpc::{JsonRpcRequest, JsonRpcResponse, JsonRpcVersion};
9use turbomcp_protocol::{Error, Result};
10use turbomcp_transport::{Transport, TransportMessage};
11
12/// JSON-RPC protocol handler for MCP communication
13///
14/// Handles request/response correlation, serialization, and protocol-level concerns.
15/// This is the missing abstraction layer between raw Transport and high-level Client APIs.
16#[derive(Debug)]
17pub(super) struct ProtocolClient<T: Transport> {
18    transport: T,
19    next_id: AtomicU64,
20}
21
22impl<T: Transport> ProtocolClient<T> {
23    pub(super) fn new(transport: T) -> Self {
24        Self {
25            transport,
26            next_id: AtomicU64::new(1),
27        }
28    }
29
30    /// Send JSON-RPC request and await typed response
31    pub(super) async fn request<R: serde::de::DeserializeOwned>(
32        &self,
33        method: &str,
34        params: Option<serde_json::Value>,
35    ) -> Result<R> {
36        let id = self.next_id.fetch_add(1, Ordering::Relaxed);
37        let request = JsonRpcRequest {
38            jsonrpc: JsonRpcVersion,
39            id: turbomcp_protocol::MessageId::from(id.to_string()),
40            method: method.to_string(),
41            params,
42        };
43
44        // Serialize and send
45        let payload = serde_json::to_vec(&request)
46            .map_err(|e| Error::protocol(format!("Failed to serialize request: {e}")))?;
47
48        let message = TransportMessage::new(
49            turbomcp_protocol::MessageId::from(format!("req-{id}")),
50            payload.into(),
51        );
52        self.transport
53            .send(message)
54            .await
55            .map_err(|e| Error::transport(format!("Transport send failed: {e}")))?;
56
57        // Receive and deserialize response
58        let response_msg = self
59            .transport
60            .receive()
61            .await
62            .map_err(|e| Error::transport(format!("Transport receive failed: {e}")))?
63            .ok_or_else(|| Error::transport("No response received".to_string()))?;
64
65        let response: JsonRpcResponse = serde_json::from_slice(&response_msg.payload)
66            .map_err(|e| Error::protocol(format!("Invalid JSON-RPC response: {e}")))?;
67
68        if let Some(error) = response.error() {
69            return Err(Error::rpc(error.code, &error.message));
70        }
71
72        serde_json::from_value(response.result().unwrap_or_default().clone())
73            .map_err(|e| Error::protocol(format!("Failed to deserialize response: {e}")))
74    }
75
76    /// Send JSON-RPC notification (no response expected)
77    pub(super) async fn notify(
78        &self,
79        method: &str,
80        params: Option<serde_json::Value>,
81    ) -> Result<()> {
82        let request = serde_json::json!({
83            "jsonrpc": "2.0",
84            "method": method,
85            "params": params
86        });
87
88        let payload = serde_json::to_vec(&request)
89            .map_err(|e| Error::protocol(format!("Failed to serialize notification: {e}")))?;
90
91        let message = TransportMessage::new(
92            turbomcp_protocol::MessageId::from("notification"),
93            payload.into(),
94        );
95
96        self.transport
97            .send(message)
98            .await
99            .map_err(|e| Error::transport(format!("Transport send failed: {e}")))
100    }
101
102    /// Connect the transport
103    #[allow(dead_code)] // Reserved for future use
104    pub(super) async fn connect(&self) -> Result<()> {
105        self.transport
106            .connect()
107            .await
108            .map_err(|e| Error::transport(format!("Transport connect failed: {e}")))
109    }
110
111    /// Disconnect the transport
112    #[allow(dead_code)] // Reserved for future use
113    pub(super) async fn disconnect(&self) -> Result<()> {
114        self.transport
115            .disconnect()
116            .await
117            .map_err(|e| Error::transport(format!("Transport disconnect failed: {e}")))
118    }
119
120    /// Get transport reference
121    pub(super) fn transport(&self) -> &T {
122        &self.transport
123    }
124}