1use std::sync::Arc;
2
3use crate::{
4 error::{McpError, McpResult},
5 protocol::{
6 JsonRpcError, JsonRpcMessage, JsonRpcNotification, JsonRpcRequest, JsonRpcResponse,
7 MCP_PROTOCOL_VERSION,
8 },
9 types::{
10 messages::{
11 CallToolRequest, CancelledNotification, CompleteRequest, GetPromptRequest,
12 InitializeRequest, InitializeResult, ListPromptsRequest, ListResourcesRequest,
13 ListToolsRequest, ReadResourceRequest, SetLevelRequest, SubscribeRequest,
14 UnsubscribeRequest,
15 },
16 LoggingCapability, PromptsCapability, ResourcesCapability, ServerCapabilities, ServerInfo,
17 ToolsCapability,
18 },
19};
20use serde_json::Value;
21use tracing::{debug, error, info, warn};
22
23use crate::server::{cancellation::CancellationManager, router::Router, session::Session};
24
25#[cfg(feature = "auth")]
26use crate::auth::DynAuthProvider;
27
28#[derive(Clone)]
33pub struct McpServer {
34 pub(crate) info: ServerInfo,
35 pub(crate) instructions: Option<String>,
36 pub(crate) router: Arc<Router>,
37 pub(crate) cancellation: CancellationManager,
38 #[cfg(feature = "auth")]
39 pub(crate) auth_provider: Option<DynAuthProvider>,
40 #[cfg(feature = "auth")]
41 pub(crate) require_auth: bool,
42}
43
44impl McpServer {
45 pub fn builder() -> crate::server::builder::McpServerBuilder {
46 crate::server::builder::McpServerBuilder::new()
47 }
48
49 pub async fn handle_message(
51 &self,
52 msg: JsonRpcMessage,
53 session: &mut Session,
54 ) -> Option<JsonRpcMessage> {
55 match msg {
56 JsonRpcMessage::Request(req) => {
57 let id = req.id.clone();
58 match self.dispatch_request(req, session).await {
59 Ok(result) => Some(JsonRpcMessage::Response(JsonRpcResponse {
60 jsonrpc: "2.0".to_owned(),
61 id,
62 result,
63 })),
64 Err(e) => {
65 error!(error = %e, "Request failed");
66 Some(JsonRpcMessage::Error(JsonRpcError::new(id, e)))
67 }
68 }
69 }
70 JsonRpcMessage::Notification(notif) => {
71 self.handle_notification(notif, session).await;
72 None
73 }
74 _ => None,
75 }
76 }
77
78 async fn dispatch_request(
79 &self,
80 req: JsonRpcRequest,
81 session: &mut Session,
82 ) -> McpResult<Value> {
83 let params = req.params.unwrap_or(Value::Null);
84 debug!(method = %req.method, "Dispatching request");
85
86 match req.method.as_str() {
87 "initialize" => {
88 let init: InitializeRequest = serde_json::from_value(params)
89 .map_err(|e| McpError::InvalidParams(e.to_string()))?;
90 self.handle_initialize(init, session).await
91 }
92 "ping" => Ok(serde_json::json!({})),
93 "tools/list" => {
94 self.require_initialized(session)?;
95 let req: ListToolsRequest = serde_json::from_value(params).unwrap_or_default();
96 Ok(serde_json::to_value(
97 self.router.list_tools(req.cursor.as_deref()),
98 )?)
99 }
100 "tools/call" => {
101 self.require_initialized(session)?;
102 let req: CallToolRequest = serde_json::from_value(params)
103 .map_err(|e| McpError::InvalidParams(e.to_string()))?;
104 #[cfg(feature = "auth")]
105 let result = {
106 let identity = session.identity.clone();
107 crate::server::auth_context::scope(identity, self.router.call_tool(req)).await?
108 };
109 #[cfg(not(feature = "auth"))]
110 let result = self.router.call_tool(req).await?;
111 Ok(serde_json::to_value(result)?)
112 }
113 "resources/list" => {
114 self.require_initialized(session)?;
115 let req: ListResourcesRequest = serde_json::from_value(params).unwrap_or_default();
116 Ok(serde_json::to_value(
117 self.router.list_resources(req.cursor.as_deref()),
118 )?)
119 }
120 "resources/read" => {
121 self.require_initialized(session)?;
122 let req: ReadResourceRequest = serde_json::from_value(params)
123 .map_err(|e| McpError::InvalidParams(e.to_string()))?;
124 #[cfg(feature = "auth")]
125 let result = {
126 let identity = session.identity.clone();
127 crate::server::auth_context::scope(identity, self.router.read_resource(req))
128 .await?
129 };
130 #[cfg(not(feature = "auth"))]
131 let result = self.router.read_resource(req).await?;
132 Ok(serde_json::to_value(result)?)
133 }
134 "resources/subscribe" => {
135 self.require_initialized(session)?;
136 let _req: SubscribeRequest = serde_json::from_value(params)
137 .map_err(|e| McpError::InvalidParams(e.to_string()))?;
138 Ok(serde_json::json!({}))
139 }
140 "resources/unsubscribe" => {
141 self.require_initialized(session)?;
142 let _req: UnsubscribeRequest = serde_json::from_value(params)
143 .map_err(|e| McpError::InvalidParams(e.to_string()))?;
144 Ok(serde_json::json!({}))
145 }
146 "prompts/list" => {
147 self.require_initialized(session)?;
148 let req: ListPromptsRequest = serde_json::from_value(params).unwrap_or_default();
149 Ok(serde_json::to_value(
150 self.router.list_prompts(req.cursor.as_deref()),
151 )?)
152 }
153 "prompts/get" => {
154 self.require_initialized(session)?;
155 let req: GetPromptRequest = serde_json::from_value(params)
156 .map_err(|e| McpError::InvalidParams(e.to_string()))?;
157 #[cfg(feature = "auth")]
158 let result = {
159 let identity = session.identity.clone();
160 crate::server::auth_context::scope(identity, self.router.get_prompt(req))
161 .await?
162 };
163 #[cfg(not(feature = "auth"))]
164 let result = self.router.get_prompt(req).await?;
165 Ok(serde_json::to_value(result)?)
166 }
167 "logging/setLevel" => {
168 self.require_initialized(session)?;
169 let _req: SetLevelRequest = serde_json::from_value(params)
170 .map_err(|e| McpError::InvalidParams(e.to_string()))?;
171 Ok(serde_json::json!({}))
172 }
173 "completion/complete" => {
174 self.require_initialized(session)?;
175 let req: CompleteRequest = serde_json::from_value(params)
176 .map_err(|e| McpError::InvalidParams(e.to_string()))?;
177 #[cfg(feature = "auth")]
178 let result = {
179 let identity = session.identity.clone();
180 crate::server::auth_context::scope(identity, self.router.complete(req)).await?
181 };
182 #[cfg(not(feature = "auth"))]
183 let result = self.router.complete(req).await?;
184 Ok(serde_json::to_value(result)?)
185 }
186 method => Err(McpError::MethodNotFound(method.to_owned())),
187 }
188 }
189
190 async fn handle_initialize(
191 &self,
192 req: InitializeRequest,
193 session: &mut Session,
194 ) -> McpResult<Value> {
195 info!(
196 client = %req.client_info.name,
197 version = %req.client_info.version,
198 "Client initializing"
199 );
200
201 session.client_info = Some(req.client_info);
202 session.client_capabilities = Some(req.capabilities);
203 session.protocol_version = Some(req.protocol_version);
204 session.initialized = true;
205
206 let capabilities = ServerCapabilities {
207 tools: if self.router.has_tools() {
208 Some(ToolsCapability {
209 list_changed: Some(true),
210 })
211 } else {
212 None
213 },
214 resources: if self.router.has_resources() {
215 Some(ResourcesCapability {
216 subscribe: Some(false),
217 list_changed: Some(true),
218 })
219 } else {
220 None
221 },
222 prompts: if self.router.has_prompts() {
223 Some(PromptsCapability {
224 list_changed: Some(true),
225 })
226 } else {
227 None
228 },
229 logging: Some(LoggingCapability {}),
230 experimental: None,
231 };
232
233 let result = InitializeResult {
234 protocol_version: MCP_PROTOCOL_VERSION.to_owned(),
235 capabilities,
236 server_info: self.info.clone(),
237 instructions: self.instructions.clone(),
238 };
239
240 Ok(serde_json::to_value(result)?)
241 }
242
243 async fn handle_notification(&self, notif: JsonRpcNotification, session: &mut Session) {
244 match notif.method.as_str() {
245 "notifications/initialized" => {
246 info!(session = %session.id, "Client sent initialized notification");
247 }
248 "notifications/cancelled" => {
249 if let Some(params) = notif.params {
250 if let Ok(cancelled) = serde_json::from_value::<CancelledNotification>(params) {
251 debug!(request_id = ?cancelled.request_id, reason = ?cancelled.reason, "Request cancelled by client");
252 let was_cancelled = self
253 .cancellation
254 .cancel(&session.id, &cancelled.request_id)
255 .await;
256 if was_cancelled {
257 info!(request_id = ?cancelled.request_id, "Request successfully cancelled");
258 } else {
259 debug!(request_id = ?cancelled.request_id, "Request not found or already completed");
260 }
261 }
262 }
263 }
264 "notifications/roots/list_changed" => {
265 debug!("Client roots list changed");
266 }
268 method => {
269 warn!(method, "Received unknown notification");
270 }
271 }
272 }
273
274 fn require_initialized(&self, session: &Session) -> McpResult<()> {
275 if !session.initialized {
276 Err(McpError::InvalidRequest(
277 "Server not initialized. Send 'initialize' first.".to_owned(),
278 ))
279 } else {
280 Ok(())
281 }
282 }
283
284 pub fn info(&self) -> &ServerInfo {
285 &self.info
286 }
287}