Skip to main content

mcp_kit/server/
core.rs

1use std::sync::Arc;
2
3use crate::{
4    error::{McpError, McpResult},
5    protocol::{
6        JsonRpcError, JsonRpcMessage, JsonRpcNotification, JsonRpcRequest, JsonRpcResponse,
7        MCP_PROTOCOL_VERSION,
8    },
9    types::{
10        messages::{
11            CallToolRequest, CompleteRequest, GetPromptRequest, InitializeRequest,
12            InitializeResult, ListPromptsRequest, ListResourcesRequest, ListToolsRequest,
13            ReadResourceRequest, SetLevelRequest, SubscribeRequest, UnsubscribeRequest,
14        },
15        LoggingCapability, PromptsCapability, ResourcesCapability, ServerCapabilities, ServerInfo,
16        ToolsCapability,
17    },
18};
19use serde_json::Value;
20use tracing::{debug, error, info, warn};
21
22use crate::server::{router::Router, session::Session};
23
24/// The core MCP server — holds configuration and the routing table.
25///
26/// Create one with `McpServer::builder()` then call `.serve_stdio()` or
27/// another transport extension method to start accepting connections.
28#[derive(Clone)]
29pub struct McpServer {
30    pub(crate) info: ServerInfo,
31    pub(crate) instructions: Option<String>,
32    pub(crate) router: Arc<Router>,
33}
34
35impl McpServer {
36    pub fn builder() -> crate::server::builder::McpServerBuilder {
37        crate::server::builder::McpServerBuilder::new()
38    }
39
40    /// Handle a single incoming JSON-RPC message, returning the response (if any).
41    pub async fn handle_message(
42        &self,
43        msg: JsonRpcMessage,
44        session: &mut Session,
45    ) -> Option<JsonRpcMessage> {
46        match msg {
47            JsonRpcMessage::Request(req) => {
48                let id = req.id.clone();
49                match self.dispatch_request(req, session).await {
50                    Ok(result) => Some(JsonRpcMessage::Response(JsonRpcResponse {
51                        jsonrpc: "2.0".to_owned(),
52                        id,
53                        result,
54                    })),
55                    Err(e) => {
56                        error!(error = %e, "Request failed");
57                        Some(JsonRpcMessage::Error(JsonRpcError::new(id, e)))
58                    }
59                }
60            }
61            JsonRpcMessage::Notification(notif) => {
62                self.handle_notification(notif, session).await;
63                None
64            }
65            _ => None,
66        }
67    }
68
69    async fn dispatch_request(
70        &self,
71        req: JsonRpcRequest,
72        session: &mut Session,
73    ) -> McpResult<Value> {
74        let params = req.params.unwrap_or(Value::Null);
75        debug!(method = %req.method, "Dispatching request");
76
77        match req.method.as_str() {
78            "initialize" => {
79                let init: InitializeRequest = serde_json::from_value(params)
80                    .map_err(|e| McpError::InvalidParams(e.to_string()))?;
81                self.handle_initialize(init, session).await
82            }
83            "ping" => Ok(serde_json::json!({})),
84            "tools/list" => {
85                self.require_initialized(session)?;
86                let req: ListToolsRequest = serde_json::from_value(params).unwrap_or_default();
87                Ok(serde_json::to_value(
88                    self.router.list_tools(req.cursor.as_deref()),
89                )?)
90            }
91            "tools/call" => {
92                self.require_initialized(session)?;
93                let req: CallToolRequest = serde_json::from_value(params)
94                    .map_err(|e| McpError::InvalidParams(e.to_string()))?;
95                Ok(serde_json::to_value(self.router.call_tool(req).await?)?)
96            }
97            "resources/list" => {
98                self.require_initialized(session)?;
99                let req: ListResourcesRequest = serde_json::from_value(params).unwrap_or_default();
100                Ok(serde_json::to_value(
101                    self.router.list_resources(req.cursor.as_deref()),
102                )?)
103            }
104            "resources/read" => {
105                self.require_initialized(session)?;
106                let req: ReadResourceRequest = serde_json::from_value(params)
107                    .map_err(|e| McpError::InvalidParams(e.to_string()))?;
108                Ok(serde_json::to_value(self.router.read_resource(req).await?)?)
109            }
110            "resources/subscribe" => {
111                self.require_initialized(session)?;
112                let _req: SubscribeRequest = serde_json::from_value(params)
113                    .map_err(|e| McpError::InvalidParams(e.to_string()))?;
114                Ok(serde_json::json!({}))
115            }
116            "resources/unsubscribe" => {
117                self.require_initialized(session)?;
118                let _req: UnsubscribeRequest = serde_json::from_value(params)
119                    .map_err(|e| McpError::InvalidParams(e.to_string()))?;
120                Ok(serde_json::json!({}))
121            }
122            "prompts/list" => {
123                self.require_initialized(session)?;
124                let req: ListPromptsRequest = serde_json::from_value(params).unwrap_or_default();
125                Ok(serde_json::to_value(
126                    self.router.list_prompts(req.cursor.as_deref()),
127                )?)
128            }
129            "prompts/get" => {
130                self.require_initialized(session)?;
131                let req: GetPromptRequest = serde_json::from_value(params)
132                    .map_err(|e| McpError::InvalidParams(e.to_string()))?;
133                Ok(serde_json::to_value(self.router.get_prompt(req).await?)?)
134            }
135            "logging/setLevel" => {
136                self.require_initialized(session)?;
137                let _req: SetLevelRequest = serde_json::from_value(params)
138                    .map_err(|e| McpError::InvalidParams(e.to_string()))?;
139                Ok(serde_json::json!({}))
140            }
141            "completion/complete" => {
142                self.require_initialized(session)?;
143                let _req: CompleteRequest = serde_json::from_value(params)
144                    .map_err(|e| McpError::InvalidParams(e.to_string()))?;
145                Ok(serde_json::json!({ "completion": { "values": [], "hasMore": false } }))
146            }
147            method => Err(McpError::MethodNotFound(method.to_owned())),
148        }
149    }
150
151    async fn handle_initialize(
152        &self,
153        req: InitializeRequest,
154        session: &mut Session,
155    ) -> McpResult<Value> {
156        info!(
157            client = %req.client_info.name,
158            version = %req.client_info.version,
159            "Client initializing"
160        );
161
162        session.client_info = Some(req.client_info);
163        session.protocol_version = Some(req.protocol_version);
164        session.initialized = true;
165
166        let capabilities = ServerCapabilities {
167            tools: if self.router.has_tools() {
168                Some(ToolsCapability {
169                    list_changed: Some(true),
170                })
171            } else {
172                None
173            },
174            resources: if self.router.has_resources() {
175                Some(ResourcesCapability {
176                    subscribe: Some(false),
177                    list_changed: Some(true),
178                })
179            } else {
180                None
181            },
182            prompts: if self.router.has_prompts() {
183                Some(PromptsCapability {
184                    list_changed: Some(true),
185                })
186            } else {
187                None
188            },
189            logging: Some(LoggingCapability {}),
190            experimental: None,
191        };
192
193        let result = InitializeResult {
194            protocol_version: MCP_PROTOCOL_VERSION.to_owned(),
195            capabilities,
196            server_info: self.info.clone(),
197            instructions: self.instructions.clone(),
198        };
199
200        Ok(serde_json::to_value(result)?)
201    }
202
203    async fn handle_notification(&self, notif: JsonRpcNotification, session: &mut Session) {
204        match notif.method.as_str() {
205            "notifications/initialized" => {
206                info!(session = %session.id, "Client sent initialized notification");
207            }
208            "notifications/cancelled" => {
209                debug!("Client cancelled a request");
210            }
211            method => {
212                warn!(method, "Received unknown notification");
213            }
214        }
215    }
216
217    fn require_initialized(&self, session: &Session) -> McpResult<()> {
218        if !session.initialized {
219            Err(McpError::InvalidRequest(
220                "Server not initialized. Send 'initialize' first.".to_owned(),
221            ))
222        } else {
223            Ok(())
224        }
225    }
226
227    pub fn info(&self) -> &ServerInfo {
228        &self.info
229    }
230}