Skip to main content

mcp_kit/server/
core.rs

1use std::sync::Arc;
2
3use crate::{
4    error::{McpError, McpResult},
5    protocol::{
6        JsonRpcError, JsonRpcMessage, JsonRpcNotification, JsonRpcRequest, JsonRpcResponse,
7        MCP_PROTOCOL_VERSION,
8    },
9    types::{
10        messages::{
11            CallToolRequest, CancelledNotification, CompleteRequest, GetPromptRequest,
12            InitializeRequest, InitializeResult, ListPromptsRequest, ListResourcesRequest,
13            ListToolsRequest, ReadResourceRequest, SetLevelRequest, SubscribeRequest,
14            UnsubscribeRequest,
15        },
16        LoggingCapability, PromptsCapability, ResourcesCapability, ServerCapabilities, ServerInfo,
17        ToolsCapability,
18    },
19};
20use serde_json::Value;
21use tracing::{debug, error, info, warn};
22
23use crate::server::{cancellation::CancellationManager, router::Router, session::Session};
24
25#[cfg(feature = "auth")]
26use crate::auth::DynAuthProvider;
27
28/// The core MCP server — holds configuration and the routing table.
29///
30/// Create one with `McpServer::builder()` then call `.serve_stdio()` or
31/// another transport extension method to start accepting connections.
32#[derive(Clone)]
33pub struct McpServer {
34    pub(crate) info: ServerInfo,
35    pub(crate) instructions: Option<String>,
36    pub(crate) router: Arc<Router>,
37    pub(crate) cancellation: CancellationManager,
38    #[cfg(feature = "auth")]
39    pub(crate) auth_provider: Option<DynAuthProvider>,
40    #[cfg(feature = "auth")]
41    pub(crate) require_auth: bool,
42}
43
44impl McpServer {
45    pub fn builder() -> crate::server::builder::McpServerBuilder {
46        crate::server::builder::McpServerBuilder::new()
47    }
48
49    /// Handle a single incoming JSON-RPC message, returning the response (if any).
50    pub async fn handle_message(
51        &self,
52        msg: JsonRpcMessage,
53        session: &mut Session,
54    ) -> Option<JsonRpcMessage> {
55        match msg {
56            JsonRpcMessage::Request(req) => {
57                let id = req.id.clone();
58                match self.dispatch_request(req, session).await {
59                    Ok(result) => Some(JsonRpcMessage::Response(JsonRpcResponse {
60                        jsonrpc: "2.0".to_owned(),
61                        id,
62                        result,
63                    })),
64                    Err(e) => {
65                        error!(error = %e, "Request failed");
66                        Some(JsonRpcMessage::Error(JsonRpcError::new(id, e)))
67                    }
68                }
69            }
70            JsonRpcMessage::Notification(notif) => {
71                self.handle_notification(notif, session).await;
72                None
73            }
74            _ => None,
75        }
76    }
77
78    async fn dispatch_request(
79        &self,
80        req: JsonRpcRequest,
81        session: &mut Session,
82    ) -> McpResult<Value> {
83        let params = req.params.unwrap_or(Value::Null);
84        debug!(method = %req.method, "Dispatching request");
85
86        match req.method.as_str() {
87            "initialize" => {
88                let init: InitializeRequest = serde_json::from_value(params)
89                    .map_err(|e| McpError::InvalidParams(e.to_string()))?;
90                self.handle_initialize(init, session).await
91            }
92            "ping" => Ok(serde_json::json!({})),
93            "tools/list" => {
94                self.require_initialized(session)?;
95                let req: ListToolsRequest = serde_json::from_value(params).unwrap_or_default();
96                Ok(serde_json::to_value(
97                    self.router.list_tools(req.cursor.as_deref()),
98                )?)
99            }
100            "tools/call" => {
101                self.require_initialized(session)?;
102                let req: CallToolRequest = serde_json::from_value(params)
103                    .map_err(|e| McpError::InvalidParams(e.to_string()))?;
104                #[cfg(feature = "auth")]
105                let result = {
106                    let identity = session.identity.clone();
107                    crate::server::auth_context::scope(identity, self.router.call_tool(req)).await?
108                };
109                #[cfg(not(feature = "auth"))]
110                let result = self.router.call_tool(req).await?;
111                Ok(serde_json::to_value(result)?)
112            }
113            "resources/list" => {
114                self.require_initialized(session)?;
115                let req: ListResourcesRequest = serde_json::from_value(params).unwrap_or_default();
116                Ok(serde_json::to_value(
117                    self.router.list_resources(req.cursor.as_deref()),
118                )?)
119            }
120            "resources/read" => {
121                self.require_initialized(session)?;
122                let req: ReadResourceRequest = serde_json::from_value(params)
123                    .map_err(|e| McpError::InvalidParams(e.to_string()))?;
124                #[cfg(feature = "auth")]
125                let result = {
126                    let identity = session.identity.clone();
127                    crate::server::auth_context::scope(identity, self.router.read_resource(req))
128                        .await?
129                };
130                #[cfg(not(feature = "auth"))]
131                let result = self.router.read_resource(req).await?;
132                Ok(serde_json::to_value(result)?)
133            }
134            "resources/subscribe" => {
135                self.require_initialized(session)?;
136                let _req: SubscribeRequest = serde_json::from_value(params)
137                    .map_err(|e| McpError::InvalidParams(e.to_string()))?;
138                Ok(serde_json::json!({}))
139            }
140            "resources/unsubscribe" => {
141                self.require_initialized(session)?;
142                let _req: UnsubscribeRequest = serde_json::from_value(params)
143                    .map_err(|e| McpError::InvalidParams(e.to_string()))?;
144                Ok(serde_json::json!({}))
145            }
146            "prompts/list" => {
147                self.require_initialized(session)?;
148                let req: ListPromptsRequest = serde_json::from_value(params).unwrap_or_default();
149                Ok(serde_json::to_value(
150                    self.router.list_prompts(req.cursor.as_deref()),
151                )?)
152            }
153            "prompts/get" => {
154                self.require_initialized(session)?;
155                let req: GetPromptRequest = serde_json::from_value(params)
156                    .map_err(|e| McpError::InvalidParams(e.to_string()))?;
157                #[cfg(feature = "auth")]
158                let result = {
159                    let identity = session.identity.clone();
160                    crate::server::auth_context::scope(identity, self.router.get_prompt(req))
161                        .await?
162                };
163                #[cfg(not(feature = "auth"))]
164                let result = self.router.get_prompt(req).await?;
165                Ok(serde_json::to_value(result)?)
166            }
167            "logging/setLevel" => {
168                self.require_initialized(session)?;
169                let _req: SetLevelRequest = serde_json::from_value(params)
170                    .map_err(|e| McpError::InvalidParams(e.to_string()))?;
171                Ok(serde_json::json!({}))
172            }
173            "completion/complete" => {
174                self.require_initialized(session)?;
175                let req: CompleteRequest = serde_json::from_value(params)
176                    .map_err(|e| McpError::InvalidParams(e.to_string()))?;
177                #[cfg(feature = "auth")]
178                let result = {
179                    let identity = session.identity.clone();
180                    crate::server::auth_context::scope(identity, self.router.complete(req)).await?
181                };
182                #[cfg(not(feature = "auth"))]
183                let result = self.router.complete(req).await?;
184                Ok(serde_json::to_value(result)?)
185            }
186            method => Err(McpError::MethodNotFound(method.to_owned())),
187        }
188    }
189
190    async fn handle_initialize(
191        &self,
192        req: InitializeRequest,
193        session: &mut Session,
194    ) -> McpResult<Value> {
195        info!(
196            client = %req.client_info.name,
197            version = %req.client_info.version,
198            "Client initializing"
199        );
200
201        session.client_info = Some(req.client_info);
202        session.client_capabilities = Some(req.capabilities);
203        session.protocol_version = Some(req.protocol_version);
204        session.initialized = true;
205
206        let capabilities = ServerCapabilities {
207            tools: if self.router.has_tools() {
208                Some(ToolsCapability {
209                    list_changed: Some(true),
210                })
211            } else {
212                None
213            },
214            resources: if self.router.has_resources() {
215                Some(ResourcesCapability {
216                    subscribe: Some(false),
217                    list_changed: Some(true),
218                })
219            } else {
220                None
221            },
222            prompts: if self.router.has_prompts() {
223                Some(PromptsCapability {
224                    list_changed: Some(true),
225                })
226            } else {
227                None
228            },
229            logging: Some(LoggingCapability {}),
230            experimental: None,
231        };
232
233        let result = InitializeResult {
234            protocol_version: MCP_PROTOCOL_VERSION.to_owned(),
235            capabilities,
236            server_info: self.info.clone(),
237            instructions: self.instructions.clone(),
238        };
239
240        Ok(serde_json::to_value(result)?)
241    }
242
243    async fn handle_notification(&self, notif: JsonRpcNotification, session: &mut Session) {
244        match notif.method.as_str() {
245            "notifications/initialized" => {
246                info!(session = %session.id, "Client sent initialized notification");
247            }
248            "notifications/cancelled" => {
249                if let Some(params) = notif.params {
250                    if let Ok(cancelled) = serde_json::from_value::<CancelledNotification>(params) {
251                        debug!(request_id = ?cancelled.request_id, reason = ?cancelled.reason, "Request cancelled by client");
252                        let was_cancelled = self
253                            .cancellation
254                            .cancel(&session.id, &cancelled.request_id)
255                            .await;
256                        if was_cancelled {
257                            info!(request_id = ?cancelled.request_id, "Request successfully cancelled");
258                        } else {
259                            debug!(request_id = ?cancelled.request_id, "Request not found or already completed");
260                        }
261                    }
262                }
263            }
264            "notifications/roots/list_changed" => {
265                debug!("Client roots list changed");
266                // Client will send roots/list if we request it
267            }
268            method => {
269                warn!(method, "Received unknown notification");
270            }
271        }
272    }
273
274    fn require_initialized(&self, session: &Session) -> McpResult<()> {
275        if !session.initialized {
276            Err(McpError::InvalidRequest(
277                "Server not initialized. Send 'initialize' first.".to_owned(),
278            ))
279        } else {
280            Ok(())
281        }
282    }
283
284    pub fn info(&self) -> &ServerInfo {
285        &self.info
286    }
287}