scim_server/mcp_integration/
protocol.rs

1//! MCP protocol layer for tool discovery and dispatch
2//!
3//! This module handles the core MCP protocol functionality including tool discovery,
4//! execution dispatch, and protocol communication. It serves as the interface between
5//! AI agents and the SCIM server operations.
6
7use super::core::{ScimMcpServer, ScimToolResult};
8use super::handlers::{system_info, user_crud, user_queries};
9use super::tools::{system_schemas, user_schemas};
10use crate::ResourceProvider;
11use log::{debug, error, info, warn};
12use serde::{Deserialize, Serialize};
13use serde_json::{Value, json};
14use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
15
16/// MCP JSON-RPC request structure
17#[derive(Debug, Deserialize)]
18struct McpRequest {
19    jsonrpc: String,
20    id: Option<Value>,
21    method: String,
22    params: Option<Value>,
23}
24
25/// MCP JSON-RPC response structure
26#[derive(Debug, Serialize)]
27pub struct McpResponse {
28    pub jsonrpc: String,
29    pub id: Option<Value>,
30    pub result: Option<Value>,
31    pub error: Option<Value>,
32}
33
34/// MCP JSON-RPC error structure
35#[derive(Debug, Serialize)]
36struct McpError {
37    code: i32,
38    message: String,
39    data: Option<Value>,
40}
41
42impl McpResponse {
43    /// Create a successful response
44    fn success(id: Option<Value>, result: Value) -> Self {
45        Self {
46            jsonrpc: "2.0".to_string(),
47            id,
48            result: Some(result),
49            error: None,
50        }
51    }
52
53    /// Create an error response
54    fn error(id: Option<Value>, code: i32, message: String) -> Self {
55        Self {
56            jsonrpc: "2.0".to_string(),
57            id,
58            result: None,
59            error: Some(json!(McpError {
60                code,
61                message,
62                data: None
63            })),
64        }
65    }
66}
67
68impl<P: ResourceProvider + Send + Sync + 'static> ScimMcpServer<P> {
69    /// Get the list of available MCP tools as JSON
70    ///
71    /// Returns all tool definitions that AI agents can discover and execute.
72    /// Each tool includes its schema, parameters, and documentation.
73    ///
74    /// # Examples
75    ///
76    /// ```rust
77    /// # #[cfg(feature = "mcp")]
78    /// use scim_server::mcp_integration::ScimMcpServer;
79    /// use scim_server::providers::StandardResourceProvider;
80    /// use scim_server::storage::InMemoryStorage;
81    /// # async fn example(mcp_server: ScimMcpServer<StandardResourceProvider<InMemoryStorage>>) {
82    /// let tools = mcp_server.get_tools();
83    /// println!("Available tools: {}", tools.len());
84    /// # }
85    /// ```
86    pub fn get_tools(&self) -> Vec<Value> {
87        vec![
88            user_schemas::create_user_tool(),
89            user_schemas::get_user_tool(),
90            user_schemas::update_user_tool(),
91            user_schemas::delete_user_tool(),
92            user_schemas::list_users_tool(),
93            user_schemas::search_users_tool(),
94            user_schemas::user_exists_tool(),
95            system_schemas::get_schemas_tool(),
96            system_schemas::get_server_info_tool(),
97        ]
98    }
99
100    /// Execute a tool by name with arguments
101    ///
102    /// This is the main dispatch function that routes tool execution requests
103    /// to the appropriate handler based on the tool name.
104    ///
105    /// # Arguments
106    /// * `tool_name` - The name of the tool to execute
107    /// * `arguments` - JSON arguments for the tool execution
108    ///
109    /// # Returns
110    /// A `ScimToolResult` containing the execution outcome
111    pub async fn execute_tool(&self, tool_name: &str, arguments: Value) -> ScimToolResult {
112        debug!("Executing MCP tool: {} with args: {}", tool_name, arguments);
113
114        match tool_name {
115            // User CRUD operations
116            "scim_create_user" => user_crud::handle_create_user(self, arguments).await,
117            "scim_get_user" => user_crud::handle_get_user(self, arguments).await,
118            "scim_update_user" => user_crud::handle_update_user(self, arguments).await,
119            "scim_delete_user" => user_crud::handle_delete_user(self, arguments).await,
120
121            // User query operations
122            "scim_list_users" => user_queries::handle_list_users(self, arguments).await,
123            "scim_search_users" => user_queries::handle_search_users(self, arguments).await,
124            "scim_user_exists" => user_queries::handle_user_exists(self, arguments).await,
125
126            // System information operations
127            "scim_get_schemas" => system_info::handle_get_schemas(self, arguments).await,
128            "scim_server_info" => system_info::handle_server_info(self, arguments).await,
129
130            // Unknown tool
131            _ => ScimToolResult {
132                success: false,
133                content: json!({
134                    "error": "Unknown tool",
135                    "tool_name": tool_name
136                }),
137                metadata: None,
138            },
139        }
140    }
141
142    /// Run the MCP server using stdio communication
143    ///
144    /// Starts the MCP server and begins listening for tool execution requests
145    /// over standard input/output. This is the standard MCP communication method.
146    ///
147    /// # Examples
148    ///
149    /// ```rust,no_run
150    /// # #[cfg(feature = "mcp")]
151    /// use scim_server::mcp_integration::ScimMcpServer;
152    /// use scim_server::providers::StandardResourceProvider;
153    /// use scim_server::storage::InMemoryStorage;
154    /// # async fn example(mcp_server: ScimMcpServer<StandardResourceProvider<InMemoryStorage>>) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
155    /// // Run MCP server
156    /// mcp_server.run_stdio().await?;
157    /// # Ok(())
158    /// # }
159    /// ```
160    pub async fn run_stdio(self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
161        info!("SCIM MCP server starting stdio communication");
162        info!(
163            "Available tools: {:?}",
164            self.get_tools()
165                .iter()
166                .map(|t| t.get("name"))
167                .collect::<Vec<_>>()
168        );
169
170        let stdin = tokio::io::stdin();
171        let mut stdout = tokio::io::stdout();
172        let mut reader = BufReader::new(stdin);
173        let mut line = String::new();
174
175        info!("SCIM MCP server ready - listening on stdio");
176
177        loop {
178            line.clear();
179            match reader.read_line(&mut line).await {
180                Ok(0) => {
181                    debug!("EOF received, shutting down");
182                    break; // EOF
183                }
184                Ok(_) => {
185                    let line_content = line.trim();
186                    if line_content.is_empty() {
187                        continue;
188                    }
189
190                    debug!("Received request: {}", line_content);
191
192                    if let Some(response) = self.handle_mcp_request(line_content).await {
193                        let response_json = match serde_json::to_string(&response) {
194                            Ok(json) => json,
195                            Err(e) => {
196                                error!("Failed to serialize response: {}", e);
197                                continue;
198                            }
199                        };
200
201                        debug!("Sending response: {}", response_json);
202
203                        if let Err(e) = stdout.write_all(response_json.as_bytes()).await {
204                            error!("Failed to write response: {}", e);
205                            break;
206                        }
207                        if let Err(e) = stdout.write_all(b"\n").await {
208                            error!("Failed to write newline: {}", e);
209                            break;
210                        }
211                        if let Err(e) = stdout.flush().await {
212                            error!("Failed to flush stdout: {}", e);
213                            break;
214                        }
215                    }
216                }
217                Err(e) => {
218                    error!("Error reading from stdin: {}", e);
219                    break;
220                }
221            }
222        }
223
224        info!("SCIM MCP server shutting down");
225        Ok(())
226    }
227
228    /// Handle a single MCP request and return the appropriate response
229    pub async fn handle_mcp_request(&self, line: &str) -> Option<McpResponse> {
230        let request: McpRequest = match serde_json::from_str(line) {
231            Ok(req) => req,
232            Err(e) => {
233                warn!("Failed to parse JSON request: {} - Input: {}", e, line);
234                return Some(McpResponse::error(None, -32700, "Parse error".to_string()));
235            }
236        };
237
238        debug!(
239            "Processing method: {} with id: {:?}",
240            request.method, request.id
241        );
242
243        match request.method.as_str() {
244            "initialize" => Some(self.handle_initialize(request.id)),
245            "notifications/initialized" => {
246                debug!("Received initialized notification - handshake complete");
247                None // Notifications don't require responses
248            }
249            "tools/list" => Some(self.handle_tools_list(request.id)),
250            "tools/call" => Some(self.handle_tools_call(request.id, request.params).await),
251            "ping" => Some(self.handle_ping(request.id)),
252            _ => {
253                warn!("Unknown method: {}", request.method);
254                Some(McpResponse::error(
255                    request.id,
256                    -32601,
257                    "Method not found".to_string(),
258                ))
259            }
260        }
261    }
262
263    /// Handle initialize request
264    fn handle_initialize(&self, id: Option<Value>) -> McpResponse {
265        debug!("Handling initialize request");
266
267        let result = json!({
268            "protocolVersion": "2024-11-05",
269            "capabilities": {
270                "tools": {}
271            },
272            "serverInfo": {
273                "name": self.server_info.name,
274                "version": self.server_info.version,
275                "description": self.server_info.description
276            }
277        });
278
279        McpResponse::success(id, result)
280    }
281
282    /// Handle tools/list request
283    fn handle_tools_list(&self, id: Option<Value>) -> McpResponse {
284        debug!("Handling tools/list request");
285
286        let tools = self.get_tools();
287        let result = json!({
288            "tools": tools
289        });
290
291        McpResponse::success(id, result)
292    }
293
294    /// Handle tools/call request
295    async fn handle_tools_call(&self, id: Option<Value>, params: Option<Value>) -> McpResponse {
296        debug!("Handling tools/call request");
297
298        let params = match params {
299            Some(p) => p,
300            None => {
301                return McpResponse::error(
302                    id,
303                    -32602,
304                    "Invalid params: missing parameters".to_string(),
305                );
306            }
307        };
308
309        let tool_name = match params.get("name").and_then(|n| n.as_str()) {
310            Some(name) => name,
311            None => {
312                return McpResponse::error(
313                    id,
314                    -32602,
315                    "Invalid params: missing tool name".to_string(),
316                );
317            }
318        };
319
320        let arguments = params.get("arguments").cloned().unwrap_or(json!({}));
321
322        debug!(
323            "Executing tool: {} with arguments: {}",
324            tool_name, arguments
325        );
326
327        let tool_result = self.execute_tool(tool_name, arguments).await;
328
329        if tool_result.success {
330            let result = json!({
331                "content": [
332                    {
333                        "type": "text",
334                        "text": serde_json::to_string_pretty(&tool_result.content)
335                            .unwrap_or_else(|_| "Error serializing result".to_string())
336                    }
337                ],
338                "_meta": tool_result.metadata
339            });
340
341            McpResponse::success(id, result)
342        } else {
343            McpResponse::error(
344                id,
345                -32000,
346                format!(
347                    "Tool execution failed: {}",
348                    tool_result
349                        .content
350                        .get("error")
351                        .and_then(|e| e.as_str())
352                        .unwrap_or("Unknown error")
353                ),
354            )
355        }
356    }
357
358    /// Handle ping request
359    fn handle_ping(&self, id: Option<Value>) -> McpResponse {
360        debug!("Handling ping request");
361        McpResponse::success(id, json!({}))
362    }
363}