Skip to main content

synapse_core/
mcp_stdio.rs

1use crate::mcp_types::{
2    CallToolResult, Content, ListToolsResult, McpError, McpRequest, McpResponse, Tool,
3};
4use crate::server::proto::semantic_engine_server::SemanticEngine;
5use crate::server::proto::{
6    HybridSearchRequest, IngestFileRequest, IngestRequest, Provenance, ReasoningRequest,
7    ReasoningStrategy, SearchMode, SparqlRequest, Triple,
8};
9use crate::server::MySemanticEngine;
10use std::sync::Arc;
11use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
12use tonic::Request;
13
14pub struct McpStdioServer {
15    engine: Arc<MySemanticEngine>,
16}
17
18impl McpStdioServer {
19    pub fn new(engine: Arc<MySemanticEngine>) -> Self {
20        Self { engine }
21    }
22
23    pub async fn run(&self) -> Result<(), Box<dyn std::error::Error>> {
24        let mut reader = BufReader::new(tokio::io::stdin());
25        let mut writer = tokio::io::stdout();
26
27        loop {
28            let mut line = String::new();
29            if reader.read_line(&mut line).await? == 0 {
30                break;
31            }
32
33            let trimmed = line.trim();
34            if trimmed.is_empty() {
35                continue;
36            }
37
38            if let Ok(request) = serde_json::from_str::<McpRequest>(trimmed) {
39                let response = self.handle_request(request).await;
40                let response_json = serde_json::to_string(&response)? + "\n";
41                writer.write_all(response_json.as_bytes()).await?;
42                writer.flush().await?;
43            }
44        }
45
46        Ok(())
47    }
48
49    fn get_tools() -> Vec<Tool> {
50        vec![
51            Tool {
52                name: "ingest_triples".to_string(),
53                description: Some(
54                    "Ingest one or more RDF triples into the knowledge graph".to_string(),
55                ),
56                input_schema: serde_json::json!({
57                    "type": "object",
58                    "properties": {
59                        "triples": {
60                            "type": "array",
61                            "items": {
62                                "type": "object",
63                                "properties": {
64                                    "subject": { "type": "string" },
65                                    "predicate": { "type": "string" },
66                                    "object": { "type": "string" }
67                                },
68                                "required": ["subject", "predicate", "object"]
69                            }
70                        },
71                        "namespace": { "type": "string", "default": "default" }
72                    },
73                    "required": ["triples"]
74                }),
75            },
76            Tool {
77                name: "ingest_file".to_string(),
78                description: Some(
79                    "Ingest a CSV or Markdown file into the knowledge graph".to_string(),
80                ),
81                input_schema: serde_json::json!({
82                    "type": "object",
83                    "properties": {
84                        "path": { "type": "string", "description": "Path to the file" },
85                        "namespace": { "type": "string", "default": "default" }
86                    },
87                    "required": ["path"]
88                }),
89            },
90            Tool {
91                name: "sparql_query".to_string(),
92                description: Some("Execute a SPARQL query against the knowledge graph".to_string()),
93                input_schema: serde_json::json!({
94                    "type": "object",
95                    "properties": {
96                        "query": { "type": "string", "description": "SPARQL query string" },
97                        "namespace": { "type": "string", "default": "default" }
98                    },
99                    "required": ["query"]
100                }),
101            },
102            Tool {
103                name: "hybrid_search".to_string(),
104                description: Some("Perform a hybrid vector + graph search".to_string()),
105                input_schema: serde_json::json!({
106                    "type": "object",
107                    "properties": {
108                        "query": { "type": "string", "description": "Natural language query" },
109                        "namespace": { "type": "string", "default": "default" },
110                        "vector_k": { "type": "integer", "default": 10 },
111                        "graph_depth": { "type": "integer", "default": 1 },
112                        "limit": { "type": "integer", "default": 20 }
113                    },
114                    "required": ["query"]
115                }),
116            },
117            Tool {
118                name: "apply_reasoning".to_string(),
119                description: Some(
120                    "Apply RDFS or OWL-RL reasoning to infer new triples".to_string(),
121                ),
122                input_schema: serde_json::json!({
123                    "type": "object",
124                    "properties": {
125                        "namespace": { "type": "string", "default": "default" },
126                        "strategy": { "type": "string", "enum": ["rdfs", "owlrl"], "default": "rdfs" },
127                        "materialize": { "type": "boolean", "default": false }
128                    }
129                }),
130            },
131            Tool {
132                name: "get_neighbors".to_string(),
133                description: Some(
134                    "Get neighboring nodes connected to a given URI in the graph".to_string(),
135                ),
136                input_schema: serde_json::json!({
137                    "type": "object",
138                    "properties": {
139                        "uri": { "type": "string", "description": "URI of the entity to find neighbors for" },
140                        "namespace": { "type": "string", "default": "default" },
141                        "direction": { "type": "string", "enum": ["outgoing", "incoming", "both"], "default": "outgoing" }
142                    },
143                    "required": ["uri"]
144                }),
145            },
146            Tool {
147                name: "list_triples".to_string(),
148                description: Some(
149                    "List all triples in a namespace (useful for debugging/exploration)"
150                        .to_string(),
151                ),
152                input_schema: serde_json::json!({
153                    "type": "object",
154                    "properties": {
155                        "namespace": { "type": "string", "default": "default" },
156                        "limit": { "type": "integer", "default": 100 }
157                    }
158                }),
159            },
160            Tool {
161                name: "delete_namespace".to_string(),
162                description: Some("Delete all data in a namespace".to_string()),
163                input_schema: serde_json::json!({
164                    "type": "object",
165                    "properties": {
166                        "namespace": { "type": "string", "description": "Namespace to delete" }
167                    },
168                    "required": ["namespace"]
169                }),
170            },
171            Tool {
172                name: "ingest_url".to_string(),
173                description: Some(
174                    "Scrape a web page and add its content to the vector store for RAG retrieval"
175                        .to_string(),
176                ),
177                input_schema: serde_json::json!({
178                    "type": "object",
179                    "properties": {
180                        "url": { "type": "string", "description": "URL to scrape and ingest" },
181                        "namespace": { "type": "string", "default": "default" }
182                    },
183                    "required": ["url"]
184                }),
185            },
186            Tool {
187                name: "ingest_text".to_string(),
188                description: Some(
189                    "Add arbitrary text content to the vector store for RAG retrieval".to_string(),
190                ),
191                input_schema: serde_json::json!({
192                    "type": "object",
193                    "properties": {
194                        "uri": { "type": "string", "description": "Custom URI identifier for this text" },
195                        "content": { "type": "string", "description": "Text content to embed and store" },
196                        "namespace": { "type": "string", "default": "default" }
197                    },
198                    "required": ["uri", "content"]
199                }),
200            },
201            Tool {
202                name: "compact_vectors".to_string(),
203                description: Some("Compact the vector index by removing stale entries".to_string()),
204                input_schema: serde_json::json!({
205                    "type": "object",
206                    "properties": {
207                        "namespace": { "type": "string", "default": "default" }
208                    }
209                }),
210            },
211            Tool {
212                name: "vector_stats".to_string(),
213                description: Some("Get vector store statistics (active, stale, total)".to_string()),
214                input_schema: serde_json::json!({
215                    "type": "object",
216                    "properties": {
217                        "namespace": { "type": "string", "default": "default" }
218                    }
219                }),
220            },
221            Tool {
222                name: "disambiguate".to_string(),
223                description: Some("Find similar entities that might be duplicates".to_string()),
224                input_schema: serde_json::json!({
225                    "type": "object",
226                    "properties": {
227                        "namespace": { "type": "string", "default": "default" },
228                        "threshold": { "type": "number", "default": 0.8, "description": "Similarity threshold 0.0-1.0" }
229                    }
230                }),
231            },
232        ]
233    }
234
235    async fn handle_request(&self, request: McpRequest) -> McpResponse {
236        match request.method.as_str() {
237            "initialize" => {
238                // MCP protocol initialization
239                McpResponse {
240                    jsonrpc: "2.0".to_string(),
241                    id: request.id,
242                    result: Some(serde_json::json!({
243                        "protocolVersion": "2024-11-05",
244                        "capabilities": {
245                            "tools": {}
246                        },
247                        "serverInfo": {
248                        "name": "synapse",
249                        "version": "0.5.0"
250                    }
251                    })),
252                    error: None,
253                }
254            }
255            "notifications/initialized" | "initialized" => {
256                // Client confirms initialization - just acknowledge
257                McpResponse {
258                    jsonrpc: "2.0".to_string(),
259                    id: request.id,
260                    result: Some(serde_json::json!({})),
261                    error: None,
262                }
263            }
264            "tools/list" => {
265                let result = ListToolsResult {
266                    tools: Self::get_tools(),
267                };
268                McpResponse {
269                    jsonrpc: "2.0".to_string(),
270                    id: request.id,
271                    result: Some(serde_json::to_value(result).unwrap()),
272                    error: None,
273                }
274            }
275            "tools/call" => self.handle_tool_call(request).await,
276            // Legacy methods for backwards compatibility
277            "ingest" => self.handle_legacy_ingest(request).await,
278            "ingest_file" => self.handle_legacy_ingest_file(request).await,
279            _ => McpResponse {
280                jsonrpc: "2.0".to_string(),
281                id: request.id,
282                result: None,
283                error: Some(McpError {
284                    code: -32601,
285                    message: format!("Method not found: {}", request.method),
286                    data: None,
287                }),
288            },
289        }
290    }
291
292    async fn handle_tool_call(&self, request: McpRequest) -> McpResponse {
293        let params = match request.params {
294            Some(p) => p,
295            None => return self.error_response(request.id, -32602, "Missing params"),
296        };
297
298        let tool_name = match params.get("name").and_then(|v| v.as_str()) {
299            Some(n) => n,
300            None => return self.error_response(request.id, -32602, "Missing tool name"),
301        };
302
303        let arguments = params
304            .get("arguments")
305            .and_then(|v| v.as_object())
306            .cloned()
307            .unwrap_or_default();
308
309        match tool_name {
310            "ingest_triples" => self.call_ingest_triples(request.id, &arguments).await,
311            "ingest_file" => self.call_ingest_file(request.id, &arguments).await,
312            "sparql_query" => self.call_sparql_query(request.id, &arguments).await,
313            "hybrid_search" => self.call_hybrid_search(request.id, &arguments).await,
314            "apply_reasoning" => self.call_apply_reasoning(request.id, &arguments).await,
315            "get_neighbors" => self.call_get_neighbors(request.id, &arguments).await,
316            "list_triples" => self.call_list_triples(request.id, &arguments).await,
317            "delete_namespace" => self.call_delete_namespace(request.id, &arguments).await,
318            "ingest_url" => self.call_ingest_url(request.id, &arguments).await,
319            "ingest_text" => self.call_ingest_text(request.id, &arguments).await,
320            "compact_vectors" => self.call_compact_vectors(request.id, &arguments).await,
321            "vector_stats" => self.call_vector_stats(request.id, &arguments).await,
322            "disambiguate" => self.call_disambiguate(request.id, &arguments).await,
323            _ => self.error_response(request.id, -32602, &format!("Unknown tool: {}", tool_name)),
324        }
325    }
326
327    async fn call_ingest_triples(
328        &self,
329        id: Option<serde_json::Value>,
330        args: &serde_json::Map<String, serde_json::Value>,
331    ) -> McpResponse {
332        let namespace = args
333            .get("namespace")
334            .and_then(|v| v.as_str())
335            .unwrap_or("default");
336        let triples_array = match args.get("triples").and_then(|v| v.as_array()) {
337            Some(t) => t,
338            None => return self.error_response(id, -32602, "Missing 'triples' array"),
339        };
340
341        let mut triples = Vec::new();
342        for t in triples_array {
343            if let (Some(s), Some(p), Some(o)) = (
344                t.get("subject").and_then(|v| v.as_str()),
345                t.get("predicate").and_then(|v| v.as_str()),
346                t.get("object").and_then(|v| v.as_str()),
347            ) {
348                triples.push(Triple {
349                    subject: s.to_string(),
350                    predicate: p.to_string(),
351                    object: o.to_string(),
352                    provenance: Some(Provenance {
353                        source: "mcp".to_string(),
354                        timestamp: "".to_string(),
355                        method: "tools/call".to_string(),
356                    }),
357                    embedding: vec![],
358                });
359            }
360        }
361
362        let req = Request::new(IngestRequest {
363            triples,
364            namespace: namespace.to_string(),
365        });
366
367        match self.engine.ingest_triples(req).await {
368            Ok(resp) => {
369                let inner = resp.into_inner();
370                self.tool_result(
371                    id,
372                    &format!("Ingested {} triples", inner.edges_added),
373                    false,
374                )
375            }
376            Err(e) => self.tool_result(id, &e.to_string(), true),
377        }
378    }
379
380    async fn call_ingest_file(
381        &self,
382        id: Option<serde_json::Value>,
383        args: &serde_json::Map<String, serde_json::Value>,
384    ) -> McpResponse {
385        let path = match args.get("path").and_then(|v| v.as_str()) {
386            Some(p) => p,
387            None => return self.error_response(id, -32602, "Missing 'path'"),
388        };
389        let namespace = args
390            .get("namespace")
391            .and_then(|v| v.as_str())
392            .unwrap_or("default");
393
394        let req = Request::new(IngestFileRequest {
395            file_path: path.to_string(),
396            namespace: namespace.to_string(),
397        });
398
399        match self.engine.ingest_file(req).await {
400            Ok(resp) => {
401                let inner = resp.into_inner();
402                self.tool_result(
403                    id,
404                    &format!("Ingested {} triples from {}", inner.edges_added, path),
405                    false,
406                )
407            }
408            Err(e) => self.tool_result(id, &e.to_string(), true),
409        }
410    }
411
412    async fn call_sparql_query(
413        &self,
414        id: Option<serde_json::Value>,
415        args: &serde_json::Map<String, serde_json::Value>,
416    ) -> McpResponse {
417        let query = match args.get("query").and_then(|v| v.as_str()) {
418            Some(q) => q,
419            None => return self.error_response(id, -32602, "Missing 'query'"),
420        };
421        let namespace = args
422            .get("namespace")
423            .and_then(|v| v.as_str())
424            .unwrap_or("default");
425
426        let req = Request::new(SparqlRequest {
427            query: query.to_string(),
428            namespace: namespace.to_string(),
429        });
430
431        match self.engine.query_sparql(req).await {
432            Ok(resp) => self.tool_result(id, &resp.into_inner().results_json, false),
433            Err(e) => self.tool_result(id, &e.to_string(), true),
434        }
435    }
436
437    async fn call_hybrid_search(
438        &self,
439        id: Option<serde_json::Value>,
440        args: &serde_json::Map<String, serde_json::Value>,
441    ) -> McpResponse {
442        let query = match args.get("query").and_then(|v| v.as_str()) {
443            Some(q) => q,
444            None => return self.error_response(id, -32602, "Missing 'query'"),
445        };
446        let namespace = args
447            .get("namespace")
448            .and_then(|v| v.as_str())
449            .unwrap_or("default");
450        let vector_k = args.get("vector_k").and_then(|v| v.as_u64()).unwrap_or(10) as u32;
451        let graph_depth = args
452            .get("graph_depth")
453            .and_then(|v| v.as_u64())
454            .unwrap_or(1) as u32;
455        let limit = args.get("limit").and_then(|v| v.as_u64()).unwrap_or(20) as u32;
456
457        let req = Request::new(HybridSearchRequest {
458            query: query.to_string(),
459            namespace: namespace.to_string(),
460            vector_k,
461            graph_depth,
462            mode: SearchMode::Hybrid as i32,
463            limit,
464        });
465
466        match self.engine.hybrid_search(req).await {
467            Ok(resp) => {
468                let results = resp.into_inner().results;
469                // Manually serialize since proto SearchResult doesn't derive Serialize
470                let json_results: Vec<serde_json::Value> = results
471                    .iter()
472                    .map(|r| {
473                        serde_json::json!({
474                            "node_id": r.node_id,
475                            "score": r.score,
476                            "content": r.content,
477                            "uri": r.uri
478                        })
479                    })
480                    .collect();
481                let json = serde_json::to_string_pretty(&json_results).unwrap_or_default();
482                self.tool_result(id, &json, false)
483            }
484            Err(e) => self.tool_result(id, &e.to_string(), true),
485        }
486    }
487
488    async fn call_apply_reasoning(
489        &self,
490        id: Option<serde_json::Value>,
491        args: &serde_json::Map<String, serde_json::Value>,
492    ) -> McpResponse {
493        let namespace = args
494            .get("namespace")
495            .and_then(|v| v.as_str())
496            .unwrap_or("default");
497        let strategy_str = args
498            .get("strategy")
499            .and_then(|v| v.as_str())
500            .unwrap_or("rdfs");
501        let materialize = args
502            .get("materialize")
503            .and_then(|v| v.as_bool())
504            .unwrap_or(false);
505
506        let strategy = match strategy_str.to_lowercase().as_str() {
507            "owlrl" | "owl-rl" => ReasoningStrategy::Owlrl as i32,
508            _ => ReasoningStrategy::Rdfs as i32,
509        };
510
511        let req = Request::new(ReasoningRequest {
512            namespace: namespace.to_string(),
513            strategy,
514            materialize,
515        });
516
517        match self.engine.apply_reasoning(req).await {
518            Ok(resp) => {
519                let inner = resp.into_inner();
520                self.tool_result(id, &inner.message, !inner.success)
521            }
522            Err(e) => self.tool_result(id, &e.to_string(), true),
523        }
524    }
525
526    async fn call_get_neighbors(
527        &self,
528        id: Option<serde_json::Value>,
529        args: &serde_json::Map<String, serde_json::Value>,
530    ) -> McpResponse {
531        let uri = match args.get("uri").and_then(|v| v.as_str()) {
532            Some(u) => u,
533            None => return self.error_response(id, -32602, "Missing 'uri'"),
534        };
535        let namespace = args
536            .get("namespace")
537            .and_then(|v| v.as_str())
538            .unwrap_or("default");
539        let direction = args
540            .get("direction")
541            .and_then(|v| v.as_str())
542            .unwrap_or("outgoing");
543
544        let store = match self.engine.get_store(namespace) {
545            Ok(s) => s,
546            Err(e) => return self.tool_result(id, &e.to_string(), true),
547        };
548
549        let mut neighbors = Vec::new();
550
551        // Query outgoing edges (URI as subject)
552        if direction == "outgoing" || direction == "both" {
553            if let Ok(subj) = oxigraph::model::NamedNodeRef::new(uri) {
554                for q in store
555                    .store
556                    .quads_for_pattern(Some(subj.into()), None, None, None)
557                    .flatten()
558                {
559                    neighbors.push(serde_json::json!({
560                        "direction": "outgoing",
561                        "predicate": q.predicate.to_string(),
562                        "target": q.object.to_string()
563                    }));
564                }
565            }
566        }
567
568        // Query incoming edges (URI as object)
569        if direction == "incoming" || direction == "both" {
570            if let Ok(obj) = oxigraph::model::NamedNodeRef::new(uri) {
571                for q in store
572                    .store
573                    .quads_for_pattern(None, None, Some(obj.into()), None)
574                    .flatten()
575                {
576                    neighbors.push(serde_json::json!({
577                        "direction": "incoming",
578                        "predicate": q.predicate.to_string(),
579                        "source": q.subject.to_string()
580                    }));
581                }
582            }
583        }
584
585        let json = serde_json::to_string_pretty(&neighbors).unwrap_or_default();
586        self.tool_result(id, &json, false)
587    }
588
589    async fn call_list_triples(
590        &self,
591        id: Option<serde_json::Value>,
592        args: &serde_json::Map<String, serde_json::Value>,
593    ) -> McpResponse {
594        let namespace = args
595            .get("namespace")
596            .and_then(|v| v.as_str())
597            .unwrap_or("default");
598        let limit = args.get("limit").and_then(|v| v.as_u64()).unwrap_or(100) as usize;
599
600        let store = match self.engine.get_store(namespace) {
601            Ok(s) => s,
602            Err(e) => return self.tool_result(id, &e.to_string(), true),
603        };
604
605        let mut triples = Vec::new();
606        for q in store.store.iter().take(limit).flatten() {
607            triples.push(serde_json::json!({
608                "subject": q.subject.to_string(),
609                "predicate": q.predicate.to_string(),
610                "object": q.object.to_string()
611            }));
612        }
613
614        let json = serde_json::to_string_pretty(&triples).unwrap_or_default();
615        self.tool_result(id, &json, false)
616    }
617
618    async fn call_delete_namespace(
619        &self,
620        id: Option<serde_json::Value>,
621        args: &serde_json::Map<String, serde_json::Value>,
622    ) -> McpResponse {
623        let namespace = match args.get("namespace").and_then(|v| v.as_str()) {
624            Some(n) => n,
625            None => return self.error_response(id, -32602, "Missing 'namespace'"),
626        };
627
628        let req = Request::new(crate::server::proto::EmptyRequest {
629            namespace: namespace.to_string(),
630        });
631
632        match self.engine.delete_namespace_data(req).await {
633            Ok(resp) => {
634                let inner = resp.into_inner();
635                self.tool_result(id, &inner.message, !inner.success)
636            }
637            Err(e) => self.tool_result(id, &e.to_string(), true),
638        }
639    }
640
641    async fn call_ingest_url(
642        &self,
643        id: Option<serde_json::Value>,
644        args: &serde_json::Map<String, serde_json::Value>,
645    ) -> McpResponse {
646        let url = match args.get("url").and_then(|v| v.as_str()) {
647            Some(u) => u,
648            None => return self.error_response(id, -32602, "Missing 'url'"),
649        };
650        let namespace = args
651            .get("namespace")
652            .and_then(|v| v.as_str())
653            .unwrap_or("default");
654
655        // Fetch URL content
656        let client = reqwest::Client::new();
657        let response = match client.get(url).send().await {
658            Ok(r) => r,
659            Err(e) => return self.tool_result(id, &format!("Failed to fetch URL: {}", e), true),
660        };
661
662        if !response.status().is_success() {
663            return self.tool_result(id, &format!("HTTP error: {}", response.status()), true);
664        }
665
666        let html = match response.text().await {
667            Ok(t) => t,
668            Err(e) => {
669                return self.tool_result(id, &format!("Failed to read response: {}", e), true)
670            }
671        };
672
673        // HTML to text conversion with Regex
674        let script_re = regex::Regex::new(r"(?s)<script.*?>.*?</script>").unwrap();
675        let style_re = regex::Regex::new(r"(?s)<style.*?>.*?</style>").unwrap();
676        let tag_re = regex::Regex::new(r"<[^>]*>").unwrap();
677
678        let no_script = script_re.replace_all(&html, " ");
679        let no_style = style_re.replace_all(&no_script, " ");
680        let text_content = tag_re.replace_all(&no_style, " ");
681
682        let text = text_content
683            .split_whitespace()
684            .collect::<Vec<_>>()
685            .join(" ");
686
687        // Chunk text with overlap
688        let processor = crate::processor::TextProcessor::new();
689        let chunks = processor.chunk_text(&text, 1000, 150);
690
691        // Add to vector store
692        let store = match self.engine.get_store(namespace) {
693            Ok(s) => s,
694            Err(e) => return self.tool_result(id, &e.to_string(), true),
695        };
696
697        if let Some(ref vector_store) = store.vector_store {
698            let mut added_chunks = 0;
699            for (i, chunk) in chunks.iter().enumerate() {
700                let chunk_uri = format!("{}#chunk-{}", url, i);
701                match vector_store.add(&chunk_uri, chunk).await {
702                    Ok(_) => added_chunks += 1,
703                    Err(e) => {
704                        eprintln!("Failed to add chunk {}: {}", i, e);
705                    }
706                }
707            }
708            self.tool_result(
709                id,
710                &format!(
711                    "Ingested URL: {} ({} chars, {} chunks)",
712                    url,
713                    text.len(),
714                    added_chunks
715                ),
716                false,
717            )
718        } else {
719            self.tool_result(id, "Vector store not available", true)
720        }
721    }
722
723    async fn call_ingest_text(
724        &self,
725        id: Option<serde_json::Value>,
726        args: &serde_json::Map<String, serde_json::Value>,
727    ) -> McpResponse {
728        let uri = match args.get("uri").and_then(|v| v.as_str()) {
729            Some(u) => u,
730            None => return self.error_response(id, -32602, "Missing 'uri'"),
731        };
732        let content = match args.get("content").and_then(|v| v.as_str()) {
733            Some(c) => c,
734            None => return self.error_response(id, -32602, "Missing 'content'"),
735        };
736        let namespace = args
737            .get("namespace")
738            .and_then(|v| v.as_str())
739            .unwrap_or("default");
740
741        // Chunk text with overlap
742        let processor = crate::processor::TextProcessor::new();
743        let chunks = processor.chunk_text(&content, 1000, 150);
744
745        // Add to vector store
746        let store = match self.engine.get_store(namespace) {
747            Ok(s) => s,
748            Err(e) => return self.tool_result(id, &e.to_string(), true),
749        };
750
751        if let Some(ref vector_store) = store.vector_store {
752            let mut added_chunks = 0;
753            for (i, chunk) in chunks.iter().enumerate() {
754                let chunk_uri = if chunks.len() > 1 {
755                    format!("{}#chunk-{}", uri, i)
756                } else {
757                    uri.to_string()
758                };
759                match vector_store.add(&chunk_uri, chunk).await {
760                    Ok(_) => added_chunks += 1,
761                    Err(e) => {
762                        eprintln!("Failed to add chunk {}: {}", i, e);
763                    }
764                }
765            }
766            self.tool_result(
767                id,
768                &format!(
769                    "Ingested text: {} ({} chars, {} chunks)",
770                    uri,
771                    content.len(),
772                    added_chunks
773                ),
774                false,
775            )
776        } else {
777            self.tool_result(id, "Vector store not available", true)
778        }
779    }
780
781    async fn call_compact_vectors(
782        &self,
783        id: Option<serde_json::Value>,
784        args: &serde_json::Map<String, serde_json::Value>,
785    ) -> McpResponse {
786        let namespace = args
787            .get("namespace")
788            .and_then(|v| v.as_str())
789            .unwrap_or("default");
790
791        let store = match self.engine.get_store(namespace) {
792            Ok(s) => s,
793            Err(e) => return self.tool_result(id, &e.to_string(), true),
794        };
795
796        if let Some(ref vector_store) = store.vector_store {
797            match vector_store.compact() {
798                Ok(removed) => self.tool_result(
799                    id,
800                    &format!("Compaction complete: {} stale entries removed", removed),
801                    false,
802                ),
803                Err(e) => self.tool_result(id, &format!("Compaction error: {}", e), true),
804            }
805        } else {
806            self.tool_result(id, "Vector store not available", true)
807        }
808    }
809
810    async fn call_vector_stats(
811        &self,
812        id: Option<serde_json::Value>,
813        args: &serde_json::Map<String, serde_json::Value>,
814    ) -> McpResponse {
815        let namespace = args
816            .get("namespace")
817            .and_then(|v| v.as_str())
818            .unwrap_or("default");
819
820        let store = match self.engine.get_store(namespace) {
821            Ok(s) => s,
822            Err(e) => return self.tool_result(id, &e.to_string(), true),
823        };
824
825        if let Some(ref vector_store) = store.vector_store {
826            let (active, stale, total) = vector_store.stats();
827            let msg = format!(
828                "Vector store stats:\n  Active: {}\n  Stale: {}\n  Total embeddings: {}",
829                active, stale, total
830            );
831            self.tool_result(id, &msg, false)
832        } else {
833            self.tool_result(id, "Vector store not available", true)
834        }
835    }
836
837    async fn call_disambiguate(
838        &self,
839        id: Option<serde_json::Value>,
840        args: &serde_json::Map<String, serde_json::Value>,
841    ) -> McpResponse {
842        let namespace = args
843            .get("namespace")
844            .and_then(|v| v.as_str())
845            .unwrap_or("default");
846        let threshold = args
847            .get("threshold")
848            .and_then(|v| v.as_f64())
849            .unwrap_or(0.8);
850
851        let store = match self.engine.get_store(namespace) {
852            Ok(s) => s,
853            Err(e) => return self.tool_result(id, &e.to_string(), true),
854        };
855
856        // Collect all URIs from the store
857        let uri_map = store.uri_to_id.read().unwrap();
858        let uris: Vec<String> = uri_map.keys().cloned().collect();
859        drop(uri_map);
860
861        let disambiguator = crate::disambiguation::EntityDisambiguator::new(threshold);
862        let suggestions = disambiguator.suggest_merges(&uris);
863
864        if suggestions.is_empty() {
865            self.tool_result(id, "No similar entities found above threshold", false)
866        } else {
867            let mut msg = format!("Found {} potential duplicates:\n", suggestions.len());
868            for (uri1, uri2, sim) in suggestions.iter().take(20) {
869                msg.push_str(&format!("  {:.2}%: {} <-> {}\n", sim * 100.0, uri1, uri2));
870            }
871            if suggestions.len() > 20 {
872                msg.push_str(&format!("  ... and {} more\n", suggestions.len() - 20));
873            }
874            self.tool_result(id, &msg, false)
875        }
876    }
877
878    // Legacy handlers for backward compatibility
879    async fn handle_legacy_ingest(&self, request: McpRequest) -> McpResponse {
880        let params = match request.params {
881            Some(p) => p,
882            None => return self.error_response(request.id, -32602, "Invalid params"),
883        };
884
885        if let (Some(sub), Some(pred), Some(obj)) = (
886            params.get("subject").and_then(|v| v.as_str()),
887            params.get("predicate").and_then(|v| v.as_str()),
888            params.get("object").and_then(|v| v.as_str()),
889        ) {
890            let namespace = params
891                .get("namespace")
892                .and_then(|v| v.as_str())
893                .unwrap_or("default");
894            let triple = Triple {
895                subject: sub.to_string(),
896                predicate: pred.to_string(),
897                object: obj.to_string(),
898                provenance: Some(Provenance {
899                    source: "mcp".to_string(),
900                    timestamp: "".to_string(),
901                    method: "stdio".to_string(),
902                }),
903                embedding: vec![],
904            };
905
906            let req = Request::new(IngestRequest {
907                triples: vec![triple],
908                namespace: namespace.to_string(),
909            });
910
911            match self.engine.ingest_triples(req).await {
912                Ok(_) => McpResponse {
913                    jsonrpc: "2.0".to_string(),
914                    id: request.id,
915                    result: Some(serde_json::to_value("Ingested").unwrap()),
916                    error: None,
917                },
918                Err(e) => self.error_response(request.id, -32000, &e.to_string()),
919            }
920        } else {
921            self.error_response(request.id, -32602, "Invalid params")
922        }
923    }
924
925    async fn handle_legacy_ingest_file(&self, request: McpRequest) -> McpResponse {
926        let params = match request.params {
927            Some(p) => p,
928            None => {
929                return self.error_response(request.id, -32602, "Invalid params: 'path' required")
930            }
931        };
932
933        if let Some(path) = params.get("path").and_then(|v| v.as_str()) {
934            let namespace = params
935                .get("namespace")
936                .and_then(|v| v.as_str())
937                .unwrap_or("default");
938
939            let req = Request::new(IngestFileRequest {
940                file_path: path.to_string(),
941                namespace: namespace.to_string(),
942            });
943
944            match self.engine.ingest_file(req).await {
945                Ok(resp) => {
946                    let inner = resp.into_inner();
947                    McpResponse {
948                        jsonrpc: "2.0".to_string(),
949                        id: request.id,
950                        result: Some(
951                            serde_json::to_value(format!(
952                                "Ingested {} triples from {}",
953                                inner.edges_added, path
954                            ))
955                            .unwrap(),
956                        ),
957                        error: None,
958                    }
959                }
960                Err(e) => self.error_response(request.id, -32000, &e.to_string()),
961            }
962        } else {
963            self.error_response(request.id, -32602, "Invalid params: 'path' required")
964        }
965    }
966
967    fn error_response(
968        &self,
969        id: Option<serde_json::Value>,
970        code: i32,
971        message: &str,
972    ) -> McpResponse {
973        McpResponse {
974            jsonrpc: "2.0".to_string(),
975            id,
976            result: None,
977            error: Some(McpError {
978                code,
979                message: message.to_string(),
980                data: None,
981            }),
982        }
983    }
984
985    fn tool_result(
986        &self,
987        id: Option<serde_json::Value>,
988        text: &str,
989        is_error: bool,
990    ) -> McpResponse {
991        let result = CallToolResult {
992            content: vec![Content {
993                content_type: "text".to_string(),
994                text: text.to_string(),
995            }],
996            is_error: if is_error { Some(true) } else { None },
997        };
998        McpResponse {
999            jsonrpc: "2.0".to_string(),
1000            id,
1001            result: Some(serde_json::to_value(result).unwrap()),
1002            error: None,
1003        }
1004    }
1005}