Skip to main content

synapse_core/
mcp_stdio.rs

1use crate::server::MySemanticEngine;
2use crate::server::proto::semantic_engine_server::SemanticEngine;
3use crate::server::proto::{
4    IngestRequest, IngestFileRequest, Triple, Provenance, 
5    SparqlRequest, HybridSearchRequest, ReasoningRequest,
6    SearchMode, ReasoningStrategy,
7};
8use crate::mcp_types::{McpRequest, McpResponse, McpError, Tool, ListToolsResult, CallToolResult, Content};
9use std::sync::Arc;
10use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
11use tonic::Request;
12
13pub struct McpStdioServer {
14    engine: Arc<MySemanticEngine>,
15}
16
17impl McpStdioServer {
18    pub fn new(engine: Arc<MySemanticEngine>) -> Self {
19        Self { engine }
20    }
21
22    pub async fn run(&self) -> Result<(), Box<dyn std::error::Error>> {
23        let mut reader = BufReader::new(tokio::io::stdin());
24        let mut writer = tokio::io::stdout();
25
26        loop {
27            let mut line = String::new();
28            if reader.read_line(&mut line).await? == 0 {
29                break;
30            }
31
32            let trimmed = line.trim();
33            if trimmed.is_empty() {
34                continue;
35            }
36
37            if let Ok(request) = serde_json::from_str::<McpRequest>(trimmed) {
38                let response = self.handle_request(request).await;
39                let response_json = serde_json::to_string(&response)? + "\n";
40                writer.write_all(response_json.as_bytes()).await?;
41                writer.flush().await?;
42            }
43        }
44
45        Ok(())
46    }
47
48    fn get_tools() -> Vec<Tool> {
49        vec![
50            Tool {
51                name: "ingest_triples".to_string(),
52                description: Some("Ingest one or more RDF triples into the knowledge graph".to_string()),
53                input_schema: serde_json::json!({
54                    "type": "object",
55                    "properties": {
56                        "triples": {
57                            "type": "array",
58                            "items": {
59                                "type": "object",
60                                "properties": {
61                                    "subject": { "type": "string" },
62                                    "predicate": { "type": "string" },
63                                    "object": { "type": "string" }
64                                },
65                                "required": ["subject", "predicate", "object"]
66                            }
67                        },
68                        "namespace": { "type": "string", "default": "default" }
69                    },
70                    "required": ["triples"]
71                }),
72            },
73            Tool {
74                name: "ingest_file".to_string(),
75                description: Some("Ingest a CSV or Markdown file into the knowledge graph".to_string()),
76                input_schema: serde_json::json!({
77                    "type": "object",
78                    "properties": {
79                        "path": { "type": "string", "description": "Path to the file" },
80                        "namespace": { "type": "string", "default": "default" }
81                    },
82                    "required": ["path"]
83                }),
84            },
85            Tool {
86                name: "sparql_query".to_string(),
87                description: Some("Execute a SPARQL query against the knowledge graph".to_string()),
88                input_schema: serde_json::json!({
89                    "type": "object",
90                    "properties": {
91                        "query": { "type": "string", "description": "SPARQL query string" },
92                        "namespace": { "type": "string", "default": "default" }
93                    },
94                    "required": ["query"]
95                }),
96            },
97            Tool {
98                name: "hybrid_search".to_string(),
99                description: Some("Perform a hybrid vector + graph search".to_string()),
100                input_schema: serde_json::json!({
101                    "type": "object",
102                    "properties": {
103                        "query": { "type": "string", "description": "Natural language query" },
104                        "namespace": { "type": "string", "default": "default" },
105                        "vector_k": { "type": "integer", "default": 10 },
106                        "graph_depth": { "type": "integer", "default": 1 },
107                        "limit": { "type": "integer", "default": 20 }
108                    },
109                    "required": ["query"]
110                }),
111            },
112            Tool {
113                name: "apply_reasoning".to_string(),
114                description: Some("Apply RDFS or OWL-RL reasoning to infer new triples".to_string()),
115                input_schema: serde_json::json!({
116                    "type": "object",
117                    "properties": {
118                        "namespace": { "type": "string", "default": "default" },
119                        "strategy": { "type": "string", "enum": ["rdfs", "owlrl"], "default": "rdfs" },
120                        "materialize": { "type": "boolean", "default": false }
121                    }
122                }),
123            },
124            Tool {
125                name: "get_neighbors".to_string(),
126                description: Some("Get neighboring nodes connected to a given URI in the graph".to_string()),
127                input_schema: serde_json::json!({
128                    "type": "object",
129                    "properties": {
130                        "uri": { "type": "string", "description": "URI of the entity to find neighbors for" },
131                        "namespace": { "type": "string", "default": "default" },
132                        "direction": { "type": "string", "enum": ["outgoing", "incoming", "both"], "default": "outgoing" }
133                    },
134                    "required": ["uri"]
135                }),
136            },
137            Tool {
138                name: "list_triples".to_string(),
139                description: Some("List all triples in a namespace (useful for debugging/exploration)".to_string()),
140                input_schema: serde_json::json!({
141                    "type": "object",
142                    "properties": {
143                        "namespace": { "type": "string", "default": "default" },
144                        "limit": { "type": "integer", "default": 100 }
145                    }
146                }),
147            },
148            Tool {
149                name: "delete_namespace".to_string(),
150                description: Some("Delete all data in a namespace".to_string()),
151                input_schema: serde_json::json!({
152                    "type": "object",
153                    "properties": {
154                        "namespace": { "type": "string", "description": "Namespace to delete" }
155                    },
156                    "required": ["namespace"]
157                }),
158            },
159            Tool {
160                name: "ingest_url".to_string(),
161                description: Some("Scrape a web page and add its content to the vector store for RAG retrieval".to_string()),
162                input_schema: serde_json::json!({
163                    "type": "object",
164                    "properties": {
165                        "url": { "type": "string", "description": "URL to scrape and ingest" },
166                        "namespace": { "type": "string", "default": "default" }
167                    },
168                    "required": ["url"]
169                }),
170            },
171            Tool {
172                name: "ingest_text".to_string(),
173                description: Some("Add arbitrary text content to the vector store for RAG retrieval".to_string()),
174                input_schema: serde_json::json!({
175                    "type": "object",
176                    "properties": {
177                        "uri": { "type": "string", "description": "Custom URI identifier for this text" },
178                        "content": { "type": "string", "description": "Text content to embed and store" },
179                        "namespace": { "type": "string", "default": "default" }
180                    },
181                    "required": ["uri", "content"]
182                }),
183            },
184            Tool {
185                name: "compact_vectors".to_string(),
186                description: Some("Compact the vector index by removing stale entries".to_string()),
187                input_schema: serde_json::json!({
188                    "type": "object",
189                    "properties": {
190                        "namespace": { "type": "string", "default": "default" }
191                    }
192                }),
193            },
194            Tool {
195                name: "vector_stats".to_string(),
196                description: Some("Get vector store statistics (active, stale, total)".to_string()),
197                input_schema: serde_json::json!({
198                    "type": "object",
199                    "properties": {
200                        "namespace": { "type": "string", "default": "default" }
201                    }
202                }),
203            },
204            Tool {
205                name: "disambiguate".to_string(),
206                description: Some("Find similar entities that might be duplicates".to_string()),
207                input_schema: serde_json::json!({
208                    "type": "object",
209                    "properties": {
210                        "namespace": { "type": "string", "default": "default" },
211                        "threshold": { "type": "number", "default": 0.8, "description": "Similarity threshold 0.0-1.0" }
212                    }
213                }),
214            },
215        ]
216    }
217
218    async fn handle_request(&self, request: McpRequest) -> McpResponse {
219        match request.method.as_str() {
220            "initialize" => {
221                // MCP protocol initialization
222                McpResponse {
223                    jsonrpc: "2.0".to_string(),
224                    id: request.id,
225                    result: Some(serde_json::json!({
226                        "protocolVersion": "2024-11-05",
227                        "capabilities": {
228                            "tools": {}
229                        },
230                        "serverInfo": {
231                            "name": "synapse",
232                            "version": "0.2.0"
233                        }
234                    })),
235                    error: None,
236                }
237            }
238            "notifications/initialized" | "initialized" => {
239                // Client confirms initialization - just acknowledge
240                McpResponse {
241                    jsonrpc: "2.0".to_string(),
242                    id: request.id,
243                    result: Some(serde_json::json!({})),
244                    error: None,
245                }
246            }
247            "tools/list" => {
248                let result = ListToolsResult { tools: Self::get_tools() };
249                McpResponse {
250                    jsonrpc: "2.0".to_string(),
251                    id: request.id,
252                    result: Some(serde_json::to_value(result).unwrap()),
253                    error: None,
254                }
255            }
256            "tools/call" => {
257                self.handle_tool_call(request).await
258            }
259            // Legacy methods for backwards compatibility
260            "ingest" => self.handle_legacy_ingest(request).await,
261            "ingest_file" => self.handle_legacy_ingest_file(request).await,
262            _ => McpResponse {
263                jsonrpc: "2.0".to_string(),
264                id: request.id,
265                result: None,
266                error: Some(McpError {
267                    code: -32601,
268                    message: format!("Method not found: {}", request.method),
269                    data: None,
270                }),
271            },
272        }
273    }
274
275    async fn handle_tool_call(&self, request: McpRequest) -> McpResponse {
276        let params = match request.params {
277            Some(p) => p,
278            None => return self.error_response(request.id, -32602, "Missing params"),
279        };
280
281        let tool_name = match params.get("name").and_then(|v| v.as_str()) {
282            Some(n) => n,
283            None => return self.error_response(request.id, -32602, "Missing tool name"),
284        };
285
286        let arguments = params.get("arguments")
287            .and_then(|v| v.as_object())
288            .cloned()
289            .unwrap_or_default();
290
291        match tool_name {
292            "ingest_triples" => self.call_ingest_triples(request.id, &arguments).await,
293            "ingest_file" => self.call_ingest_file(request.id, &arguments).await,
294            "sparql_query" => self.call_sparql_query(request.id, &arguments).await,
295            "hybrid_search" => self.call_hybrid_search(request.id, &arguments).await,
296            "apply_reasoning" => self.call_apply_reasoning(request.id, &arguments).await,
297            "get_neighbors" => self.call_get_neighbors(request.id, &arguments).await,
298            "list_triples" => self.call_list_triples(request.id, &arguments).await,
299            "delete_namespace" => self.call_delete_namespace(request.id, &arguments).await,
300            "ingest_url" => self.call_ingest_url(request.id, &arguments).await,
301            "ingest_text" => self.call_ingest_text(request.id, &arguments).await,
302            "compact_vectors" => self.call_compact_vectors(request.id, &arguments).await,
303            "vector_stats" => self.call_vector_stats(request.id, &arguments).await,
304            "disambiguate" => self.call_disambiguate(request.id, &arguments).await,
305            _ => self.error_response(request.id, -32602, &format!("Unknown tool: {}", tool_name)),
306        }
307    }
308
309    async fn call_ingest_triples(
310        &self,
311        id: Option<serde_json::Value>,
312        args: &serde_json::Map<String, serde_json::Value>,
313    ) -> McpResponse {
314        let namespace = args.get("namespace").and_then(|v| v.as_str()).unwrap_or("default");
315        let triples_array = match args.get("triples").and_then(|v| v.as_array()) {
316            Some(t) => t,
317            None => return self.error_response(id, -32602, "Missing 'triples' array"),
318        };
319
320        let mut triples = Vec::new();
321        for t in triples_array {
322            if let (Some(s), Some(p), Some(o)) = (
323                t.get("subject").and_then(|v| v.as_str()),
324                t.get("predicate").and_then(|v| v.as_str()),
325                t.get("object").and_then(|v| v.as_str()),
326            ) {
327                triples.push(Triple {
328                    subject: s.to_string(),
329                    predicate: p.to_string(),
330                    object: o.to_string(),
331                    provenance: Some(Provenance {
332                        source: "mcp".to_string(),
333                        timestamp: "".to_string(),
334                        method: "tools/call".to_string(),
335                    }),
336                    embedding: vec![],
337                });
338            }
339        }
340
341        let req = Request::new(IngestRequest {
342            triples,
343            namespace: namespace.to_string(),
344        });
345
346        match self.engine.ingest_triples(req).await {
347            Ok(resp) => {
348                let inner = resp.into_inner();
349                self.tool_result(id, &format!("Ingested {} triples", inner.edges_added), false)
350            }
351            Err(e) => self.tool_result(id, &e.to_string(), true),
352        }
353    }
354
355    async fn call_ingest_file(
356        &self,
357        id: Option<serde_json::Value>,
358        args: &serde_json::Map<String, serde_json::Value>,
359    ) -> McpResponse {
360        let path = match args.get("path").and_then(|v| v.as_str()) {
361            Some(p) => p,
362            None => return self.error_response(id, -32602, "Missing 'path'"),
363        };
364        let namespace = args.get("namespace").and_then(|v| v.as_str()).unwrap_or("default");
365
366        let req = Request::new(IngestFileRequest {
367            file_path: path.to_string(),
368            namespace: namespace.to_string(),
369        });
370
371        match self.engine.ingest_file(req).await {
372            Ok(resp) => {
373                let inner = resp.into_inner();
374                self.tool_result(id, &format!("Ingested {} triples from {}", inner.edges_added, path), false)
375            }
376            Err(e) => self.tool_result(id, &e.to_string(), true),
377        }
378    }
379
380    async fn call_sparql_query(
381        &self,
382        id: Option<serde_json::Value>,
383        args: &serde_json::Map<String, serde_json::Value>,
384    ) -> McpResponse {
385        let query = match args.get("query").and_then(|v| v.as_str()) {
386            Some(q) => q,
387            None => return self.error_response(id, -32602, "Missing 'query'"),
388        };
389        let namespace = args.get("namespace").and_then(|v| v.as_str()).unwrap_or("default");
390
391        let req = Request::new(SparqlRequest {
392            query: query.to_string(),
393            namespace: namespace.to_string(),
394        });
395
396        match self.engine.query_sparql(req).await {
397            Ok(resp) => {
398                self.tool_result(id, &resp.into_inner().results_json, false)
399            }
400            Err(e) => self.tool_result(id, &e.to_string(), true),
401        }
402    }
403
404    async fn call_hybrid_search(
405        &self,
406        id: Option<serde_json::Value>,
407        args: &serde_json::Map<String, serde_json::Value>,
408    ) -> McpResponse {
409        let query = match args.get("query").and_then(|v| v.as_str()) {
410            Some(q) => q,
411            None => return self.error_response(id, -32602, "Missing 'query'"),
412        };
413        let namespace = args.get("namespace").and_then(|v| v.as_str()).unwrap_or("default");
414        let vector_k = args.get("vector_k").and_then(|v| v.as_u64()).unwrap_or(10) as u32;
415        let graph_depth = args.get("graph_depth").and_then(|v| v.as_u64()).unwrap_or(1) as u32;
416        let limit = args.get("limit").and_then(|v| v.as_u64()).unwrap_or(20) as u32;
417
418        let req = Request::new(HybridSearchRequest {
419            query: query.to_string(),
420            namespace: namespace.to_string(),
421            vector_k,
422            graph_depth,
423            mode: SearchMode::Hybrid as i32,
424            limit,
425        });
426
427        match self.engine.hybrid_search(req).await {
428            Ok(resp) => {
429                let results = resp.into_inner().results;
430                // Manually serialize since proto SearchResult doesn't derive Serialize
431                let json_results: Vec<serde_json::Value> = results.iter().map(|r| {
432                    serde_json::json!({
433                        "node_id": r.node_id,
434                        "score": r.score,
435                        "content": r.content,
436                        "uri": r.uri
437                    })
438                }).collect();
439                let json = serde_json::to_string_pretty(&json_results).unwrap_or_default();
440                self.tool_result(id, &json, false)
441            }
442            Err(e) => self.tool_result(id, &e.to_string(), true),
443        }
444    }
445
446    async fn call_apply_reasoning(
447        &self,
448        id: Option<serde_json::Value>,
449        args: &serde_json::Map<String, serde_json::Value>,
450    ) -> McpResponse {
451        let namespace = args.get("namespace").and_then(|v| v.as_str()).unwrap_or("default");
452        let strategy_str = args.get("strategy").and_then(|v| v.as_str()).unwrap_or("rdfs");
453        let materialize = args.get("materialize").and_then(|v| v.as_bool()).unwrap_or(false);
454
455        let strategy = match strategy_str.to_lowercase().as_str() {
456            "owlrl" | "owl-rl" => ReasoningStrategy::Owlrl as i32,
457            _ => ReasoningStrategy::Rdfs as i32,
458        };
459
460        let req = Request::new(ReasoningRequest {
461            namespace: namespace.to_string(),
462            strategy,
463            materialize,
464        });
465
466        match self.engine.apply_reasoning(req).await {
467            Ok(resp) => {
468                let inner = resp.into_inner();
469                self.tool_result(id, &inner.message, !inner.success)
470            }
471            Err(e) => self.tool_result(id, &e.to_string(), true),
472        }
473    }
474
475    async fn call_get_neighbors(
476        &self,
477        id: Option<serde_json::Value>,
478        args: &serde_json::Map<String, serde_json::Value>,
479    ) -> McpResponse {
480        let uri = match args.get("uri").and_then(|v| v.as_str()) {
481            Some(u) => u,
482            None => return self.error_response(id, -32602, "Missing 'uri'"),
483        };
484        let namespace = args.get("namespace").and_then(|v| v.as_str()).unwrap_or("default");
485        let direction = args.get("direction").and_then(|v| v.as_str()).unwrap_or("outgoing");
486
487        let store = match self.engine.get_store(namespace) {
488            Ok(s) => s,
489            Err(e) => return self.tool_result(id, &e.to_string(), true),
490        };
491
492        let mut neighbors = Vec::new();
493
494        // Query outgoing edges (URI as subject)
495        if direction == "outgoing" || direction == "both" {
496            if let Ok(subj) = oxigraph::model::NamedNodeRef::new(uri) {
497                for quad in store.store.quads_for_pattern(Some(subj.into()), None, None, None) {
498                    if let Ok(q) = quad {
499                        neighbors.push(serde_json::json!({
500                            "direction": "outgoing",
501                            "predicate": q.predicate.to_string(),
502                            "target": q.object.to_string()
503                        }));
504                    }
505                }
506            }
507        }
508
509        // Query incoming edges (URI as object)
510        if direction == "incoming" || direction == "both" {
511            if let Ok(obj) = oxigraph::model::NamedNodeRef::new(uri) {
512                for quad in store.store.quads_for_pattern(None, None, Some(obj.into()), None) {
513                    if let Ok(q) = quad {
514                        neighbors.push(serde_json::json!({
515                            "direction": "incoming",
516                            "predicate": q.predicate.to_string(),
517                            "source": q.subject.to_string()
518                        }));
519                    }
520                }
521            }
522        }
523
524        let json = serde_json::to_string_pretty(&neighbors).unwrap_or_default();
525        self.tool_result(id, &json, false)
526    }
527
528    async fn call_list_triples(
529        &self,
530        id: Option<serde_json::Value>,
531        args: &serde_json::Map<String, serde_json::Value>,
532    ) -> McpResponse {
533        let namespace = args.get("namespace").and_then(|v| v.as_str()).unwrap_or("default");
534        let limit = args.get("limit").and_then(|v| v.as_u64()).unwrap_or(100) as usize;
535
536        let store = match self.engine.get_store(namespace) {
537            Ok(s) => s,
538            Err(e) => return self.tool_result(id, &e.to_string(), true),
539        };
540
541        let mut triples = Vec::new();
542        for quad in store.store.iter().take(limit) {
543            if let Ok(q) = quad {
544                triples.push(serde_json::json!({
545                    "subject": q.subject.to_string(),
546                    "predicate": q.predicate.to_string(),
547                    "object": q.object.to_string()
548                }));
549            }
550        }
551
552        let json = serde_json::to_string_pretty(&triples).unwrap_or_default();
553        self.tool_result(id, &json, false)
554    }
555
556    async fn call_delete_namespace(
557        &self,
558        id: Option<serde_json::Value>,
559        args: &serde_json::Map<String, serde_json::Value>,
560    ) -> McpResponse {
561        let namespace = match args.get("namespace").and_then(|v| v.as_str()) {
562            Some(n) => n,
563            None => return self.error_response(id, -32602, "Missing 'namespace'"),
564        };
565
566        let req = Request::new(crate::server::proto::EmptyRequest {
567            namespace: namespace.to_string(),
568        });
569
570        match self.engine.delete_namespace_data(req).await {
571            Ok(resp) => {
572                let inner = resp.into_inner();
573                self.tool_result(id, &inner.message, !inner.success)
574            }
575            Err(e) => self.tool_result(id, &e.to_string(), true),
576        }
577    }
578
579    async fn call_ingest_url(
580        &self,
581        id: Option<serde_json::Value>,
582        args: &serde_json::Map<String, serde_json::Value>,
583    ) -> McpResponse {
584        let url = match args.get("url").and_then(|v| v.as_str()) {
585            Some(u) => u,
586            None => return self.error_response(id, -32602, "Missing 'url'"),
587        };
588        let namespace = args.get("namespace").and_then(|v| v.as_str()).unwrap_or("default");
589
590        // Fetch URL content
591        let client = reqwest::Client::new();
592        let response = match client.get(url).send().await {
593            Ok(r) => r,
594            Err(e) => return self.tool_result(id, &format!("Failed to fetch URL: {}", e), true),
595        };
596
597        if !response.status().is_success() {
598            return self.tool_result(id, &format!("HTTP error: {}", response.status()), true);
599        }
600
601        let html = match response.text().await {
602            Ok(t) => t,
603            Err(e) => return self.tool_result(id, &format!("Failed to read response: {}", e), true),
604        };
605
606        // Simple HTML to text conversion (strip tags)
607        let text = html
608            .split('<')
609            .filter_map(|s| s.split('>').nth(1))
610            .collect::<Vec<_>>()
611            .join(" ")
612            .split_whitespace()
613            .collect::<Vec<_>>()
614            .join(" ");
615
616        // Chunk text
617        let processor = crate::processor::TextProcessor::new();
618        let chunks = processor.chunk_text(&text, 1000);
619
620        // Add to vector store
621        let store = match self.engine.get_store(namespace) {
622            Ok(s) => s,
623            Err(e) => return self.tool_result(id, &e.to_string(), true),
624        };
625
626        if let Some(ref vector_store) = store.vector_store {
627            let mut added_chunks = 0;
628            for (i, chunk) in chunks.iter().enumerate() {
629                let chunk_uri = format!("{}#chunk-{}", url, i);
630                match vector_store.add(&chunk_uri, chunk).await {
631                    Ok(_) => added_chunks += 1,
632                    Err(e) => {
633                        eprintln!("Failed to add chunk {}: {}", i, e);
634                    }
635                }
636            }
637            self.tool_result(id, &format!("Ingested URL: {} ({} chars, {} chunks)", url, text.len(), added_chunks), false)
638        } else {
639            self.tool_result(id, "Vector store not available", true)
640        }
641    }
642
643
644    async fn call_ingest_text(
645        &self,
646        id: Option<serde_json::Value>,
647        args: &serde_json::Map<String, serde_json::Value>,
648    ) -> McpResponse {
649        let uri = match args.get("uri").and_then(|v| v.as_str()) {
650            Some(u) => u,
651            None => return self.error_response(id, -32602, "Missing 'uri'"),
652        };
653        let content = match args.get("content").and_then(|v| v.as_str()) {
654            Some(c) => c,
655            None => return self.error_response(id, -32602, "Missing 'content'"),
656        };
657        let namespace = args.get("namespace").and_then(|v| v.as_str()).unwrap_or("default");
658
659        // Chunk text
660        let processor = crate::processor::TextProcessor::new();
661        let chunks = processor.chunk_text(&content, 1000);
662
663        // Add to vector store
664        let store = match self.engine.get_store(namespace) {
665            Ok(s) => s,
666            Err(e) => return self.tool_result(id, &e.to_string(), true),
667        };
668
669        if let Some(ref vector_store) = store.vector_store {
670            let mut added_chunks = 0;
671            for (i, chunk) in chunks.iter().enumerate() {
672                let chunk_uri = if chunks.len() > 1 { format!("{}#chunk-{}", uri, i) } else { uri.to_string() };
673                match vector_store.add(&chunk_uri, chunk).await {
674                    Ok(_) => added_chunks += 1,
675                    Err(e) => {
676                        eprintln!("Failed to add chunk {}: {}", i, e);
677                    }
678                }
679            }
680            self.tool_result(id, &format!("Ingested text: {} ({} chars, {} chunks)", uri, content.len(), added_chunks), false)
681        } else {
682            self.tool_result(id, "Vector store not available", true)
683        }
684    }
685
686    async fn call_compact_vectors(
687        &self,
688        id: Option<serde_json::Value>,
689        args: &serde_json::Map<String, serde_json::Value>,
690    ) -> McpResponse {
691        let namespace = args.get("namespace").and_then(|v| v.as_str()).unwrap_or("default");
692
693        let store = match self.engine.get_store(namespace) {
694            Ok(s) => s,
695            Err(e) => return self.tool_result(id, &e.to_string(), true),
696        };
697
698        if let Some(ref vector_store) = store.vector_store {
699            match vector_store.compact() {
700                Ok(removed) => self.tool_result(id, &format!("Compaction complete: {} stale entries removed", removed), false),
701                Err(e) => self.tool_result(id, &format!("Compaction error: {}", e), true),
702            }
703        } else {
704            self.tool_result(id, "Vector store not available", true)
705        }
706    }
707
708    async fn call_vector_stats(
709        &self,
710        id: Option<serde_json::Value>,
711        args: &serde_json::Map<String, serde_json::Value>,
712    ) -> McpResponse {
713        let namespace = args.get("namespace").and_then(|v| v.as_str()).unwrap_or("default");
714
715        let store = match self.engine.get_store(namespace) {
716            Ok(s) => s,
717            Err(e) => return self.tool_result(id, &e.to_string(), true),
718        };
719
720        if let Some(ref vector_store) = store.vector_store {
721            let (active, stale, total) = vector_store.stats();
722            let msg = format!(
723                "Vector store stats:\n  Active: {}\n  Stale: {}\n  Total embeddings: {}",
724                active, stale, total
725            );
726            self.tool_result(id, &msg, false)
727        } else {
728            self.tool_result(id, "Vector store not available", true)
729        }
730    }
731
732    async fn call_disambiguate(
733        &self,
734        id: Option<serde_json::Value>,
735        args: &serde_json::Map<String, serde_json::Value>,
736    ) -> McpResponse {
737        let namespace = args.get("namespace").and_then(|v| v.as_str()).unwrap_or("default");
738        let threshold = args.get("threshold").and_then(|v| v.as_f64()).unwrap_or(0.8);
739
740        let store = match self.engine.get_store(namespace) {
741            Ok(s) => s,
742            Err(e) => return self.tool_result(id, &e.to_string(), true),
743        };
744
745        // Collect all URIs from the store
746        let uri_map = store.uri_to_id.read().unwrap();
747        let uris: Vec<String> = uri_map.keys().cloned().collect();
748        drop(uri_map);
749
750        let disambiguator = crate::disambiguation::EntityDisambiguator::new(threshold);
751        let suggestions = disambiguator.suggest_merges(&uris);
752
753        if suggestions.is_empty() {
754            self.tool_result(id, "No similar entities found above threshold", false)
755        } else {
756            let mut msg = format!("Found {} potential duplicates:\n", suggestions.len());
757            for (uri1, uri2, sim) in suggestions.iter().take(20) {
758                msg.push_str(&format!("  {:.2}%: {} <-> {}\n", sim * 100.0, uri1, uri2));
759            }
760            if suggestions.len() > 20 {
761                msg.push_str(&format!("  ... and {} more\n", suggestions.len() - 20));
762            }
763            self.tool_result(id, &msg, false)
764        }
765    }
766
767    // Legacy handlers for backward compatibility
768    async fn handle_legacy_ingest(&self, request: McpRequest) -> McpResponse {
769        let params = match request.params {
770            Some(p) => p,
771            None => return self.error_response(request.id, -32602, "Invalid params"),
772        };
773
774        if let (Some(sub), Some(pred), Some(obj)) = (
775            params.get("subject").and_then(|v| v.as_str()),
776            params.get("predicate").and_then(|v| v.as_str()),
777            params.get("object").and_then(|v| v.as_str()),
778        ) {
779            let namespace = params.get("namespace").and_then(|v| v.as_str()).unwrap_or("default");
780            let triple = Triple {
781                subject: sub.to_string(),
782                predicate: pred.to_string(),
783                object: obj.to_string(),
784                provenance: Some(Provenance {
785                    source: "mcp".to_string(),
786                    timestamp: "".to_string(),
787                    method: "stdio".to_string(),
788                }),
789                embedding: vec![],
790            };
791
792            let req = Request::new(IngestRequest {
793                triples: vec![triple],
794                namespace: namespace.to_string(),
795            });
796
797            match self.engine.ingest_triples(req).await {
798                Ok(_) => McpResponse {
799                    jsonrpc: "2.0".to_string(),
800                    id: request.id,
801                    result: Some(serde_json::to_value("Ingested").unwrap()),
802                    error: None,
803                },
804                Err(e) => self.error_response(request.id, -32000, &e.to_string()),
805            }
806        } else {
807            self.error_response(request.id, -32602, "Invalid params")
808        }
809    }
810
811    async fn handle_legacy_ingest_file(&self, request: McpRequest) -> McpResponse {
812        let params = match request.params {
813            Some(p) => p,
814            None => return self.error_response(request.id, -32602, "Invalid params: 'path' required"),
815        };
816
817        if let Some(path) = params.get("path").and_then(|v| v.as_str()) {
818            let namespace = params.get("namespace").and_then(|v| v.as_str()).unwrap_or("default");
819
820            let req = Request::new(IngestFileRequest {
821                file_path: path.to_string(),
822                namespace: namespace.to_string(),
823            });
824
825            match self.engine.ingest_file(req).await {
826                Ok(resp) => {
827                    let inner = resp.into_inner();
828                    McpResponse {
829                        jsonrpc: "2.0".to_string(),
830                        id: request.id,
831                        result: Some(serde_json::to_value(format!(
832                            "Ingested {} triples from {}",
833                            inner.edges_added, path
834                        )).unwrap()),
835                        error: None,
836                    }
837                }
838                Err(e) => self.error_response(request.id, -32000, &e.to_string()),
839            }
840        } else {
841            self.error_response(request.id, -32602, "Invalid params: 'path' required")
842        }
843    }
844
845    fn error_response(&self, id: Option<serde_json::Value>, code: i32, message: &str) -> McpResponse {
846        McpResponse {
847            jsonrpc: "2.0".to_string(),
848            id,
849            result: None,
850            error: Some(McpError {
851                code,
852                message: message.to_string(),
853                data: None,
854            }),
855        }
856    }
857
858    fn tool_result(&self, id: Option<serde_json::Value>, text: &str, is_error: bool) -> McpResponse {
859        let result = CallToolResult {
860            content: vec![Content {
861                content_type: "text".to_string(),
862                text: text.to_string(),
863            }],
864            is_error: if is_error { Some(true) } else { None },
865        };
866        McpResponse {
867            jsonrpc: "2.0".to_string(),
868            id,
869            result: Some(serde_json::to_value(result).unwrap()),
870            error: None,
871        }
872    }
873}