Skip to main content

synapse_core/
mcp_stdio.rs

1use crate::mcp_types::{
2    CallToolResult, Content, DegreeResult, DisambiguationItem, DisambiguationResult,
3    IngestToolResult, ListToolsResult, McpError, McpRequest, McpResponse, NeighborItem,
4    NeighborsToolResult, ReasoningToolResult, SearchResultItem, SearchToolResult,
5    SimpleSuccessResult, StatsToolResult, Tool, TripleItem, TriplesToolResult,
6};
7use crate::server::proto::semantic_engine_server::SemanticEngine;
8use crate::server::proto::{
9    HybridSearchRequest, IngestFileRequest, IngestRequest, Provenance, ReasoningRequest,
10    ReasoningStrategy, SearchMode, SparqlRequest, Triple,
11};
12use crate::server::MySemanticEngine;
13use jsonschema::JSONSchema;
14use std::sync::Arc;
15use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
16use tonic::Request;
17
18pub struct McpStdioServer {
19    engine: Arc<MySemanticEngine>,
20}
21
22impl McpStdioServer {
23    pub fn new(engine: Arc<MySemanticEngine>) -> Self {
24        Self { engine }
25    }
26
27    pub async fn run(&self) -> Result<(), Box<dyn std::error::Error>> {
28        let mut reader = BufReader::new(tokio::io::stdin());
29        let mut writer = tokio::io::stdout();
30
31        loop {
32            let mut line = String::new();
33            if reader.read_line(&mut line).await? == 0 {
34                break;
35            }
36
37            let trimmed = line.trim();
38            if trimmed.is_empty() {
39                continue;
40            }
41
42            if let Ok(request) = serde_json::from_str::<McpRequest>(trimmed) {
43                let response = self.handle_request(request).await;
44                let response_json = serde_json::to_string(&response)? + "\n";
45                writer.write_all(response_json.as_bytes()).await?;
46                writer.flush().await?;
47            }
48        }
49
50        Ok(())
51    }
52
53    fn get_tools() -> Vec<Tool> {
54        vec![
55            Tool {
56                name: "ingest_triples".to_string(),
57                description: Some(
58                    "Ingest one or more RDF triples into the knowledge graph".to_string(),
59                ),
60                input_schema: serde_json::json!({
61                    "type": "object",
62                    "properties": {
63                        "triples": {
64                            "type": "array",
65                            "items": {
66                                "type": "object",
67                                "properties": {
68                                    "subject": { "type": "string" },
69                                    "predicate": { "type": "string" },
70                                    "object": { "type": "string" }
71                                },
72                                "required": ["subject", "predicate", "object"]
73                            }
74                        },
75                        "namespace": { "type": "string", "default": "default" }
76                    },
77                    "required": ["triples"]
78                }),
79            },
80            Tool {
81                name: "ingest_file".to_string(),
82                description: Some(
83                    "Ingest a CSV or Markdown file into the knowledge graph".to_string(),
84                ),
85                input_schema: serde_json::json!({
86                    "type": "object",
87                    "properties": {
88                        "path": { "type": "string", "description": "Path to the file" },
89                        "namespace": { "type": "string", "default": "default" }
90                    },
91                    "required": ["path"]
92                }),
93            },
94            Tool {
95                name: "sparql_query".to_string(),
96                description: Some("Execute a SPARQL query against the knowledge graph".to_string()),
97                input_schema: serde_json::json!({
98                    "type": "object",
99                    "properties": {
100                        "query": { "type": "string", "description": "SPARQL query string" },
101                        "namespace": { "type": "string", "default": "default" }
102                    },
103                    "required": ["query"]
104                }),
105            },
106            Tool {
107                name: "hybrid_search".to_string(),
108                description: Some("Perform a hybrid vector + graph search".to_string()),
109                input_schema: serde_json::json!({
110                    "type": "object",
111                    "properties": {
112                        "query": { "type": "string", "description": "Natural language query" },
113                        "namespace": { "type": "string", "default": "default" },
114                        "vector_k": { "type": "integer", "default": 10 },
115                        "graph_depth": { "type": "integer", "default": 1 },
116                        "limit": { "type": "integer", "default": 20 }
117                    },
118                    "required": ["query"]
119                }),
120            },
121            Tool {
122                name: "apply_reasoning".to_string(),
123                description: Some(
124                    "Apply RDFS or OWL-RL reasoning to infer new triples".to_string(),
125                ),
126                input_schema: serde_json::json!({
127                    "type": "object",
128                    "properties": {
129                        "namespace": { "type": "string", "default": "default" },
130                        "strategy": { "type": "string", "enum": ["rdfs", "owlrl"], "default": "rdfs" },
131                        "materialize": { "type": "boolean", "default": false }
132                    }
133                }),
134            },
135            Tool {
136                name: "get_neighbors".to_string(),
137                description: Some(
138                    "Get neighboring nodes connected to a given URI in the graph".to_string(),
139                ),
140                input_schema: serde_json::json!({
141                    "type": "object",
142                    "properties": {
143                        "uri": { "type": "string", "description": "URI of the entity to find neighbors for" },
144                        "namespace": { "type": "string", "default": "default" },
145                        "direction": { "type": "string", "enum": ["outgoing", "incoming", "both"], "default": "outgoing" }
146                    },
147                    "required": ["uri"]
148                }),
149            },
150            Tool {
151                name: "list_triples".to_string(),
152                description: Some(
153                    "List all triples in a namespace (useful for debugging/exploration)"
154                        .to_string(),
155                ),
156                input_schema: serde_json::json!({
157                    "type": "object",
158                    "properties": {
159                        "namespace": { "type": "string", "default": "default" },
160                        "limit": { "type": "integer", "default": 100 }
161                    }
162                }),
163            },
164            Tool {
165                name: "delete_namespace".to_string(),
166                description: Some("Delete all data in a namespace".to_string()),
167                input_schema: serde_json::json!({
168                    "type": "object",
169                    "properties": {
170                        "namespace": { "type": "string", "description": "Namespace to delete" }
171                    },
172                    "required": ["namespace"]
173                }),
174            },
175            Tool {
176                name: "ingest_url".to_string(),
177                description: Some(
178                    "Scrape a web page and add its content to the vector store for RAG retrieval"
179                        .to_string(),
180                ),
181                input_schema: serde_json::json!({
182                    "type": "object",
183                    "properties": {
184                        "url": { "type": "string", "description": "URL to scrape and ingest" },
185                        "namespace": { "type": "string", "default": "default" }
186                    },
187                    "required": ["url"]
188                }),
189            },
190            Tool {
191                name: "ingest_text".to_string(),
192                description: Some(
193                    "Add arbitrary text content to the vector store for RAG retrieval".to_string(),
194                ),
195                input_schema: serde_json::json!({
196                    "type": "object",
197                    "properties": {
198                        "uri": { "type": "string", "description": "Custom URI identifier for this text" },
199                        "content": { "type": "string", "description": "Text content to embed and store" },
200                        "namespace": { "type": "string", "default": "default" }
201                    },
202                    "required": ["uri", "content"]
203                }),
204            },
205            Tool {
206                name: "compact_vectors".to_string(),
207                description: Some("Compact the vector index by removing stale entries".to_string()),
208                input_schema: serde_json::json!({
209                    "type": "object",
210                    "properties": {
211                        "namespace": { "type": "string", "default": "default" }
212                    }
213                }),
214            },
215            Tool {
216                name: "vector_stats".to_string(),
217                description: Some("Get vector store statistics (active, stale, total)".to_string()),
218                input_schema: serde_json::json!({
219                    "type": "object",
220                    "properties": {
221                        "namespace": { "type": "string", "default": "default" }
222                    }
223                }),
224            },
225            Tool {
226                name: "disambiguate".to_string(),
227                description: Some("Find similar entities that might be duplicates".to_string()),
228                input_schema: serde_json::json!({
229                    "type": "object",
230                    "properties": {
231                        "namespace": { "type": "string", "default": "default" },
232                        "threshold": { "type": "number", "default": 0.8, "description": "Similarity threshold 0.0-1.0" }
233                    }
234                }),
235            },
236            Tool {
237                name: "get_node_degree".to_string(),
238                description: Some("Get the degree (number of connections) of a node".to_string()),
239                input_schema: serde_json::json!({
240                    "type": "object",
241                    "properties": {
242                        "uri": { "type": "string" },
243                        "namespace": { "type": "string", "default": "default" }
244                    },
245                    "required": ["uri"]
246                }),
247            },
248        ]
249    }
250
251    pub async fn handle_request(&self, request: McpRequest) -> McpResponse {
252        match request.method.as_str() {
253            "initialize" => {
254                // MCP protocol initialization
255                McpResponse {
256                    jsonrpc: "2.0".to_string(),
257                    id: request.id,
258                    result: Some(serde_json::json!({
259                        "protocolVersion": "2024-11-05",
260                        "capabilities": {
261                            "tools": {}
262                        },
263                        "serverInfo": {
264                        "name": "synapse",
265                        "version": env!("CARGO_PKG_VERSION")
266                    }
267                    })),
268                    error: None,
269                }
270            }
271            "notifications/initialized" | "initialized" => {
272                // Client confirms initialization - just acknowledge
273                McpResponse {
274                    jsonrpc: "2.0".to_string(),
275                    id: request.id,
276                    result: Some(serde_json::json!({})),
277                    error: None,
278                }
279            }
280            "tools/list" => {
281                let result = ListToolsResult {
282                    tools: Self::get_tools(),
283                };
284                McpResponse {
285                    jsonrpc: "2.0".to_string(),
286                    id: request.id,
287                    result: Some(serde_json::to_value(result).unwrap()),
288                    error: None,
289                }
290            }
291            "tools/call" => self.handle_tool_call(request).await,
292            // Legacy methods for backwards compatibility
293            "ingest" => self.handle_legacy_ingest(request).await,
294            "ingest_file" => self.handle_legacy_ingest_file(request).await,
295            _ => McpResponse {
296                jsonrpc: "2.0".to_string(),
297                id: request.id,
298                result: None,
299                error: Some(McpError {
300                    code: -32601,
301                    message: format!("Method not found: {}", request.method),
302                    data: None,
303                }),
304            },
305        }
306    }
307
308    fn validate_arguments(tool_name: &str, arguments: &serde_json::Value) -> Result<(), String> {
309        let tools = Self::get_tools();
310        if let Some(tool) = tools.iter().find(|t| t.name == tool_name) {
311            if let Ok(schema) = JSONSchema::compile(&tool.input_schema) {
312                if let Err(errors) = schema.validate(arguments) {
313                    let error_msg = errors
314                        .map(|e| e.to_string())
315                        .collect::<Vec<_>>()
316                        .join(", ");
317                    return Err(format!("Validation error: {}", error_msg));
318                }
319            } else {
320                return Err("Invalid tool schema definition".to_string());
321            }
322        }
323        Ok(())
324    }
325
326    async fn handle_tool_call(&self, request: McpRequest) -> McpResponse {
327        let params = match request.params {
328            Some(p) => p,
329            None => return self.error_response(request.id, -32602, "Missing params"),
330        };
331
332        let tool_name = match params.get("name").and_then(|v| v.as_str()) {
333            Some(n) => n,
334            None => return self.error_response(request.id, -32602, "Missing tool name"),
335        };
336
337        let arguments = params
338            .get("arguments")
339            .and_then(|v| v.as_object())
340            .cloned()
341            .unwrap_or_default();
342
343        let args_value = serde_json::Value::Object(arguments.clone());
344        if let Err(e) = Self::validate_arguments(tool_name, &args_value) {
345            return self.error_response(request.id, -32602, &e);
346        }
347
348        match tool_name {
349            "ingest_triples" => self.call_ingest_triples(request.id, &arguments).await,
350            "ingest_file" => self.call_ingest_file(request.id, &arguments).await,
351            "sparql_query" => self.call_sparql_query(request.id, &arguments).await,
352            "hybrid_search" => self.call_hybrid_search(request.id, &arguments).await,
353            "apply_reasoning" => self.call_apply_reasoning(request.id, &arguments).await,
354            "get_neighbors" => self.call_get_neighbors(request.id, &arguments).await,
355            "list_triples" => self.call_list_triples(request.id, &arguments).await,
356            "delete_namespace" => self.call_delete_namespace(request.id, &arguments).await,
357            "ingest_url" => self.call_ingest_url(request.id, &arguments).await,
358            "ingest_text" => self.call_ingest_text(request.id, &arguments).await,
359            "compact_vectors" => self.call_compact_vectors(request.id, &arguments).await,
360            "vector_stats" => self.call_vector_stats(request.id, &arguments).await,
361            "disambiguate" => self.call_disambiguate(request.id, &arguments).await,
362            "get_node_degree" => self.call_get_node_degree(request.id, &arguments).await,
363            _ => self.error_response(request.id, -32602, &format!("Unknown tool: {}", tool_name)),
364        }
365    }
366
367    async fn call_ingest_triples(
368        &self,
369        id: Option<serde_json::Value>,
370        args: &serde_json::Map<String, serde_json::Value>,
371    ) -> McpResponse {
372        let namespace = args
373            .get("namespace")
374            .and_then(|v| v.as_str())
375            .unwrap_or("default");
376        let triples_array = match args.get("triples").and_then(|v| v.as_array()) {
377            Some(t) => t,
378            None => return self.error_response(id, -32602, "Missing 'triples' array"),
379        };
380
381        let mut triples = Vec::new();
382        for t in triples_array {
383            if let (Some(s), Some(p), Some(o)) = (
384                t.get("subject").and_then(|v| v.as_str()),
385                t.get("predicate").and_then(|v| v.as_str()),
386                t.get("object").and_then(|v| v.as_str()),
387            ) {
388                triples.push(Triple {
389                    subject: s.to_string(),
390                    predicate: p.to_string(),
391                    object: o.to_string(),
392                    provenance: Some(Provenance {
393                        source: "mcp".to_string(),
394                        timestamp: "".to_string(),
395                        method: "tools/call".to_string(),
396                    }),
397                    embedding: vec![],
398                });
399            }
400        }
401
402        let req = Request::new(IngestRequest {
403            triples,
404            namespace: namespace.to_string(),
405        });
406
407        match self.engine.ingest_triples(req).await {
408            Ok(resp) => {
409                let inner = resp.into_inner();
410                let result = IngestToolResult {
411                    nodes_added: inner.nodes_added,
412                    edges_added: inner.edges_added,
413                    message: format!("Ingested {} triples", inner.edges_added),
414                };
415                self.serialize_result(id, result)
416            }
417            Err(e) => self.tool_result(id, &e.to_string(), true),
418        }
419    }
420
421    async fn call_ingest_file(
422        &self,
423        id: Option<serde_json::Value>,
424        args: &serde_json::Map<String, serde_json::Value>,
425    ) -> McpResponse {
426        let path = match args.get("path").and_then(|v| v.as_str()) {
427            Some(p) => p,
428            None => return self.error_response(id, -32602, "Missing 'path'"),
429        };
430        let namespace = args
431            .get("namespace")
432            .and_then(|v| v.as_str())
433            .unwrap_or("default");
434
435        let req = Request::new(IngestFileRequest {
436            file_path: path.to_string(),
437            namespace: namespace.to_string(),
438        });
439
440        match self.engine.ingest_file(req).await {
441            Ok(resp) => {
442                let inner = resp.into_inner();
443                let result = IngestToolResult {
444                    nodes_added: inner.nodes_added,
445                    edges_added: inner.edges_added,
446                    message: format!("Ingested {} triples from {}", inner.edges_added, path),
447                };
448                self.serialize_result(id, result)
449            }
450            Err(e) => self.tool_result(id, &e.to_string(), true),
451        }
452    }
453
454    async fn call_sparql_query(
455        &self,
456        id: Option<serde_json::Value>,
457        args: &serde_json::Map<String, serde_json::Value>,
458    ) -> McpResponse {
459        let query = match args.get("query").and_then(|v| v.as_str()) {
460            Some(q) => q,
461            None => return self.error_response(id, -32602, "Missing 'query'"),
462        };
463        let namespace = args
464            .get("namespace")
465            .and_then(|v| v.as_str())
466            .unwrap_or("default");
467
468        let req = Request::new(SparqlRequest {
469            query: query.to_string(),
470            namespace: namespace.to_string(),
471        });
472
473        match self.engine.query_sparql(req).await {
474            Ok(resp) => self.tool_result(id, &resp.into_inner().results_json, false),
475            Err(e) => self.tool_result(id, &e.to_string(), true),
476        }
477    }
478
479    async fn call_hybrid_search(
480        &self,
481        id: Option<serde_json::Value>,
482        args: &serde_json::Map<String, serde_json::Value>,
483    ) -> McpResponse {
484        let query = match args.get("query").and_then(|v| v.as_str()) {
485            Some(q) => q,
486            None => return self.error_response(id, -32602, "Missing 'query'"),
487        };
488        let namespace = args
489            .get("namespace")
490            .and_then(|v| v.as_str())
491            .unwrap_or("default");
492        let vector_k = args.get("vector_k").and_then(|v| v.as_u64()).unwrap_or(10) as u32;
493        let graph_depth = args
494            .get("graph_depth")
495            .and_then(|v| v.as_u64())
496            .unwrap_or(1) as u32;
497        let limit = args.get("limit").and_then(|v| v.as_u64()).unwrap_or(20) as u32;
498
499        let req = Request::new(HybridSearchRequest {
500            query: query.to_string(),
501            namespace: namespace.to_string(),
502            vector_k,
503            graph_depth,
504            mode: SearchMode::Hybrid as i32,
505            limit,
506        });
507
508        match self.engine.hybrid_search(req).await {
509            Ok(resp) => {
510                let results = resp.into_inner().results;
511                let items: Vec<SearchResultItem> = results
512                    .into_iter()
513                    .map(|r| SearchResultItem {
514                        node_id: r.node_id,
515                        score: r.score,
516                        content: r.content,
517                        uri: r.uri,
518                    })
519                    .collect();
520
521                let result = SearchToolResult { results: items };
522                self.serialize_result(id, result)
523            }
524            Err(e) => self.tool_result(id, &e.to_string(), true),
525        }
526    }
527
528    async fn call_apply_reasoning(
529        &self,
530        id: Option<serde_json::Value>,
531        args: &serde_json::Map<String, serde_json::Value>,
532    ) -> McpResponse {
533        let namespace = args
534            .get("namespace")
535            .and_then(|v| v.as_str())
536            .unwrap_or("default");
537        let strategy_str = args
538            .get("strategy")
539            .and_then(|v| v.as_str())
540            .unwrap_or("rdfs");
541        let materialize = args
542            .get("materialize")
543            .and_then(|v| v.as_bool())
544            .unwrap_or(false);
545
546        let strategy = match strategy_str.to_lowercase().as_str() {
547            "owlrl" | "owl-rl" => ReasoningStrategy::Owlrl as i32,
548            _ => ReasoningStrategy::Rdfs as i32,
549        };
550
551        let req = Request::new(ReasoningRequest {
552            namespace: namespace.to_string(),
553            strategy,
554            materialize,
555        });
556
557        match self.engine.apply_reasoning(req).await {
558            Ok(resp) => {
559                let inner = resp.into_inner();
560                let result = ReasoningToolResult {
561                    success: inner.success,
562                    triples_inferred: inner.triples_inferred,
563                    message: inner.message,
564                };
565                self.serialize_result(id, result)
566            }
567            Err(e) => self.tool_result(id, &e.to_string(), true),
568        }
569    }
570
571    async fn call_get_neighbors(
572        &self,
573        id: Option<serde_json::Value>,
574        args: &serde_json::Map<String, serde_json::Value>,
575    ) -> McpResponse {
576        let uri = match args.get("uri").and_then(|v| v.as_str()) {
577            Some(u) => u,
578            None => return self.error_response(id, -32602, "Missing 'uri'"),
579        };
580        let namespace = args
581            .get("namespace")
582            .and_then(|v| v.as_str())
583            .unwrap_or("default");
584        let direction = args
585            .get("direction")
586            .and_then(|v| v.as_str())
587            .unwrap_or("outgoing");
588
589        let store = match self.engine.get_store(namespace) {
590            Ok(s) => s,
591            Err(e) => return self.tool_result(id, &e.to_string(), true),
592        };
593
594        let mut neighbors = Vec::new();
595
596        // Query outgoing edges (URI as subject)
597        if direction == "outgoing" || direction == "both" {
598            if let Ok(subj) = oxigraph::model::NamedNodeRef::new(uri) {
599                for q in store
600                    .store
601                    .quads_for_pattern(Some(subj.into()), None, None, None)
602                    .flatten()
603                {
604                    neighbors.push(NeighborItem {
605                        direction: "outgoing".to_string(),
606                        predicate: q.predicate.to_string(),
607                        target: q.object.to_string(),
608                        score: 1.0,
609                    });
610                }
611            }
612        }
613
614        // Query incoming edges (URI as object)
615        if direction == "incoming" || direction == "both" {
616            if let Ok(obj) = oxigraph::model::NamedNodeRef::new(uri) {
617                for q in store
618                    .store
619                    .quads_for_pattern(None, None, Some(obj.into()), None)
620                    .flatten()
621                {
622                    neighbors.push(NeighborItem {
623                        direction: "incoming".to_string(),
624                        predicate: q.predicate.to_string(),
625                        target: q.subject.to_string(),
626                        score: 1.0,
627                    });
628                }
629            }
630        }
631
632        let result = NeighborsToolResult { neighbors };
633        self.serialize_result(id, result)
634    }
635
636    async fn call_list_triples(
637        &self,
638        id: Option<serde_json::Value>,
639        args: &serde_json::Map<String, serde_json::Value>,
640    ) -> McpResponse {
641        let namespace = args
642            .get("namespace")
643            .and_then(|v| v.as_str())
644            .unwrap_or("default");
645        let limit = args.get("limit").and_then(|v| v.as_u64()).unwrap_or(100) as usize;
646
647        let store = match self.engine.get_store(namespace) {
648            Ok(s) => s,
649            Err(e) => return self.tool_result(id, &e.to_string(), true),
650        };
651
652        let mut triples = Vec::new();
653        for q in store.store.iter().take(limit).flatten() {
654            triples.push(TripleItem {
655                subject: q.subject.to_string(),
656                predicate: q.predicate.to_string(),
657                object: q.object.to_string(),
658            });
659        }
660
661        let result = TriplesToolResult { triples };
662        self.serialize_result(id, result)
663    }
664
665    async fn call_delete_namespace(
666        &self,
667        id: Option<serde_json::Value>,
668        args: &serde_json::Map<String, serde_json::Value>,
669    ) -> McpResponse {
670        let namespace = match args.get("namespace").and_then(|v| v.as_str()) {
671            Some(n) => n,
672            None => return self.error_response(id, -32602, "Missing 'namespace'"),
673        };
674
675        let req = Request::new(crate::server::proto::EmptyRequest {
676            namespace: namespace.to_string(),
677        });
678
679        match self.engine.delete_namespace_data(req).await {
680            Ok(resp) => {
681                let inner = resp.into_inner();
682                let result = SimpleSuccessResult {
683                    success: inner.success,
684                    message: inner.message,
685                };
686                self.serialize_result(id, result)
687            }
688            Err(e) => self.tool_result(id, &e.to_string(), true),
689        }
690    }
691
692    async fn call_ingest_url(
693        &self,
694        id: Option<serde_json::Value>,
695        args: &serde_json::Map<String, serde_json::Value>,
696    ) -> McpResponse {
697        let url = match args.get("url").and_then(|v| v.as_str()) {
698            Some(u) => u,
699            None => return self.error_response(id, -32602, "Missing 'url'"),
700        };
701        let namespace = args
702            .get("namespace")
703            .and_then(|v| v.as_str())
704            .unwrap_or("default");
705
706        // Fetch URL content
707        let client = reqwest::Client::new();
708        let response = match client.get(url).send().await {
709            Ok(r) => r,
710            Err(e) => return self.tool_result(id, &format!("Failed to fetch URL: {}", e), true),
711        };
712
713        if !response.status().is_success() {
714            return self.tool_result(id, &format!("HTTP error: {}", response.status()), true);
715        }
716
717        let html = match response.text().await {
718            Ok(t) => t,
719            Err(e) => {
720                return self.tool_result(id, &format!("Failed to read response: {}", e), true)
721            }
722        };
723
724        // HTML to text conversion with Regex
725        let script_re = regex::Regex::new(r"(?s)<script.*?>.*?</script>").unwrap();
726        let style_re = regex::Regex::new(r"(?s)<style.*?>.*?</style>").unwrap();
727        let tag_re = regex::Regex::new(r"<[^>]*>").unwrap();
728
729        let no_script = script_re.replace_all(&html, " ");
730        let no_style = style_re.replace_all(&no_script, " ");
731        let text_content = tag_re.replace_all(&no_style, " ");
732
733        let text = text_content
734            .split_whitespace()
735            .collect::<Vec<_>>()
736            .join(" ");
737
738        // Chunk text with overlap
739        let processor = crate::processor::TextProcessor::new();
740        let chunks = processor.chunk_text(&text, 1000, 150);
741
742        // Add to vector store
743        let store = match self.engine.get_store(namespace) {
744            Ok(s) => s,
745            Err(e) => return self.tool_result(id, &e.to_string(), true),
746        };
747
748        if let Some(ref vector_store) = store.vector_store {
749            let mut added_chunks = 0;
750            for (i, chunk) in chunks.iter().enumerate() {
751                let chunk_uri = format!("{}#chunk-{}", url, i);
752                // For MCP ingestion, we just use the chunk URI as the key and metadata URI
753                let metadata = serde_json::json!({
754                    "uri": chunk_uri,
755                    "source_url": url,
756                    "type": "web_chunk"
757                });
758                match vector_store.add(&chunk_uri, chunk, metadata).await {
759                    Ok(_) => added_chunks += 1,
760                    Err(e) => {
761                        eprintln!("Failed to add chunk {}: {}", i, e);
762                    }
763                }
764            }
765            let result = IngestToolResult {
766                nodes_added: 0,
767                edges_added: 0, // Ingest URL technically adds to vector store, no graph edges yet unless reasoned
768                message: format!(
769                    "Ingested URL: {} ({} chars, {} chunks)",
770                    url,
771                    text.len(),
772                    added_chunks
773                ),
774            };
775            self.serialize_result(id, result)
776        } else {
777            self.tool_result(id, "Vector store not available", true)
778        }
779    }
780
781    async fn call_ingest_text(
782        &self,
783        id: Option<serde_json::Value>,
784        args: &serde_json::Map<String, serde_json::Value>,
785    ) -> McpResponse {
786        let uri = match args.get("uri").and_then(|v| v.as_str()) {
787            Some(u) => u,
788            None => return self.error_response(id, -32602, "Missing 'uri'"),
789        };
790        let content = match args.get("content").and_then(|v| v.as_str()) {
791            Some(c) => c,
792            None => return self.error_response(id, -32602, "Missing 'content'"),
793        };
794        let namespace = args
795            .get("namespace")
796            .and_then(|v| v.as_str())
797            .unwrap_or("default");
798
799        // Chunk text with overlap
800        let processor = crate::processor::TextProcessor::new();
801        let chunks = processor.chunk_text(&content, 1000, 150);
802
803        // Add to vector store
804        let store = match self.engine.get_store(namespace) {
805            Ok(s) => s,
806            Err(e) => return self.tool_result(id, &e.to_string(), true),
807        };
808
809        if let Some(ref vector_store) = store.vector_store {
810            let mut added_chunks = 0;
811            for (i, chunk) in chunks.iter().enumerate() {
812                let chunk_uri = if chunks.len() > 1 {
813                    format!("{}#chunk-{}", uri, i)
814                } else {
815                    uri.to_string()
816                };
817                let metadata = serde_json::json!({
818                    "uri": uri, // Map back to original URI
819                    "chunk_uri": chunk_uri,
820                    "type": "text_chunk"
821                });
822                match vector_store.add(&chunk_uri, chunk, metadata).await {
823                    Ok(_) => added_chunks += 1,
824                    Err(e) => {
825                        eprintln!("Failed to add chunk {}: {}", i, e);
826                    }
827                }
828            }
829            let result = IngestToolResult {
830                nodes_added: 0,
831                edges_added: 0,
832                message: format!(
833                    "Ingested text: {} ({} chars, {} chunks)",
834                    uri,
835                    content.len(),
836                    added_chunks
837                ),
838            };
839            self.serialize_result(id, result)
840        } else {
841            self.tool_result(id, "Vector store not available", true)
842        }
843    }
844
845    async fn call_compact_vectors(
846        &self,
847        id: Option<serde_json::Value>,
848        args: &serde_json::Map<String, serde_json::Value>,
849    ) -> McpResponse {
850        let namespace = args
851            .get("namespace")
852            .and_then(|v| v.as_str())
853            .unwrap_or("default");
854
855        let store = match self.engine.get_store(namespace) {
856            Ok(s) => s,
857            Err(e) => return self.tool_result(id, &e.to_string(), true),
858        };
859
860        if let Some(ref vector_store) = store.vector_store {
861            match vector_store.compact() {
862                Ok(removed) => {
863                    let result = SimpleSuccessResult {
864                        success: true,
865                        message: format!("Compaction complete: {} stale entries removed", removed),
866                    };
867                    self.serialize_result(id, result)
868                }
869                Err(e) => self.tool_result(id, &format!("Compaction error: {}", e), true),
870            }
871        } else {
872            self.tool_result(id, "Vector store not available", true)
873        }
874    }
875
876    async fn call_vector_stats(
877        &self,
878        id: Option<serde_json::Value>,
879        args: &serde_json::Map<String, serde_json::Value>,
880    ) -> McpResponse {
881        let namespace = args
882            .get("namespace")
883            .and_then(|v| v.as_str())
884            .unwrap_or("default");
885
886        let store = match self.engine.get_store(namespace) {
887            Ok(s) => s,
888            Err(e) => return self.tool_result(id, &e.to_string(), true),
889        };
890
891        if let Some(ref vector_store) = store.vector_store {
892            let (active, stale, total) = vector_store.stats();
893            let result = StatsToolResult {
894                active_vectors: active,
895                stale_vectors: stale,
896                total_embeddings: total,
897            };
898            self.serialize_result(id, result)
899        } else {
900            self.tool_result(id, "Vector store not available", true)
901        }
902    }
903
904    async fn call_disambiguate(
905        &self,
906        id: Option<serde_json::Value>,
907        args: &serde_json::Map<String, serde_json::Value>,
908    ) -> McpResponse {
909        let namespace = args
910            .get("namespace")
911            .and_then(|v| v.as_str())
912            .unwrap_or("default");
913        let threshold = args
914            .get("threshold")
915            .and_then(|v| v.as_f64())
916            .unwrap_or(0.8);
917
918        let store = match self.engine.get_store(namespace) {
919            Ok(s) => s,
920            Err(e) => return self.tool_result(id, &e.to_string(), true),
921        };
922
923        // Collect all URIs from the store
924        let uri_map = store.uri_to_id.read().unwrap();
925        let uris: Vec<String> = uri_map.keys().cloned().collect();
926        drop(uri_map);
927
928        let disambiguator = crate::disambiguation::EntityDisambiguator::new(threshold);
929        let suggestions = disambiguator.suggest_merges(&uris);
930
931        let items: Vec<DisambiguationItem> = suggestions
932            .into_iter()
933            .map(|(u1, u2, s)| DisambiguationItem {
934                uri1: u1,
935                uri2: u2,
936                similarity: s,
937            })
938            .collect();
939
940        let message = if items.is_empty() {
941            "No similar entities found above threshold".to_string()
942        } else {
943            format!("Found {} potential duplicates", items.len())
944        };
945
946        let result = DisambiguationResult {
947            suggestions: items,
948            message,
949        };
950        self.serialize_result(id, result)
951    }
952
953    // Legacy handlers for backward compatibility
954    async fn handle_legacy_ingest(&self, request: McpRequest) -> McpResponse {
955        let params = match request.params {
956            Some(p) => p,
957            None => return self.error_response(request.id, -32602, "Invalid params"),
958        };
959
960        if let (Some(sub), Some(pred), Some(obj)) = (
961            params.get("subject").and_then(|v| v.as_str()),
962            params.get("predicate").and_then(|v| v.as_str()),
963            params.get("object").and_then(|v| v.as_str()),
964        ) {
965            let namespace = params
966                .get("namespace")
967                .and_then(|v| v.as_str())
968                .unwrap_or("default");
969            let triple = Triple {
970                subject: sub.to_string(),
971                predicate: pred.to_string(),
972                object: obj.to_string(),
973                provenance: Some(Provenance {
974                    source: "mcp".to_string(),
975                    timestamp: "".to_string(),
976                    method: "stdio".to_string(),
977                }),
978                embedding: vec![],
979            };
980
981            let req = Request::new(IngestRequest {
982                triples: vec![triple],
983                namespace: namespace.to_string(),
984            });
985
986            match self.engine.ingest_triples(req).await {
987                Ok(_) => McpResponse {
988                    jsonrpc: "2.0".to_string(),
989                    id: request.id,
990                    result: Some(serde_json::to_value("Ingested").unwrap()),
991                    error: None,
992                },
993                Err(e) => self.error_response(request.id, -32000, &e.to_string()),
994            }
995        } else {
996            self.error_response(request.id, -32602, "Invalid params")
997        }
998    }
999
1000    async fn handle_legacy_ingest_file(&self, request: McpRequest) -> McpResponse {
1001        let params = match request.params {
1002            Some(p) => p,
1003            None => {
1004                return self.error_response(request.id, -32602, "Invalid params: 'path' required")
1005            }
1006        };
1007
1008        if let Some(path) = params.get("path").and_then(|v| v.as_str()) {
1009            let namespace = params
1010                .get("namespace")
1011                .and_then(|v| v.as_str())
1012                .unwrap_or("default");
1013
1014            let req = Request::new(IngestFileRequest {
1015                file_path: path.to_string(),
1016                namespace: namespace.to_string(),
1017            });
1018
1019            match self.engine.ingest_file(req).await {
1020                Ok(resp) => {
1021                    let inner = resp.into_inner();
1022                    McpResponse {
1023                        jsonrpc: "2.0".to_string(),
1024                        id: request.id,
1025                        result: Some(
1026                            serde_json::to_value(format!(
1027                                "Ingested {} triples from {}",
1028                                inner.edges_added, path
1029                            ))
1030                            .unwrap(),
1031                        ),
1032                        error: None,
1033                    }
1034                }
1035                Err(e) => self.error_response(request.id, -32000, &e.to_string()),
1036            }
1037        } else {
1038            self.error_response(request.id, -32602, "Invalid params: 'path' required")
1039        }
1040    }
1041
1042    fn serialize_result<T: serde::Serialize>(
1043        &self,
1044        id: Option<serde_json::Value>,
1045        result: T,
1046    ) -> McpResponse {
1047        match serde_json::to_string_pretty(&result) {
1048            Ok(json) => self.tool_result(id, &json, false),
1049            Err(e) => self.tool_result(id, &format!("Serialization error: {}", e), true),
1050        }
1051    }
1052
1053    async fn call_get_node_degree(
1054        &self,
1055        id: Option<serde_json::Value>,
1056        args: &serde_json::Map<String, serde_json::Value>,
1057    ) -> McpResponse {
1058        let uri = match args.get("uri").and_then(|v| v.as_str()) {
1059            Some(u) => u,
1060            None => return self.error_response(id, -32602, "Missing 'uri'"),
1061        };
1062        let namespace = args
1063            .get("namespace")
1064            .and_then(|v| v.as_str())
1065            .unwrap_or("default");
1066
1067        let store = match self.engine.get_store(namespace) {
1068            Ok(s) => s,
1069            Err(e) => return self.tool_result(id, &e.to_string(), true),
1070        };
1071
1072        let degree = store.get_degree(uri);
1073
1074        let result = DegreeResult {
1075            uri: uri.to_string(),
1076            degree,
1077        };
1078
1079        self.serialize_result(id, result)
1080    }
1081
1082    fn error_response(
1083        &self,
1084        id: Option<serde_json::Value>,
1085        code: i32,
1086        message: &str,
1087    ) -> McpResponse {
1088        McpResponse {
1089            jsonrpc: "2.0".to_string(),
1090            id,
1091            result: None,
1092            error: Some(McpError {
1093                code,
1094                message: message.to_string(),
1095                data: None,
1096            }),
1097        }
1098    }
1099
1100    fn tool_result(
1101        &self,
1102        id: Option<serde_json::Value>,
1103        text: &str,
1104        is_error: bool,
1105    ) -> McpResponse {
1106        let result = CallToolResult {
1107            content: vec![Content {
1108                content_type: "text".to_string(),
1109                text: text.to_string(),
1110            }],
1111            is_error: if is_error { Some(true) } else { None },
1112        };
1113        McpResponse {
1114            jsonrpc: "2.0".to_string(),
1115            id,
1116            result: Some(serde_json::to_value(result).unwrap()),
1117            error: None,
1118        }
1119    }
1120}