rust_mcp_sdk/mcp_runtimes/server_runtime/
mcp_server_runtime.rs1use super::ServerRuntime;
2#[cfg(feature = "hyper-server")]
3use crate::{
4 auth::AuthInfo,
5 task_store::{ClientTaskStore, ServerTaskStore},
6};
7use crate::{
8 error::SdkResult,
9 mcp_handlers::mcp_server_handler::ServerHandler,
10 mcp_traits::{McpServer, McpServerHandler},
11 task_store::TaskCreator,
12 McpObserver,
13};
14use crate::{
15 mcp_runtimes::server_runtime::McpServerOptions,
16 schema::{
17 schema_utils::{
18 CallToolError, ClientMessage, ClientMessages, MessageFromServer, ResultFromServer,
19 ServerMessage, ServerMessages,
20 },
21 CallToolResult, InitializeResult, RpcError,
22 },
23};
24use async_trait::async_trait;
25use rust_mcp_schema::schema_utils::{ClientJsonrpcNotification, ClientJsonrpcRequest};
26#[cfg(feature = "hyper-server")]
27use rust_mcp_transport::SessionId;
28use rust_mcp_transport::TransportDispatcher;
29use std::sync::Arc;
30
31pub fn create_server<T>(options: McpServerOptions<T>) -> Arc<ServerRuntime>
50where
51 T: TransportDispatcher<
52 ClientMessages,
53 MessageFromServer,
54 ClientMessage,
55 ServerMessages,
56 ServerMessage,
57 >,
58{
59 ServerRuntime::new(options)
60}
61
62#[cfg(feature = "hyper-server")]
63pub(crate) fn create_server_instance(
64 server_details: Arc<InitializeResult>,
65 handler: Arc<dyn McpServerHandler>,
66 session_id: SessionId,
67 auth_info: Option<AuthInfo>,
68 task_store: Option<Arc<ServerTaskStore>>,
69 client_task_store: Option<Arc<ClientTaskStore>>,
70 message_observer: Option<Arc<dyn McpObserver<ClientMessage, ServerMessage>>>,
71) -> Arc<ServerRuntime> {
72 ServerRuntime::new_instance(
73 server_details,
74 handler,
75 session_id,
76 auth_info,
77 task_store,
78 client_task_store,
79 message_observer,
80 )
81}
82
83pub(crate) struct ServerRuntimeInternalHandler<H> {
84 handler: H,
85}
86impl ServerRuntimeInternalHandler<Box<dyn ServerHandler>> {
87 pub fn new(handler: Box<dyn ServerHandler>) -> Self {
88 Self { handler }
89 }
90}
91
92#[async_trait]
93impl McpServerHandler for ServerRuntimeInternalHandler<Box<dyn ServerHandler>> {
94 async fn handle_request(
95 &self,
96 client_jsonrpc_request: ClientJsonrpcRequest,
97 runtime: Arc<dyn McpServer>,
98 ) -> std::result::Result<ResultFromServer, RpcError> {
99 let task_creator = if client_jsonrpc_request.is_task_augmented() {
101 if !runtime.capabilities().can_run_task_augmented_tools() {
102 return Err(RpcError::invalid_request()
103 .with_message("This MCP server does not support \"tasks\".".to_string()));
104 }
105
106 let Some(task_store) = runtime.task_store() else {
107 return Err(RpcError::invalid_request()
108 .with_message("The server is not configured with a task store.".to_string()));
109 };
110
111 let session_id = {
112 #[cfg(feature = "hyper-server")]
113 {
114 runtime.session_id()
115 }
116 #[cfg(not(feature = "hyper-server"))]
117 {
118 None
119 }
120 };
121
122 Some(TaskCreator {
123 request_id: client_jsonrpc_request.request_id().to_owned(),
124 request: client_jsonrpc_request.clone(),
125 session_id,
126 task_store,
127 })
128 } else {
129 None
130 };
131
132 runtime
133 .capabilities()
134 .can_handle_request(&client_jsonrpc_request)?;
135
136 match client_jsonrpc_request {
137 ClientJsonrpcRequest::InitializeRequest(initialize_request) => self
138 .handler
139 .handle_initialize_request(initialize_request.params, runtime)
140 .await
141 .map(|value| value.into()),
142 ClientJsonrpcRequest::PingRequest(ping_request) => self
143 .handler
144 .handle_ping_request(ping_request.params, runtime)
145 .await
146 .map(|value| value.into()),
147 ClientJsonrpcRequest::ListResourcesRequest(list_resources_request) => self
148 .handler
149 .handle_list_resources_request(list_resources_request.params, runtime)
150 .await
151 .map(|value| value.into()),
152 ClientJsonrpcRequest::ListResourceTemplatesRequest(list_resource_templates_request) => {
153 self.handler
154 .handle_list_resource_templates_request(
155 list_resource_templates_request.params,
156 runtime,
157 )
158 .await
159 .map(|value| value.into())
160 }
161 ClientJsonrpcRequest::ReadResourceRequest(read_resource_request) => self
162 .handler
163 .handle_read_resource_request(read_resource_request.params, runtime)
164 .await
165 .map(|value| value.into()),
166 ClientJsonrpcRequest::SubscribeRequest(subscribe_request) => self
167 .handler
168 .handle_subscribe_request(subscribe_request.params, runtime)
169 .await
170 .map(|value| value.into()),
171 ClientJsonrpcRequest::UnsubscribeRequest(unsubscribe_request) => self
172 .handler
173 .handle_unsubscribe_request(unsubscribe_request.params, runtime)
174 .await
175 .map(|value| value.into()),
176 ClientJsonrpcRequest::ListPromptsRequest(list_prompts_request) => self
177 .handler
178 .handle_list_prompts_request(list_prompts_request.params, runtime)
179 .await
180 .map(|value| value.into()),
181
182 ClientJsonrpcRequest::GetPromptRequest(prompt_request) => self
183 .handler
184 .handle_get_prompt_request(prompt_request.params, runtime)
185 .await
186 .map(|value| value.into()),
187 ClientJsonrpcRequest::ListToolsRequest(list_tools_request) => self
188 .handler
189 .handle_list_tools_request(list_tools_request.params, runtime)
190 .await
191 .map(|value| value.into()),
192 ClientJsonrpcRequest::CallToolRequest(call_tool_request) => {
193 let result = if call_tool_request.is_task_augmented() {
194 let Some(task_creator) = task_creator else {
195 return Err(CallToolError::from_message("Error creating a task!").into());
196 };
197
198 self.handler
199 .handle_task_augmented_tool_call(
200 call_tool_request.params,
201 task_creator,
202 runtime,
203 )
204 .await
205 .map_or_else(
206 |err| {
207 let result: CallToolResult = CallToolError::new(err).into();
208 result.into()
209 },
210 Into::into,
211 )
212 } else {
213 self.handler
214 .handle_call_tool_request(call_tool_request.params, runtime)
215 .await
216 .map_or_else(
217 |err| {
218 let result: CallToolResult = CallToolError::new(err).into();
219 result.into()
220 },
221 Into::into,
222 )
223 };
224 Ok(result)
225 }
226 ClientJsonrpcRequest::SetLevelRequest(set_level_request) => self
227 .handler
228 .handle_set_level_request(set_level_request.params, runtime)
229 .await
230 .map(|value| value.into()),
231 ClientJsonrpcRequest::CompleteRequest(complete_request) => self
232 .handler
233 .handle_complete_request(complete_request.params, runtime)
234 .await
235 .map(|value| value.into()),
236 ClientJsonrpcRequest::GetTaskRequest(get_task_request) => self
237 .handler
238 .handle_get_task_request(get_task_request.params, runtime)
239 .await
240 .map(|value| value.into()),
241 ClientJsonrpcRequest::GetTaskPayloadRequest(get_task_payload_request) => self
242 .handler
243 .handle_get_task_payload_request(get_task_payload_request.params, runtime)
244 .await
245 .map(|value| value.into()),
246 ClientJsonrpcRequest::CancelTaskRequest(cancel_task_request) => self
247 .handler
248 .handle_cancel_task_request(cancel_task_request.params, runtime)
249 .await
250 .map(|value| value.into()),
251 ClientJsonrpcRequest::ListTasksRequest(list_tasks_request) => self
252 .handler
253 .handle_list_task_request(list_tasks_request.params, runtime)
254 .await
255 .map(|value| value.into()),
256 ClientJsonrpcRequest::CustomRequest(custom_request) => self
257 .handler
258 .handle_custom_request(custom_request.into(), runtime)
259 .await
260 .map(|value| value.into()),
261 }
262 }
263
264 async fn handle_error(
265 &self,
266 jsonrpc_error: &RpcError,
267 runtime: Arc<dyn McpServer>,
268 ) -> SdkResult<()> {
269 self.handler.handle_error(jsonrpc_error, runtime).await?;
270 Ok(())
271 }
272
273 async fn handle_notification(
274 &self,
275 client_jsonrpc_notification: ClientJsonrpcNotification,
276 runtime: Arc<dyn McpServer>,
277 ) -> SdkResult<()> {
278 match client_jsonrpc_notification {
279 ClientJsonrpcNotification::CancelledNotification(cancelled_notification) => {
280 self.handler
281 .handle_cancelled_notification(cancelled_notification.params, runtime)
282 .await?;
283 }
284 ClientJsonrpcNotification::InitializedNotification(initialized_notification) => {
285 self.handler
286 .handle_initialized_notification(
287 initialized_notification.params,
288 runtime.clone(),
289 )
290 .await?;
291 self.handler.on_initialized(runtime).await;
292 }
293 ClientJsonrpcNotification::ProgressNotification(progress_notification) => {
294 self.handler
295 .handle_progress_notification(progress_notification.params, runtime)
296 .await?;
297 }
298 ClientJsonrpcNotification::RootsListChangedNotification(
299 roots_list_changed_notification,
300 ) => {
301 self.handler
302 .handle_roots_list_changed_notification(
303 roots_list_changed_notification.params,
304 runtime,
305 )
306 .await?;
307 }
308 ClientJsonrpcNotification::TaskStatusNotification(task_status_notification) => {
309 self.handler
310 .handle_task_status_notification(task_status_notification.params, runtime)
311 .await?;
312 }
313
314 ClientJsonrpcNotification::CustomNotification(value) => {
315 self.handler
316 .handle_custom_notification(value.into())
317 .await?;
318 }
319 }
320 Ok(())
321 }
322}