airsprotocols_mcpserver_filesystem/mcp/
message_handler.rs1use std::sync::Arc;
8
9use async_trait::async_trait;
11use serde_json::json;
12use tokio::io::{stdout, AsyncWriteExt};
13use tracing::{error, info};
14
15use airsprotocols_mcp::protocol::{
18 constants::methods as mcp_methods, InitializeResponse, JsonRpcMessage, JsonRpcNotification,
19 JsonRpcRequest, JsonRpcResponse, MessageContext, MessageHandler, ProtocolVersion, ServerInfo,
20 TransportError,
21};
22use airsprotocols_mcp::providers::ToolProvider;
23
24use crate::mcp::handlers::{DirectoryOperations, FileOperations};
26use crate::mcp::server::FilesystemMcpServer;
27
28#[derive(Debug)]
34pub struct FilesystemMessageHandler<F, D>
35where
36 F: FileOperations,
37 D: DirectoryOperations,
38{
39 server: Arc<FilesystemMcpServer<F, D>>,
40}
41
42impl<F, D> FilesystemMessageHandler<F, D>
43where
44 F: FileOperations,
45 D: DirectoryOperations,
46{
47 pub fn new(server: Arc<FilesystemMcpServer<F, D>>) -> Self {
49 Self { server }
50 }
51
52 async fn process_mcp_notification(&self, notification: &JsonRpcNotification) {
54 info!("Processing MCP notification: {}", notification.method);
55
56 match notification.method.as_str() {
57 mcp_methods::INITIALIZED => self.handle_initialized_notification(notification).await,
59
60 _ => {
62 info!("Unknown notification method: {}", notification.method);
63 }
64 }
65 }
66
67 async fn process_mcp_request(&self, request: &JsonRpcRequest) -> JsonRpcResponse {
69 info!("Processing MCP request: {}", request.method);
70
71 match request.method.as_str() {
72 mcp_methods::INITIALIZE => self.handle_initialize(request).await,
74
75 mcp_methods::TOOLS_LIST => self.handle_tools_list(request).await,
77 mcp_methods::TOOLS_CALL => self.handle_tools_call(request).await,
78
79 mcp_methods::PING => self.handle_ping(request).await,
81
82 _ => self.create_method_not_found_response(
84 request,
85 &format!("Unknown method: {}", request.method),
86 ),
87 }
88 }
89
90 async fn handle_initialize(&self, request: &JsonRpcRequest) -> JsonRpcResponse {
92 info!("Handling initialize request");
93
94 let capabilities_json = json!({
96 "experimental": {},
97 "tools": {
98 "list_changed": false
99 }
100 });
102
103 let protocol_version = ProtocolVersion::current();
105
106 let response = InitializeResponse {
107 protocol_version,
108 capabilities: capabilities_json,
109 server_info: ServerInfo {
110 name: "airsprotocols-mcpserver-filesystem".to_string(),
111 version: env!("CARGO_PKG_VERSION").to_string(),
112 },
113 };
114
115 JsonRpcResponse {
116 jsonrpc: "2.0".to_string(),
117 result: Some(serde_json::to_value(response).unwrap_or(json!({}))),
118 error: None,
119 id: Some(request.id.clone()),
120 }
121 }
122
123 async fn handle_initialized_notification(&self, _notification: &JsonRpcNotification) {
125 info!("MCP initialization handshake completed successfully");
126 info!("Server is now ready to handle tool calls");
127 }
129
130 async fn handle_tools_list(&self, request: &JsonRpcRequest) -> JsonRpcResponse {
132 info!("Handling tools/list request");
133
134 match self.server.list_tools().await {
135 Ok(tools) => JsonRpcResponse {
136 jsonrpc: "2.0".to_string(),
137 result: Some(json!({ "tools": tools })),
138 error: None,
139 id: Some(request.id.clone()),
140 },
141 Err(e) => self.create_error_response(request, -32603, &format!("Internal error: {e}")),
142 }
143 }
144
145 async fn handle_tools_call(&self, request: &JsonRpcRequest) -> JsonRpcResponse {
147 info!("Handling tools/call request");
148
149 let params = request.params.clone().unwrap_or(json!({}));
151
152 let tool_name = match params.get("name") {
154 Some(name) => name.as_str().unwrap_or(""),
155 None => {
156 return self.create_error_response(
157 request,
158 -32602,
159 "Invalid params: missing 'name' field",
160 );
161 }
162 };
163
164 let arguments = params.get("arguments").cloned().unwrap_or(json!({}));
165
166 match self.server.call_tool(tool_name, arguments).await {
168 Ok(result) => JsonRpcResponse {
169 jsonrpc: "2.0".to_string(),
170 result: Some(json!({ "content": result })),
171 error: None,
172 id: Some(request.id.clone()),
173 },
174 Err(e) => {
175 let error_message = e.to_string();
177 if error_message.contains("Tool not found")
178 || error_message.contains("Unknown tool")
179 {
180 self.create_error_response(request, -32602, &format!("Invalid params: {e}"))
182 } else {
183 self.create_error_response(request, -32603, &format!("Internal error: {e}"))
185 }
186 }
187 }
188 }
189
190 async fn handle_ping(&self, request: &JsonRpcRequest) -> JsonRpcResponse {
192 info!("Handling ping request");
193
194 JsonRpcResponse {
195 jsonrpc: "2.0".to_string(),
196 result: Some(json!("pong")),
197 error: None,
198 id: Some(request.id.clone()),
199 }
200 }
201
202 fn create_error_response(
204 &self,
205 request: &JsonRpcRequest,
206 code: i32,
207 message: &str,
208 ) -> JsonRpcResponse {
209 JsonRpcResponse {
210 jsonrpc: "2.0".to_string(),
211 result: None,
212 error: Some(json!({
213 "code": code,
214 "message": message
215 })),
216 id: Some(request.id.clone()),
217 }
218 }
219
220 fn create_method_not_found_response(
222 &self,
223 request: &JsonRpcRequest,
224 message: &str,
225 ) -> JsonRpcResponse {
226 self.create_error_response(request, -32601, message)
227 }
228}
229
230#[async_trait]
231impl<F, D> MessageHandler<()> for FilesystemMessageHandler<F, D>
232where
233 F: FileOperations,
234 D: DirectoryOperations,
235{
236 async fn handle_message(&self, message: JsonRpcMessage, _context: MessageContext<()>) {
238 match message {
239 JsonRpcMessage::Request(request) => {
240 info!("Processing MCP request: {}", request.method);
241 let response = self.process_mcp_request(&request).await;
242
243 let response_json = serde_json::to_string(&JsonRpcMessage::Response(response))
245 .unwrap_or_else(|e| {
246 error!("Failed to serialize response: {}", e);
247 r#"{"jsonrpc":"2.0","error":{"code":-32603,"message":"Serialization error"},"id":null}"#.to_string()
248 });
249
250 info!("Sending JSON response: {}", response_json);
251
252 if let Err(e) = stdout().write_all(response_json.as_bytes()).await {
254 error!("Failed to write response to stdout: {}", e);
255 }
256 if let Err(e) = stdout().write_all(b"\n").await {
257 error!("Failed to write newline to stdout: {}", e);
258 }
259 if let Err(e) = stdout().flush().await {
260 error!("Failed to flush stdout: {}", e);
261 }
262
263 info!("Response sent successfully");
264
265 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
268 }
269 JsonRpcMessage::Notification(notification) => {
270 info!("Processing MCP notification: {}", notification.method);
271 self.process_mcp_notification(¬ification).await;
272 }
273 JsonRpcMessage::Response(_) => {
274 info!("Received response message, ignoring");
276 }
277 }
278 }
279
280 async fn handle_error(&self, error: TransportError) {
282 error!("Transport error: {}", error);
283 }
284
285 async fn handle_close(&self) {
287 info!("Transport connection closed");
288 }
289}