airsprotocols_mcpserver_filesystem/mcp/
message_handler.rs

1//! MessageHandler implementation for airsprotocols-mcpserver-filesystem
2//!
3//! This module provides the MessageHandler<()> implementation that integrates
4//! the existing ToolProvider business logic with the new airsprotocols-mcp architecture.
5
6// Layer 1: Standard library imports
7use std::sync::Arc;
8
9// Layer 2: Third-party crate imports
10use async_trait::async_trait;
11use serde_json::json;
12use tokio::io::{stdout, AsyncWriteExt};
13use tracing::{error, info};
14
15// Layer 3: Internal module imports
16// Layer 3a: AIRS foundation crates (prioritized)
17use airsprotocols_mcp::protocol::{
18    constants::methods as mcp_methods, InitializeResponse, JsonRpcMessage, JsonRpcNotification,
19    JsonRpcRequest, JsonRpcResponse, MessageContext, MessageHandler, ProtocolVersion, ServerInfo,
20    TransportError,
21};
22use airsprotocols_mcp::providers::ToolProvider;
23
24// Layer 3b: Local crate modules
25use crate::mcp::handlers::{DirectoryOperations, FileOperations};
26use crate::mcp::server::FilesystemMcpServer;
27
28/// MCP Message Handler for airsprotocols-mcpserver-filesystem STDIO Transport
29///
30/// This handler wraps the existing FilesystemMcpServer (ToolProvider) and provides
31/// the MessageHandler<()> interface required by the new airsprotocols-mcp architecture.
32/// It preserves all existing business logic while enabling proper transport integration.
33#[derive(Debug)]
34pub struct FilesystemMessageHandler<F, D>
35where
36    F: FileOperations,
37    D: DirectoryOperations,
38{
39    server: Arc<FilesystemMcpServer<F, D>>,
40}
41
42impl<F, D> FilesystemMessageHandler<F, D>
43where
44    F: FileOperations,
45    D: DirectoryOperations,
46{
47    /// Create a new message handler wrapping the existing server
48    pub fn new(server: Arc<FilesystemMcpServer<F, D>>) -> Self {
49        Self { server }
50    }
51
52    /// Process MCP JSON-RPC notifications
53    async fn process_mcp_notification(&self, notification: &JsonRpcNotification) {
54        info!("Processing MCP notification: {}", notification.method);
55
56        match notification.method.as_str() {
57            // Protocol initialization complete
58            mcp_methods::INITIALIZED => self.handle_initialized_notification(notification).await,
59
60            // Unknown notifications
61            _ => {
62                info!("Unknown notification method: {}", notification.method);
63            }
64        }
65    }
66
67    /// Process MCP JSON-RPC requests using existing ToolProvider logic
68    async fn process_mcp_request(&self, request: &JsonRpcRequest) -> JsonRpcResponse {
69        info!("Processing MCP request: {}", request.method);
70
71        match request.method.as_str() {
72            // Protocol initialization
73            mcp_methods::INITIALIZE => self.handle_initialize(request).await,
74
75            // Tool management methods (delegated to existing ToolProvider)
76            mcp_methods::TOOLS_LIST => self.handle_tools_list(request).await,
77            mcp_methods::TOOLS_CALL => self.handle_tools_call(request).await,
78
79            // Ping/pong for connectivity testing
80            mcp_methods::PING => self.handle_ping(request).await,
81
82            // Unknown methods
83            _ => self.create_method_not_found_response(
84                request,
85                &format!("Unknown method: {}", request.method),
86            ),
87        }
88    }
89
90    /// Handle initialize request
91    async fn handle_initialize(&self, request: &JsonRpcRequest) -> JsonRpcResponse {
92        info!("Handling initialize request");
93
94        // Manually construct capabilities JSON - only tools and experimental
95        let capabilities_json = json!({
96            "experimental": {},
97            "tools": {
98                "list_changed": false
99            }
100            // Only filesystem tools - no resources, prompts, or logging
101        });
102
103        // Use the current supported protocol version instead of hardcoded old version
104        let protocol_version = ProtocolVersion::current();
105
106        let response = InitializeResponse {
107            protocol_version,
108            capabilities: capabilities_json,
109            server_info: ServerInfo {
110                name: "airsprotocols-mcpserver-filesystem".to_string(),
111                version: env!("CARGO_PKG_VERSION").to_string(),
112            },
113        };
114
115        JsonRpcResponse {
116            jsonrpc: "2.0".to_string(),
117            result: Some(serde_json::to_value(response).unwrap_or(json!({}))),
118            error: None,
119            id: Some(request.id.clone()),
120        }
121    }
122
123    /// Handle initialized notification (completes the MCP handshake)
124    async fn handle_initialized_notification(&self, _notification: &JsonRpcNotification) {
125        info!("MCP initialization handshake completed successfully");
126        info!("Server is now ready to handle tool calls");
127        // No response needed for notifications
128    }
129
130    /// Handle tools/list request - delegates to existing ToolProvider
131    async fn handle_tools_list(&self, request: &JsonRpcRequest) -> JsonRpcResponse {
132        info!("Handling tools/list request");
133
134        match self.server.list_tools().await {
135            Ok(tools) => JsonRpcResponse {
136                jsonrpc: "2.0".to_string(),
137                result: Some(json!({ "tools": tools })),
138                error: None,
139                id: Some(request.id.clone()),
140            },
141            Err(e) => self.create_error_response(request, -32603, &format!("Internal error: {e}")),
142        }
143    }
144
145    /// Handle tools/call request - delegates to existing ToolProvider  
146    async fn handle_tools_call(&self, request: &JsonRpcRequest) -> JsonRpcResponse {
147        info!("Handling tools/call request");
148
149        // Parse call_tool request parameters
150        let params = request.params.clone().unwrap_or(json!({}));
151
152        // Extract tool name and arguments according to MCP spec
153        let tool_name = match params.get("name") {
154            Some(name) => name.as_str().unwrap_or(""),
155            None => {
156                return self.create_error_response(
157                    request,
158                    -32602,
159                    "Invalid params: missing 'name' field",
160                );
161            }
162        };
163
164        let arguments = params.get("arguments").cloned().unwrap_or(json!({}));
165
166        // Delegate to existing ToolProvider implementation
167        match self.server.call_tool(tool_name, arguments).await {
168            Ok(result) => JsonRpcResponse {
169                jsonrpc: "2.0".to_string(),
170                result: Some(json!({ "content": result })),
171                error: None,
172                id: Some(request.id.clone()),
173            },
174            Err(e) => {
175                // Map tool provider errors to appropriate JSON-RPC error codes
176                let error_message = e.to_string();
177                if error_message.contains("Tool not found")
178                    || error_message.contains("Unknown tool")
179                {
180                    // Tool not found should be treated as invalid params (tool name is invalid)
181                    self.create_error_response(request, -32602, &format!("Invalid params: {e}"))
182                } else {
183                    // Other tool errors are internal errors (execution failures)
184                    self.create_error_response(request, -32603, &format!("Internal error: {e}"))
185                }
186            }
187        }
188    }
189
190    /// Handle ping request
191    async fn handle_ping(&self, request: &JsonRpcRequest) -> JsonRpcResponse {
192        info!("Handling ping request");
193
194        JsonRpcResponse {
195            jsonrpc: "2.0".to_string(),
196            result: Some(json!("pong")),
197            error: None,
198            id: Some(request.id.clone()),
199        }
200    }
201
202    /// Create standardized error response
203    fn create_error_response(
204        &self,
205        request: &JsonRpcRequest,
206        code: i32,
207        message: &str,
208    ) -> JsonRpcResponse {
209        JsonRpcResponse {
210            jsonrpc: "2.0".to_string(),
211            result: None,
212            error: Some(json!({
213                "code": code,
214                "message": message
215            })),
216            id: Some(request.id.clone()),
217        }
218    }
219
220    /// Create method not found response
221    fn create_method_not_found_response(
222        &self,
223        request: &JsonRpcRequest,
224        message: &str,
225    ) -> JsonRpcResponse {
226        self.create_error_response(request, -32601, message)
227    }
228}
229
230#[async_trait]
231impl<F, D> MessageHandler<()> for FilesystemMessageHandler<F, D>
232where
233    F: FileOperations,
234    D: DirectoryOperations,
235{
236    /// Handle incoming JSON-RPC messages from the transport layer
237    async fn handle_message(&self, message: JsonRpcMessage, _context: MessageContext<()>) {
238        match message {
239            JsonRpcMessage::Request(request) => {
240                info!("Processing MCP request: {}", request.method);
241                let response = self.process_mcp_request(&request).await;
242
243                // Send response via stdout (STDIO transport integration)
244                let response_json = serde_json::to_string(&JsonRpcMessage::Response(response))
245                    .unwrap_or_else(|e| {
246                        error!("Failed to serialize response: {}", e);
247                        r#"{"jsonrpc":"2.0","error":{"code":-32603,"message":"Serialization error"},"id":null}"#.to_string()
248                    });
249
250                info!("Sending JSON response: {}", response_json);
251
252                // Write to stdout
253                if let Err(e) = stdout().write_all(response_json.as_bytes()).await {
254                    error!("Failed to write response to stdout: {}", e);
255                }
256                if let Err(e) = stdout().write_all(b"\n").await {
257                    error!("Failed to write newline to stdout: {}", e);
258                }
259                if let Err(e) = stdout().flush().await {
260                    error!("Failed to flush stdout: {}", e);
261                }
262                
263                info!("Response sent successfully");
264                
265                // Ensure we don't immediately close the connection after sending response
266                // The transport should stay alive for subsequent messages (like 'initialized' notification)
267                tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
268            }
269            JsonRpcMessage::Notification(notification) => {
270                info!("Processing MCP notification: {}", notification.method);
271                self.process_mcp_notification(&notification).await;
272            }
273            JsonRpcMessage::Response(_) => {
274                // STDIO servers typically don't receive responses from clients
275                info!("Received response message, ignoring");
276            }
277        }
278    }
279
280    /// Handle transport-level errors
281    async fn handle_error(&self, error: TransportError) {
282        error!("Transport error: {}", error);
283    }
284
285    /// Handle transport close events
286    async fn handle_close(&self) {
287        info!("Transport connection closed");
288    }
289}