1use std::sync::Arc;
2
3use crate::{
4 error::{McpError, McpResult},
5 protocol::{
6 JsonRpcError, JsonRpcMessage, JsonRpcNotification, JsonRpcRequest, JsonRpcResponse,
7 MCP_PROTOCOL_VERSION,
8 },
9 types::{
10 messages::{
11 CallToolRequest, CompleteRequest, GetPromptRequest, InitializeRequest,
12 InitializeResult, ListPromptsRequest, ListResourcesRequest, ListToolsRequest,
13 ReadResourceRequest, SetLevelRequest, SubscribeRequest, UnsubscribeRequest,
14 },
15 LoggingCapability, PromptsCapability, ResourcesCapability, ServerCapabilities, ServerInfo,
16 ToolsCapability,
17 },
18};
19use serde_json::Value;
20use tracing::{debug, error, info, warn};
21
22use crate::server::{router::Router, session::Session};
23
24#[cfg(feature = "auth")]
25use crate::auth::DynAuthProvider;
26
27#[derive(Clone)]
32pub struct McpServer {
33 pub(crate) info: ServerInfo,
34 pub(crate) instructions: Option<String>,
35 pub(crate) router: Arc<Router>,
36 #[cfg(feature = "auth")]
37 pub(crate) auth_provider: Option<DynAuthProvider>,
38 #[cfg(feature = "auth")]
39 pub(crate) require_auth: bool,
40}
41
42impl McpServer {
43 pub fn builder() -> crate::server::builder::McpServerBuilder {
44 crate::server::builder::McpServerBuilder::new()
45 }
46
47 pub async fn handle_message(
49 &self,
50 msg: JsonRpcMessage,
51 session: &mut Session,
52 ) -> Option<JsonRpcMessage> {
53 match msg {
54 JsonRpcMessage::Request(req) => {
55 let id = req.id.clone();
56 match self.dispatch_request(req, session).await {
57 Ok(result) => Some(JsonRpcMessage::Response(JsonRpcResponse {
58 jsonrpc: "2.0".to_owned(),
59 id,
60 result,
61 })),
62 Err(e) => {
63 error!(error = %e, "Request failed");
64 Some(JsonRpcMessage::Error(JsonRpcError::new(id, e)))
65 }
66 }
67 }
68 JsonRpcMessage::Notification(notif) => {
69 self.handle_notification(notif, session).await;
70 None
71 }
72 _ => None,
73 }
74 }
75
76 async fn dispatch_request(
77 &self,
78 req: JsonRpcRequest,
79 session: &mut Session,
80 ) -> McpResult<Value> {
81 let params = req.params.unwrap_or(Value::Null);
82 debug!(method = %req.method, "Dispatching request");
83
84 match req.method.as_str() {
85 "initialize" => {
86 let init: InitializeRequest = serde_json::from_value(params)
87 .map_err(|e| McpError::InvalidParams(e.to_string()))?;
88 self.handle_initialize(init, session).await
89 }
90 "ping" => Ok(serde_json::json!({})),
91 "tools/list" => {
92 self.require_initialized(session)?;
93 let req: ListToolsRequest = serde_json::from_value(params).unwrap_or_default();
94 Ok(serde_json::to_value(
95 self.router.list_tools(req.cursor.as_deref()),
96 )?)
97 }
98 "tools/call" => {
99 self.require_initialized(session)?;
100 let req: CallToolRequest = serde_json::from_value(params)
101 .map_err(|e| McpError::InvalidParams(e.to_string()))?;
102 #[cfg(feature = "auth")]
103 let result = {
104 let identity = session.identity.clone();
105 crate::server::auth_context::scope(identity, self.router.call_tool(req)).await?
106 };
107 #[cfg(not(feature = "auth"))]
108 let result = self.router.call_tool(req).await?;
109 Ok(serde_json::to_value(result)?)
110 }
111 "resources/list" => {
112 self.require_initialized(session)?;
113 let req: ListResourcesRequest = serde_json::from_value(params).unwrap_or_default();
114 Ok(serde_json::to_value(
115 self.router.list_resources(req.cursor.as_deref()),
116 )?)
117 }
118 "resources/read" => {
119 self.require_initialized(session)?;
120 let req: ReadResourceRequest = serde_json::from_value(params)
121 .map_err(|e| McpError::InvalidParams(e.to_string()))?;
122 #[cfg(feature = "auth")]
123 let result = {
124 let identity = session.identity.clone();
125 crate::server::auth_context::scope(identity, self.router.read_resource(req))
126 .await?
127 };
128 #[cfg(not(feature = "auth"))]
129 let result = self.router.read_resource(req).await?;
130 Ok(serde_json::to_value(result)?)
131 }
132 "resources/subscribe" => {
133 self.require_initialized(session)?;
134 let _req: SubscribeRequest = serde_json::from_value(params)
135 .map_err(|e| McpError::InvalidParams(e.to_string()))?;
136 Ok(serde_json::json!({}))
137 }
138 "resources/unsubscribe" => {
139 self.require_initialized(session)?;
140 let _req: UnsubscribeRequest = serde_json::from_value(params)
141 .map_err(|e| McpError::InvalidParams(e.to_string()))?;
142 Ok(serde_json::json!({}))
143 }
144 "prompts/list" => {
145 self.require_initialized(session)?;
146 let req: ListPromptsRequest = serde_json::from_value(params).unwrap_or_default();
147 Ok(serde_json::to_value(
148 self.router.list_prompts(req.cursor.as_deref()),
149 )?)
150 }
151 "prompts/get" => {
152 self.require_initialized(session)?;
153 let req: GetPromptRequest = serde_json::from_value(params)
154 .map_err(|e| McpError::InvalidParams(e.to_string()))?;
155 #[cfg(feature = "auth")]
156 let result = {
157 let identity = session.identity.clone();
158 crate::server::auth_context::scope(identity, self.router.get_prompt(req))
159 .await?
160 };
161 #[cfg(not(feature = "auth"))]
162 let result = self.router.get_prompt(req).await?;
163 Ok(serde_json::to_value(result)?)
164 }
165 "logging/setLevel" => {
166 self.require_initialized(session)?;
167 let _req: SetLevelRequest = serde_json::from_value(params)
168 .map_err(|e| McpError::InvalidParams(e.to_string()))?;
169 Ok(serde_json::json!({}))
170 }
171 "completion/complete" => {
172 self.require_initialized(session)?;
173 let _req: CompleteRequest = serde_json::from_value(params)
174 .map_err(|e| McpError::InvalidParams(e.to_string()))?;
175 Ok(serde_json::json!({ "completion": { "values": [], "hasMore": false } }))
176 }
177 method => Err(McpError::MethodNotFound(method.to_owned())),
178 }
179 }
180
181 async fn handle_initialize(
182 &self,
183 req: InitializeRequest,
184 session: &mut Session,
185 ) -> McpResult<Value> {
186 info!(
187 client = %req.client_info.name,
188 version = %req.client_info.version,
189 "Client initializing"
190 );
191
192 session.client_info = Some(req.client_info);
193 session.protocol_version = Some(req.protocol_version);
194 session.initialized = true;
195
196 let capabilities = ServerCapabilities {
197 tools: if self.router.has_tools() {
198 Some(ToolsCapability {
199 list_changed: Some(true),
200 })
201 } else {
202 None
203 },
204 resources: if self.router.has_resources() {
205 Some(ResourcesCapability {
206 subscribe: Some(false),
207 list_changed: Some(true),
208 })
209 } else {
210 None
211 },
212 prompts: if self.router.has_prompts() {
213 Some(PromptsCapability {
214 list_changed: Some(true),
215 })
216 } else {
217 None
218 },
219 logging: Some(LoggingCapability {}),
220 experimental: None,
221 };
222
223 let result = InitializeResult {
224 protocol_version: MCP_PROTOCOL_VERSION.to_owned(),
225 capabilities,
226 server_info: self.info.clone(),
227 instructions: self.instructions.clone(),
228 };
229
230 Ok(serde_json::to_value(result)?)
231 }
232
233 async fn handle_notification(&self, notif: JsonRpcNotification, session: &mut Session) {
234 match notif.method.as_str() {
235 "notifications/initialized" => {
236 info!(session = %session.id, "Client sent initialized notification");
237 }
238 "notifications/cancelled" => {
239 debug!("Client cancelled a request");
240 }
241 method => {
242 warn!(method, "Received unknown notification");
243 }
244 }
245 }
246
247 fn require_initialized(&self, session: &Session) -> McpResult<()> {
248 if !session.initialized {
249 Err(McpError::InvalidRequest(
250 "Server not initialized. Send 'initialize' first.".to_owned(),
251 ))
252 } else {
253 Ok(())
254 }
255 }
256
257 pub fn info(&self) -> &ServerInfo {
258 &self.info
259 }
260}