1use std::sync::Arc;
2
3use crate::{
4 error::{McpError, McpResult},
5 protocol::{
6 JsonRpcError, JsonRpcMessage, JsonRpcNotification, JsonRpcRequest, JsonRpcResponse,
7 MCP_PROTOCOL_VERSION,
8 },
9 types::{
10 messages::{
11 CallToolRequest, CompleteRequest, GetPromptRequest, InitializeRequest,
12 InitializeResult, ListPromptsRequest, ListResourcesRequest, ListToolsRequest,
13 ReadResourceRequest, SetLevelRequest, SubscribeRequest, UnsubscribeRequest,
14 },
15 LoggingCapability, PromptsCapability, ResourcesCapability, ServerCapabilities, ServerInfo,
16 ToolsCapability,
17 },
18};
19use serde_json::Value;
20use tracing::{debug, error, info, warn};
21
22use crate::server::{router::Router, session::Session};
23
24#[derive(Clone)]
29pub struct McpServer {
30 pub(crate) info: ServerInfo,
31 pub(crate) instructions: Option<String>,
32 pub(crate) router: Arc<Router>,
33}
34
35impl McpServer {
36 pub fn builder() -> crate::server::builder::McpServerBuilder {
37 crate::server::builder::McpServerBuilder::new()
38 }
39
40 pub async fn handle_message(
42 &self,
43 msg: JsonRpcMessage,
44 session: &mut Session,
45 ) -> Option<JsonRpcMessage> {
46 match msg {
47 JsonRpcMessage::Request(req) => {
48 let id = req.id.clone();
49 match self.dispatch_request(req, session).await {
50 Ok(result) => Some(JsonRpcMessage::Response(JsonRpcResponse {
51 jsonrpc: "2.0".to_owned(),
52 id,
53 result,
54 })),
55 Err(e) => {
56 error!(error = %e, "Request failed");
57 Some(JsonRpcMessage::Error(JsonRpcError::new(id, e)))
58 }
59 }
60 }
61 JsonRpcMessage::Notification(notif) => {
62 self.handle_notification(notif, session).await;
63 None
64 }
65 _ => None,
66 }
67 }
68
69 async fn dispatch_request(
70 &self,
71 req: JsonRpcRequest,
72 session: &mut Session,
73 ) -> McpResult<Value> {
74 let params = req.params.unwrap_or(Value::Null);
75 debug!(method = %req.method, "Dispatching request");
76
77 match req.method.as_str() {
78 "initialize" => {
79 let init: InitializeRequest = serde_json::from_value(params)
80 .map_err(|e| McpError::InvalidParams(e.to_string()))?;
81 self.handle_initialize(init, session).await
82 }
83 "ping" => Ok(serde_json::json!({})),
84 "tools/list" => {
85 self.require_initialized(session)?;
86 let req: ListToolsRequest = serde_json::from_value(params).unwrap_or_default();
87 Ok(serde_json::to_value(
88 self.router.list_tools(req.cursor.as_deref()),
89 )?)
90 }
91 "tools/call" => {
92 self.require_initialized(session)?;
93 let req: CallToolRequest = serde_json::from_value(params)
94 .map_err(|e| McpError::InvalidParams(e.to_string()))?;
95 Ok(serde_json::to_value(self.router.call_tool(req).await?)?)
96 }
97 "resources/list" => {
98 self.require_initialized(session)?;
99 let req: ListResourcesRequest = serde_json::from_value(params).unwrap_or_default();
100 Ok(serde_json::to_value(
101 self.router.list_resources(req.cursor.as_deref()),
102 )?)
103 }
104 "resources/read" => {
105 self.require_initialized(session)?;
106 let req: ReadResourceRequest = serde_json::from_value(params)
107 .map_err(|e| McpError::InvalidParams(e.to_string()))?;
108 Ok(serde_json::to_value(self.router.read_resource(req).await?)?)
109 }
110 "resources/subscribe" => {
111 self.require_initialized(session)?;
112 let _req: SubscribeRequest = serde_json::from_value(params)
113 .map_err(|e| McpError::InvalidParams(e.to_string()))?;
114 Ok(serde_json::json!({}))
115 }
116 "resources/unsubscribe" => {
117 self.require_initialized(session)?;
118 let _req: UnsubscribeRequest = serde_json::from_value(params)
119 .map_err(|e| McpError::InvalidParams(e.to_string()))?;
120 Ok(serde_json::json!({}))
121 }
122 "prompts/list" => {
123 self.require_initialized(session)?;
124 let req: ListPromptsRequest = serde_json::from_value(params).unwrap_or_default();
125 Ok(serde_json::to_value(
126 self.router.list_prompts(req.cursor.as_deref()),
127 )?)
128 }
129 "prompts/get" => {
130 self.require_initialized(session)?;
131 let req: GetPromptRequest = serde_json::from_value(params)
132 .map_err(|e| McpError::InvalidParams(e.to_string()))?;
133 Ok(serde_json::to_value(self.router.get_prompt(req).await?)?)
134 }
135 "logging/setLevel" => {
136 self.require_initialized(session)?;
137 let _req: SetLevelRequest = serde_json::from_value(params)
138 .map_err(|e| McpError::InvalidParams(e.to_string()))?;
139 Ok(serde_json::json!({}))
140 }
141 "completion/complete" => {
142 self.require_initialized(session)?;
143 let _req: CompleteRequest = serde_json::from_value(params)
144 .map_err(|e| McpError::InvalidParams(e.to_string()))?;
145 Ok(serde_json::json!({ "completion": { "values": [], "hasMore": false } }))
146 }
147 method => Err(McpError::MethodNotFound(method.to_owned())),
148 }
149 }
150
151 async fn handle_initialize(
152 &self,
153 req: InitializeRequest,
154 session: &mut Session,
155 ) -> McpResult<Value> {
156 info!(
157 client = %req.client_info.name,
158 version = %req.client_info.version,
159 "Client initializing"
160 );
161
162 session.client_info = Some(req.client_info);
163 session.protocol_version = Some(req.protocol_version);
164 session.initialized = true;
165
166 let capabilities = ServerCapabilities {
167 tools: if self.router.has_tools() {
168 Some(ToolsCapability {
169 list_changed: Some(true),
170 })
171 } else {
172 None
173 },
174 resources: if self.router.has_resources() {
175 Some(ResourcesCapability {
176 subscribe: Some(false),
177 list_changed: Some(true),
178 })
179 } else {
180 None
181 },
182 prompts: if self.router.has_prompts() {
183 Some(PromptsCapability {
184 list_changed: Some(true),
185 })
186 } else {
187 None
188 },
189 logging: Some(LoggingCapability {}),
190 experimental: None,
191 };
192
193 let result = InitializeResult {
194 protocol_version: MCP_PROTOCOL_VERSION.to_owned(),
195 capabilities,
196 server_info: self.info.clone(),
197 instructions: self.instructions.clone(),
198 };
199
200 Ok(serde_json::to_value(result)?)
201 }
202
203 async fn handle_notification(&self, notif: JsonRpcNotification, session: &mut Session) {
204 match notif.method.as_str() {
205 "notifications/initialized" => {
206 info!(session = %session.id, "Client sent initialized notification");
207 }
208 "notifications/cancelled" => {
209 debug!("Client cancelled a request");
210 }
211 method => {
212 warn!(method, "Received unknown notification");
213 }
214 }
215 }
216
217 fn require_initialized(&self, session: &Session) -> McpResult<()> {
218 if !session.initialized {
219 Err(McpError::InvalidRequest(
220 "Server not initialized. Send 'initialize' first.".to_owned(),
221 ))
222 } else {
223 Ok(())
224 }
225 }
226
227 pub fn info(&self) -> &ServerInfo {
228 &self.info
229 }
230}