Skip to main content

synapse_core/
mcp_stdio.rs

1use crate::mcp_types::{
2    CallToolResult, Content, DegreeResult, DisambiguationItem, DisambiguationResult,
3    IngestToolResult, ListToolsResult, McpError, McpRequest, McpResponse, NeighborItem,
4    NeighborsToolResult, ReasoningToolResult, ScenarioItem, ScenarioListResult, SearchResultItem,
5    SearchToolResult, SimpleSuccessResult, StatsToolResult, Tool, TripleItem, TriplesToolResult,
6};
7use crate::server::proto::semantic_engine_server::SemanticEngine;
8use crate::server::proto::{
9    HybridSearchRequest, IngestFileRequest, IngestRequest, Provenance, ReasoningRequest,
10    ReasoningStrategy, SearchMode, SparqlRequest, Triple,
11};
12use crate::server::MySemanticEngine;
13use jsonschema::JSONSchema;
14use std::sync::Arc;
15use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
16use tonic::Request;
17
18pub struct McpStdioServer {
19    engine: Arc<MySemanticEngine>,
20}
21
22impl McpStdioServer {
23    pub fn new(engine: Arc<MySemanticEngine>) -> Self {
24        Self { engine }
25    }
26
27    pub async fn run(&self) -> Result<(), Box<dyn std::error::Error>> {
28        let mut reader = BufReader::new(tokio::io::stdin());
29        let mut writer = tokio::io::stdout();
30
31        loop {
32            let mut line = String::new();
33            if reader.read_line(&mut line).await? == 0 {
34                break;
35            }
36
37            let trimmed = line.trim();
38            if trimmed.is_empty() {
39                continue;
40            }
41
42            if let Ok(request) = serde_json::from_str::<McpRequest>(trimmed) {
43                let is_notification = request.id.is_none();
44                let response = self.handle_request(request).await;
45
46                // Only send response if it's not a notification
47                if !is_notification {
48                    let response_json = serde_json::to_string(&response)? + "\n";
49                    writer.write_all(response_json.as_bytes()).await?;
50                    writer.flush().await?;
51                }
52            } else {
53                // Log failed parse to stderr but don't crash
54                eprintln!("MCP PROTOCOL ERROR: Failed to parse line: {}", trimmed);
55            }
56        }
57
58        self.engine.shutdown().await;
59        Ok(())
60    }
61
62    fn create_request<T>(msg: T) -> Request<T> {
63        let mut req = Request::new(msg);
64
65        // Try to get token from env
66        let token_opt = std::env::var("SYNAPSE_ADMIN_TOKEN")
67            .or_else(|_| std::env::var("SYNAPSE_MCP_TOKEN"))
68            .ok();
69
70        if let Some(token) = token_opt {
71            if let Ok(val) = format!("Bearer {}", token).parse() {
72                req.metadata_mut().insert("authorization", val);
73            }
74        }
75        req
76    }
77
78    fn get_tools() -> Vec<Tool> {
79        vec![
80            Tool {
81                name: "ingest_triples".to_string(),
82                description: Some(
83                    "Ingest one or more RDF triples into the knowledge graph".to_string(),
84                ),
85                input_schema: serde_json::json!({
86                    "type": "object",
87                    "properties": {
88                        "triples": {
89                            "type": "array",
90                            "items": {
91                                "type": "object",
92                                "properties": {
93                                    "subject": { "type": "string" },
94                                    "predicate": { "type": "string" },
95                                    "object": { "type": "string" }
96                                },
97                                "required": ["subject", "predicate", "object"]
98                            }
99                        },
100                        "namespace": { "type": "string", "default": "default" }
101                    },
102                    "required": ["triples"]
103                }),
104            },
105            Tool {
106                name: "ingest_file".to_string(),
107                description: Some(
108                    "Ingest a CSV or Markdown file into the knowledge graph".to_string(),
109                ),
110                input_schema: serde_json::json!({
111                    "type": "object",
112                    "properties": {
113                        "path": { "type": "string", "description": "Path to the file" },
114                        "namespace": { "type": "string", "default": "default" }
115                    },
116                    "required": ["path"]
117                }),
118            },
119            Tool {
120                name: "sparql_query".to_string(),
121                description: Some("Execute a SPARQL query against the knowledge graph".to_string()),
122                input_schema: serde_json::json!({
123                    "type": "object",
124                    "properties": {
125                        "query": { "type": "string", "description": "SPARQL query string" },
126                        "namespace": { "type": "string", "default": "default" }
127                    },
128                    "required": ["query"]
129                }),
130            },
131            Tool {
132                name: "hybrid_search".to_string(),
133                description: Some("Perform a hybrid vector + graph search".to_string()),
134                input_schema: serde_json::json!({
135                    "type": "object",
136                    "properties": {
137                        "query": { "type": "string", "description": "Natural language query" },
138                        "namespace": { "type": "string", "default": "default" },
139                        "vector_k": { "type": "integer", "default": 10 },
140                        "graph_depth": { "type": "integer", "default": 1 },
141                        "limit": { "type": "integer", "default": 20 }
142                    },
143                    "required": ["query"]
144                }),
145            },
146            Tool {
147                name: "apply_reasoning".to_string(),
148                description: Some(
149                    "Apply RDFS or OWL-RL reasoning to infer new triples".to_string(),
150                ),
151                input_schema: serde_json::json!({
152                    "type": "object",
153                    "properties": {
154                        "namespace": { "type": "string", "default": "default" },
155                        "strategy": { "type": "string", "enum": ["rdfs", "owlrl"], "default": "rdfs" },
156                        "materialize": { "type": "boolean", "default": false }
157                    }
158                }),
159            },
160            Tool {
161                name: "get_neighbors".to_string(),
162                description: Some(
163                    "Get neighboring nodes connected to a given URI in the graph".to_string(),
164                ),
165                input_schema: serde_json::json!({
166                    "type": "object",
167                    "properties": {
168                        "uri": { "type": "string", "description": "URI of the entity to find neighbors for" },
169                        "namespace": { "type": "string", "default": "default" },
170                        "direction": { "type": "string", "enum": ["outgoing", "incoming", "both"], "default": "outgoing" }
171                    },
172                    "required": ["uri"]
173                }),
174            },
175            Tool {
176                name: "list_triples".to_string(),
177                description: Some(
178                    "List all triples in a namespace (useful for debugging/exploration)"
179                        .to_string(),
180                ),
181                input_schema: serde_json::json!({
182                    "type": "object",
183                    "properties": {
184                        "namespace": { "type": "string", "default": "default" },
185                        "limit": { "type": "integer", "default": 100 }
186                    }
187                }),
188            },
189            Tool {
190                name: "delete_namespace".to_string(),
191                description: Some("Delete all data in a namespace".to_string()),
192                input_schema: serde_json::json!({
193                    "type": "object",
194                    "properties": {
195                        "namespace": { "type": "string", "description": "Namespace to delete" }
196                    },
197                    "required": ["namespace"]
198                }),
199            },
200            Tool {
201                name: "ingest_url".to_string(),
202                description: Some(
203                    "Scrape a web page and add its content to the vector store for RAG retrieval"
204                        .to_string(),
205                ),
206                input_schema: serde_json::json!({
207                    "type": "object",
208                    "properties": {
209                        "url": { "type": "string", "description": "URL to scrape and ingest" },
210                        "namespace": { "type": "string", "default": "default" }
211                    },
212                    "required": ["url"]
213                }),
214            },
215            Tool {
216                name: "ingest_text".to_string(),
217                description: Some(
218                    "Add arbitrary text content to the vector store for RAG retrieval".to_string(),
219                ),
220                input_schema: serde_json::json!({
221                    "type": "object",
222                    "properties": {
223                        "uri": { "type": "string", "description": "Custom URI identifier for this text" },
224                        "content": { "type": "string", "description": "Text content to embed and store" },
225                        "namespace": { "type": "string", "default": "default" }
226                    },
227                    "required": ["uri", "content"]
228                }),
229            },
230            Tool {
231                name: "compact_vectors".to_string(),
232                description: Some("Compact the vector index by removing stale entries".to_string()),
233                input_schema: serde_json::json!({
234                    "type": "object",
235                    "properties": {
236                        "namespace": { "type": "string", "default": "default" }
237                    }
238                }),
239            },
240            Tool {
241                name: "vector_stats".to_string(),
242                description: Some("Get vector store statistics (active, stale, total)".to_string()),
243                input_schema: serde_json::json!({
244                    "type": "object",
245                    "properties": {
246                        "namespace": { "type": "string", "default": "default" }
247                    }
248                }),
249            },
250            Tool {
251                name: "disambiguate".to_string(),
252                description: Some("Find similar entities that might be duplicates".to_string()),
253                input_schema: serde_json::json!({
254                    "type": "object",
255                    "properties": {
256                        "namespace": { "type": "string", "default": "default" },
257                        "threshold": { "type": "number", "default": 0.8, "description": "Similarity threshold 0.0-1.0" }
258                    }
259                }),
260            },
261            Tool {
262                name: "get_node_degree".to_string(),
263                description: Some("Get the degree (number of connections) of a node".to_string()),
264                input_schema: serde_json::json!({
265                    "type": "object",
266                    "properties": {
267                        "uri": { "type": "string" },
268                        "namespace": { "type": "string", "default": "default" }
269                    },
270                    "required": ["uri"]
271                }),
272            },
273            Tool {
274                name: "install_ontology".to_string(),
275                description: Some("Download and install an ontology from a URL".to_string()),
276                input_schema: serde_json::json!({
277                    "type": "object",
278                    "properties": {
279                        "url": { "type": "string", "description": "URL of the ontology file (.owl, .ttl)" },
280                        "name": { "type": "string", "description": "Name for the local file (e.g. 'legal.owl')" },
281                        "namespace": { "type": "string", "default": "default" }
282                    },
283                    "required": ["url", "name"]
284                }),
285            },
286            Tool {
287                name: "list_scenarios".to_string(),
288                description: Some("List available scenarios in the marketplace".to_string()),
289                input_schema: serde_json::json!({
290                    "type": "object",
291                    "properties": {}
292                }),
293            },
294            Tool {
295                name: "install_scenario".to_string(),
296                description: Some(
297                    "Install a scenario (ontologies, data, docs) from the marketplace".to_string(),
298                ),
299                input_schema: serde_json::json!({
300                    "type": "object",
301                    "properties": {
302                        "name": { "type": "string", "description": "Name of the scenario to install" },
303                        "namespace": { "type": "string", "default": "default" }
304                    },
305                    "required": ["name"]
306                }),
307            },
308        ]
309    }
310
311    pub async fn handle_request(&self, request: McpRequest) -> McpResponse {
312        match request.method.as_str() {
313            "initialize" => {
314                // MCP protocol initialization
315                McpResponse {
316                    jsonrpc: "2.0".to_string(),
317                    id: request.id,
318                    result: Some(serde_json::json!({
319                        "protocolVersion": "2024-11-05",
320                        "capabilities": {
321                            "tools": {}
322                        },
323                        "serverInfo": {
324                        "name": "synapse",
325                        "version": env!("CARGO_PKG_VERSION")
326                    }
327                    })),
328                    error: None,
329                }
330            }
331            "notifications/initialized" | "initialized" => {
332                // Client confirms initialization - just acknowledge
333                McpResponse {
334                    jsonrpc: "2.0".to_string(),
335                    id: request.id,
336                    result: Some(serde_json::json!({})),
337                    error: None,
338                }
339            }
340            "tools/list" => {
341                let result = ListToolsResult {
342                    tools: Self::get_tools(),
343                };
344                McpResponse {
345                    jsonrpc: "2.0".to_string(),
346                    id: request.id,
347                    result: Some(serde_json::to_value(result).unwrap()),
348                    error: None,
349                }
350            }
351            "tools/call" => self.handle_tool_call(request).await,
352            // Legacy methods for backwards compatibility
353            "ingest" => self.handle_legacy_ingest(request).await,
354            "ingest_file" => self.handle_legacy_ingest_file(request).await,
355            _ => McpResponse {
356                jsonrpc: "2.0".to_string(),
357                id: request.id,
358                result: None,
359                error: Some(McpError {
360                    code: -32601,
361                    message: format!("Method not found: {}", request.method),
362                    data: None,
363                }),
364            },
365        }
366    }
367
368    fn validate_arguments(tool_name: &str, arguments: &serde_json::Value) -> Result<(), String> {
369        let tools = Self::get_tools();
370        if let Some(tool) = tools.iter().find(|t| t.name == tool_name) {
371            if let Ok(schema) = JSONSchema::compile(&tool.input_schema) {
372                if let Err(errors) = schema.validate(arguments) {
373                    let error_msg = errors.map(|e| e.to_string()).collect::<Vec<_>>().join(", ");
374                    return Err(format!("Validation error: {}", error_msg));
375                }
376            } else {
377                return Err("Invalid tool schema definition".to_string());
378            }
379        }
380        Ok(())
381    }
382
383    async fn handle_tool_call(&self, request: McpRequest) -> McpResponse {
384        let params = match request.params {
385            Some(serde_json::Value::Object(map)) => map,
386            Some(_) => return self.error_response(request.id, -32602, "Params must be an object"),
387            None => return self.error_response(request.id, -32602, "Missing params"),
388        };
389
390        let tool_name = match params.get("name").and_then(|v| v.as_str()) {
391            Some(n) => n,
392            None => return self.error_response(request.id, -32602, "Missing tool name"),
393        };
394
395        let arguments = params
396            .get("arguments")
397            .and_then(|v| v.as_object())
398            .cloned()
399            .unwrap_or_default();
400
401        let args_value = serde_json::Value::Object(arguments.clone());
402        if let Err(e) = Self::validate_arguments(tool_name, &args_value) {
403            return self.error_response(request.id, -32602, &e);
404        }
405
406        match tool_name {
407            "ingest_triples" => self.call_ingest_triples(request.id, &arguments).await,
408            "ingest_file" => self.call_ingest_file(request.id, &arguments).await,
409            "sparql_query" => self.call_sparql_query(request.id, &arguments).await,
410            "hybrid_search" => self.call_hybrid_search(request.id, &arguments).await,
411            "apply_reasoning" => self.call_apply_reasoning(request.id, &arguments).await,
412            "get_neighbors" => self.call_get_neighbors(request.id, &arguments).await,
413            "list_triples" => self.call_list_triples(request.id, &arguments).await,
414            "delete_namespace" => self.call_delete_namespace(request.id, &arguments).await,
415            "ingest_url" => self.call_ingest_url(request.id, &arguments).await,
416            "ingest_text" => self.call_ingest_text(request.id, &arguments).await,
417            "compact_vectors" => self.call_compact_vectors(request.id, &arguments).await,
418            "vector_stats" => self.call_vector_stats(request.id, &arguments).await,
419            "disambiguate" => self.call_disambiguate(request.id, &arguments).await,
420            "get_node_degree" => self.call_get_node_degree(request.id, &arguments).await,
421            "install_ontology" => self.call_install_ontology(request.id, &arguments).await,
422            "list_scenarios" => self.call_list_scenarios(request.id).await,
423            "install_scenario" => self.call_install_scenario(request.id, &arguments).await,
424            _ => self.error_response(request.id, -32602, &format!("Unknown tool: {}", tool_name)),
425        }
426    }
427
428    async fn call_list_scenarios(&self, id: Option<serde_json::Value>) -> McpResponse {
429        match self.engine.scenario_manager.list_scenarios().await {
430            Ok(registry) => {
431                let items: Vec<ScenarioItem> = registry
432                    .into_iter()
433                    .map(|e| ScenarioItem {
434                        name: e.name,
435                        description: e.description,
436                        version: e.version,
437                    })
438                    .collect();
439                self.serialize_result(id, ScenarioListResult { scenarios: items })
440            }
441            Err(e) => self.tool_result(id, &e.to_string(), true),
442        }
443    }
444
445    async fn call_install_scenario(
446        &self,
447        id: Option<serde_json::Value>,
448        args: &serde_json::Map<String, serde_json::Value>,
449    ) -> McpResponse {
450        let name = match args.get("name").and_then(|v| v.as_str()) {
451            Some(n) => n,
452            None => return self.error_response(id, -32602, "Missing 'name'"),
453        };
454        let namespace = args
455            .get("namespace")
456            .and_then(|v| v.as_str())
457            .unwrap_or("default");
458
459        match self.engine.install_scenario(name, namespace).await {
460            Ok(msg) => {
461                let result = SimpleSuccessResult {
462                    success: true,
463                    message: msg,
464                };
465                self.serialize_result(id, result)
466            }
467            Err(e) => self.tool_result(id, &e, true),
468        }
469    }
470
471    async fn call_install_ontology(
472        &self,
473        id: Option<serde_json::Value>,
474        args: &serde_json::Map<String, serde_json::Value>,
475    ) -> McpResponse {
476        let url = match args.get("url").and_then(|v| v.as_str()) {
477            Some(u) => u,
478            None => return self.error_response(id, -32602, "Missing 'url'"),
479        };
480        let name = match args.get("name").and_then(|v| v.as_str()) {
481            Some(n) => n,
482            None => return self.error_response(id, -32602, "Missing 'name'"),
483        };
484        let namespace = args
485            .get("namespace")
486            .and_then(|v| v.as_str())
487            .unwrap_or("default");
488
489        // Download
490        let response = match reqwest::get(url).await {
491            Ok(r) => r,
492            Err(e) => {
493                return self.tool_result(id, &format!("Failed to download ontology: {}", e), true)
494            }
495        };
496
497        if !response.status().is_success() {
498            return self.tool_result(id, &format!("HTTP error: {}", response.status()), true);
499        }
500
501        let content = match response.text().await {
502            Ok(t) => t,
503            Err(e) => {
504                return self.tool_result(id, &format!("Failed to read response: {}", e), true)
505            }
506        };
507
508        // Ensure ontology directory exists
509        let ontology_dir = std::path::Path::new("ontology");
510        if !ontology_dir.exists() {
511            if let Err(e) = std::fs::create_dir(ontology_dir) {
512                return self.tool_result(
513                    id,
514                    &format!("Failed to create ontology dir: {}", e),
515                    true,
516                );
517            }
518        }
519
520        // Security check: Sanitize filename to prevent directory traversal
521        let file_name = std::path::Path::new(name)
522            .file_name()
523            .and_then(|n| n.to_str())
524            .ok_or_else(|| "Invalid filename".to_string());
525
526        let clean_name = match file_name {
527            Ok(n) => n,
528            Err(e) => return self.tool_result(id, &format!("Security error: {}", e), true),
529        };
530
531        if clean_name != name {
532            return self.tool_result(
533                id,
534                "Security error: Filename contains path components",
535                true,
536            );
537        }
538
539        let path = ontology_dir.join(clean_name);
540        if let Err(e) = std::fs::write(&path, content) {
541            return self.tool_result(id, &format!("Failed to save ontology file: {}", e), true);
542        }
543
544        // Load into store
545        let store = match self.engine.get_store(namespace) {
546            Ok(s) => s,
547            Err(e) => return self.tool_result(id, &e.to_string(), true),
548        };
549
550        match crate::ingest::ontology::OntologyLoader::load_file(&store, &path).await {
551            Ok(count) => {
552                let result = SimpleSuccessResult {
553                    success: true,
554                    message: format!("Installed ontology '{}' and loaded {} triples", name, count),
555                };
556                self.serialize_result(id, result)
557            }
558            Err(e) => self.tool_result(id, &format!("Failed to load ontology: {}", e), true),
559        }
560    }
561
562    async fn call_ingest_triples(
563        &self,
564        id: Option<serde_json::Value>,
565        args: &serde_json::Map<String, serde_json::Value>,
566    ) -> McpResponse {
567        let namespace = args
568            .get("namespace")
569            .and_then(|v| v.as_str())
570            .unwrap_or("default");
571        let triples_array = match args.get("triples").and_then(|v| v.as_array()) {
572            Some(t) => t,
573            None => return self.error_response(id, -32602, "Missing 'triples' array"),
574        };
575
576        let mut triples = Vec::new();
577        for t in triples_array {
578            if let (Some(s), Some(p), Some(o)) = (
579                t.get("subject").and_then(|v| v.as_str()),
580                t.get("predicate").and_then(|v| v.as_str()),
581                t.get("object").and_then(|v| v.as_str()),
582            ) {
583                triples.push(Triple {
584                    subject: s.to_string(),
585                    predicate: p.to_string(),
586                    object: o.to_string(),
587                    provenance: Some(Provenance {
588                        source: "mcp".to_string(),
589                        timestamp: "".to_string(),
590                        method: "tools/call".to_string(),
591                    }),
592                    embedding: vec![],
593                });
594            }
595        }
596
597        let req = Self::create_request(IngestRequest {
598            triples,
599            namespace: namespace.to_string(),
600        });
601
602        match self.engine.ingest_triples(req).await {
603            Ok(resp) => {
604                let inner = resp.into_inner();
605                let result = IngestToolResult {
606                    nodes_added: inner.nodes_added,
607                    edges_added: inner.edges_added,
608                    message: format!("Ingested {} triples", inner.edges_added),
609                };
610                self.serialize_result(id, result)
611            }
612            Err(e) => self.tool_result(id, &e.to_string(), true),
613        }
614    }
615
616    async fn call_ingest_file(
617        &self,
618        id: Option<serde_json::Value>,
619        args: &serde_json::Map<String, serde_json::Value>,
620    ) -> McpResponse {
621        let path = match args.get("path").and_then(|v| v.as_str()) {
622            Some(p) => p,
623            None => return self.error_response(id, -32602, "Missing 'path'"),
624        };
625        let namespace = args
626            .get("namespace")
627            .and_then(|v| v.as_str())
628            .unwrap_or("default");
629
630        let req = Self::create_request(IngestFileRequest {
631            file_path: path.to_string(),
632            namespace: namespace.to_string(),
633        });
634
635        match self.engine.ingest_file(req).await {
636            Ok(resp) => {
637                let inner = resp.into_inner();
638                let result = IngestToolResult {
639                    nodes_added: inner.nodes_added,
640                    edges_added: inner.edges_added,
641                    message: format!("Ingested {} triples from {}", inner.edges_added, path),
642                };
643                self.serialize_result(id, result)
644            }
645            Err(e) => self.tool_result(id, &e.to_string(), true),
646        }
647    }
648
649    async fn call_sparql_query(
650        &self,
651        id: Option<serde_json::Value>,
652        args: &serde_json::Map<String, serde_json::Value>,
653    ) -> McpResponse {
654        let query = match args.get("query").and_then(|v| v.as_str()) {
655            Some(q) => q,
656            None => return self.error_response(id, -32602, "Missing 'query'"),
657        };
658        let namespace = args
659            .get("namespace")
660            .and_then(|v| v.as_str())
661            .unwrap_or("default");
662
663        let req = Self::create_request(SparqlRequest {
664            query: query.to_string(),
665            namespace: namespace.to_string(),
666        });
667
668        match self.engine.query_sparql(req).await {
669            Ok(resp) => self.tool_result(id, &resp.into_inner().results_json, false),
670            Err(e) => self.tool_result(id, &e.to_string(), true),
671        }
672    }
673
674    async fn call_hybrid_search(
675        &self,
676        id: Option<serde_json::Value>,
677        args: &serde_json::Map<String, serde_json::Value>,
678    ) -> McpResponse {
679        let query = match args.get("query").and_then(|v| v.as_str()) {
680            Some(q) => q,
681            None => return self.error_response(id, -32602, "Missing 'query'"),
682        };
683        let namespace = args
684            .get("namespace")
685            .and_then(|v| v.as_str())
686            .unwrap_or("default");
687        let vector_k = args.get("vector_k").and_then(|v| v.as_u64()).unwrap_or(10) as u32;
688        let graph_depth = args
689            .get("graph_depth")
690            .and_then(|v| v.as_u64())
691            .unwrap_or(1) as u32;
692        let limit = args.get("limit").and_then(|v| v.as_u64()).unwrap_or(20) as u32;
693
694        let req = Self::create_request(HybridSearchRequest {
695            query: query.to_string(),
696            namespace: namespace.to_string(),
697            vector_k,
698            graph_depth,
699            mode: SearchMode::Hybrid as i32,
700            limit,
701        });
702
703        match self.engine.hybrid_search(req).await {
704            Ok(resp) => {
705                let results = resp.into_inner().results;
706                let items: Vec<SearchResultItem> = results
707                    .into_iter()
708                    .map(|r| SearchResultItem {
709                        node_id: r.node_id,
710                        score: r.score,
711                        content: r.content,
712                        uri: r.uri,
713                    })
714                    .collect();
715
716                let result = SearchToolResult { results: items };
717                self.serialize_result(id, result)
718            }
719            Err(e) => self.tool_result(id, &e.to_string(), true),
720        }
721    }
722
723    async fn call_apply_reasoning(
724        &self,
725        id: Option<serde_json::Value>,
726        args: &serde_json::Map<String, serde_json::Value>,
727    ) -> McpResponse {
728        let namespace = args
729            .get("namespace")
730            .and_then(|v| v.as_str())
731            .unwrap_or("default");
732        let strategy_str = args
733            .get("strategy")
734            .and_then(|v| v.as_str())
735            .unwrap_or("rdfs");
736        let materialize = args
737            .get("materialize")
738            .and_then(|v| v.as_bool())
739            .unwrap_or(false);
740
741        let strategy = match strategy_str.to_lowercase().as_str() {
742            "owlrl" | "owl-rl" => ReasoningStrategy::Owlrl as i32,
743            _ => ReasoningStrategy::Rdfs as i32,
744        };
745
746        let req = Self::create_request(ReasoningRequest {
747            namespace: namespace.to_string(),
748            strategy,
749            materialize,
750        });
751
752        match self.engine.apply_reasoning(req).await {
753            Ok(resp) => {
754                let inner = resp.into_inner();
755                let result = ReasoningToolResult {
756                    success: inner.success,
757                    triples_inferred: inner.triples_inferred,
758                    message: inner.message,
759                };
760                self.serialize_result(id, result)
761            }
762            Err(e) => self.tool_result(id, &e.to_string(), true),
763        }
764    }
765
766    async fn call_get_neighbors(
767        &self,
768        id: Option<serde_json::Value>,
769        args: &serde_json::Map<String, serde_json::Value>,
770    ) -> McpResponse {
771        let uri = match args.get("uri").and_then(|v| v.as_str()) {
772            Some(u) => u,
773            None => return self.error_response(id, -32602, "Missing 'uri'"),
774        };
775        let namespace = args
776            .get("namespace")
777            .and_then(|v| v.as_str())
778            .unwrap_or("default");
779        let direction = args
780            .get("direction")
781            .and_then(|v| v.as_str())
782            .unwrap_or("outgoing");
783
784        let store = match self.engine.get_store(namespace) {
785            Ok(s) => s,
786            Err(e) => return self.tool_result(id, &e.to_string(), true),
787        };
788
789        let mut neighbors = Vec::new();
790
791        // Query outgoing edges (URI as subject)
792        if direction == "outgoing" || direction == "both" {
793            if let Ok(subj) = oxigraph::model::NamedNodeRef::new(uri) {
794                for q in store
795                    .store
796                    .quads_for_pattern(Some(subj.into()), None, None, None)
797                    .flatten()
798                {
799                    neighbors.push(NeighborItem {
800                        direction: "outgoing".to_string(),
801                        predicate: q.predicate.to_string(),
802                        target: q.object.to_string(),
803                        score: 1.0,
804                    });
805                }
806            }
807        }
808
809        // Query incoming edges (URI as object)
810        if direction == "incoming" || direction == "both" {
811            if let Ok(obj) = oxigraph::model::NamedNodeRef::new(uri) {
812                for q in store
813                    .store
814                    .quads_for_pattern(None, None, Some(obj.into()), None)
815                    .flatten()
816                {
817                    neighbors.push(NeighborItem {
818                        direction: "incoming".to_string(),
819                        predicate: q.predicate.to_string(),
820                        target: q.subject.to_string(),
821                        score: 1.0,
822                    });
823                }
824            }
825        }
826
827        let result = NeighborsToolResult { neighbors };
828        self.serialize_result(id, result)
829    }
830
831    async fn call_list_triples(
832        &self,
833        id: Option<serde_json::Value>,
834        args: &serde_json::Map<String, serde_json::Value>,
835    ) -> McpResponse {
836        let namespace = args
837            .get("namespace")
838            .and_then(|v| v.as_str())
839            .unwrap_or("default");
840        let limit = args.get("limit").and_then(|v| v.as_u64()).unwrap_or(100) as usize;
841
842        let store = match self.engine.get_store(namespace) {
843            Ok(s) => s,
844            Err(e) => return self.tool_result(id, &e.to_string(), true),
845        };
846
847        let mut triples = Vec::new();
848        for q in store.store.iter().take(limit).flatten() {
849            triples.push(TripleItem {
850                subject: q.subject.to_string(),
851                predicate: q.predicate.to_string(),
852                object: q.object.to_string(),
853            });
854        }
855
856        let result = TriplesToolResult { triples };
857        self.serialize_result(id, result)
858    }
859
860    async fn call_delete_namespace(
861        &self,
862        id: Option<serde_json::Value>,
863        args: &serde_json::Map<String, serde_json::Value>,
864    ) -> McpResponse {
865        let namespace = match args.get("namespace").and_then(|v| v.as_str()) {
866            Some(n) => n,
867            None => return self.error_response(id, -32602, "Missing 'namespace'"),
868        };
869
870        let req = Self::create_request(crate::server::proto::EmptyRequest {
871            namespace: namespace.to_string(),
872        });
873
874        match self.engine.delete_namespace_data(req).await {
875            Ok(resp) => {
876                let inner = resp.into_inner();
877                let result = SimpleSuccessResult {
878                    success: inner.success,
879                    message: inner.message,
880                };
881                self.serialize_result(id, result)
882            }
883            Err(e) => self.tool_result(id, &e.to_string(), true),
884        }
885    }
886
887    async fn call_ingest_url(
888        &self,
889        id: Option<serde_json::Value>,
890        args: &serde_json::Map<String, serde_json::Value>,
891    ) -> McpResponse {
892        let url = match args.get("url").and_then(|v| v.as_str()) {
893            Some(u) => u,
894            None => return self.error_response(id, -32602, "Missing 'url'"),
895        };
896        let namespace = args
897            .get("namespace")
898            .and_then(|v| v.as_str())
899            .unwrap_or("default");
900
901        // Fetch URL content
902        let client = reqwest::Client::new();
903        let response = match client.get(url).send().await {
904            Ok(r) => r,
905            Err(e) => return self.tool_result(id, &format!("Failed to fetch URL: {}", e), true),
906        };
907
908        if !response.status().is_success() {
909            return self.tool_result(id, &format!("HTTP error: {}", response.status()), true);
910        }
911
912        let html = match response.text().await {
913            Ok(t) => t,
914            Err(e) => {
915                return self.tool_result(id, &format!("Failed to read response: {}", e), true)
916            }
917        };
918
919        // HTML to text conversion with Regex
920        let script_re = regex::Regex::new(r"(?s)<script.*?>.*?</script>").unwrap();
921        let style_re = regex::Regex::new(r"(?s)<style.*?>.*?</style>").unwrap();
922        let tag_re = regex::Regex::new(r"<[^>]*>").unwrap();
923
924        let no_script = script_re.replace_all(&html, " ");
925        let no_style = style_re.replace_all(&no_script, " ");
926        let text_content = tag_re.replace_all(&no_style, " ");
927
928        let text = text_content
929            .split_whitespace()
930            .collect::<Vec<_>>()
931            .join(" ");
932
933        // Chunk text with overlap
934        let processor = crate::processor::TextProcessor::new();
935        let chunks = processor.chunk_text(&text, 1000, 150);
936
937        // Add to vector store
938        let store = match self.engine.get_store(namespace) {
939            Ok(s) => s,
940            Err(e) => return self.tool_result(id, &e.to_string(), true),
941        };
942
943        if let Some(ref vector_store) = store.vector_store {
944            let mut added_chunks = 0;
945            for (i, chunk) in chunks.iter().enumerate() {
946                let chunk_uri = format!("{}#chunk-{}", url, i);
947                // For MCP ingestion, we just use the chunk URI as the key and metadata URI
948                let metadata = serde_json::json!({
949                    "uri": chunk_uri,
950                    "source_url": url,
951                    "type": "web_chunk"
952                });
953                match vector_store.add(&chunk_uri, chunk, metadata).await {
954                    Ok(_) => added_chunks += 1,
955                    Err(e) => {
956                        eprintln!("Failed to add chunk {}: {}", i, e);
957                    }
958                }
959            }
960            let result = IngestToolResult {
961                nodes_added: 0,
962                edges_added: 0, // Ingest URL technically adds to vector store, no graph edges yet unless reasoned
963                message: format!(
964                    "Ingested URL: {} ({} chars, {} chunks)",
965                    url,
966                    text.len(),
967                    added_chunks
968                ),
969            };
970            self.serialize_result(id, result)
971        } else {
972            self.tool_result(id, "Vector store not available", true)
973        }
974    }
975
976    async fn call_ingest_text(
977        &self,
978        id: Option<serde_json::Value>,
979        args: &serde_json::Map<String, serde_json::Value>,
980    ) -> McpResponse {
981        let uri = match args.get("uri").and_then(|v| v.as_str()) {
982            Some(u) => u,
983            None => return self.error_response(id, -32602, "Missing 'uri'"),
984        };
985        let content = match args.get("content").and_then(|v| v.as_str()) {
986            Some(c) => c,
987            None => return self.error_response(id, -32602, "Missing 'content'"),
988        };
989        let namespace = args
990            .get("namespace")
991            .and_then(|v| v.as_str())
992            .unwrap_or("default");
993
994        // Chunk text with overlap
995        let processor = crate::processor::TextProcessor::new();
996        let chunks = processor.chunk_text(content, 1000, 150);
997
998        // Add to vector store
999        let store = match self.engine.get_store(namespace) {
1000            Ok(s) => s,
1001            Err(e) => return self.tool_result(id, &e.to_string(), true),
1002        };
1003
1004        if let Some(ref vector_store) = store.vector_store {
1005            let mut added_chunks = 0;
1006            for (i, chunk) in chunks.iter().enumerate() {
1007                let chunk_uri = if chunks.len() > 1 {
1008                    format!("{}#chunk-{}", uri, i)
1009                } else {
1010                    uri.to_string()
1011                };
1012                let metadata = serde_json::json!({
1013                    "uri": uri, // Map back to original URI
1014                    "chunk_uri": chunk_uri,
1015                    "type": "text_chunk"
1016                });
1017                match vector_store.add(&chunk_uri, chunk, metadata).await {
1018                    Ok(_) => added_chunks += 1,
1019                    Err(e) => {
1020                        eprintln!("Failed to add chunk {}: {}", i, e);
1021                    }
1022                }
1023            }
1024            let result = IngestToolResult {
1025                nodes_added: 0,
1026                edges_added: 0,
1027                message: format!(
1028                    "Ingested text: {} ({} chars, {} chunks)",
1029                    uri,
1030                    content.len(),
1031                    added_chunks
1032                ),
1033            };
1034            self.serialize_result(id, result)
1035        } else {
1036            self.tool_result(id, "Vector store not available", true)
1037        }
1038    }
1039
1040    async fn call_compact_vectors(
1041        &self,
1042        id: Option<serde_json::Value>,
1043        args: &serde_json::Map<String, serde_json::Value>,
1044    ) -> McpResponse {
1045        let namespace = args
1046            .get("namespace")
1047            .and_then(|v| v.as_str())
1048            .unwrap_or("default");
1049
1050        let store = match self.engine.get_store(namespace) {
1051            Ok(s) => s,
1052            Err(e) => return self.tool_result(id, &e.to_string(), true),
1053        };
1054
1055        if let Some(ref vector_store) = store.vector_store {
1056            match vector_store.compact() {
1057                Ok(removed) => {
1058                    let result = SimpleSuccessResult {
1059                        success: true,
1060                        message: format!("Compaction complete: {} stale entries removed", removed),
1061                    };
1062                    self.serialize_result(id, result)
1063                }
1064                Err(e) => self.tool_result(id, &format!("Compaction error: {}", e), true),
1065            }
1066        } else {
1067            self.tool_result(id, "Vector store not available", true)
1068        }
1069    }
1070
1071    async fn call_vector_stats(
1072        &self,
1073        id: Option<serde_json::Value>,
1074        args: &serde_json::Map<String, serde_json::Value>,
1075    ) -> McpResponse {
1076        let namespace = args
1077            .get("namespace")
1078            .and_then(|v| v.as_str())
1079            .unwrap_or("default");
1080
1081        let store = match self.engine.get_store(namespace) {
1082            Ok(s) => s,
1083            Err(e) => return self.tool_result(id, &e.to_string(), true),
1084        };
1085
1086        if let Some(ref vector_store) = store.vector_store {
1087            let (active, stale, total) = vector_store.stats();
1088            let result = StatsToolResult {
1089                active_vectors: active,
1090                stale_vectors: stale,
1091                total_embeddings: total,
1092            };
1093            self.serialize_result(id, result)
1094        } else {
1095            self.tool_result(id, "Vector store not available", true)
1096        }
1097    }
1098
1099    async fn call_disambiguate(
1100        &self,
1101        id: Option<serde_json::Value>,
1102        args: &serde_json::Map<String, serde_json::Value>,
1103    ) -> McpResponse {
1104        let namespace = args
1105            .get("namespace")
1106            .and_then(|v| v.as_str())
1107            .unwrap_or("default");
1108        let threshold = args
1109            .get("threshold")
1110            .and_then(|v| v.as_f64())
1111            .unwrap_or(0.8);
1112
1113        let store = match self.engine.get_store(namespace) {
1114            Ok(s) => s,
1115            Err(e) => return self.tool_result(id, &e.to_string(), true),
1116        };
1117
1118        // Collect all URIs from the store
1119        let uri_map = store.uri_to_id.read().unwrap();
1120        let uris: Vec<String> = uri_map.keys().cloned().collect();
1121        drop(uri_map);
1122
1123        let disambiguator = crate::disambiguation::EntityDisambiguator::new(threshold);
1124        let suggestions = disambiguator.suggest_merges(&uris);
1125
1126        let items: Vec<DisambiguationItem> = suggestions
1127            .into_iter()
1128            .map(|(u1, u2, s)| DisambiguationItem {
1129                uri1: u1,
1130                uri2: u2,
1131                similarity: s,
1132            })
1133            .collect();
1134
1135        let message = if items.is_empty() {
1136            "No similar entities found above threshold".to_string()
1137        } else {
1138            format!("Found {} potential duplicates", items.len())
1139        };
1140
1141        let result = DisambiguationResult {
1142            suggestions: items,
1143            message,
1144        };
1145        self.serialize_result(id, result)
1146    }
1147
1148    // Legacy handlers for backward compatibility
1149    async fn handle_legacy_ingest(&self, request: McpRequest) -> McpResponse {
1150        let params = match request.params {
1151            Some(p) => p,
1152            None => return self.error_response(request.id, -32602, "Invalid params"),
1153        };
1154
1155        if let (Some(sub), Some(pred), Some(obj)) = (
1156            params.get("subject").and_then(|v| v.as_str()),
1157            params.get("predicate").and_then(|v| v.as_str()),
1158            params.get("object").and_then(|v| v.as_str()),
1159        ) {
1160            let namespace = params
1161                .get("namespace")
1162                .and_then(|v| v.as_str())
1163                .unwrap_or("default");
1164            let triple = Triple {
1165                subject: sub.to_string(),
1166                predicate: pred.to_string(),
1167                object: obj.to_string(),
1168                provenance: Some(Provenance {
1169                    source: "mcp".to_string(),
1170                    timestamp: "".to_string(),
1171                    method: "stdio".to_string(),
1172                }),
1173                embedding: vec![],
1174            };
1175
1176            let req = Self::create_request(IngestRequest {
1177                triples: vec![triple],
1178                namespace: namespace.to_string(),
1179            });
1180
1181            match self.engine.ingest_triples(req).await {
1182                Ok(_) => McpResponse {
1183                    jsonrpc: "2.0".to_string(),
1184                    id: request.id,
1185                    result: Some(serde_json::to_value("Ingested").unwrap()),
1186                    error: None,
1187                },
1188                Err(e) => self.error_response(request.id, -32000, &e.to_string()),
1189            }
1190        } else {
1191            self.error_response(request.id, -32602, "Invalid params")
1192        }
1193    }
1194
1195    async fn handle_legacy_ingest_file(&self, request: McpRequest) -> McpResponse {
1196        let params = match request.params {
1197            Some(p) => p,
1198            None => {
1199                return self.error_response(request.id, -32602, "Invalid params: 'path' required")
1200            }
1201        };
1202
1203        if let Some(path) = params.get("path").and_then(|v| v.as_str()) {
1204            let namespace = params
1205                .get("namespace")
1206                .and_then(|v| v.as_str())
1207                .unwrap_or("default");
1208
1209            let req = Self::create_request(IngestFileRequest {
1210                file_path: path.to_string(),
1211                namespace: namespace.to_string(),
1212            });
1213
1214            match self.engine.ingest_file(req).await {
1215                Ok(resp) => {
1216                    let inner = resp.into_inner();
1217                    McpResponse {
1218                        jsonrpc: "2.0".to_string(),
1219                        id: request.id,
1220                        result: Some(
1221                            serde_json::to_value(format!(
1222                                "Ingested {} triples from {}",
1223                                inner.edges_added, path
1224                            ))
1225                            .unwrap(),
1226                        ),
1227                        error: None,
1228                    }
1229                }
1230                Err(e) => self.error_response(request.id, -32000, &e.to_string()),
1231            }
1232        } else {
1233            self.error_response(request.id, -32602, "Invalid params: 'path' required")
1234        }
1235    }
1236
1237    fn serialize_result<T: serde::Serialize>(
1238        &self,
1239        id: Option<serde_json::Value>,
1240        result: T,
1241    ) -> McpResponse {
1242        match serde_json::to_string_pretty(&result) {
1243            Ok(json) => self.tool_result(id, &json, false),
1244            Err(e) => self.tool_result(id, &format!("Serialization error: {}", e), true),
1245        }
1246    }
1247
1248    async fn call_get_node_degree(
1249        &self,
1250        id: Option<serde_json::Value>,
1251        args: &serde_json::Map<String, serde_json::Value>,
1252    ) -> McpResponse {
1253        let uri = match args.get("uri").and_then(|v| v.as_str()) {
1254            Some(u) => u,
1255            None => return self.error_response(id, -32602, "Missing 'uri'"),
1256        };
1257        let namespace = args
1258            .get("namespace")
1259            .and_then(|v| v.as_str())
1260            .unwrap_or("default");
1261
1262        let store = match self.engine.get_store(namespace) {
1263            Ok(s) => s,
1264            Err(e) => return self.tool_result(id, &e.to_string(), true),
1265        };
1266
1267        let degree = store.get_degree(uri);
1268
1269        let result = DegreeResult {
1270            uri: uri.to_string(),
1271            degree,
1272        };
1273
1274        self.serialize_result(id, result)
1275    }
1276
1277    fn error_response(
1278        &self,
1279        id: Option<serde_json::Value>,
1280        code: i32,
1281        message: &str,
1282    ) -> McpResponse {
1283        McpResponse {
1284            jsonrpc: "2.0".to_string(),
1285            id,
1286            result: None,
1287            error: Some(McpError {
1288                code,
1289                message: message.to_string(),
1290                data: None,
1291            }),
1292        }
1293    }
1294
1295    fn tool_result(
1296        &self,
1297        id: Option<serde_json::Value>,
1298        text: &str,
1299        is_error: bool,
1300    ) -> McpResponse {
1301        let result = CallToolResult {
1302            content: vec![Content {
1303                content_type: "text".to_string(),
1304                text: text.to_string(),
1305            }],
1306            is_error: if is_error { Some(true) } else { None },
1307        };
1308        McpResponse {
1309            jsonrpc: "2.0".to_string(),
1310            id,
1311            result: Some(serde_json::to_value(result).unwrap()),
1312            error: None,
1313        }
1314    }
1315}