Skip to main content

mcp_memory/
server.rs

1use serde_json::{Value, json};
2use std::num::NonZeroUsize;
3use std::path::Path;
4use std::sync::Arc;
5use std::time::Duration;
6
7use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
8use tokio::net::TcpListener;
9use tokio::sync::Semaphore;
10use tracing::{error, info};
11
12use crate::actions::memory;
13use crate::config::Config;
14use crate::errors::{MCSError, Result};
15use crate::kg::GraphHandle;
16use crate::protocol::{JsonRpcRequest, JsonRpcResponse};
17use crate::tools;
18use crate::vector_actions;
19use crate::vector_store::{VectorConfig, VectorStore};
20
21/// Outcome of processing a request: either a pre-escaped JSON Value (small
22/// payloads) or a pre-serialized JSON *string* of the `result` field (avoids
23/// a second serialization pass for large payloads such as `read_graph`).
24enum HandlerResult {
25    Value(Value),
26    RawResult(String),
27}
28
29const BUFFER_CAPACITY: usize = 65536;
30const NEWLINE: &[u8] = b"\n";
31/// Maximum size of a single inbound JSON-RPC message (shared by all transports).
32pub const MAX_REQUEST_BYTES: usize = 16 * 1024 * 1024;
33/// Maximum number of concurrent TCP connections (C4).
34const MAX_TCP_CONNECTIONS: usize = 128;
35
36enum LineRead {
37    Line,
38    Eof,
39    TooLong,
40}
41
42async fn read_line_capped<R>(
43    reader: &mut R,
44    out: &mut String,
45    max: usize,
46) -> std::io::Result<LineRead>
47where
48    R: AsyncBufReadExt + Unpin,
49{
50    out.clear();
51    let mut buf: Vec<u8> = Vec::new();
52    loop {
53        let available = reader.fill_buf().await?;
54        if available.is_empty() {
55            if buf.is_empty() {
56                return Ok(LineRead::Eof);
57            }
58            // Move `buf` into the String — no copy. `buf` is not used afterward.
59            *out = String::from_utf8(buf).map_err(|_| {
60                std::io::Error::new(std::io::ErrorKind::InvalidData, "Non-UTF-8 input")
61            })?;
62            return Ok(LineRead::Line);
63        }
64        match available.iter().position(|&b| b == b'\n') {
65            Some(i) => {
66                if buf.len() + i + 1 > max {
67                    reader.consume(i + 1);
68                    return Ok(LineRead::TooLong);
69                }
70                buf.extend_from_slice(&available[..=i]);
71                reader.consume(i + 1);
72                *out = String::from_utf8(buf).map_err(|_| {
73                    std::io::Error::new(std::io::ErrorKind::InvalidData, "Non-UTF-8 input")
74                })?;
75                return Ok(LineRead::Line);
76            }
77            None => {
78                let take = available.len();
79                if buf.len() + take > max {
80                    reader.consume(take);
81                    return Ok(LineRead::TooLong);
82                }
83                buf.extend_from_slice(available);
84                reader.consume(take);
85            }
86        }
87    }
88}
89
90fn parse_error(msg: String) -> JsonRpcResponse {
91    let mcp_error = MCSError::ParseError(msg);
92    JsonRpcResponse::error(None, mcp_error.error_code(), mcp_error.to_string())
93}
94
95/// Dispatch one framed line (stdio / tcp). Returns the serialized response, or
96/// `None` for a notification. `vs` is `Some` only when vector support is enabled.
97pub fn dispatch_line(line: &str, kg: &GraphHandle, vs: Option<&VectorStore>) -> Option<String> {
98    let trimmed = line.trim();
99    if trimmed.is_empty() {
100        return Some(serde_json::to_string(&parse_error("Empty request".into())).unwrap());
101    }
102    let raw: Value = match serde_json::from_str(trimmed) {
103        Ok(v) => v,
104        Err(e) => return Some(serde_json::to_string(&parse_error(e.to_string())).unwrap()),
105    };
106    let req: JsonRpcRequest = match serde_json::from_value(raw) {
107        Ok(r) => r,
108        Err(e) => return Some(serde_json::to_string(&parse_error(e.to_string())).unwrap()),
109    };
110    req.id.as_ref()?;
111    match process_request(&req, kg, vs) {
112        Ok(HandlerResult::Value(result)) => {
113            let resp = JsonRpcResponse::success(req.id, result);
114            Some(serde_json::to_string(&resp).unwrap())
115        }
116        Ok(HandlerResult::RawResult(result_json)) => {
117            let id_json = serde_json::to_string(&req.id).unwrap();
118            let mut out = String::with_capacity(64 + id_json.len() + result_json.len());
119            out.push_str(r#"{"jsonrpc":"2.0","id":"#);
120            out.push_str(&id_json);
121            out.push_str(",\"result\":");
122            out.push_str(&result_json);
123            out.push('}');
124            Some(out)
125        }
126        Err(e) => {
127            let resp = JsonRpcResponse::error(req.id, e.error_code(), e.to_string());
128            Some(serde_json::to_string(&resp).unwrap())
129        }
130    }
131}
132
133/// Dispatch a Streamable-HTTP POST body, which may be a single JSON-RPC message
134/// or a batch array. `Ok(None)` means the body held only notifications (HTTP
135/// 202, empty body); `Err` means the body was not valid JSON.
136pub fn dispatch_http_body(
137    body: &str,
138    kg: &GraphHandle,
139    vs: Option<&VectorStore>,
140) -> std::result::Result<Option<Value>, String> {
141    let value: Value = serde_json::from_str(body.trim()).map_err(|e| e.to_string())?;
142    match value {
143        Value::Array(items) => {
144            // Batches are rare and never huge — keep Value path for simplicity.
145            let responses: Vec<Value> = items
146                .into_iter()
147                .filter_map(|v| process_value_http(v, kg, vs))
148                .collect();
149            Ok((!responses.is_empty()).then_some(Value::Array(responses)))
150        }
151        other => Ok(process_value_http(other, kg, vs)),
152    }
153}
154
155/// Process one JSON-RPC message for the HTTP transport, converting any
156/// `RawResult` back into a `Value` (acceptable since HTTP payloads are typically
157/// much smaller in this context). `None` means the message was a notification.
158fn process_value_http(value: Value, kg: &GraphHandle, vs: Option<&VectorStore>) -> Option<Value> {
159    let req: JsonRpcRequest = match serde_json::from_value(value) {
160        Ok(r) => r,
161        Err(e) => return Some(to_value(parse_error(e.to_string()))),
162    };
163    req.id.as_ref()?;
164    match process_request(&req, kg, vs) {
165        Ok(HandlerResult::Value(result)) => {
166            Some(to_value(JsonRpcResponse::success(req.id, result)))
167        }
168        Ok(HandlerResult::RawResult(result_json)) => {
169            // Parse the pre-serialized result back into a Value for HTTP delivery.
170            // This is a small extra cost for the HTTP transport; the stdio/TCP
171            // path (dispatch_line) avoids it entirely.
172            let result_val: Value = serde_json::from_str(&result_json).unwrap_or(Value::Null);
173            Some(to_value(JsonRpcResponse::success(req.id, result_val)))
174        }
175        Err(e) => Some(to_value(JsonRpcResponse::error(
176            req.id,
177            e.error_code(),
178            e.to_string(),
179        ))),
180    }
181}
182
183#[inline]
184fn to_value(resp: JsonRpcResponse) -> Value {
185    serde_json::to_value(resp).expect("JsonRpcResponse always serializes")
186}
187
188pub struct MCPServer {
189    config: Arc<Config>,
190    kg: Arc<GraphHandle>,
191    /// `Some` when vector support is enabled (`--vectors`); drives the extra
192    /// `vector_*` / `hybrid_search` tools. `None` for a pure knowledge-graph server.
193    vs: Option<Arc<VectorStore>>,
194}
195
196impl MCPServer {
197    /// Build a server. The vector subsystem (usearch index + petgraph mirror) is
198    /// only constructed when `config.vectors_enabled` is set; `vec_config` is
199    /// ignored otherwise.
200    pub fn new(config: Config, vec_config: VectorConfig) -> Result<Self> {
201        let path = Path::new(&config.memory_file_path);
202        let lru_cache = NonZeroUsize::new(config.lru_cache_size).unwrap_or_else(|| {
203            NonZeroUsize::new(10000).expect("10000 > 0")
204        });
205        let kg = GraphHandle::new(
206            path,
207            config.durability,
208            config.sqlite_tuning(),
209            lru_cache,
210            config.read_pool_size,
211        )?;
212
213        let vs = if config.vectors_enabled {
214            Some(Arc::new(VectorStore::with_config(path, &vec_config)?))
215        } else {
216            None
217        };
218
219        Ok(Self {
220            config: Arc::new(config),
221            kg: Arc::new(kg),
222            vs,
223        })
224    }
225
226    /// Convenience constructor for a pure knowledge-graph server (no vectors).
227    pub fn new_kg(config: Config) -> Result<Self> {
228        let mut config = config;
229        config.vectors_enabled = false;
230        Self::new(config, VectorConfig::new(0))
231    }
232
233    /// Expose the shared graph handle (used to drive the HTTP transport).
234    pub fn graph(&self) -> Arc<GraphHandle> {
235        Arc::clone(&self.kg)
236    }
237
238    /// The shared vector store, if vector support is enabled.
239    pub fn vector_store(&self) -> Option<Arc<VectorStore>> {
240        self.vs.clone()
241    }
242
243    /// stdio transport: newline-delimited JSON-RPC over stdin/stdout.
244    pub async fn run_stdio(&self) -> Result<()> {
245        spawn_maintenance(self.kg.clone());
246        spawn_wal_flush(self.kg.clone(), self.config.wal_flush_ms);
247        let stdin = tokio::io::stdin();
248        let mut reader = BufReader::with_capacity(BUFFER_CAPACITY, stdin);
249        let mut stdout = tokio::io::stdout();
250        serve_line_conn(&mut reader, &mut stdout, Arc::clone(&self.kg), self.vs.clone()).await
251    }
252
253    /// TCP transport: each accepted connection speaks newline-delimited
254    /// JSON-RPC, exactly like stdio. Connections are served concurrently (up to
255    /// [`MAX_TCP_CONNECTIONS`]) and share the one graph behind its mutex.
256    pub async fn run_tcp(&self, addr: &str) -> Result<()> {
257        spawn_maintenance(self.kg.clone());
258        spawn_wal_flush(self.kg.clone(), self.config.wal_flush_ms);
259        let listener = TcpListener::bind(addr).await.map_err(MCSError::IoError)?;
260        let semaphore = Arc::new(Semaphore::new(MAX_TCP_CONNECTIONS));
261        let auth_token = self.config.auth_token.clone();
262        info!(
263            "Listening for TCP MCP connections on {addr} (max {MAX_TCP_CONNECTIONS}, auth {}, vectors {})",
264            if auth_token.is_some() { "on" } else { "off" },
265            if self.vs.is_some() { "on" } else { "off" }
266        );
267        loop {
268            let permit = Arc::clone(&semaphore).acquire_owned().await;
269            let (socket, peer) = listener.accept().await.map_err(MCSError::IoError)?;
270            let kg = Arc::clone(&self.kg);
271            let vs = self.vs.clone();
272            let auth_token = auth_token.clone();
273            tokio::spawn(async move {
274                let _permit = permit; // held for the connection lifetime
275                let (read_half, mut write_half) = socket.into_split();
276                let mut reader = BufReader::with_capacity(BUFFER_CAPACITY, read_half);
277                // When a token is configured, the client must send it as the
278                // first line before any JSON-RPC traffic.
279                if let Some(ref expected) = auth_token {
280                    match authenticate_line_conn(&mut reader, expected).await {
281                        Ok(true) => {}
282                        Ok(false) => {
283                            let _ = write_half.write_all(AUTH_REQUIRED_LINE.as_bytes()).await;
284                            let _ = write_half.flush().await;
285                            return;
286                        }
287                        Err(e) => {
288                            error!("TCP auth error for {peer}: {e}");
289                            return;
290                        }
291                    }
292                }
293                if let Err(e) = serve_line_conn(&mut reader, &mut write_half, kg, vs).await {
294                    error!("TCP connection {peer} error: {e}");
295                }
296            });
297        }
298    }
299
300    /// MCP Streamable HTTP transport (POST/GET `/mcp`, JSON or SSE responses).
301    pub async fn run_http(&self, addr: &str) -> Result<()> {
302        spawn_maintenance(self.kg.clone());
303        spawn_wal_flush(self.kg.clone(), self.config.wal_flush_ms);
304        crate::http::run(
305            addr,
306            self.graph(),
307            self.vs.clone(),
308            self.config.auth_token.clone(),
309            self.config.tls_cert.clone(),
310            self.config.tls_key.clone(),
311        )
312        .await
313    }
314}
315
316/// Spawn a background task that fsyncs committed WAL frames every
317/// `interval_ms` milliseconds via a non-blocking passive checkpoint, bounding
318/// the durability window in async mode. A zero interval disables the task.
319fn spawn_wal_flush(kg: Arc<GraphHandle>, interval_ms: u64) {
320    if interval_ms == 0 {
321        return;
322    }
323    tokio::spawn(async move {
324        let mut interval = tokio::time::interval(Duration::from_millis(interval_ms));
325        interval.tick().await; // skip immediate first tick
326        loop {
327            interval.tick().await;
328            let kg = kg.clone();
329            tokio::task::spawn_blocking(move || {
330                if let Err(e) = kg.checkpoint_passive() {
331                    tracing::warn!("WAL flush error: {e}");
332                }
333            })
334            .await
335            .ok();
336        }
337    });
338}
339
340/// Spawn a background task that runs periodic database maintenance every
341/// 5 minutes until the runtime shuts down.
342fn spawn_maintenance(kg: Arc<GraphHandle>) {
343    tokio::spawn(async move {
344        let mut interval = tokio::time::interval(Duration::from_secs(300));
345        interval.tick().await; // skip immediate first tick
346        loop {
347            interval.tick().await;
348            let kg = kg.clone();
349            tokio::task::spawn_blocking(move || {
350                if let Err(e) = kg.run_maintenance() {
351                    tracing::warn!("Maintenance error: {e}");
352                }
353            })
354            .await
355            .ok();
356        }
357    });
358}
359
360/// JSON-RPC error line returned to a TCP client that fails authentication.
361const AUTH_REQUIRED_LINE: &str = "{\"jsonrpc\":\"2.0\",\"error\":{\"code\":-32001,\
362\"message\":\"Authentication required: send the bearer token as the first line\"},\"id\":null}\n";
363
364/// Read the first line of a connection and compare it (constant-time) to the
365/// expected bearer token. Returns `Ok(false)` on EOF / oversized first line.
366async fn authenticate_line_conn<R>(reader: &mut R, expected: &str) -> Result<bool>
367where
368    R: AsyncBufReadExt + Unpin,
369{
370    let mut line = String::new();
371    match read_line_capped(reader, &mut line, MAX_REQUEST_BYTES)
372        .await
373        .map_err(MCSError::IoError)?
374    {
375        LineRead::Line => Ok(token_matches(&line, expected)),
376        _ => Ok(false),
377    }
378}
379
380/// Drive one line-framed connection (stdio or a single TCP socket): read
381/// newline-delimited JSON-RPC requests, write newline-delimited responses.
382/// Notifications produce no output. Returns when the peer closes the stream.
383/// The dispatch path (graph lock + optional fsync) is offloaded to
384/// [`tokio::task::spawn_blocking`] to keep the async reactor responsive (C3).
385async fn serve_line_conn<R, W>(
386    reader: &mut R,
387    writer: &mut W,
388    kg: Arc<GraphHandle>,
389    vs: Option<Arc<VectorStore>>,
390) -> Result<()>
391where
392    R: AsyncBufReadExt + Unpin,
393    W: AsyncWriteExt + Unpin,
394{
395    let mut line = String::with_capacity(1024);
396    let mut out = Vec::with_capacity(BUFFER_CAPACITY);
397
398    loop {
399        match read_line_capped(reader, &mut line, MAX_REQUEST_BYTES).await {
400            Ok(LineRead::Eof) => break,
401            Ok(LineRead::Line) => {
402                let line_copy = line.clone();
403                let kg_clone = Arc::clone(&kg);
404                let vs_clone = vs.clone();
405                let resp = tokio::task::spawn_blocking(move || {
406                    dispatch_line(&line_copy, &kg_clone, vs_clone.as_deref())
407                })
408                .await
409                .map_err(|join_err| {
410                    error!("dispatch task panicked: {join_err}");
411                    MCSError::IoError(std::io::Error::other("dispatch task panicked"))
412                })?;
413                if let Some(resp) = resp {
414                    out.clear();
415                    out.extend_from_slice(resp.as_bytes());
416                    out.extend_from_slice(NEWLINE);
417                    writer.write_all(&out).await.map_err(MCSError::IoError)?;
418                    writer.flush().await.map_err(MCSError::IoError)?;
419                }
420            }
421            Ok(LineRead::TooLong) => {
422                let err = MCSError::InvalidParams("Request exceeds maximum size of 16MB".into());
423                let response = JsonRpcResponse::error(None, err.error_code(), err.to_string());
424                out.clear();
425                serde_json::to_writer(&mut out, &response).map_err(MCSError::JsonError)?;
426                out.extend_from_slice(NEWLINE);
427                writer.write_all(&out).await.map_err(MCSError::IoError)?;
428                writer.flush().await.map_err(MCSError::IoError)?;
429                break;
430            }
431            Err(e) => {
432                error!("IO error: {}", e);
433                break;
434            }
435        }
436    }
437    Ok(())
438}
439
440fn process_request(
441    req: &JsonRpcRequest,
442    kg: &GraphHandle,
443    vs: Option<&VectorStore>,
444) -> Result<HandlerResult> {
445    match req.method.as_str() {
446        "initialize" => Ok(HandlerResult::Value(handle_initialize(req, vs.is_some()))),
447        "tools/list" => Ok(HandlerResult::Value(handle_tools_list(vs.is_some()))),
448        "tools/call" => handle_tools_call(req, kg, vs),
449        "ping" => Ok(HandlerResult::Value(Value::Null)),
450        method if method.starts_with("notifications/") => {
451            tracing::trace!("Received notification: {method}");
452            Ok(HandlerResult::Value(Value::Null))
453        }
454        _ => Err(MCSError::MethodNotFound(req.method.clone())),
455    }
456}
457
458/// MCP protocol revisions this server can speak, newest first (for `initialize`
459/// version negotiation).
460const SUPPORTED_PROTOCOL_VERSIONS: &[&str] =
461    &["2025-11-25", "2025-06-18", "2025-03-26", "2024-11-05"];
462/// Newest revision we implement; offered when the client requests an unknown one.
463const LATEST_PROTOCOL_VERSION: &str = "2025-11-25";
464
465/// `instructions` surfaced to the client and appended to the model's system prompt.
466const SERVER_INSTRUCTIONS: &str = "Knowledge-graph memory MCP server. Entity names are unique and \
467case-sensitive. Use `create_entities`/`create_relations` to build the graph, `add_observations` to \
468attach facts, and `search_nodes`/`open_nodes`/`read_graph` to retrieve. Prefer `upsert_entities` for \
469idempotent writes and `merge_entities` to collapse duplicates. Tool failures are returned with \
470`isError: true` rather than as protocol errors — read the message and retry.";
471
472/// Extra guidance appended to [`SERVER_INSTRUCTIONS`] when vector support is on.
473const VECTOR_INSTRUCTIONS: &str = " Vector search is enabled: use `vector_upsert_embedding` to \
474attach embeddings to entities, `vector_search_entities` for semantic search, and `hybrid_search` to \
475combine text + vector relevance.";
476
477fn handle_initialize(req: &JsonRpcRequest, vectors_enabled: bool) -> Value {
478    // Version negotiation: echo a supported requested revision, else offer latest.
479    let protocol_version = req
480        .params
481        .as_ref()
482        .and_then(|p| p.get("protocolVersion"))
483        .and_then(Value::as_str)
484        .filter(|v| SUPPORTED_PROTOCOL_VERSIONS.contains(v))
485        .unwrap_or(LATEST_PROTOCOL_VERSION);
486
487    let instructions = if vectors_enabled {
488        format!("{SERVER_INSTRUCTIONS}{VECTOR_INSTRUCTIONS}")
489    } else {
490        SERVER_INSTRUCTIONS.to_string()
491    };
492
493    json!({
494        "protocolVersion": protocol_version,
495        "capabilities": {
496            "tools": { "listChanged": false }
497        },
498        "serverInfo": {
499            "name": "mcp-memory",
500            "version": env!("CARGO_PKG_VERSION")
501        },
502        "instructions": instructions
503    })
504}
505
506/// Wrap a tool execution failure as an MCP `CallToolResult` with `isError: true`
507/// so the model sees the message and can self-correct, instead of receiving an
508/// opaque JSON-RPC protocol error. (Successful results are already content-
509/// wrapped by the action handlers.)
510#[inline]
511fn tool_error(message: &str) -> Value {
512    json!({
513        "content": [{ "type": "text", "text": message }],
514        "isError": true
515    })
516}
517
518/// Constant-time bearer-token check. Accepts the raw token or a `Bearer <token>`
519/// form; surrounding whitespace is trimmed.
520pub fn token_matches(presented: &str, expected: &str) -> bool {
521    use subtle::ConstantTimeEq;
522    let presented = presented.trim();
523    let presented = presented
524        .strip_prefix("Bearer ")
525        .unwrap_or(presented)
526        .trim();
527    presented.as_bytes().ct_eq(expected.as_bytes()).into()
528}
529
530/// The base knowledge-graph tools, parsed from `tools.json` at build time.
531fn base_tools() -> &'static Vec<Value> {
532    static BASE: std::sync::OnceLock<Vec<Value>> = std::sync::OnceLock::new();
533    BASE.get_or_init(|| {
534        serde_json::from_str(include_str!("../tools.json"))
535            .expect("tools.json is valid JSON compiled at build time")
536    })
537}
538
539/// `tools/list` response. The vector tools are appended only when vector support
540/// is enabled, so a pure-KG server never advertises tools it cannot serve.
541fn handle_tools_list(vectors_enabled: bool) -> Value {
542    static KG_ONLY: std::sync::OnceLock<Value> = std::sync::OnceLock::new();
543    static WITH_VECTORS: std::sync::OnceLock<Value> = std::sync::OnceLock::new();
544
545    if vectors_enabled {
546        WITH_VECTORS
547            .get_or_init(|| {
548                let mut all = base_tools().clone();
549                let vec_tools: Vec<Value> =
550                    serde_json::from_str(include_str!("../vector_tools.json"))
551                        .expect("vector_tools.json is valid JSON compiled at build time");
552                all.extend(vec_tools);
553                json!({ "tools": all })
554            })
555            .clone()
556    } else {
557        KG_ONLY
558            .get_or_init(|| json!({ "tools": base_tools().clone() }))
559            .clone()
560    }
561}
562
563/// `true` for the vector-specific tool names (`vector_*` plus `hybrid_search`).
564fn is_vector_tool_name(name: &str) -> bool {
565    matches!(
566        name,
567        "vector_upsert_embedding"
568            | "vector_search_entities"
569            | "vector_delete_embedding"
570            | "hybrid_search"
571            | "vector_refresh_graph_cache"
572            | "vector_store_stats"
573            | "vector_batch_upsert"
574            | "vector_get_embedding"
575            | "vector_search_by_entity"
576            | "vector_recommend"
577            | "vector_mmr_search"
578            | "vector_reindex"
579    )
580}
581
582fn handle_tools_call(
583    req: &JsonRpcRequest,
584    kg: &GraphHandle,
585    vs: Option<&VectorStore>,
586) -> Result<HandlerResult> {
587    let tool_name = req
588        .params
589        .as_ref()
590        .and_then(|p| p.get("name").and_then(|v| v.as_str()))
591        .ok_or_else(|| MCSError::InvalidParams("Missing 'name' parameter".into()))?;
592
593    let tool_args = req.params.as_ref().and_then(|p| p.get("arguments"));
594
595    if is_vector_tool_name(tool_name) {
596        let Some(vs) = vs else {
597            return Err(MCSError::MethodNotFound(format!(
598                "{tool_name} (vector support disabled; start the server with --vectors)"
599            )));
600        };
601        let result = match tool_name {
602            "vector_upsert_embedding" => {
603                vector_actions::handle_vector_upsert_embedding(vs, kg, tool_args)
604                    .map(HandlerResult::Value)
605            }
606            "vector_search_entities" => {
607                vector_actions::handle_vector_search_entities(vs, kg, tool_args)
608                    .map(HandlerResult::RawResult)
609            }
610            "vector_delete_embedding" => {
611                vector_actions::handle_vector_delete_embedding(vs, kg, tool_args)
612                    .map(HandlerResult::Value)
613            }
614            "hybrid_search" => {
615                vector_actions::handle_hybrid_search(vs, kg, tool_args).map(HandlerResult::RawResult)
616            }
617            "vector_refresh_graph_cache" => {
618                vector_actions::handle_refresh_graph_cache(vs, kg, tool_args)
619                    .map(HandlerResult::Value)
620            }
621            "vector_store_stats" => {
622                vector_actions::handle_vector_store_stats(vs, kg, tool_args)
623                    .map(HandlerResult::Value)
624            }
625            "vector_batch_upsert" => {
626                vector_actions::handle_vector_batch_upsert(vs, kg, tool_args)
627                    .map(HandlerResult::Value)
628            }
629            "vector_get_embedding" => {
630                vector_actions::handle_vector_get_embedding(vs, kg, tool_args)
631                    .map(HandlerResult::Value)
632            }
633            "vector_search_by_entity" => {
634                vector_actions::handle_vector_search_by_entity(vs, kg, tool_args)
635                    .map(HandlerResult::RawResult)
636            }
637            "vector_recommend" => {
638                vector_actions::handle_vector_recommend(vs, kg, tool_args)
639                    .map(HandlerResult::RawResult)
640            }
641            "vector_mmr_search" => {
642                vector_actions::handle_vector_mmr_search(vs, kg, tool_args)
643                    .map(HandlerResult::RawResult)
644            }
645            "vector_reindex" => {
646                vector_actions::handle_vector_reindex(vs, kg, tool_args).map(HandlerResult::Value)
647            }
648            other => Err(MCSError::MethodNotFound(other.to_string())),
649        };
650        return Ok(result.unwrap_or_else(|e| {
651            error!("Tool '{tool_name}' error: {e}");
652            HandlerResult::Value(tool_error(&e.to_string()))
653        }));
654    }
655
656    if !tools::tool_exists(tool_name) {
657        return Err(MCSError::MethodNotFound(tool_name.to_string()));
658    }
659
660    let result = match tool_name {
661        // Raw-result handlers (large payloads, avoid second serialization pass).
662        "read_graph" => memory::handle_read_graph(kg, tool_args).map(HandlerResult::RawResult),
663        "search_nodes" => memory::handle_search_nodes(kg, tool_args).map(HandlerResult::RawResult),
664        // Standard Value handlers.
665        "create_entities" => {
666            memory::handle_create_entities(kg, tool_args).map(HandlerResult::Value)
667        }
668        "create_relations" => {
669            memory::handle_create_relations(kg, tool_args).map(HandlerResult::Value)
670        }
671        "add_observations" => {
672            memory::handle_add_observations(kg, tool_args).map(HandlerResult::Value)
673        }
674        "delete_entities" => {
675            let r = memory::handle_delete_entities(kg, tool_args);
676            if r.is_ok()
677                && let Some(vs) = vs
678                && let Some(args) = tool_args.and_then(|a| a.get("entityNames")).and_then(|v| v.as_array())
679            {
680                let names: Vec<String> = args.iter().filter_map(|v| v.as_str().map(String::from)).collect();
681                vs.invalidate_entity_cache(&names);
682            }
683            r.map(HandlerResult::Value)
684        }
685        "delete_observations" => {
686            memory::handle_delete_observations(kg, tool_args).map(HandlerResult::Value)
687        }
688        "delete_relations" => {
689            memory::handle_delete_relations(kg, tool_args).map(HandlerResult::Value)
690        }
691        "open_nodes" => memory::handle_open_nodes(kg, tool_args).map(HandlerResult::Value),
692        "get_entity" => memory::handle_get_entity(kg, tool_args).map(HandlerResult::Value),
693        "graph_stats" => memory::handle_graph_stats(kg).map(HandlerResult::Value),
694        "search_relations" => {
695            memory::handle_search_relations(kg, tool_args).map(HandlerResult::Value)
696        }
697        "find_path" => memory::handle_find_path(kg, tool_args).map(HandlerResult::Value),
698        "compact" => memory::handle_compact(kg).map(HandlerResult::Value),
699        "get_neighbors" => memory::handle_get_neighbors(kg, tool_args).map(HandlerResult::Value),
700        "describe_entity" => {
701            memory::handle_describe_entity(kg, tool_args).map(HandlerResult::Value)
702        }
703        "list_entity_types" => memory::handle_list_entity_types(kg).map(HandlerResult::Value),
704        "list_relation_types" => memory::handle_list_relation_types(kg).map(HandlerResult::Value),
705        "upsert_entities" => {
706            memory::handle_upsert_entities(kg, tool_args).map(HandlerResult::Value)
707        }
708        "export_graph" => memory::handle_export_graph(kg, tool_args).map(HandlerResult::Value),
709        "merge_entities" => memory::handle_merge_entities(kg, tool_args).map(HandlerResult::Value),
710        "extract_subgraph" => {
711            memory::handle_extract_subgraph(kg, tool_args).map(HandlerResult::Value)
712        }
713        "batch_get_entities" => {
714            memory::handle_batch_get_entities(kg, tool_args).map(HandlerResult::Value)
715        }
716        "find_all_paths" => memory::handle_find_all_paths(kg, tool_args).map(HandlerResult::Value),
717        "entity_exists" => memory::handle_entity_exists(kg, tool_args).map(HandlerResult::Value),
718        "degree" => memory::handle_degree(kg, tool_args).map(HandlerResult::Value),
719        tool => Err(MCSError::MethodNotFound(tool.to_string())),
720    };
721
722    // Tool execution failures become isError CallToolResults so the model can
723    // read the message and self-correct, instead of an opaque protocol error.
724    Ok(result.unwrap_or_else(|e| {
725        error!("Tool '{tool_name}' error: {e}");
726        HandlerResult::Value(tool_error(&e.to_string()))
727    }))
728}
729
730