Skip to main content

mcp_memory/
server.rs

1use serde_json::{Value, json};
2use std::num::NonZeroUsize;
3use std::path::Path;
4#[cfg(feature = "code")]
5use std::path::PathBuf;
6use std::sync::Arc;
7use std::time::Duration;
8
9use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
10use tokio::net::TcpListener;
11use tokio::sync::Semaphore;
12use tracing::{error, info};
13
14#[cfg(feature = "code")]
15use crate::actions::code as code_actions;
16use crate::actions::memory;
17use crate::config::Config;
18use crate::errors::{MCSError, Result};
19use crate::kg::GraphHandle;
20use crate::protocol::{JsonRpcRequest, JsonRpcResponse};
21use crate::tools;
22use crate::vector_actions;
23use crate::vector_store::{VectorConfig, VectorStore};
24
25/// Outcome of processing a request: either a pre-escaped JSON Value (small
26/// payloads) or a pre-serialized JSON *string* of the `result` field (avoids
27/// a second serialization pass for large payloads such as `read_graph`).
28enum HandlerResult {
29    Value(Value),
30    RawResult(String),
31}
32
33const BUFFER_CAPACITY: usize = 65536;
34const NEWLINE: &[u8] = b"\n";
35/// Maximum size of a single inbound JSON-RPC message (shared by all transports).
36pub const MAX_REQUEST_BYTES: usize = 16 * 1024 * 1024;
37/// Maximum number of concurrent TCP connections (C4).
38const MAX_TCP_CONNECTIONS: usize = 128;
39
40enum LineRead {
41    Line,
42    Eof,
43    TooLong,
44}
45
46async fn read_line_capped<R>(
47    reader: &mut R,
48    out: &mut String,
49    max: usize,
50) -> std::io::Result<LineRead>
51where
52    R: AsyncBufReadExt + Unpin,
53{
54    out.clear();
55    let mut buf: Vec<u8> = Vec::new();
56    loop {
57        let available = reader.fill_buf().await?;
58        if available.is_empty() {
59            if buf.is_empty() {
60                return Ok(LineRead::Eof);
61            }
62            // Move `buf` into the String — no copy. `buf` is not used afterward.
63            *out = String::from_utf8(buf).map_err(|_| {
64                std::io::Error::new(std::io::ErrorKind::InvalidData, "Non-UTF-8 input")
65            })?;
66            return Ok(LineRead::Line);
67        }
68        match available.iter().position(|&b| b == b'\n') {
69            Some(i) => {
70                if buf.len() + i + 1 > max {
71                    reader.consume(i + 1);
72                    return Ok(LineRead::TooLong);
73                }
74                buf.extend_from_slice(&available[..=i]);
75                reader.consume(i + 1);
76                *out = String::from_utf8(buf).map_err(|_| {
77                    std::io::Error::new(std::io::ErrorKind::InvalidData, "Non-UTF-8 input")
78                })?;
79                return Ok(LineRead::Line);
80            }
81            None => {
82                let take = available.len();
83                if buf.len() + take > max {
84                    reader.consume(take);
85                    return Ok(LineRead::TooLong);
86                }
87                buf.extend_from_slice(available);
88                reader.consume(take);
89            }
90        }
91    }
92}
93
94fn parse_error(msg: String) -> JsonRpcResponse {
95    let mcp_error = MCSError::ParseError(msg);
96    JsonRpcResponse::error(None, mcp_error.error_code(), mcp_error.to_string())
97}
98
99/// Dispatch one framed line (stdio / tcp). Returns the serialized response, or
100/// `None` for a notification. `vs` is `Some` only when vector support is enabled.
101pub fn dispatch_line(line: &str, kg: &GraphHandle, vs: Option<&VectorStore>) -> Option<String> {
102    let trimmed = line.trim();
103    if trimmed.is_empty() {
104        return Some(serde_json::to_string(&parse_error("Empty request".into())).unwrap());
105    }
106    let raw: Value = match serde_json::from_str(trimmed) {
107        Ok(v) => v,
108        Err(e) => return Some(serde_json::to_string(&parse_error(e.to_string())).unwrap()),
109    };
110    let req: JsonRpcRequest = match serde_json::from_value(raw) {
111        Ok(r) => r,
112        Err(e) => return Some(serde_json::to_string(&parse_error(e.to_string())).unwrap()),
113    };
114    req.id.as_ref()?;
115    match process_request(&req, kg, vs) {
116        Ok(HandlerResult::Value(result)) => {
117            let resp = JsonRpcResponse::success(req.id, result);
118            Some(serde_json::to_string(&resp).unwrap())
119        }
120        Ok(HandlerResult::RawResult(result_json)) => {
121            let id_json = serde_json::to_string(&req.id).unwrap();
122            let mut out = String::with_capacity(64 + id_json.len() + result_json.len());
123            out.push_str(r#"{"jsonrpc":"2.0","id":"#);
124            out.push_str(&id_json);
125            out.push_str(",\"result\":");
126            out.push_str(&result_json);
127            out.push('}');
128            Some(out)
129        }
130        Err(e) => {
131            let resp = JsonRpcResponse::error(req.id, e.error_code(), e.to_string());
132            Some(serde_json::to_string(&resp).unwrap())
133        }
134    }
135}
136
137/// Dispatch a Streamable-HTTP POST body, which may be a single JSON-RPC message
138/// or a batch array. `Ok(None)` means the body held only notifications (HTTP
139/// 202, empty body); `Err` means the body was not valid JSON.
140pub fn dispatch_http_body(
141    body: &str,
142    kg: &GraphHandle,
143    vs: Option<&VectorStore>,
144) -> std::result::Result<Option<Value>, String> {
145    let value: Value = serde_json::from_str(body.trim()).map_err(|e| e.to_string())?;
146    match value {
147        Value::Array(items) => {
148            // Batches are rare and never huge — keep Value path for simplicity.
149            let responses: Vec<Value> = items
150                .into_iter()
151                .filter_map(|v| process_value_http(v, kg, vs))
152                .collect();
153            Ok((!responses.is_empty()).then_some(Value::Array(responses)))
154        }
155        other => Ok(process_value_http(other, kg, vs)),
156    }
157}
158
159/// Process one JSON-RPC message for the HTTP transport, converting any
160/// `RawResult` back into a `Value` (acceptable since HTTP payloads are typically
161/// much smaller in this context). `None` means the message was a notification.
162fn process_value_http(value: Value, kg: &GraphHandle, vs: Option<&VectorStore>) -> Option<Value> {
163    let req: JsonRpcRequest = match serde_json::from_value(value) {
164        Ok(r) => r,
165        Err(e) => return Some(to_value(parse_error(e.to_string()))),
166    };
167    req.id.as_ref()?;
168    match process_request(&req, kg, vs) {
169        Ok(HandlerResult::Value(result)) => {
170            Some(to_value(JsonRpcResponse::success(req.id, result)))
171        }
172        Ok(HandlerResult::RawResult(result_json)) => {
173            // Parse the pre-serialized result back into a Value for HTTP delivery.
174            // This is a small extra cost for the HTTP transport; the stdio/TCP
175            // path (dispatch_line) avoids it entirely.
176            let result_val: Value = serde_json::from_str(&result_json).unwrap_or(Value::Null);
177            Some(to_value(JsonRpcResponse::success(req.id, result_val)))
178        }
179        Err(e) => Some(to_value(JsonRpcResponse::error(
180            req.id,
181            e.error_code(),
182            e.to_string(),
183        ))),
184    }
185}
186
187#[inline]
188fn to_value(resp: JsonRpcResponse) -> Value {
189    serde_json::to_value(resp).expect("JsonRpcResponse always serializes")
190}
191
192pub struct MCPServer {
193    config: Arc<Config>,
194    kg: Arc<GraphHandle>,
195    /// `Some` when vector support is enabled (`--vectors`); drives the extra
196    /// `vector_*` / `hybrid_search` tools. `None` for a pure knowledge-graph server.
197    vs: Option<Arc<VectorStore>>,
198}
199
200impl MCPServer {
201    /// Build a server. The vector subsystem (usearch index + petgraph mirror) is
202    /// only constructed when `config.vectors_enabled` is set; `vec_config` is
203    /// ignored otherwise.
204    pub fn new(config: Config, vec_config: VectorConfig) -> Result<Self> {
205        let path = Path::new(&config.memory_file_path);
206        let lru_cache = NonZeroUsize::new(config.lru_cache_size).unwrap_or_else(|| {
207            NonZeroUsize::new(10000).expect("10000 > 0")
208        });
209        let kg = Arc::new(GraphHandle::new(
210            path,
211            config.durability,
212            config.sqlite_tuning(),
213            lru_cache,
214            config.read_pool_size,
215        )?);
216
217        let vs = if config.vectors_enabled {
218            Some(Arc::new(VectorStore::with_config(path, &vec_config)?))
219        } else {
220            None
221        };
222
223        #[cfg(feature = "code")]
224        {
225            CODE_ENABLED.store(config.code_enabled, std::sync::atomic::Ordering::Relaxed);
226            if config.code_enabled {
227                // Per-project code databases live in a sibling directory keyed to
228                // the main memory file, so distinct memory DBs never collide.
229                let base = PathBuf::from(format!("{}.code", config.memory_file_path));
230                crate::code_registry::init(
231                    base,
232                    config.durability,
233                    config.sqlite_tuning(),
234                    lru_cache,
235                    config.read_pool_size,
236                );
237            }
238        }
239
240        Ok(Self {
241            config: Arc::new(config),
242            kg,
243            vs,
244        })
245    }
246
247    /// Convenience constructor for a pure knowledge-graph server (no vectors).
248    pub fn new_kg(config: Config) -> Result<Self> {
249        let mut config = config;
250        config.vectors_enabled = false;
251        Self::new(config, VectorConfig::new(0))
252    }
253
254    /// Expose the shared graph handle (used to drive the HTTP transport).
255    pub fn graph(&self) -> Arc<GraphHandle> {
256        Arc::clone(&self.kg)
257    }
258
259    /// The shared vector store, if vector support is enabled.
260    pub fn vector_store(&self) -> Option<Arc<VectorStore>> {
261        self.vs.clone()
262    }
263
264    /// stdio transport: newline-delimited JSON-RPC over stdin/stdout.
265    pub async fn run_stdio(&self) -> Result<()> {
266        spawn_maintenance(self.kg.clone());
267        spawn_wal_flush(self.kg.clone(), self.config.wal_flush_ms);
268        let stdin = tokio::io::stdin();
269        let mut reader = BufReader::with_capacity(BUFFER_CAPACITY, stdin);
270        let mut stdout = tokio::io::stdout();
271        serve_line_conn(&mut reader, &mut stdout, Arc::clone(&self.kg), self.vs.clone()).await
272    }
273
274    /// TCP transport: each accepted connection speaks newline-delimited
275    /// JSON-RPC, exactly like stdio. Connections are served concurrently (up to
276    /// [`MAX_TCP_CONNECTIONS`]) and share the one graph behind its mutex.
277    pub async fn run_tcp(&self, addr: &str) -> Result<()> {
278        spawn_maintenance(self.kg.clone());
279        spawn_wal_flush(self.kg.clone(), self.config.wal_flush_ms);
280        let listener = TcpListener::bind(addr).await.map_err(MCSError::IoError)?;
281        let semaphore = Arc::new(Semaphore::new(MAX_TCP_CONNECTIONS));
282        let auth_token = self.config.auth_token.clone();
283        info!(
284            "Listening for TCP MCP connections on {addr} (max {MAX_TCP_CONNECTIONS}, auth {}, vectors {})",
285            if auth_token.is_some() { "on" } else { "off" },
286            if self.vs.is_some() { "on" } else { "off" }
287        );
288        loop {
289            let permit = Arc::clone(&semaphore).acquire_owned().await;
290            let (socket, peer) = listener.accept().await.map_err(MCSError::IoError)?;
291            let kg = Arc::clone(&self.kg);
292            let vs = self.vs.clone();
293            let auth_token = auth_token.clone();
294            tokio::spawn(async move {
295                let _permit = permit; // held for the connection lifetime
296                let (read_half, mut write_half) = socket.into_split();
297                let mut reader = BufReader::with_capacity(BUFFER_CAPACITY, read_half);
298                // When a token is configured, the client must send it as the
299                // first line before any JSON-RPC traffic.
300                if let Some(ref expected) = auth_token {
301                    match authenticate_line_conn(&mut reader, expected).await {
302                        Ok(true) => {}
303                        Ok(false) => {
304                            let _ = write_half.write_all(AUTH_REQUIRED_LINE.as_bytes()).await;
305                            let _ = write_half.flush().await;
306                            return;
307                        }
308                        Err(e) => {
309                            error!("TCP auth error for {peer}: {e}");
310                            return;
311                        }
312                    }
313                }
314                if let Err(e) = serve_line_conn(&mut reader, &mut write_half, kg, vs).await {
315                    error!("TCP connection {peer} error: {e}");
316                }
317            });
318        }
319    }
320
321    /// MCP Streamable HTTP transport (POST/GET `/mcp`, JSON or SSE responses).
322    pub async fn run_http(&self, addr: &str) -> Result<()> {
323        spawn_maintenance(self.kg.clone());
324        spawn_wal_flush(self.kg.clone(), self.config.wal_flush_ms);
325        crate::http::run(
326            addr,
327            self.graph(),
328            self.vs.clone(),
329            self.config.auth_token.clone(),
330            self.config.tls_cert.clone(),
331            self.config.tls_key.clone(),
332        )
333        .await
334    }
335}
336
337/// Spawn a background task that fsyncs committed WAL frames every
338/// `interval_ms` milliseconds via a non-blocking passive checkpoint, bounding
339/// the durability window in async mode. A zero interval disables the task.
340fn spawn_wal_flush(kg: Arc<GraphHandle>, interval_ms: u64) {
341    if interval_ms == 0 {
342        return;
343    }
344    tokio::spawn(async move {
345        let mut interval = tokio::time::interval(Duration::from_millis(interval_ms));
346        interval.tick().await; // skip immediate first tick
347        loop {
348            interval.tick().await;
349            let kg = kg.clone();
350            tokio::task::spawn_blocking(move || {
351                if let Err(e) = kg.checkpoint_passive() {
352                    tracing::warn!("WAL flush error: {e}");
353                }
354            })
355            .await
356            .ok();
357        }
358    });
359}
360
361/// Spawn a background task that runs periodic database maintenance every
362/// 5 minutes until the runtime shuts down.
363fn spawn_maintenance(kg: Arc<GraphHandle>) {
364    tokio::spawn(async move {
365        let mut interval = tokio::time::interval(Duration::from_secs(300));
366        interval.tick().await; // skip immediate first tick
367        loop {
368            interval.tick().await;
369            let kg = kg.clone();
370            tokio::task::spawn_blocking(move || {
371                if let Err(e) = kg.run_maintenance() {
372                    tracing::warn!("Maintenance error: {e}");
373                }
374            })
375            .await
376            .ok();
377        }
378    });
379}
380
381/// JSON-RPC error line returned to a TCP client that fails authentication.
382const AUTH_REQUIRED_LINE: &str = "{\"jsonrpc\":\"2.0\",\"error\":{\"code\":-32001,\
383\"message\":\"Authentication required: send the bearer token as the first line\"},\"id\":null}\n";
384
385/// Read the first line of a connection and compare it (constant-time) to the
386/// expected bearer token. Returns `Ok(false)` on EOF / oversized first line.
387async fn authenticate_line_conn<R>(reader: &mut R, expected: &str) -> Result<bool>
388where
389    R: AsyncBufReadExt + Unpin,
390{
391    let mut line = String::new();
392    match read_line_capped(reader, &mut line, MAX_REQUEST_BYTES)
393        .await
394        .map_err(MCSError::IoError)?
395    {
396        LineRead::Line => Ok(token_matches(&line, expected)),
397        _ => Ok(false),
398    }
399}
400
401/// Drive one line-framed connection (stdio or a single TCP socket): read
402/// newline-delimited JSON-RPC requests, write newline-delimited responses.
403/// Notifications produce no output. Returns when the peer closes the stream.
404/// The dispatch path (graph lock + optional fsync) is offloaded to
405/// [`tokio::task::spawn_blocking`] to keep the async reactor responsive (C3).
406async fn serve_line_conn<R, W>(
407    reader: &mut R,
408    writer: &mut W,
409    kg: Arc<GraphHandle>,
410    vs: Option<Arc<VectorStore>>,
411) -> Result<()>
412where
413    R: AsyncBufReadExt + Unpin,
414    W: AsyncWriteExt + Unpin,
415{
416    let mut line = String::with_capacity(1024);
417    let mut out = Vec::with_capacity(BUFFER_CAPACITY);
418
419    loop {
420        match read_line_capped(reader, &mut line, MAX_REQUEST_BYTES).await {
421            Ok(LineRead::Eof) => break,
422            Ok(LineRead::Line) => {
423                let line_copy = line.clone();
424                let kg_clone = Arc::clone(&kg);
425                let vs_clone = vs.clone();
426                let resp = tokio::task::spawn_blocking(move || {
427                    dispatch_line(&line_copy, &kg_clone, vs_clone.as_deref())
428                })
429                .await
430                .map_err(|join_err| {
431                    error!("dispatch task panicked: {join_err}");
432                    MCSError::IoError(std::io::Error::other("dispatch task panicked"))
433                })?;
434                if let Some(resp) = resp {
435                    out.clear();
436                    out.extend_from_slice(resp.as_bytes());
437                    out.extend_from_slice(NEWLINE);
438                    writer.write_all(&out).await.map_err(MCSError::IoError)?;
439                    writer.flush().await.map_err(MCSError::IoError)?;
440                }
441            }
442            Ok(LineRead::TooLong) => {
443                let err = MCSError::InvalidParams("Request exceeds maximum size of 16MB".into());
444                let response = JsonRpcResponse::error(None, err.error_code(), err.to_string());
445                out.clear();
446                serde_json::to_writer(&mut out, &response).map_err(MCSError::JsonError)?;
447                out.extend_from_slice(NEWLINE);
448                writer.write_all(&out).await.map_err(MCSError::IoError)?;
449                writer.flush().await.map_err(MCSError::IoError)?;
450                break;
451            }
452            Err(e) => {
453                error!("IO error: {}", e);
454                break;
455            }
456        }
457    }
458    Ok(())
459}
460
461fn process_request(
462    req: &JsonRpcRequest,
463    kg: &GraphHandle,
464    vs: Option<&VectorStore>,
465) -> Result<HandlerResult> {
466    match req.method.as_str() {
467        "initialize" => Ok(HandlerResult::Value(handle_initialize(req, vs.is_some()))),
468        "tools/list" => Ok(HandlerResult::Value(handle_tools_list(vs.is_some()))),
469        "tools/call" => handle_tools_call(req, kg, vs),
470        "ping" => Ok(HandlerResult::Value(Value::Null)),
471        method if method.starts_with("notifications/") => {
472            tracing::trace!("Received notification: {method}");
473            Ok(HandlerResult::Value(Value::Null))
474        }
475        _ => Err(MCSError::MethodNotFound(req.method.clone())),
476    }
477}
478
479/// MCP protocol revisions this server can speak, newest first (for `initialize`
480/// version negotiation).
481const SUPPORTED_PROTOCOL_VERSIONS: &[&str] =
482    &["2025-11-25", "2025-06-18", "2025-03-26", "2024-11-05"];
483/// Newest revision we implement; offered when the client requests an unknown one.
484const LATEST_PROTOCOL_VERSION: &str = "2025-11-25";
485
486/// `instructions` surfaced to the client and appended to the model's system prompt.
487const SERVER_INSTRUCTIONS: &str = "Knowledge-graph memory MCP server. Entity names are unique and \
488case-sensitive. Use `create_entities`/`create_relations` to build the graph, `add_observations` to \
489attach facts, and `search_nodes`/`open_nodes`/`read_graph` to retrieve. Prefer `upsert_entities` for \
490idempotent writes and `merge_entities` to collapse duplicates. Tool failures are returned with \
491`isError: true` rather than as protocol errors — read the message and retry.";
492
493/// Extra guidance appended to [`SERVER_INSTRUCTIONS`] when vector support is on.
494const VECTOR_INSTRUCTIONS: &str = " Vector search is enabled: use `vector_upsert_embedding` to \
495attach embeddings to entities, `vector_search_entities` for semantic search, and `hybrid_search` to \
496combine text + vector relevance.";
497
498fn handle_initialize(req: &JsonRpcRequest, vectors_enabled: bool) -> Value {
499    // Version negotiation: echo a supported requested revision, else offer latest.
500    let protocol_version = req
501        .params
502        .as_ref()
503        .and_then(|p| p.get("protocolVersion"))
504        .and_then(Value::as_str)
505        .filter(|v| SUPPORTED_PROTOCOL_VERSIONS.contains(v))
506        .unwrap_or(LATEST_PROTOCOL_VERSION);
507
508    let instructions = if vectors_enabled {
509        format!("{SERVER_INSTRUCTIONS}{VECTOR_INSTRUCTIONS}")
510    } else {
511        SERVER_INSTRUCTIONS.to_string()
512    };
513
514    json!({
515        "protocolVersion": protocol_version,
516        "capabilities": {
517            "tools": { "listChanged": false }
518        },
519        "serverInfo": {
520            "name": "mcp-memory",
521            "version": env!("CARGO_PKG_VERSION")
522        },
523        "instructions": instructions
524    })
525}
526
527/// Wrap a tool execution failure as an MCP `CallToolResult` with `isError: true`
528/// so the model sees the message and can self-correct, instead of receiving an
529/// opaque JSON-RPC protocol error. (Successful results are already content-
530/// wrapped by the action handlers.)
531#[inline]
532fn tool_error(message: &str) -> Value {
533    json!({
534        "content": [{ "type": "text", "text": message }],
535        "isError": true
536    })
537}
538
539/// Constant-time bearer-token check. Accepts the raw token or a `Bearer <token>`
540/// form; surrounding whitespace is trimmed.
541pub fn token_matches(presented: &str, expected: &str) -> bool {
542    use subtle::ConstantTimeEq;
543    let presented = presented.trim();
544    let presented = presented
545        .strip_prefix("Bearer ")
546        .unwrap_or(presented)
547        .trim();
548    presented.as_bytes().ct_eq(expected.as_bytes()).into()
549}
550
551/// The base knowledge-graph tools, parsed from `tools.json` at build time.
552fn base_tools() -> &'static Vec<Value> {
553    static BASE: std::sync::OnceLock<Vec<Value>> = std::sync::OnceLock::new();
554    BASE.get_or_init(|| {
555        serde_json::from_str(include_str!("../tools.json"))
556            .expect("tools.json is valid JSON compiled at build time")
557    })
558}
559
560/// The vector tools, parsed from `vector_tools.json` at build time.
561fn vector_tools() -> &'static Vec<Value> {
562    static VEC: std::sync::OnceLock<Vec<Value>> = std::sync::OnceLock::new();
563    VEC.get_or_init(|| {
564        serde_json::from_str(include_str!("../vector_tools.json"))
565            .expect("vector_tools.json is valid JSON compiled at build time")
566    })
567}
568
569/// The code-indexing tools, parsed from `code_tools.json` at build time.
570#[cfg(feature = "code")]
571fn code_tools() -> &'static Vec<Value> {
572    static CODE: std::sync::OnceLock<Vec<Value>> = std::sync::OnceLock::new();
573    CODE.get_or_init(|| {
574        serde_json::from_str(include_str!("../code_tools.json"))
575            .expect("code_tools.json is valid JSON compiled at build time")
576    })
577}
578
579/// `tools/list` response. Vector and code tools are appended only when their
580/// subsystems are enabled, so the server never advertises tools it cannot serve.
581fn handle_tools_list(vectors_enabled: bool) -> Value {
582    let mut all = base_tools().clone();
583    if vectors_enabled {
584        all.extend(vector_tools().iter().cloned());
585    }
586    #[cfg(feature = "code")]
587    if code_enabled() {
588        all.extend(code_tools().iter().cloned());
589    }
590    json!({ "tools": all })
591}
592
593/// `true` for the vector-specific tool names (`vector_*` plus `hybrid_search`).
594fn is_vector_tool_name(name: &str) -> bool {
595    matches!(
596        name,
597        "vector_upsert_embedding"
598            | "vector_search_entities"
599            | "vector_delete_embedding"
600            | "hybrid_search"
601            | "vector_refresh_graph_cache"
602            | "vector_store_stats"
603            | "vector_batch_upsert"
604            | "vector_get_embedding"
605            | "vector_search_by_entity"
606            | "vector_recommend"
607            | "vector_mmr_search"
608            | "vector_reindex"
609    )
610}
611
612/// Process-wide flag for the code-indexing subsystem, set once at server
613/// startup from `config.code_enabled`. Code tools carry no per-request state
614/// (unlike the vector store), so a global flag avoids threading a bool through
615/// every dispatch signature.
616#[cfg(feature = "code")]
617static CODE_ENABLED: std::sync::atomic::AtomicBool = std::sync::atomic::AtomicBool::new(false);
618
619#[cfg(feature = "code")]
620fn code_enabled() -> bool {
621    CODE_ENABLED.load(std::sync::atomic::Ordering::Relaxed)
622}
623#[cfg(not(feature = "code"))]
624const fn code_enabled() -> bool {
625    false
626}
627
628/// `true` for the tree-sitter code tool names.
629fn is_code_tool_name(name: &str) -> bool {
630    matches!(
631        name,
632        "code_index" | "code_outline" | "code_search" | "code_get_symbol" | "code_watch"
633    )
634}
635
636fn handle_tools_call(
637    req: &JsonRpcRequest,
638    kg: &GraphHandle,
639    vs: Option<&VectorStore>,
640) -> Result<HandlerResult> {
641    let tool_name = req
642        .params
643        .as_ref()
644        .and_then(|p| p.get("name").and_then(|v| v.as_str()))
645        .ok_or_else(|| MCSError::InvalidParams("Missing 'name' parameter".into()))?;
646
647    let tool_args = req.params.as_ref().and_then(|p| p.get("arguments"));
648
649    if is_vector_tool_name(tool_name) {
650        let Some(vs) = vs else {
651            return Err(MCSError::MethodNotFound(format!(
652                "{tool_name} (vector support disabled; start the server with --vectors)"
653            )));
654        };
655        let result = match tool_name {
656            "vector_upsert_embedding" => {
657                vector_actions::handle_vector_upsert_embedding(vs, kg, tool_args)
658                    .map(HandlerResult::Value)
659            }
660            "vector_search_entities" => {
661                vector_actions::handle_vector_search_entities(vs, kg, tool_args)
662                    .map(HandlerResult::RawResult)
663            }
664            "vector_delete_embedding" => {
665                vector_actions::handle_vector_delete_embedding(vs, kg, tool_args)
666                    .map(HandlerResult::Value)
667            }
668            "hybrid_search" => {
669                vector_actions::handle_hybrid_search(vs, kg, tool_args).map(HandlerResult::RawResult)
670            }
671            "vector_refresh_graph_cache" => {
672                vector_actions::handle_refresh_graph_cache(vs, kg, tool_args)
673                    .map(HandlerResult::Value)
674            }
675            "vector_store_stats" => {
676                vector_actions::handle_vector_store_stats(vs, kg, tool_args)
677                    .map(HandlerResult::Value)
678            }
679            "vector_batch_upsert" => {
680                vector_actions::handle_vector_batch_upsert(vs, kg, tool_args)
681                    .map(HandlerResult::Value)
682            }
683            "vector_get_embedding" => {
684                vector_actions::handle_vector_get_embedding(vs, kg, tool_args)
685                    .map(HandlerResult::Value)
686            }
687            "vector_search_by_entity" => {
688                vector_actions::handle_vector_search_by_entity(vs, kg, tool_args)
689                    .map(HandlerResult::RawResult)
690            }
691            "vector_recommend" => {
692                vector_actions::handle_vector_recommend(vs, kg, tool_args)
693                    .map(HandlerResult::RawResult)
694            }
695            "vector_mmr_search" => {
696                vector_actions::handle_vector_mmr_search(vs, kg, tool_args)
697                    .map(HandlerResult::RawResult)
698            }
699            "vector_reindex" => {
700                vector_actions::handle_vector_reindex(vs, kg, tool_args).map(HandlerResult::Value)
701            }
702            other => Err(MCSError::MethodNotFound(other.to_string())),
703        };
704        return Ok(result.unwrap_or_else(|e| {
705            error!("Tool '{tool_name}' error: {e}");
706            HandlerResult::Value(tool_error(&e.to_string()))
707        }));
708    }
709
710    if is_code_tool_name(tool_name) {
711        if !code_enabled() {
712            return Err(MCSError::MethodNotFound(format!(
713                "{tool_name} (code indexing disabled; start the server with --code)"
714            )));
715        }
716        #[cfg(feature = "code")]
717        {
718            let result = match tool_name {
719                "code_index" => {
720                    code_actions::handle_code_index(tool_args).map(HandlerResult::Value)
721                }
722                "code_outline" => {
723                    code_actions::handle_code_outline(tool_args).map(HandlerResult::Value)
724                }
725                "code_search" => {
726                    code_actions::handle_code_search(tool_args).map(HandlerResult::Value)
727                }
728                "code_get_symbol" => {
729                    code_actions::handle_code_get_symbol(tool_args).map(HandlerResult::Value)
730                }
731                "code_watch" => {
732                    code_actions::handle_code_watch(tool_args).map(HandlerResult::Value)
733                }
734                other => Err(MCSError::MethodNotFound(other.to_string())),
735            };
736            return Ok(result.unwrap_or_else(|e| {
737                error!("Tool '{tool_name}' error: {e}");
738                HandlerResult::Value(tool_error(&e.to_string()))
739            }));
740        }
741        #[cfg(not(feature = "code"))]
742        return Err(MCSError::MethodNotFound(format!(
743            "{tool_name} (built without the 'code' feature)"
744        )));
745    }
746
747    if !tools::tool_exists(tool_name) {
748        return Err(MCSError::MethodNotFound(tool_name.to_string()));
749    }
750
751    let result = match tool_name {
752        // Raw-result handlers (large payloads, avoid second serialization pass).
753        "read_graph" => memory::handle_read_graph(kg, tool_args).map(HandlerResult::RawResult),
754        "search_nodes" => memory::handle_search_nodes(kg, tool_args).map(HandlerResult::RawResult),
755        // Standard Value handlers.
756        "create_entities" => {
757            memory::handle_create_entities(kg, tool_args).map(HandlerResult::Value)
758        }
759        "create_relations" => {
760            memory::handle_create_relations(kg, tool_args).map(HandlerResult::Value)
761        }
762        "add_observations" => {
763            memory::handle_add_observations(kg, tool_args).map(HandlerResult::Value)
764        }
765        "delete_entities" => {
766            let r = memory::handle_delete_entities(kg, tool_args);
767            if r.is_ok()
768                && let Some(vs) = vs
769                && let Some(args) = tool_args.and_then(|a| a.get("entityNames")).and_then(|v| v.as_array())
770            {
771                let names: Vec<String> = args.iter().filter_map(|v| v.as_str().map(String::from)).collect();
772                vs.invalidate_entity_cache(&names);
773            }
774            r.map(HandlerResult::Value)
775        }
776        "delete_observations" => {
777            memory::handle_delete_observations(kg, tool_args).map(HandlerResult::Value)
778        }
779        "delete_relations" => {
780            memory::handle_delete_relations(kg, tool_args).map(HandlerResult::Value)
781        }
782        "open_nodes" => memory::handle_open_nodes(kg, tool_args).map(HandlerResult::Value),
783        "get_entity" => memory::handle_get_entity(kg, tool_args).map(HandlerResult::Value),
784        "graph_stats" => memory::handle_graph_stats(kg).map(HandlerResult::Value),
785        "search_relations" => {
786            memory::handle_search_relations(kg, tool_args).map(HandlerResult::Value)
787        }
788        "find_path" => memory::handle_find_path(kg, tool_args).map(HandlerResult::Value),
789        "compact" => memory::handle_compact(kg).map(HandlerResult::Value),
790        "get_neighbors" => memory::handle_get_neighbors(kg, tool_args).map(HandlerResult::Value),
791        "describe_entity" => {
792            memory::handle_describe_entity(kg, tool_args).map(HandlerResult::Value)
793        }
794        "list_entity_types" => memory::handle_list_entity_types(kg).map(HandlerResult::Value),
795        "list_relation_types" => memory::handle_list_relation_types(kg).map(HandlerResult::Value),
796        "upsert_entities" => {
797            memory::handle_upsert_entities(kg, tool_args).map(HandlerResult::Value)
798        }
799        "export_graph" => memory::handle_export_graph(kg, tool_args).map(HandlerResult::Value),
800        "merge_entities" => memory::handle_merge_entities(kg, tool_args).map(HandlerResult::Value),
801        "extract_subgraph" => {
802            memory::handle_extract_subgraph(kg, tool_args).map(HandlerResult::Value)
803        }
804        "batch_get_entities" => {
805            memory::handle_batch_get_entities(kg, tool_args).map(HandlerResult::Value)
806        }
807        "find_all_paths" => memory::handle_find_all_paths(kg, tool_args).map(HandlerResult::Value),
808        "entity_exists" => memory::handle_entity_exists(kg, tool_args).map(HandlerResult::Value),
809        "degree" => memory::handle_degree(kg, tool_args).map(HandlerResult::Value),
810        tool => Err(MCSError::MethodNotFound(tool.to_string())),
811    };
812
813    // Tool execution failures become isError CallToolResults so the model can
814    // read the message and self-correct, instead of an opaque protocol error.
815    Ok(result.unwrap_or_else(|e| {
816        error!("Tool '{tool_name}' error: {e}");
817        HandlerResult::Value(tool_error(&e.to_string()))
818    }))
819}
820
821