rust_mcp_sdk/mcp_runtimes/client_runtime/
mcp_client_runtime.rs1use std::sync::Arc;
2
3use crate::schema::{
4 schema_utils::{
5 ClientMessage, ClientMessages, MessageFromClient, NotificationFromServer,
6 RequestFromServer, ResultFromClient, ServerMessage, ServerMessages,
7 },
8 InitializeRequestParams, RpcError, ServerNotification, ServerRequest,
9};
10use async_trait::async_trait;
11
12#[cfg(feature = "streamable-http")]
13use rust_mcp_transport::StreamableTransportOptions;
14use rust_mcp_transport::TransportDispatcher;
15
16use crate::{error::SdkResult, mcp_client::ClientHandler, mcp_traits::McpClientHandler, McpClient};
17
18use super::ClientRuntime;
19
20pub fn create_client(
42 client_details: InitializeRequestParams,
43 transport: impl TransportDispatcher<
44 ServerMessages,
45 MessageFromClient,
46 ServerMessage,
47 ClientMessages,
48 ClientMessage,
49 >,
50 handler: impl ClientHandler,
51) -> Arc<ClientRuntime> {
52 Arc::new(ClientRuntime::new(
53 client_details,
54 Arc::new(transport),
55 Box::new(ClientInternalHandler::new(Box::new(handler))),
56 ))
57}
58
59#[cfg(feature = "streamable-http")]
60pub fn with_transport_options(
61 client_details: InitializeRequestParams,
62 transport_options: StreamableTransportOptions,
63 handler: impl ClientHandler,
64) -> Arc<ClientRuntime> {
65 Arc::new(ClientRuntime::new_instance(
66 client_details,
67 transport_options,
68 Box::new(ClientInternalHandler::new(Box::new(handler))),
69 ))
70}
71
72struct ClientInternalHandler<H> {
75 handler: H,
76}
77impl ClientInternalHandler<Box<dyn ClientHandler>> {
78 pub fn new(handler: Box<dyn ClientHandler>) -> Self {
79 Self { handler }
80 }
81}
82
83#[async_trait]
86impl McpClientHandler for ClientInternalHandler<Box<dyn ClientHandler>> {
87 async fn handle_request(
89 &self,
90 server_jsonrpc_request: RequestFromServer,
91 runtime: &dyn McpClient,
92 ) -> std::result::Result<ResultFromClient, RpcError> {
93 match server_jsonrpc_request {
94 RequestFromServer::ServerRequest(request) => match request {
95 ServerRequest::PingRequest(ping_request) => self
96 .handler
97 .handle_ping_request(ping_request, runtime)
98 .await
99 .map(|value| value.into()),
100 ServerRequest::CreateMessageRequest(create_message_request) => self
101 .handler
102 .handle_create_message_request(create_message_request, runtime)
103 .await
104 .map(|value| value.into()),
105 ServerRequest::ListRootsRequest(list_roots_request) => self
106 .handler
107 .handle_list_roots_request(list_roots_request, runtime)
108 .await
109 .map(|value| value.into()),
110 #[cfg(feature = "2025_06_18")]
111 ServerRequest::ElicitRequest(elicit_request) => self
112 .handler
113 .handle_elicit_request(elicit_request, runtime)
114 .await
115 .map(|value| value.into()),
116 },
117 RequestFromServer::CustomRequest(custom_request) => self
119 .handler
120 .handle_custom_request(custom_request, runtime)
121 .await
122 .map(|value| value.into()),
123 }
124 }
125
126 async fn handle_error(
128 &self,
129 jsonrpc_error: &RpcError,
130 runtime: &dyn McpClient,
131 ) -> SdkResult<()> {
132 self.handler.handle_error(jsonrpc_error, runtime).await?;
133 Ok(())
134 }
135
136 async fn handle_notification(
138 &self,
139 server_jsonrpc_notification: NotificationFromServer,
140 runtime: &dyn McpClient,
141 ) -> SdkResult<()> {
142 match server_jsonrpc_notification {
143 NotificationFromServer::ServerNotification(server_notification) => {
144 match server_notification {
145 ServerNotification::CancelledNotification(cancelled_notification) => {
146 self.handler
147 .handle_cancelled_notification(cancelled_notification, runtime)
148 .await?;
149 }
150 ServerNotification::ProgressNotification(progress_notification) => {
151 self.handler
152 .handle_progress_notification(progress_notification, runtime)
153 .await?;
154 }
155 ServerNotification::ResourceListChangedNotification(
156 resource_list_changed_notification,
157 ) => {
158 self.handler
159 .handle_resource_list_changed_notification(
160 resource_list_changed_notification,
161 runtime,
162 )
163 .await?;
164 }
165 ServerNotification::ResourceUpdatedNotification(
166 resource_updated_notification,
167 ) => {
168 self.handler
169 .handle_resource_updated_notification(
170 resource_updated_notification,
171 runtime,
172 )
173 .await?;
174 }
175 ServerNotification::PromptListChangedNotification(
176 prompt_list_changed_notification,
177 ) => {
178 self.handler
179 .handle_prompt_list_changed_notification(
180 prompt_list_changed_notification,
181 runtime,
182 )
183 .await?;
184 }
185 ServerNotification::ToolListChangedNotification(
186 tool_list_changed_notification,
187 ) => {
188 self.handler
189 .handle_tool_list_changed_notification(
190 tool_list_changed_notification,
191 runtime,
192 )
193 .await?;
194 }
195 ServerNotification::LoggingMessageNotification(
196 logging_message_notification,
197 ) => {
198 self.handler
199 .handle_logging_message_notification(
200 logging_message_notification,
201 runtime,
202 )
203 .await?;
204 }
205 }
206 }
207 NotificationFromServer::CustomNotification(custom_notification) => {
209 self.handler
210 .handle_custom_notification(custom_notification, runtime)
211 .await?;
212 }
213 }
214 Ok(())
215 }
216
217 async fn handle_process_error(
219 &self,
220 error_message: String,
221 runtime: &dyn McpClient,
222 ) -> SdkResult<()> {
223 self.handler
224 .handle_process_error(error_message, runtime)
225 .await
226 .map_err(|err| err.into())
227 }
228}