codex_memory/mcp_server/
mod.rs1pub mod handlers;
3pub mod tools;
4pub mod transport;
5
6pub use handlers::MCPHandlers;
8
9use crate::config::Config;
10use crate::error::Result;
11use crate::storage::Storage;
12use std::sync::Arc;
13use std::time::{Duration, Instant};
14use tokio::io::{AsyncReadExt, AsyncWriteExt};
15use tracing::{error, info, warn};
16
17pub struct MCPServer {
19 _config: Config,
20 handlers: Arc<MCPHandlers>,
21 start_time: Instant,
22 last_request: Arc<std::sync::Mutex<Instant>>,
23}
24
25impl MCPServer {
26 pub fn new(config: Config, storage: Arc<Storage>) -> Self {
28 let handlers = Arc::new(MCPHandlers::new(storage));
29 let now = Instant::now();
30 Self {
31 _config: config,
32 handlers,
33 start_time: now,
34 last_request: Arc::new(std::sync::Mutex::new(now)),
35 }
36 }
37
38 fn should_terminate(&self) -> bool {
40 let last_request = *self.last_request.lock().unwrap();
41 let inactive_duration = last_request.elapsed();
42
43 if inactive_duration > Duration::from_secs(86400) {
45 warn!(
46 "Server inactive for {:?}, initiating shutdown",
47 inactive_duration
48 );
49 return true;
50 }
51
52 false
53 }
54
55 fn update_last_request(&self) {
57 *self.last_request.lock().unwrap() = Instant::now();
58 }
59
60 async fn health_monitor(&self) {
62 let mut interval = tokio::time::interval(Duration::from_secs(60)); loop {
65 interval.tick().await;
66
67 if self.should_terminate() {
68 error!("Health monitor detected inactivity timeout, terminating process");
69 std::process::exit(1);
70 }
71
72 let uptime = self.start_time.elapsed();
73 let last_request_ago = self.last_request.lock().unwrap().elapsed();
74
75 info!(
76 "Health check: uptime={:?}, last_request={:?} ago",
77 uptime, last_request_ago
78 );
79 }
80 }
81
82 pub async fn run_stdio(&self) -> Result<()> {
84 info!("MCP server running in stdio mode");
85
86 let health_monitor = {
88 let server_clone = Self {
89 _config: self._config.clone(),
90 handlers: Arc::clone(&self.handlers),
91 start_time: self.start_time,
92 last_request: Arc::clone(&self.last_request),
93 };
94 tokio::spawn(async move {
95 server_clone.health_monitor().await;
96 })
97 };
98
99 let stdin = tokio::io::stdin();
100 let stdout = tokio::io::stdout();
101 let mut stdin = stdin;
102 let mut stdout = stdout;
103
104 let mut buffer = String::new();
105 let mut temp_buf = [0u8; 8192]; loop {
108 match stdin.read(&mut temp_buf).await {
110 Ok(0) => {
111 info!("Received EOF, shutting down MCP server");
112 break; }
114 Ok(n) => {
115 let chunk = String::from_utf8_lossy(&temp_buf[..n]);
117 info!(
118 "Read {} bytes: {:?}",
119 n,
120 &chunk[..std::cmp::min(100, chunk.len())]
121 );
122 buffer.push_str(&chunk);
123
124 while let Some(json_end) = self.find_complete_json(&buffer) {
126 let json_str = buffer[..json_end].trim().to_string();
127 info!(
128 "Found complete JSON at position {}: {:?}",
129 json_end,
130 &json_str[..std::cmp::min(100, json_str.len())]
131 );
132 buffer.drain(..json_end);
133 while buffer.starts_with('\n')
135 || buffer.starts_with('\r')
136 || buffer.starts_with(' ')
137 {
138 buffer.remove(0);
139 }
140 if !buffer.is_empty() {
141 info!(
142 "Buffer after cleanup: {:?}",
143 &buffer[..std::cmp::min(50, buffer.len())]
144 );
145 }
146
147 if !json_str.is_empty() {
148 info!("Processing JSON request ({} chars)", json_str.len());
149 self.update_last_request();
150 let response = self.handle_request(&json_str).await;
151 if !response.is_empty() {
152 stdout.write_all(response.as_bytes()).await?;
153 stdout.write_all(b"\n").await?;
154 stdout.flush().await?;
155 }
156 }
157 }
158 }
159 Err(e) => {
160 error!("Error reading input: {}", e);
161 break;
162 }
163 }
164 }
165
166 info!("MCP server shutting down gracefully");
167
168 health_monitor.abort();
170
171 Ok(())
173 }
174
175 fn find_complete_json(&self, buffer: &str) -> Option<usize> {
177 let mut brace_count = 0;
178 let mut in_string = false;
179 let mut escape_next = false;
180 let mut start_found = false;
181
182 for (i, ch) in buffer.char_indices() {
183 if escape_next {
184 escape_next = false;
185 continue;
186 }
187
188 match ch {
189 '\\' if in_string => escape_next = true,
190 '"' => in_string = !in_string,
191 '{' if !in_string => {
192 brace_count += 1;
193 start_found = true;
194 }
195 '}' if !in_string => {
196 brace_count -= 1;
197 if start_found && brace_count == 0 {
198 return Some(i + 1);
199 }
200 }
201 _ => {}
202 }
203 }
204
205 None
206 }
207
208 async fn handle_request(&self, request: &str) -> String {
209 info!("Raw request to parse: {:?}", request);
211
212 let request: serde_json::Value = match serde_json::from_str(request) {
213 Ok(v) => v,
214 Err(e) => {
215 error!("JSON parse error: {} - Request: {:?}", e, request);
216 return format!(
217 r#"{{"jsonrpc":"2.0","id":0,"error":{{"code":-32700,"message":"Parse error: {}"}}}}"#,
218 e
219 );
220 }
221 };
222
223 let method = request["method"].as_str().unwrap_or("");
224 let params = request.get("params").cloned().unwrap_or_default();
225 let id = request.get("id").cloned();
226
227 let result = match method {
228 "initialize" => Ok(serde_json::json!({
229 "protocolVersion": "2024-11-05",
230 "capabilities": {
231 "tools": {}
232 },
233 "serverInfo": {
234 "name": "codex-memory",
235 "version": env!("CARGO_PKG_VERSION")
236 }
237 })),
238 "tools/list" => Ok(serde_json::json!({
239 "tools": tools::MCPTools::get_tools_list()
240 })),
241 "tools/call" => {
242 let tool_name = params["name"].as_str().unwrap_or("");
243 let tool_params = params.get("arguments").cloned().unwrap_or_default();
244 self.handlers.handle_tool_call(tool_name, tool_params).await
245 }
246 "prompts/list" => {
247 Ok(serde_json::json!({
249 "prompts": []
250 }))
251 }
252 "resources/list" => {
253 Ok(serde_json::json!({
255 "resources": []
256 }))
257 }
258 "notifications/initialized" => {
259 return "".to_string(); }
262 _ => {
263 Err(crate::error::Error::Other(format!(
265 "Unknown method: {}. Supported methods: initialize, tools/list, tools/call, prompts/list, resources/list, notifications/initialized",
266 method
267 )))
268 }
269 };
270
271 match result {
272 Ok(value) => {
273 if let Some(id) = id {
274 format!(r#"{{"jsonrpc":"2.0","id":{},"result":{}}}"#, id, value)
275 } else {
276 format!(r#"{{"jsonrpc":"2.0","result":{}}}"#, value)
277 }
278 }
279 Err(e) => {
280 error!("MCP request failed - Method: {}, Error: {}", method, e);
282
283 if let Some(id) = id {
284 format!(
285 r#"{{"jsonrpc":"2.0","id":{},"error":{{"code":-32000,"message":"{}"}}}}"#,
286 id, e
287 )
288 } else {
289 format!(
290 r#"{{"jsonrpc":"2.0","id":0,"error":{{"code":-32000,"message":"{}"}}}}"#,
291 e
292 )
293 }
294 }
295 }
296 }
297}