1pub mod inventory;
2pub mod primitives;
3
4use std::{borrow::Cow, collections::BTreeMap};
5
6use crate::{
7 inventory::ToolRegistration,
8 primitives::tool::{BoxedTool, Tool},
9};
10use anyhow::{Context as _, anyhow};
11use mmcp_protocol::{
12 ProtocolVersion,
13 consts::error_codes,
14 mcp::{
15 self, CallToolRequest, CallToolRequestParams, CallToolResult, CallToolResultContent,
16 Implementation, InitializeRequest, InitializeResult, JSONRPCBatchRequest, JSONRPCError,
17 JSONRPCMessage, JSONRPCRequest, JSONRPCResponse, JsonrpcBatchResponseItem,
18 JsonrpcErrorError, RequestId, ServerCapabilities, ServerCapabilitiesPrompts,
19 ServerCapabilitiesResources, ServerCapabilitiesTools, TextContent,
20 },
21 port::{RPCPort, RPCSink},
22};
23
24pub struct MCPServer {
25 name: String,
26 version: String,
27 tools: BTreeMap<Cow<'static, str>, BoxedTool>,
28 instructions: Option<String>,
29}
30
31impl MCPServer {
32 pub fn new(name: impl Into<String>, version: impl Into<String>) -> Self {
33 Self {
34 name: name.into(),
35 version: version.into(),
36 tools: Default::default(),
37 instructions: None,
38 }
39 }
40
41 pub fn with_tools_from_inventory(mut self) -> Self {
42 for tool in inventory::iter::<ToolRegistration> {
43 let tool = tool.tool();
44 self.tools.insert(tool.name(), tool);
45 }
46 self
47 }
48
49 pub fn add_tool(&mut self, tool: impl Tool + Send + Sync + 'static) {
50 self.tools.insert(tool.name(), Box::new(tool));
51 }
52
53 pub fn get_tool(&self, name: &str) -> Option<&BoxedTool> {
54 self.tools.get(name)
55 }
56
57 pub fn list_tools(&self) -> impl Iterator<Item = &BoxedTool> {
58 self.tools.values()
59 }
60
61 pub fn list_resources(&self) -> impl Iterator<Item = mcp::Resource> {
63 std::iter::empty()
65 }
66
67 pub fn list_prompts(&self) -> impl Iterator<Item = mcp::Prompt> {
69 std::iter::empty()
71 }
72
73 pub fn with_instructions(mut self, instructions: impl Into<String>) -> Self {
75 self.instructions = Some(instructions.into());
76 self
77 }
78
79 pub async fn start<P: RPCPort>(self, mut port: P) -> anyhow::Result<()> {
81 let mut sink = port.sink();
83
84 let queued_messages = self.initialize(&mut port, &mut sink).await?;
86
87 for message in queued_messages {
89 self.handle_message(&mut sink, message).await?;
90 }
91
92 while let Ok(Some(message)) = port.progress().await {
94 self.handle_message(&mut sink, message).await?;
95 }
96
97 Ok(())
98 }
99
100 async fn initialize<P: RPCPort, S: RPCSink>(
102 &self,
103 port: &mut P,
104 sink: &mut S,
105 ) -> anyhow::Result<Vec<JSONRPCMessage>> {
106 let mut queued_messages = Vec::new();
107
108 let (init_request_id, init_request) = loop {
110 let message = port
111 .progress()
112 .await?
113 .ok_or_else(|| anyhow!("connection closed during initialization"))?;
114
115 match message {
116 JSONRPCMessage::JSONRPCRequest(request) if request.method == "initialize" => {
117 let request_value = serde_json::to_value(&request)
119 .map_err(|e| anyhow!("failed to serialize request: {}", e))?;
120 let initialize_request: InitializeRequest =
121 serde_json::from_value(request_value)
122 .map_err(|e| anyhow!("failed to parse initialize request: {}", e))?;
123
124 break (request.id.clone(), initialize_request);
126 }
127 _ => queued_messages.push(message),
129 }
130 };
131
132 self.send_initialize_response(sink, init_request_id, &init_request)
134 .await?;
135
136 loop {
138 let message = port
139 .progress()
140 .await?
141 .ok_or_else(|| anyhow!("connection closed during initialization"))?;
142
143 match message {
144 JSONRPCMessage::JSONRPCNotification(notification)
145 if notification.method == "notifications/initialized" =>
146 {
147 break;
149 }
150 _ => queued_messages.push(message),
152 }
153 }
154
155 Ok(queued_messages)
156 }
157
158 async fn send_initialize_response<S: RPCSink>(
160 &self,
161 sink: &mut S,
162 id: RequestId,
163 init_request: &InitializeRequest,
164 ) -> anyhow::Result<()> {
165 let protocol_version = init_request
166 .params
167 .protocol_version
168 .parse::<ProtocolVersion>()
169 .context("failed to parse protocol version")?;
170 let response = InitializeResult {
171 meta: None,
172 capabilities: ServerCapabilities {
173 tools: Some(ServerCapabilitiesTools {
174 list_changed: Some(true),
175 extra: Default::default(),
176 }),
177 resources: Some(ServerCapabilitiesResources {
178 list_changed: Some(true),
179 subscribe: Some(false),
180 extra: Default::default(),
181 }),
182 prompts: Some(ServerCapabilitiesPrompts {
183 list_changed: Some(true),
184 extra: Default::default(),
185 }),
186 ..Default::default()
187 },
188 instructions: self.instructions.clone(),
189 protocol_version: protocol_version.to_string(),
190 server_info: Implementation {
191 name: self.name.clone(),
192 version: self.version.clone(),
193 extra: Default::default(),
194 },
195 extra: Default::default(),
196 };
197
198 sink.send_response(id, response).await?;
200
201 Ok(())
202 }
203
204 async fn handle_message<S: RPCSink>(
206 &self,
207 sink: &mut S,
208 message: JSONRPCMessage,
209 ) -> anyhow::Result<()> {
210 match message {
211 JSONRPCMessage::JSONRPCRequest(request) => {
212 let response = self.handle_request(request).await?;
213 match response {
214 JsonrpcBatchResponseItem::JSONRPCResponse(response) => {
215 sink.send_message(JSONRPCMessage::JSONRPCResponse(response))
216 .await?;
217 }
218 JsonrpcBatchResponseItem::JSONRPCError(error) => {
219 sink.send_message(JSONRPCMessage::JSONRPCError(error))
220 .await?;
221 }
222 }
223 }
224 JSONRPCMessage::JSONRPCNotification(_notification) => {
225 }
227 JSONRPCMessage::JSONRPCBatchRequest(batch) => {
228 self.handle_batch_request(sink, batch).await?;
229 }
230 _ => {}
231 }
232
233 Ok(())
234 }
235
236 async fn handle_request(
237 &self,
238 request: JSONRPCRequest,
239 ) -> anyhow::Result<JsonrpcBatchResponseItem> {
240 match request.method.as_str() {
241 "tools/call" => {
242 let Some(params) = request.params else {
243 return Ok(JsonrpcBatchResponseItem::JSONRPCError(JSONRPCError {
244 error: JsonrpcErrorError {
245 message: "No parameters provided".to_string(),
246 code: error_codes::INVALID_PARAMS,
247 data: None,
248 extra: Default::default(),
249 },
250 id: request.id,
251 jsonrpc: Default::default(),
252 extra: Default::default(),
253 }));
254 };
255
256 let tool_request = match serde_json::from_value::<CallToolRequestParams>(
257 serde_json::Value::Object(params.extra),
258 ) {
259 Ok(req) => req,
260 Err(e) => {
261 let result = CallToolResult {
262 extra: Default::default(),
263 meta: Default::default(),
264 content: vec![CallToolResultContent::TextContent(TextContent {
265 text: format!("Failed to parse tool call request: {}", e),
266 r#type: Default::default(),
267 annotations: Default::default(),
268 extra: Default::default(),
269 })],
270 is_error: Some(true),
271 };
272 return Ok(JsonrpcBatchResponseItem::JSONRPCResponse(JSONRPCResponse {
273 id: request.id,
274 jsonrpc: Default::default(),
275 result: serialize_tool_call_result(result)?,
276 extra: Default::default(),
277 }));
278 }
279 };
280
281 let tool_name = tool_request.name.as_str();
283
284 if let Some(tool) = self.get_tool(tool_name) {
286 let result = tool
288 .execute(CallToolRequest {
289 method: Default::default(),
290 params: tool_request,
291 extra: request.extra,
292 })
293 .await;
294 Ok(JsonrpcBatchResponseItem::JSONRPCResponse(JSONRPCResponse {
295 id: request.id,
296 jsonrpc: Default::default(),
297 result: serialize_tool_call_result(result)?,
298 extra: Default::default(),
299 }))
300 } else {
301 Ok(JsonrpcBatchResponseItem::JSONRPCError(JSONRPCError {
303 error: JsonrpcErrorError {
304 message: format!("Tool not found: {}", tool_name),
305 code: error_codes::INVALID_PARAMS,
306 data: None,
307 extra: Default::default(),
308 },
309 id: request.id,
310 jsonrpc: Default::default(),
311 extra: Default::default(),
312 }))
313 }
314 }
315 "tools/list" => {
316 let tools = self
318 .list_tools()
319 .map(|tool| {
320 Ok(serde_json::json!({
321 "name": tool.name(),
322 "description": tool.description(),
323 "inputSchema": serde_json::from_str::<serde_json::Value>(tool.input_schema().as_ref())?,
324 "annotations": tool.annotations()
325 }))
326 })
327 .collect::<anyhow::Result<Vec<_>>>()?;
328
329 Ok(JsonrpcBatchResponseItem::JSONRPCResponse(JSONRPCResponse {
330 id: request.id,
331 jsonrpc: Default::default(),
332 result: mcp::Result {
333 meta: Default::default(),
334 extra: serde_json::json!({"tools": tools})
335 .as_object()
336 .unwrap()
337 .clone(),
338 },
339 extra: Default::default(),
340 }))
341 }
342 "resources/list" => {
343 let resources = self
345 .list_resources()
346 .map(|resource| Ok(serde_json::to_value(resource)?))
347 .collect::<anyhow::Result<Vec<_>>>()?;
348
349 Ok(JsonrpcBatchResponseItem::JSONRPCResponse(JSONRPCResponse {
350 id: request.id,
351 jsonrpc: Default::default(),
352 result: mcp::Result {
353 meta: Default::default(),
354 extra: serde_json::json!({"resources": resources})
355 .as_object()
356 .unwrap()
357 .clone(),
358 },
359 extra: Default::default(),
360 }))
361 }
362 "prompts/list" => {
363 let prompts = self
365 .list_prompts()
366 .map(|prompt| Ok(serde_json::to_value(prompt)?))
367 .collect::<anyhow::Result<Vec<_>>>()?;
368
369 Ok(JsonrpcBatchResponseItem::JSONRPCResponse(JSONRPCResponse {
370 id: request.id,
371 jsonrpc: Default::default(),
372 result: mcp::Result {
373 meta: Default::default(),
374 extra: serde_json::json!({"prompts": prompts})
375 .as_object()
376 .unwrap()
377 .clone(),
378 },
379 extra: Default::default(),
380 }))
381 }
382 _ => {
384 Ok(JsonrpcBatchResponseItem::JSONRPCError(JSONRPCError {
386 error: JsonrpcErrorError {
387 message: format!("Method not supported: {}", request.method),
388 code: error_codes::METHOD_NOT_FOUND,
389 data: None,
390 extra: Default::default(),
391 },
392 id: request.id,
393 jsonrpc: Default::default(),
394 extra: Default::default(),
395 }))
396 }
397 }
398 }
399
400 async fn handle_batch_request<S: RPCSink>(
401 &self,
402 _sink: &mut S,
403 _batch: JSONRPCBatchRequest,
404 ) -> anyhow::Result<()> {
405 Ok(())
407 }
408}
409
410fn serialize_tool_call_result(result: CallToolResult) -> anyhow::Result<mcp::Result> {
411 let serde_json::Value::Object(result) = serde_json::to_value(&result)? else {
412 panic!("CallToolResult should be serialized to an object");
413 };
414 Ok(mcp::Result {
415 meta: Default::default(),
416 extra: result,
417 })
418}