Skip to main content

mcp_memory/
server.rs

1use serde_json::{Value, json};
2use std::num::NonZeroUsize;
3use std::path::Path;
4use std::sync::Arc;
5use std::time::Duration;
6
7use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
8use tokio::net::TcpListener;
9use tokio::sync::Semaphore;
10use tracing::{error, info};
11
12#[cfg(feature = "code")]
13use crate::actions::code as code_actions;
14use crate::actions::memory;
15use crate::config::Config;
16use crate::errors::{MCSError, Result};
17use crate::kg::GraphHandle;
18use crate::protocol::{JsonRpcRequest, JsonRpcResponse};
19use crate::tools;
20use crate::vector_actions;
21use crate::vector_store::{VectorConfig, VectorStore};
22
23/// Outcome of processing a request: either a pre-escaped JSON Value (small
24/// payloads) or a pre-serialized JSON *string* of the `result` field (avoids
25/// a second serialization pass for large payloads such as `read_graph`).
26enum HandlerResult {
27    Value(Value),
28    RawResult(String),
29}
30
31const BUFFER_CAPACITY: usize = 65536;
32const NEWLINE: &[u8] = b"\n";
33/// Maximum size of a single inbound JSON-RPC message (shared by all transports).
34pub const MAX_REQUEST_BYTES: usize = 16 * 1024 * 1024;
35/// Maximum number of concurrent TCP connections (C4).
36const MAX_TCP_CONNECTIONS: usize = 128;
37
38enum LineRead {
39    Line,
40    Eof,
41    TooLong,
42}
43
44async fn read_line_capped<R>(
45    reader: &mut R,
46    out: &mut String,
47    max: usize,
48) -> std::io::Result<LineRead>
49where
50    R: AsyncBufReadExt + Unpin,
51{
52    out.clear();
53    let mut buf: Vec<u8> = Vec::new();
54    loop {
55        let available = reader.fill_buf().await?;
56        if available.is_empty() {
57            if buf.is_empty() {
58                return Ok(LineRead::Eof);
59            }
60            // Move `buf` into the String — no copy. `buf` is not used afterward.
61            *out = String::from_utf8(buf).map_err(|_| {
62                std::io::Error::new(std::io::ErrorKind::InvalidData, "Non-UTF-8 input")
63            })?;
64            return Ok(LineRead::Line);
65        }
66        match available.iter().position(|&b| b == b'\n') {
67            Some(i) => {
68                if buf.len() + i + 1 > max {
69                    reader.consume(i + 1);
70                    return Ok(LineRead::TooLong);
71                }
72                buf.extend_from_slice(&available[..=i]);
73                reader.consume(i + 1);
74                *out = String::from_utf8(buf).map_err(|_| {
75                    std::io::Error::new(std::io::ErrorKind::InvalidData, "Non-UTF-8 input")
76                })?;
77                return Ok(LineRead::Line);
78            }
79            None => {
80                let take = available.len();
81                if buf.len() + take > max {
82                    reader.consume(take);
83                    return Ok(LineRead::TooLong);
84                }
85                buf.extend_from_slice(available);
86                reader.consume(take);
87            }
88        }
89    }
90}
91
92fn parse_error(msg: String) -> JsonRpcResponse {
93    let mcp_error = MCSError::ParseError(msg);
94    JsonRpcResponse::error(None, mcp_error.error_code(), mcp_error.to_string())
95}
96
97/// Dispatch one framed line (stdio / tcp). Returns the serialized response, or
98/// `None` for a notification. `vs` is `Some` only when vector support is enabled.
99pub fn dispatch_line(line: &str, kg: &GraphHandle, vs: Option<&VectorStore>) -> Option<String> {
100    let trimmed = line.trim();
101    if trimmed.is_empty() {
102        return Some(serde_json::to_string(&parse_error("Empty request".into())).unwrap());
103    }
104    let raw: Value = match serde_json::from_str(trimmed) {
105        Ok(v) => v,
106        Err(e) => return Some(serde_json::to_string(&parse_error(e.to_string())).unwrap()),
107    };
108    let req: JsonRpcRequest = match serde_json::from_value(raw) {
109        Ok(r) => r,
110        Err(e) => return Some(serde_json::to_string(&parse_error(e.to_string())).unwrap()),
111    };
112    req.id.as_ref()?;
113    match process_request(&req, kg, vs) {
114        Ok(HandlerResult::Value(result)) => {
115            let resp = JsonRpcResponse::success(req.id, result);
116            Some(serde_json::to_string(&resp).unwrap())
117        }
118        Ok(HandlerResult::RawResult(result_json)) => {
119            let id_json = serde_json::to_string(&req.id).unwrap();
120            let mut out = String::with_capacity(64 + id_json.len() + result_json.len());
121            out.push_str(r#"{"jsonrpc":"2.0","id":"#);
122            out.push_str(&id_json);
123            out.push_str(",\"result\":");
124            out.push_str(&result_json);
125            out.push('}');
126            Some(out)
127        }
128        Err(e) => {
129            let resp = JsonRpcResponse::error(req.id, e.error_code(), e.to_string());
130            Some(serde_json::to_string(&resp).unwrap())
131        }
132    }
133}
134
135/// Dispatch a Streamable-HTTP POST body, which may be a single JSON-RPC message
136/// or a batch array. `Ok(None)` means the body held only notifications (HTTP
137/// 202, empty body); `Err` means the body was not valid JSON.
138pub fn dispatch_http_body(
139    body: &str,
140    kg: &GraphHandle,
141    vs: Option<&VectorStore>,
142) -> std::result::Result<Option<Value>, String> {
143    let value: Value = serde_json::from_str(body.trim()).map_err(|e| e.to_string())?;
144    match value {
145        Value::Array(items) => {
146            // Batches are rare and never huge — keep Value path for simplicity.
147            let responses: Vec<Value> = items
148                .into_iter()
149                .filter_map(|v| process_value_http(v, kg, vs))
150                .collect();
151            Ok((!responses.is_empty()).then_some(Value::Array(responses)))
152        }
153        other => Ok(process_value_http(other, kg, vs)),
154    }
155}
156
157/// Process one JSON-RPC message for the HTTP transport, converting any
158/// `RawResult` back into a `Value` (acceptable since HTTP payloads are typically
159/// much smaller in this context). `None` means the message was a notification.
160fn process_value_http(value: Value, kg: &GraphHandle, vs: Option<&VectorStore>) -> Option<Value> {
161    let req: JsonRpcRequest = match serde_json::from_value(value) {
162        Ok(r) => r,
163        Err(e) => return Some(to_value(parse_error(e.to_string()))),
164    };
165    req.id.as_ref()?;
166    match process_request(&req, kg, vs) {
167        Ok(HandlerResult::Value(result)) => {
168            Some(to_value(JsonRpcResponse::success(req.id, result)))
169        }
170        Ok(HandlerResult::RawResult(result_json)) => {
171            // Parse the pre-serialized result back into a Value for HTTP delivery.
172            // This is a small extra cost for the HTTP transport; the stdio/TCP
173            // path (dispatch_line) avoids it entirely.
174            let result_val: Value = serde_json::from_str(&result_json).unwrap_or(Value::Null);
175            Some(to_value(JsonRpcResponse::success(req.id, result_val)))
176        }
177        Err(e) => Some(to_value(JsonRpcResponse::error(
178            req.id,
179            e.error_code(),
180            e.to_string(),
181        ))),
182    }
183}
184
185#[inline]
186fn to_value(resp: JsonRpcResponse) -> Value {
187    serde_json::to_value(resp).expect("JsonRpcResponse always serializes")
188}
189
190pub struct MCPServer {
191    config: Arc<Config>,
192    kg: Arc<GraphHandle>,
193    /// `Some` when vector support is enabled (`--vectors`); drives the extra
194    /// `vector_*` / `hybrid_search` tools. `None` for a pure knowledge-graph server.
195    vs: Option<Arc<VectorStore>>,
196}
197
198impl MCPServer {
199    /// Build a server. The vector subsystem (usearch index + petgraph mirror) is
200    /// only constructed when `config.vectors_enabled` is set; `vec_config` is
201    /// ignored otherwise.
202    pub fn new(config: Config, vec_config: VectorConfig) -> Result<Self> {
203        let path = Path::new(&config.memory_file_path);
204        let lru_cache = NonZeroUsize::new(config.lru_cache_size).unwrap_or_else(|| {
205            NonZeroUsize::new(10000).expect("10000 > 0")
206        });
207        let kg = GraphHandle::new(
208            path,
209            config.durability,
210            config.sqlite_tuning(),
211            lru_cache,
212            config.read_pool_size,
213        )?;
214
215        let vs = if config.vectors_enabled {
216            Some(Arc::new(VectorStore::with_config(path, &vec_config)?))
217        } else {
218            None
219        };
220
221        #[cfg(feature = "code")]
222        CODE_ENABLED.store(config.code_enabled, std::sync::atomic::Ordering::Relaxed);
223
224        Ok(Self {
225            config: Arc::new(config),
226            kg: Arc::new(kg),
227            vs,
228        })
229    }
230
231    /// Convenience constructor for a pure knowledge-graph server (no vectors).
232    pub fn new_kg(config: Config) -> Result<Self> {
233        let mut config = config;
234        config.vectors_enabled = false;
235        Self::new(config, VectorConfig::new(0))
236    }
237
238    /// Expose the shared graph handle (used to drive the HTTP transport).
239    pub fn graph(&self) -> Arc<GraphHandle> {
240        Arc::clone(&self.kg)
241    }
242
243    /// The shared vector store, if vector support is enabled.
244    pub fn vector_store(&self) -> Option<Arc<VectorStore>> {
245        self.vs.clone()
246    }
247
248    /// stdio transport: newline-delimited JSON-RPC over stdin/stdout.
249    pub async fn run_stdio(&self) -> Result<()> {
250        spawn_maintenance(self.kg.clone());
251        spawn_wal_flush(self.kg.clone(), self.config.wal_flush_ms);
252        let stdin = tokio::io::stdin();
253        let mut reader = BufReader::with_capacity(BUFFER_CAPACITY, stdin);
254        let mut stdout = tokio::io::stdout();
255        serve_line_conn(&mut reader, &mut stdout, Arc::clone(&self.kg), self.vs.clone()).await
256    }
257
258    /// TCP transport: each accepted connection speaks newline-delimited
259    /// JSON-RPC, exactly like stdio. Connections are served concurrently (up to
260    /// [`MAX_TCP_CONNECTIONS`]) and share the one graph behind its mutex.
261    pub async fn run_tcp(&self, addr: &str) -> Result<()> {
262        spawn_maintenance(self.kg.clone());
263        spawn_wal_flush(self.kg.clone(), self.config.wal_flush_ms);
264        let listener = TcpListener::bind(addr).await.map_err(MCSError::IoError)?;
265        let semaphore = Arc::new(Semaphore::new(MAX_TCP_CONNECTIONS));
266        let auth_token = self.config.auth_token.clone();
267        info!(
268            "Listening for TCP MCP connections on {addr} (max {MAX_TCP_CONNECTIONS}, auth {}, vectors {})",
269            if auth_token.is_some() { "on" } else { "off" },
270            if self.vs.is_some() { "on" } else { "off" }
271        );
272        loop {
273            let permit = Arc::clone(&semaphore).acquire_owned().await;
274            let (socket, peer) = listener.accept().await.map_err(MCSError::IoError)?;
275            let kg = Arc::clone(&self.kg);
276            let vs = self.vs.clone();
277            let auth_token = auth_token.clone();
278            tokio::spawn(async move {
279                let _permit = permit; // held for the connection lifetime
280                let (read_half, mut write_half) = socket.into_split();
281                let mut reader = BufReader::with_capacity(BUFFER_CAPACITY, read_half);
282                // When a token is configured, the client must send it as the
283                // first line before any JSON-RPC traffic.
284                if let Some(ref expected) = auth_token {
285                    match authenticate_line_conn(&mut reader, expected).await {
286                        Ok(true) => {}
287                        Ok(false) => {
288                            let _ = write_half.write_all(AUTH_REQUIRED_LINE.as_bytes()).await;
289                            let _ = write_half.flush().await;
290                            return;
291                        }
292                        Err(e) => {
293                            error!("TCP auth error for {peer}: {e}");
294                            return;
295                        }
296                    }
297                }
298                if let Err(e) = serve_line_conn(&mut reader, &mut write_half, kg, vs).await {
299                    error!("TCP connection {peer} error: {e}");
300                }
301            });
302        }
303    }
304
305    /// MCP Streamable HTTP transport (POST/GET `/mcp`, JSON or SSE responses).
306    pub async fn run_http(&self, addr: &str) -> Result<()> {
307        spawn_maintenance(self.kg.clone());
308        spawn_wal_flush(self.kg.clone(), self.config.wal_flush_ms);
309        crate::http::run(
310            addr,
311            self.graph(),
312            self.vs.clone(),
313            self.config.auth_token.clone(),
314            self.config.tls_cert.clone(),
315            self.config.tls_key.clone(),
316        )
317        .await
318    }
319}
320
321/// Spawn a background task that fsyncs committed WAL frames every
322/// `interval_ms` milliseconds via a non-blocking passive checkpoint, bounding
323/// the durability window in async mode. A zero interval disables the task.
324fn spawn_wal_flush(kg: Arc<GraphHandle>, interval_ms: u64) {
325    if interval_ms == 0 {
326        return;
327    }
328    tokio::spawn(async move {
329        let mut interval = tokio::time::interval(Duration::from_millis(interval_ms));
330        interval.tick().await; // skip immediate first tick
331        loop {
332            interval.tick().await;
333            let kg = kg.clone();
334            tokio::task::spawn_blocking(move || {
335                if let Err(e) = kg.checkpoint_passive() {
336                    tracing::warn!("WAL flush error: {e}");
337                }
338            })
339            .await
340            .ok();
341        }
342    });
343}
344
345/// Spawn a background task that runs periodic database maintenance every
346/// 5 minutes until the runtime shuts down.
347fn spawn_maintenance(kg: Arc<GraphHandle>) {
348    tokio::spawn(async move {
349        let mut interval = tokio::time::interval(Duration::from_secs(300));
350        interval.tick().await; // skip immediate first tick
351        loop {
352            interval.tick().await;
353            let kg = kg.clone();
354            tokio::task::spawn_blocking(move || {
355                if let Err(e) = kg.run_maintenance() {
356                    tracing::warn!("Maintenance error: {e}");
357                }
358            })
359            .await
360            .ok();
361        }
362    });
363}
364
365/// JSON-RPC error line returned to a TCP client that fails authentication.
366const AUTH_REQUIRED_LINE: &str = "{\"jsonrpc\":\"2.0\",\"error\":{\"code\":-32001,\
367\"message\":\"Authentication required: send the bearer token as the first line\"},\"id\":null}\n";
368
369/// Read the first line of a connection and compare it (constant-time) to the
370/// expected bearer token. Returns `Ok(false)` on EOF / oversized first line.
371async fn authenticate_line_conn<R>(reader: &mut R, expected: &str) -> Result<bool>
372where
373    R: AsyncBufReadExt + Unpin,
374{
375    let mut line = String::new();
376    match read_line_capped(reader, &mut line, MAX_REQUEST_BYTES)
377        .await
378        .map_err(MCSError::IoError)?
379    {
380        LineRead::Line => Ok(token_matches(&line, expected)),
381        _ => Ok(false),
382    }
383}
384
385/// Drive one line-framed connection (stdio or a single TCP socket): read
386/// newline-delimited JSON-RPC requests, write newline-delimited responses.
387/// Notifications produce no output. Returns when the peer closes the stream.
388/// The dispatch path (graph lock + optional fsync) is offloaded to
389/// [`tokio::task::spawn_blocking`] to keep the async reactor responsive (C3).
390async fn serve_line_conn<R, W>(
391    reader: &mut R,
392    writer: &mut W,
393    kg: Arc<GraphHandle>,
394    vs: Option<Arc<VectorStore>>,
395) -> Result<()>
396where
397    R: AsyncBufReadExt + Unpin,
398    W: AsyncWriteExt + Unpin,
399{
400    let mut line = String::with_capacity(1024);
401    let mut out = Vec::with_capacity(BUFFER_CAPACITY);
402
403    loop {
404        match read_line_capped(reader, &mut line, MAX_REQUEST_BYTES).await {
405            Ok(LineRead::Eof) => break,
406            Ok(LineRead::Line) => {
407                let line_copy = line.clone();
408                let kg_clone = Arc::clone(&kg);
409                let vs_clone = vs.clone();
410                let resp = tokio::task::spawn_blocking(move || {
411                    dispatch_line(&line_copy, &kg_clone, vs_clone.as_deref())
412                })
413                .await
414                .map_err(|join_err| {
415                    error!("dispatch task panicked: {join_err}");
416                    MCSError::IoError(std::io::Error::other("dispatch task panicked"))
417                })?;
418                if let Some(resp) = resp {
419                    out.clear();
420                    out.extend_from_slice(resp.as_bytes());
421                    out.extend_from_slice(NEWLINE);
422                    writer.write_all(&out).await.map_err(MCSError::IoError)?;
423                    writer.flush().await.map_err(MCSError::IoError)?;
424                }
425            }
426            Ok(LineRead::TooLong) => {
427                let err = MCSError::InvalidParams("Request exceeds maximum size of 16MB".into());
428                let response = JsonRpcResponse::error(None, err.error_code(), err.to_string());
429                out.clear();
430                serde_json::to_writer(&mut out, &response).map_err(MCSError::JsonError)?;
431                out.extend_from_slice(NEWLINE);
432                writer.write_all(&out).await.map_err(MCSError::IoError)?;
433                writer.flush().await.map_err(MCSError::IoError)?;
434                break;
435            }
436            Err(e) => {
437                error!("IO error: {}", e);
438                break;
439            }
440        }
441    }
442    Ok(())
443}
444
445fn process_request(
446    req: &JsonRpcRequest,
447    kg: &GraphHandle,
448    vs: Option<&VectorStore>,
449) -> Result<HandlerResult> {
450    match req.method.as_str() {
451        "initialize" => Ok(HandlerResult::Value(handle_initialize(req, vs.is_some()))),
452        "tools/list" => Ok(HandlerResult::Value(handle_tools_list(vs.is_some()))),
453        "tools/call" => handle_tools_call(req, kg, vs),
454        "ping" => Ok(HandlerResult::Value(Value::Null)),
455        method if method.starts_with("notifications/") => {
456            tracing::trace!("Received notification: {method}");
457            Ok(HandlerResult::Value(Value::Null))
458        }
459        _ => Err(MCSError::MethodNotFound(req.method.clone())),
460    }
461}
462
463/// MCP protocol revisions this server can speak, newest first (for `initialize`
464/// version negotiation).
465const SUPPORTED_PROTOCOL_VERSIONS: &[&str] =
466    &["2025-11-25", "2025-06-18", "2025-03-26", "2024-11-05"];
467/// Newest revision we implement; offered when the client requests an unknown one.
468const LATEST_PROTOCOL_VERSION: &str = "2025-11-25";
469
470/// `instructions` surfaced to the client and appended to the model's system prompt.
471const SERVER_INSTRUCTIONS: &str = "Knowledge-graph memory MCP server. Entity names are unique and \
472case-sensitive. Use `create_entities`/`create_relations` to build the graph, `add_observations` to \
473attach facts, and `search_nodes`/`open_nodes`/`read_graph` to retrieve. Prefer `upsert_entities` for \
474idempotent writes and `merge_entities` to collapse duplicates. Tool failures are returned with \
475`isError: true` rather than as protocol errors — read the message and retry.";
476
477/// Extra guidance appended to [`SERVER_INSTRUCTIONS`] when vector support is on.
478const VECTOR_INSTRUCTIONS: &str = " Vector search is enabled: use `vector_upsert_embedding` to \
479attach embeddings to entities, `vector_search_entities` for semantic search, and `hybrid_search` to \
480combine text + vector relevance.";
481
482fn handle_initialize(req: &JsonRpcRequest, vectors_enabled: bool) -> Value {
483    // Version negotiation: echo a supported requested revision, else offer latest.
484    let protocol_version = req
485        .params
486        .as_ref()
487        .and_then(|p| p.get("protocolVersion"))
488        .and_then(Value::as_str)
489        .filter(|v| SUPPORTED_PROTOCOL_VERSIONS.contains(v))
490        .unwrap_or(LATEST_PROTOCOL_VERSION);
491
492    let instructions = if vectors_enabled {
493        format!("{SERVER_INSTRUCTIONS}{VECTOR_INSTRUCTIONS}")
494    } else {
495        SERVER_INSTRUCTIONS.to_string()
496    };
497
498    json!({
499        "protocolVersion": protocol_version,
500        "capabilities": {
501            "tools": { "listChanged": false }
502        },
503        "serverInfo": {
504            "name": "mcp-memory",
505            "version": env!("CARGO_PKG_VERSION")
506        },
507        "instructions": instructions
508    })
509}
510
511/// Wrap a tool execution failure as an MCP `CallToolResult` with `isError: true`
512/// so the model sees the message and can self-correct, instead of receiving an
513/// opaque JSON-RPC protocol error. (Successful results are already content-
514/// wrapped by the action handlers.)
515#[inline]
516fn tool_error(message: &str) -> Value {
517    json!({
518        "content": [{ "type": "text", "text": message }],
519        "isError": true
520    })
521}
522
523/// Constant-time bearer-token check. Accepts the raw token or a `Bearer <token>`
524/// form; surrounding whitespace is trimmed.
525pub fn token_matches(presented: &str, expected: &str) -> bool {
526    use subtle::ConstantTimeEq;
527    let presented = presented.trim();
528    let presented = presented
529        .strip_prefix("Bearer ")
530        .unwrap_or(presented)
531        .trim();
532    presented.as_bytes().ct_eq(expected.as_bytes()).into()
533}
534
535/// The base knowledge-graph tools, parsed from `tools.json` at build time.
536fn base_tools() -> &'static Vec<Value> {
537    static BASE: std::sync::OnceLock<Vec<Value>> = std::sync::OnceLock::new();
538    BASE.get_or_init(|| {
539        serde_json::from_str(include_str!("../tools.json"))
540            .expect("tools.json is valid JSON compiled at build time")
541    })
542}
543
544/// The vector tools, parsed from `vector_tools.json` at build time.
545fn vector_tools() -> &'static Vec<Value> {
546    static VEC: std::sync::OnceLock<Vec<Value>> = std::sync::OnceLock::new();
547    VEC.get_or_init(|| {
548        serde_json::from_str(include_str!("../vector_tools.json"))
549            .expect("vector_tools.json is valid JSON compiled at build time")
550    })
551}
552
553/// The code-indexing tools, parsed from `code_tools.json` at build time.
554#[cfg(feature = "code")]
555fn code_tools() -> &'static Vec<Value> {
556    static CODE: std::sync::OnceLock<Vec<Value>> = std::sync::OnceLock::new();
557    CODE.get_or_init(|| {
558        serde_json::from_str(include_str!("../code_tools.json"))
559            .expect("code_tools.json is valid JSON compiled at build time")
560    })
561}
562
563/// `tools/list` response. Vector and code tools are appended only when their
564/// subsystems are enabled, so the server never advertises tools it cannot serve.
565fn handle_tools_list(vectors_enabled: bool) -> Value {
566    let mut all = base_tools().clone();
567    if vectors_enabled {
568        all.extend(vector_tools().iter().cloned());
569    }
570    #[cfg(feature = "code")]
571    if code_enabled() {
572        all.extend(code_tools().iter().cloned());
573    }
574    json!({ "tools": all })
575}
576
577/// `true` for the vector-specific tool names (`vector_*` plus `hybrid_search`).
578fn is_vector_tool_name(name: &str) -> bool {
579    matches!(
580        name,
581        "vector_upsert_embedding"
582            | "vector_search_entities"
583            | "vector_delete_embedding"
584            | "hybrid_search"
585            | "vector_refresh_graph_cache"
586            | "vector_store_stats"
587            | "vector_batch_upsert"
588            | "vector_get_embedding"
589            | "vector_search_by_entity"
590            | "vector_recommend"
591            | "vector_mmr_search"
592            | "vector_reindex"
593    )
594}
595
596/// Process-wide flag for the code-indexing subsystem, set once at server
597/// startup from `config.code_enabled`. Code tools carry no per-request state
598/// (unlike the vector store), so a global flag avoids threading a bool through
599/// every dispatch signature.
600#[cfg(feature = "code")]
601static CODE_ENABLED: std::sync::atomic::AtomicBool = std::sync::atomic::AtomicBool::new(false);
602
603#[cfg(feature = "code")]
604fn code_enabled() -> bool {
605    CODE_ENABLED.load(std::sync::atomic::Ordering::Relaxed)
606}
607#[cfg(not(feature = "code"))]
608const fn code_enabled() -> bool {
609    false
610}
611
612/// `true` for the tree-sitter code tool names.
613fn is_code_tool_name(name: &str) -> bool {
614    matches!(
615        name,
616        "code_index" | "code_outline" | "code_search" | "code_get_symbol"
617    )
618}
619
620fn handle_tools_call(
621    req: &JsonRpcRequest,
622    kg: &GraphHandle,
623    vs: Option<&VectorStore>,
624) -> Result<HandlerResult> {
625    let tool_name = req
626        .params
627        .as_ref()
628        .and_then(|p| p.get("name").and_then(|v| v.as_str()))
629        .ok_or_else(|| MCSError::InvalidParams("Missing 'name' parameter".into()))?;
630
631    let tool_args = req.params.as_ref().and_then(|p| p.get("arguments"));
632
633    if is_vector_tool_name(tool_name) {
634        let Some(vs) = vs else {
635            return Err(MCSError::MethodNotFound(format!(
636                "{tool_name} (vector support disabled; start the server with --vectors)"
637            )));
638        };
639        let result = match tool_name {
640            "vector_upsert_embedding" => {
641                vector_actions::handle_vector_upsert_embedding(vs, kg, tool_args)
642                    .map(HandlerResult::Value)
643            }
644            "vector_search_entities" => {
645                vector_actions::handle_vector_search_entities(vs, kg, tool_args)
646                    .map(HandlerResult::RawResult)
647            }
648            "vector_delete_embedding" => {
649                vector_actions::handle_vector_delete_embedding(vs, kg, tool_args)
650                    .map(HandlerResult::Value)
651            }
652            "hybrid_search" => {
653                vector_actions::handle_hybrid_search(vs, kg, tool_args).map(HandlerResult::RawResult)
654            }
655            "vector_refresh_graph_cache" => {
656                vector_actions::handle_refresh_graph_cache(vs, kg, tool_args)
657                    .map(HandlerResult::Value)
658            }
659            "vector_store_stats" => {
660                vector_actions::handle_vector_store_stats(vs, kg, tool_args)
661                    .map(HandlerResult::Value)
662            }
663            "vector_batch_upsert" => {
664                vector_actions::handle_vector_batch_upsert(vs, kg, tool_args)
665                    .map(HandlerResult::Value)
666            }
667            "vector_get_embedding" => {
668                vector_actions::handle_vector_get_embedding(vs, kg, tool_args)
669                    .map(HandlerResult::Value)
670            }
671            "vector_search_by_entity" => {
672                vector_actions::handle_vector_search_by_entity(vs, kg, tool_args)
673                    .map(HandlerResult::RawResult)
674            }
675            "vector_recommend" => {
676                vector_actions::handle_vector_recommend(vs, kg, tool_args)
677                    .map(HandlerResult::RawResult)
678            }
679            "vector_mmr_search" => {
680                vector_actions::handle_vector_mmr_search(vs, kg, tool_args)
681                    .map(HandlerResult::RawResult)
682            }
683            "vector_reindex" => {
684                vector_actions::handle_vector_reindex(vs, kg, tool_args).map(HandlerResult::Value)
685            }
686            other => Err(MCSError::MethodNotFound(other.to_string())),
687        };
688        return Ok(result.unwrap_or_else(|e| {
689            error!("Tool '{tool_name}' error: {e}");
690            HandlerResult::Value(tool_error(&e.to_string()))
691        }));
692    }
693
694    if is_code_tool_name(tool_name) {
695        if !code_enabled() {
696            return Err(MCSError::MethodNotFound(format!(
697                "{tool_name} (code indexing disabled; start the server with --code)"
698            )));
699        }
700        #[cfg(feature = "code")]
701        {
702            let result = match tool_name {
703                "code_index" => {
704                    code_actions::handle_code_index(kg, tool_args).map(HandlerResult::Value)
705                }
706                "code_outline" => {
707                    code_actions::handle_code_outline(kg, tool_args).map(HandlerResult::Value)
708                }
709                "code_search" => {
710                    code_actions::handle_code_search(kg, tool_args).map(HandlerResult::Value)
711                }
712                "code_get_symbol" => {
713                    code_actions::handle_code_get_symbol(kg, tool_args).map(HandlerResult::Value)
714                }
715                other => Err(MCSError::MethodNotFound(other.to_string())),
716            };
717            return Ok(result.unwrap_or_else(|e| {
718                error!("Tool '{tool_name}' error: {e}");
719                HandlerResult::Value(tool_error(&e.to_string()))
720            }));
721        }
722        #[cfg(not(feature = "code"))]
723        return Err(MCSError::MethodNotFound(format!(
724            "{tool_name} (built without the 'code' feature)"
725        )));
726    }
727
728    if !tools::tool_exists(tool_name) {
729        return Err(MCSError::MethodNotFound(tool_name.to_string()));
730    }
731
732    let result = match tool_name {
733        // Raw-result handlers (large payloads, avoid second serialization pass).
734        "read_graph" => memory::handle_read_graph(kg, tool_args).map(HandlerResult::RawResult),
735        "search_nodes" => memory::handle_search_nodes(kg, tool_args).map(HandlerResult::RawResult),
736        // Standard Value handlers.
737        "create_entities" => {
738            memory::handle_create_entities(kg, tool_args).map(HandlerResult::Value)
739        }
740        "create_relations" => {
741            memory::handle_create_relations(kg, tool_args).map(HandlerResult::Value)
742        }
743        "add_observations" => {
744            memory::handle_add_observations(kg, tool_args).map(HandlerResult::Value)
745        }
746        "delete_entities" => {
747            let r = memory::handle_delete_entities(kg, tool_args);
748            if r.is_ok()
749                && let Some(vs) = vs
750                && let Some(args) = tool_args.and_then(|a| a.get("entityNames")).and_then(|v| v.as_array())
751            {
752                let names: Vec<String> = args.iter().filter_map(|v| v.as_str().map(String::from)).collect();
753                vs.invalidate_entity_cache(&names);
754            }
755            r.map(HandlerResult::Value)
756        }
757        "delete_observations" => {
758            memory::handle_delete_observations(kg, tool_args).map(HandlerResult::Value)
759        }
760        "delete_relations" => {
761            memory::handle_delete_relations(kg, tool_args).map(HandlerResult::Value)
762        }
763        "open_nodes" => memory::handle_open_nodes(kg, tool_args).map(HandlerResult::Value),
764        "get_entity" => memory::handle_get_entity(kg, tool_args).map(HandlerResult::Value),
765        "graph_stats" => memory::handle_graph_stats(kg).map(HandlerResult::Value),
766        "search_relations" => {
767            memory::handle_search_relations(kg, tool_args).map(HandlerResult::Value)
768        }
769        "find_path" => memory::handle_find_path(kg, tool_args).map(HandlerResult::Value),
770        "compact" => memory::handle_compact(kg).map(HandlerResult::Value),
771        "get_neighbors" => memory::handle_get_neighbors(kg, tool_args).map(HandlerResult::Value),
772        "describe_entity" => {
773            memory::handle_describe_entity(kg, tool_args).map(HandlerResult::Value)
774        }
775        "list_entity_types" => memory::handle_list_entity_types(kg).map(HandlerResult::Value),
776        "list_relation_types" => memory::handle_list_relation_types(kg).map(HandlerResult::Value),
777        "upsert_entities" => {
778            memory::handle_upsert_entities(kg, tool_args).map(HandlerResult::Value)
779        }
780        "export_graph" => memory::handle_export_graph(kg, tool_args).map(HandlerResult::Value),
781        "merge_entities" => memory::handle_merge_entities(kg, tool_args).map(HandlerResult::Value),
782        "extract_subgraph" => {
783            memory::handle_extract_subgraph(kg, tool_args).map(HandlerResult::Value)
784        }
785        "batch_get_entities" => {
786            memory::handle_batch_get_entities(kg, tool_args).map(HandlerResult::Value)
787        }
788        "find_all_paths" => memory::handle_find_all_paths(kg, tool_args).map(HandlerResult::Value),
789        "entity_exists" => memory::handle_entity_exists(kg, tool_args).map(HandlerResult::Value),
790        "degree" => memory::handle_degree(kg, tool_args).map(HandlerResult::Value),
791        tool => Err(MCSError::MethodNotFound(tool.to_string())),
792    };
793
794    // Tool execution failures become isError CallToolResults so the model can
795    // read the message and self-correct, instead of an opaque protocol error.
796    Ok(result.unwrap_or_else(|e| {
797        error!("Tool '{tool_name}' error: {e}");
798        HandlerResult::Value(tool_error(&e.to_string()))
799    }))
800}
801
802