mmcp_server/
lib.rs

1pub mod inventory;
2pub mod primitives;
3
4use std::{borrow::Cow, collections::BTreeMap};
5
6use crate::{
7    inventory::ToolRegistration,
8    primitives::tool::{BoxedTool, Tool},
9};
10use anyhow::{Context as _, anyhow};
11use mmcp_protocol::{
12    ProtocolVersion,
13    consts::error_codes,
14    mcp::{
15        self, CallToolRequest, CallToolRequestParams, CallToolResult, CallToolResultContent,
16        Implementation, InitializeRequest, InitializeResult, JSONRPCBatchRequest, JSONRPCError,
17        JSONRPCMessage, JSONRPCRequest, JSONRPCResponse, JsonrpcBatchResponseItem,
18        JsonrpcErrorError, RequestId, ServerCapabilities, ServerCapabilitiesPrompts,
19        ServerCapabilitiesResources, ServerCapabilitiesTools, TextContent,
20    },
21    port::{RPCPort, RPCSink},
22};
23
24pub struct MCPServer {
25    name: String,
26    version: String,
27    tools: BTreeMap<Cow<'static, str>, BoxedTool>,
28    instructions: Option<String>,
29}
30
31impl MCPServer {
32    pub fn new(name: impl Into<String>, version: impl Into<String>) -> Self {
33        Self {
34            name: name.into(),
35            version: version.into(),
36            tools: Default::default(),
37            instructions: None,
38        }
39    }
40
41    pub fn with_tools_from_inventory(mut self) -> Self {
42        for tool in inventory::iter::<ToolRegistration> {
43            let tool = tool.tool();
44            self.tools.insert(tool.name(), tool);
45        }
46        self
47    }
48
49    pub fn add_tool(&mut self, tool: impl Tool + Send + Sync + 'static) {
50        self.tools.insert(tool.name(), Box::new(tool));
51    }
52
53    pub fn get_tool(&self, name: &str) -> Option<&BoxedTool> {
54        self.tools.get(name)
55    }
56
57    pub fn list_tools(&self) -> impl Iterator<Item = &BoxedTool> {
58        self.tools.values()
59    }
60
61    /// List the resources available on this server.
62    pub fn list_resources(&self) -> impl Iterator<Item = mcp::Resource> {
63        // Currently, we don't have any resources, so return an empty iterator
64        std::iter::empty()
65    }
66
67    /// List the prompts available on this server.
68    pub fn list_prompts(&self) -> impl Iterator<Item = mcp::Prompt> {
69        // Currently, we don't have any prompts, so return an empty iterator
70        std::iter::empty()
71    }
72
73    /// Set the instructions for the server which will be sent to the client on initialize.
74    pub fn with_instructions(mut self, instructions: impl Into<String>) -> Self {
75        self.instructions = Some(instructions.into());
76        self
77    }
78
79    /// Start the server by initializing and then processing requests
80    pub async fn start<P: RPCPort>(self, mut port: P) -> anyhow::Result<()> {
81        // Create a single sink to use throughout the lifecycle
82        let mut sink = port.sink();
83
84        // First handle initialization
85        let queued_messages = self.initialize(&mut port, &mut sink).await?;
86
87        // Process any messages queued during initialization
88        for message in queued_messages {
89            self.handle_message(&mut sink, message).await?;
90        }
91
92        // Main message processing loop
93        while let Ok(Some(message)) = port.progress().await {
94            self.handle_message(&mut sink, message).await?;
95        }
96
97        Ok(())
98    }
99
100    /// Handle the initialization process
101    async fn initialize<P: RPCPort, S: RPCSink>(
102        &self,
103        port: &mut P,
104        sink: &mut S,
105    ) -> anyhow::Result<Vec<JSONRPCMessage>> {
106        let mut queued_messages = Vec::new();
107
108        // Step 1: Wait for initialize request
109        let (init_request_id, init_request) = loop {
110            let message = port
111                .progress()
112                .await?
113                .ok_or_else(|| anyhow!("connection closed during initialization"))?;
114
115            match message {
116                JSONRPCMessage::JSONRPCRequest(request) if request.method == "initialize" => {
117                    // Parse the initialize request
118                    let request_value = serde_json::to_value(&request)
119                        .map_err(|e| anyhow!("failed to serialize request: {}", e))?;
120                    let initialize_request: InitializeRequest =
121                        serde_json::from_value(request_value)
122                            .map_err(|e| anyhow!("failed to parse initialize request: {}", e))?;
123
124                    // Return the request ID and request
125                    break (request.id.clone(), initialize_request);
126                }
127                // Queue any other messages to be processed after initialization
128                _ => queued_messages.push(message),
129            }
130        };
131
132        // Step 2: Respond to initialize request
133        self.send_initialize_response(sink, init_request_id, &init_request)
134            .await?;
135
136        // Step 3: Wait for initialized notification
137        loop {
138            let message = port
139                .progress()
140                .await?
141                .ok_or_else(|| anyhow!("connection closed during initialization"))?;
142
143            match message {
144                JSONRPCMessage::JSONRPCNotification(notification)
145                    if notification.method == "notifications/initialized" =>
146                {
147                    // Initialized notification received, initialization is complete
148                    break;
149                }
150                // Queue any other messages to be processed after initialization
151                _ => queued_messages.push(message),
152            }
153        }
154
155        Ok(queued_messages)
156    }
157
158    /// Send the initialize response with server information and capabilities
159    async fn send_initialize_response<S: RPCSink>(
160        &self,
161        sink: &mut S,
162        id: RequestId,
163        init_request: &InitializeRequest,
164    ) -> anyhow::Result<()> {
165        let protocol_version = init_request
166            .params
167            .protocol_version
168            .parse::<ProtocolVersion>()
169            .context("failed to parse protocol version")?;
170        let response = InitializeResult {
171            meta: None,
172            capabilities: ServerCapabilities {
173                tools: Some(ServerCapabilitiesTools {
174                    list_changed: Some(true),
175                    extra: Default::default(),
176                }),
177                resources: Some(ServerCapabilitiesResources {
178                    list_changed: Some(true),
179                    subscribe: Some(false),
180                    extra: Default::default(),
181                }),
182                prompts: Some(ServerCapabilitiesPrompts {
183                    list_changed: Some(true),
184                    extra: Default::default(),
185                }),
186                ..Default::default()
187            },
188            instructions: self.instructions.clone(),
189            protocol_version: protocol_version.to_string(),
190            server_info: Implementation {
191                name: self.name.clone(),
192                version: self.version.clone(),
193                extra: Default::default(),
194            },
195            extra: Default::default(),
196        };
197
198        // Send response
199        sink.send_response(id, response).await?;
200
201        Ok(())
202    }
203
204    /// Handle a single message from the client
205    async fn handle_message<S: RPCSink>(
206        &self,
207        sink: &mut S,
208        message: JSONRPCMessage,
209    ) -> anyhow::Result<()> {
210        match message {
211            JSONRPCMessage::JSONRPCRequest(request) => {
212                let response = self.handle_request(request).await?;
213                match response {
214                    JsonrpcBatchResponseItem::JSONRPCResponse(response) => {
215                        sink.send_message(JSONRPCMessage::JSONRPCResponse(response))
216                            .await?;
217                    }
218                    JsonrpcBatchResponseItem::JSONRPCError(error) => {
219                        sink.send_message(JSONRPCMessage::JSONRPCError(error))
220                            .await?;
221                    }
222                }
223            }
224            JSONRPCMessage::JSONRPCNotification(_notification) => {
225                // TODO
226            }
227            JSONRPCMessage::JSONRPCBatchRequest(batch) => {
228                self.handle_batch_request(sink, batch).await?;
229            }
230            _ => {}
231        }
232
233        Ok(())
234    }
235
236    async fn handle_request(
237        &self,
238        request: JSONRPCRequest,
239    ) -> anyhow::Result<JsonrpcBatchResponseItem> {
240        match request.method.as_str() {
241            "tools/call" => {
242                let Some(params) = request.params else {
243                    return Ok(JsonrpcBatchResponseItem::JSONRPCError(JSONRPCError {
244                        error: JsonrpcErrorError {
245                            message: "No parameters provided".to_string(),
246                            code: error_codes::INVALID_PARAMS,
247                            data: None,
248                            extra: Default::default(),
249                        },
250                        id: request.id,
251                        jsonrpc: Default::default(),
252                        extra: Default::default(),
253                    }));
254                };
255
256                let tool_request = match serde_json::from_value::<CallToolRequestParams>(
257                    serde_json::Value::Object(params.extra),
258                ) {
259                    Ok(req) => req,
260                    Err(e) => {
261                        let result = CallToolResult {
262                            extra: Default::default(),
263                            meta: Default::default(),
264                            content: vec![CallToolResultContent::TextContent(TextContent {
265                                text: format!("Failed to parse tool call request: {}", e),
266                                r#type: Default::default(),
267                                annotations: Default::default(),
268                                extra: Default::default(),
269                            })],
270                            is_error: Some(true),
271                        };
272                        return Ok(JsonrpcBatchResponseItem::JSONRPCResponse(JSONRPCResponse {
273                            id: request.id,
274                            jsonrpc: Default::default(),
275                            result: serialize_tool_call_result(result)?,
276                            extra: Default::default(),
277                        }));
278                    }
279                };
280
281                // Extract tool name from params
282                let tool_name = tool_request.name.as_str();
283
284                // Find the tool and execute it
285                if let Some(tool) = self.get_tool(tool_name) {
286                    // Execute the tool using the tool trait's execute method
287                    let result = tool
288                        .execute(CallToolRequest {
289                            method: Default::default(),
290                            params: tool_request,
291                            extra: request.extra,
292                        })
293                        .await;
294                    Ok(JsonrpcBatchResponseItem::JSONRPCResponse(JSONRPCResponse {
295                        id: request.id,
296                        jsonrpc: Default::default(),
297                        result: serialize_tool_call_result(result)?,
298                        extra: Default::default(),
299                    }))
300                } else {
301                    // Tool not found, send error response
302                    Ok(JsonrpcBatchResponseItem::JSONRPCError(JSONRPCError {
303                        error: JsonrpcErrorError {
304                            message: format!("Tool not found: {}", tool_name),
305                            code: error_codes::INVALID_PARAMS,
306                            data: None,
307                            extra: Default::default(),
308                        },
309                        id: request.id,
310                        jsonrpc: Default::default(),
311                        extra: Default::default(),
312                    }))
313                }
314            }
315            "tools/list" => {
316                // Handle tool listing request
317                let tools = self
318                    .list_tools()
319                    .map(|tool| {
320                        Ok(serde_json::json!({
321                            "name": tool.name(),
322                            "description": tool.description(),
323                            "inputSchema": serde_json::from_str::<serde_json::Value>(tool.input_schema().as_ref())?,
324                            "annotations": tool.annotations()
325                        }))
326                    })
327                    .collect::<anyhow::Result<Vec<_>>>()?;
328
329                Ok(JsonrpcBatchResponseItem::JSONRPCResponse(JSONRPCResponse {
330                    id: request.id,
331                    jsonrpc: Default::default(),
332                    result: mcp::Result {
333                        meta: Default::default(),
334                        extra: serde_json::json!({"tools": tools})
335                            .as_object()
336                            .unwrap()
337                            .clone(),
338                    },
339                    extra: Default::default(),
340                }))
341            }
342            "resources/list" => {
343                // Handle resource listing request
344                let resources = self
345                    .list_resources()
346                    .map(|resource| Ok(serde_json::to_value(resource)?))
347                    .collect::<anyhow::Result<Vec<_>>>()?;
348
349                Ok(JsonrpcBatchResponseItem::JSONRPCResponse(JSONRPCResponse {
350                    id: request.id,
351                    jsonrpc: Default::default(),
352                    result: mcp::Result {
353                        meta: Default::default(),
354                        extra: serde_json::json!({"resources": resources})
355                            .as_object()
356                            .unwrap()
357                            .clone(),
358                    },
359                    extra: Default::default(),
360                }))
361            }
362            "prompts/list" => {
363                // Handle prompt listing request
364                let prompts = self
365                    .list_prompts()
366                    .map(|prompt| Ok(serde_json::to_value(prompt)?))
367                    .collect::<anyhow::Result<Vec<_>>>()?;
368
369                Ok(JsonrpcBatchResponseItem::JSONRPCResponse(JSONRPCResponse {
370                    id: request.id,
371                    jsonrpc: Default::default(),
372                    result: mcp::Result {
373                        meta: Default::default(),
374                        extra: serde_json::json!({"prompts": prompts})
375                            .as_object()
376                            .unwrap()
377                            .clone(),
378                    },
379                    extra: Default::default(),
380                }))
381            }
382            // Handle other request types
383            _ => {
384                // Return method not found error
385                Ok(JsonrpcBatchResponseItem::JSONRPCError(JSONRPCError {
386                    error: JsonrpcErrorError {
387                        message: format!("Method not supported: {}", request.method),
388                        code: error_codes::METHOD_NOT_FOUND,
389                        data: None,
390                        extra: Default::default(),
391                    },
392                    id: request.id,
393                    jsonrpc: Default::default(),
394                    extra: Default::default(),
395                }))
396            }
397        }
398    }
399
400    async fn handle_batch_request<S: RPCSink>(
401        &self,
402        _sink: &mut S,
403        _batch: JSONRPCBatchRequest,
404    ) -> anyhow::Result<()> {
405        // TODO: Implement batch request handling
406        Ok(())
407    }
408}
409
410fn serialize_tool_call_result(result: CallToolResult) -> anyhow::Result<mcp::Result> {
411    let serde_json::Value::Object(result) = serde_json::to_value(&result)? else {
412        panic!("CallToolResult should be serialized to an object");
413    };
414    Ok(mcp::Result {
415        meta: Default::default(),
416        extra: result,
417    })
418}