codex_memory/mcp_server/
mod.rs

1//! Minimal MCP server implementation
2pub mod handlers;
3pub mod tools;
4pub mod transport;
5
6// Re-export for tests
7pub use handlers::MCPHandlers;
8
9use crate::config::Config;
10use crate::error::Result;
11use crate::storage::Storage;
12use std::sync::Arc;
13use std::time::{Duration, Instant};
14use tokio::io::{AsyncReadExt, AsyncWriteExt};
15use tracing::{error, info, warn};
16
17/// Simple MCP server
18pub struct MCPServer {
19    _config: Config,
20    handlers: Arc<MCPHandlers>,
21    start_time: Instant,
22    last_request: Arc<std::sync::Mutex<Instant>>,
23}
24
25impl MCPServer {
26    /// Create a new MCP server
27    pub fn new(config: Config, storage: Arc<Storage>) -> Self {
28        let handlers = Arc::new(MCPHandlers::new(storage));
29        let now = Instant::now();
30        Self {
31            _config: config,
32            handlers,
33            start_time: now,
34            last_request: Arc::new(std::sync::Mutex::new(now)),
35        }
36    }
37
38    /// Check if server should self-terminate due to inactivity
39    fn should_terminate(&self) -> bool {
40        let last_request = *self.last_request.lock().unwrap();
41        let inactive_duration = last_request.elapsed();
42
43        // Terminate if inactive for more than 24 hours (Claude Desktop manages restarts)
44        if inactive_duration > Duration::from_secs(86400) {
45            warn!(
46                "Server inactive for {:?}, initiating shutdown",
47                inactive_duration
48            );
49            return true;
50        }
51
52        false
53    }
54
55    /// Update last request time
56    fn update_last_request(&self) {
57        *self.last_request.lock().unwrap() = Instant::now();
58    }
59
60    /// Log health status periodically
61    async fn health_monitor(&self) {
62        let mut interval = tokio::time::interval(Duration::from_secs(60)); // Every minute
63
64        loop {
65            interval.tick().await;
66
67            if self.should_terminate() {
68                error!("Health monitor detected inactivity timeout, terminating process");
69                std::process::exit(1);
70            }
71
72            let uptime = self.start_time.elapsed();
73            let last_request_ago = self.last_request.lock().unwrap().elapsed();
74
75            info!(
76                "Health check: uptime={:?}, last_request={:?} ago",
77                uptime, last_request_ago
78            );
79        }
80    }
81
82    /// Run in stdio mode for Claude Desktop
83    pub async fn run_stdio(&self) -> Result<()> {
84        info!("MCP server running in stdio mode");
85
86        // Spawn health monitor task
87        let health_monitor = {
88            let server_clone = Self {
89                _config: self._config.clone(),
90                handlers: Arc::clone(&self.handlers),
91                start_time: self.start_time,
92                last_request: Arc::clone(&self.last_request),
93            };
94            tokio::spawn(async move {
95                server_clone.health_monitor().await;
96            })
97        };
98
99        let stdin = tokio::io::stdin();
100        let stdout = tokio::io::stdout();
101        let mut stdin = stdin;
102        let mut stdout = stdout;
103
104        let mut buffer = String::new();
105        let mut temp_buf = [0u8; 8192]; // 8KB buffer for reading chunks
106
107        loop {
108            // Read chunk from stdin
109            match stdin.read(&mut temp_buf).await {
110                Ok(0) => {
111                    info!("Received EOF, shutting down MCP server");
112                    break; // EOF
113                }
114                Ok(n) => {
115                    // Convert bytes to string and append to buffer
116                    let chunk = String::from_utf8_lossy(&temp_buf[..n]);
117                    info!(
118                        "Read {} bytes: {:?}",
119                        n,
120                        &chunk[..std::cmp::min(100, chunk.len())]
121                    );
122                    buffer.push_str(&chunk);
123
124                    // Process all complete JSON objects from buffer
125                    while let Some(json_end) = self.find_complete_json(&buffer) {
126                        let json_str = buffer[..json_end].trim().to_string();
127                        info!(
128                            "Found complete JSON at position {}: {:?}",
129                            json_end,
130                            &json_str[..std::cmp::min(100, json_str.len())]
131                        );
132                        buffer.drain(..json_end);
133                        // Skip any whitespace/newlines after the JSON object
134                        while buffer.starts_with('\n')
135                            || buffer.starts_with('\r')
136                            || buffer.starts_with(' ')
137                        {
138                            buffer.remove(0);
139                        }
140                        if !buffer.is_empty() {
141                            info!(
142                                "Buffer after cleanup: {:?}",
143                                &buffer[..std::cmp::min(50, buffer.len())]
144                            );
145                        }
146
147                        if !json_str.is_empty() {
148                            info!("Processing JSON request ({} chars)", json_str.len());
149                            self.update_last_request();
150                            let response = self.handle_request(&json_str).await;
151                            if !response.is_empty() {
152                                stdout.write_all(response.as_bytes()).await?;
153                                stdout.write_all(b"\n").await?;
154                                stdout.flush().await?;
155                            }
156                        }
157                    }
158                }
159                Err(e) => {
160                    error!("Error reading input: {}", e);
161                    break;
162                }
163            }
164        }
165
166        info!("MCP server shutting down gracefully");
167
168        // Cancel health monitor task
169        health_monitor.abort();
170
171        // The pool will be dropped automatically when Storage is dropped
172        Ok(())
173    }
174
175    /// Find the end of a complete JSON object in the buffer
176    fn find_complete_json(&self, buffer: &str) -> Option<usize> {
177        let mut brace_count = 0;
178        let mut in_string = false;
179        let mut escape_next = false;
180        let mut start_found = false;
181
182        for (i, ch) in buffer.char_indices() {
183            if escape_next {
184                escape_next = false;
185                continue;
186            }
187
188            match ch {
189                '\\' if in_string => escape_next = true,
190                '"' => in_string = !in_string,
191                '{' if !in_string => {
192                    brace_count += 1;
193                    start_found = true;
194                }
195                '}' if !in_string => {
196                    brace_count -= 1;
197                    if start_found && brace_count == 0 {
198                        return Some(i + 1);
199                    }
200                }
201                _ => {}
202            }
203        }
204
205        None
206    }
207
208    async fn handle_request(&self, request: &str) -> String {
209        // Add detailed logging for debugging JSON parsing issues
210        info!("Raw request to parse: {:?}", request);
211
212        let request: serde_json::Value = match serde_json::from_str(request) {
213            Ok(v) => v,
214            Err(e) => {
215                error!("JSON parse error: {} - Request: {:?}", e, request);
216                return format!(
217                    r#"{{"jsonrpc":"2.0","id":0,"error":{{"code":-32700,"message":"Parse error: {}"}}}}"#,
218                    e
219                );
220            }
221        };
222
223        let method = request["method"].as_str().unwrap_or("");
224        let params = request.get("params").cloned().unwrap_or_default();
225        let id = request.get("id").cloned();
226
227        let result = match method {
228            "initialize" => Ok(serde_json::json!({
229                "protocolVersion": "2024-11-05",
230                "capabilities": {
231                    "tools": {}
232                },
233                "serverInfo": {
234                    "name": "codex-memory",
235                    "version": env!("CARGO_PKG_VERSION")
236                }
237            })),
238            "tools/list" => Ok(serde_json::json!({
239                "tools": tools::MCPTools::get_tools_list()
240            })),
241            "tools/call" => {
242                let tool_name = params["name"].as_str().unwrap_or("");
243                let tool_params = params.get("arguments").cloned().unwrap_or_default();
244                self.handlers.handle_tool_call(tool_name, tool_params).await
245            }
246            "prompts/list" => {
247                // Return empty prompts list (we don't support prompts)
248                Ok(serde_json::json!({
249                    "prompts": []
250                }))
251            }
252            "resources/list" => {
253                // Return empty resources list (we don't support resources)
254                Ok(serde_json::json!({
255                    "resources": []
256                }))
257            }
258            "notifications/initialized" => {
259                // Notifications don't require responses, just acknowledge silently
260                return "".to_string(); // Return empty string for notifications
261            }
262            _ => {
263                // For truly unknown methods, return a more helpful error
264                Err(crate::error::Error::Other(format!(
265                    "Unknown method: {}. Supported methods: initialize, tools/list, tools/call, prompts/list, resources/list, notifications/initialized",
266                    method
267                )))
268            }
269        };
270
271        match result {
272            Ok(value) => {
273                if let Some(id) = id {
274                    format!(r#"{{"jsonrpc":"2.0","id":{},"result":{}}}"#, id, value)
275                } else {
276                    format!(r#"{{"jsonrpc":"2.0","result":{}}}"#, value)
277                }
278            }
279            Err(e) => {
280                // Log the error for debugging connection failures
281                error!("MCP request failed - Method: {}, Error: {}", method, e);
282
283                if let Some(id) = id {
284                    format!(
285                        r#"{{"jsonrpc":"2.0","id":{},"error":{{"code":-32000,"message":"{}"}}}}"#,
286                        id, e
287                    )
288                } else {
289                    format!(
290                        r#"{{"jsonrpc":"2.0","id":0,"error":{{"code":-32000,"message":"{}"}}}}"#,
291                        e
292                    )
293                }
294            }
295        }
296    }
297}