turbomcp_client/client/
protocol.rs1use std::sync::atomic::{AtomicU64, Ordering};
7
8use turbomcp_protocol::jsonrpc::{JsonRpcRequest, JsonRpcResponse, JsonRpcVersion};
9use turbomcp_protocol::{Error, Result};
10use turbomcp_transport::{Transport, TransportMessage};
11
12#[derive(Debug)]
17pub(super) struct ProtocolClient<T: Transport> {
18 transport: T,
19 next_id: AtomicU64,
20}
21
22impl<T: Transport> ProtocolClient<T> {
23 pub(super) fn new(transport: T) -> Self {
24 Self {
25 transport,
26 next_id: AtomicU64::new(1),
27 }
28 }
29
30 pub(super) async fn request<R: serde::de::DeserializeOwned>(
32 &self,
33 method: &str,
34 params: Option<serde_json::Value>,
35 ) -> Result<R> {
36 let id = self.next_id.fetch_add(1, Ordering::Relaxed);
37 let request = JsonRpcRequest {
38 jsonrpc: JsonRpcVersion,
39 id: turbomcp_protocol::MessageId::from(id.to_string()),
40 method: method.to_string(),
41 params,
42 };
43
44 let payload = serde_json::to_vec(&request)
46 .map_err(|e| Error::protocol(format!("Failed to serialize request: {e}")))?;
47
48 let message = TransportMessage::new(
49 turbomcp_protocol::MessageId::from(format!("req-{id}")),
50 payload.into(),
51 );
52 self.transport
53 .send(message)
54 .await
55 .map_err(|e| Error::transport(format!("Transport send failed: {e}")))?;
56
57 let response_msg = self
59 .transport
60 .receive()
61 .await
62 .map_err(|e| Error::transport(format!("Transport receive failed: {e}")))?
63 .ok_or_else(|| Error::transport("No response received".to_string()))?;
64
65 let response: JsonRpcResponse = serde_json::from_slice(&response_msg.payload)
66 .map_err(|e| Error::protocol(format!("Invalid JSON-RPC response: {e}")))?;
67
68 if let Some(error) = response.error() {
69 return Err(Error::rpc(error.code, &error.message));
70 }
71
72 serde_json::from_value(response.result().unwrap_or_default().clone())
73 .map_err(|e| Error::protocol(format!("Failed to deserialize response: {e}")))
74 }
75
76 pub(super) async fn notify(
78 &self,
79 method: &str,
80 params: Option<serde_json::Value>,
81 ) -> Result<()> {
82 let request = serde_json::json!({
83 "jsonrpc": "2.0",
84 "method": method,
85 "params": params
86 });
87
88 let payload = serde_json::to_vec(&request)
89 .map_err(|e| Error::protocol(format!("Failed to serialize notification: {e}")))?;
90
91 let message = TransportMessage::new(
92 turbomcp_protocol::MessageId::from("notification"),
93 payload.into(),
94 );
95
96 self.transport
97 .send(message)
98 .await
99 .map_err(|e| Error::transport(format!("Transport send failed: {e}")))
100 }
101
102 #[allow(dead_code)] pub(super) async fn connect(&self) -> Result<()> {
105 self.transport
106 .connect()
107 .await
108 .map_err(|e| Error::transport(format!("Transport connect failed: {e}")))
109 }
110
111 #[allow(dead_code)] pub(super) async fn disconnect(&self) -> Result<()> {
114 self.transport
115 .disconnect()
116 .await
117 .map_err(|e| Error::transport(format!("Transport disconnect failed: {e}")))
118 }
119
120 pub(super) fn transport(&self) -> &T {
122 &self.transport
123 }
124}