Skip to main content

synapse_core/
mcp_stdio.rs

1use crate::mcp_types::{
2    CallToolResult, Content, DegreeResult, DisambiguationItem, DisambiguationResult,
3    IngestToolResult, ListToolsResult, McpError, McpRequest, McpResponse, NeighborItem,
4    NeighborsToolResult, ReasoningToolResult, ScenarioItem, ScenarioListResult, SearchResultItem,
5    SearchToolResult, SimpleSuccessResult, StatsToolResult, Tool, TripleItem, TriplesToolResult,
6};
7use crate::server::proto::semantic_engine_server::SemanticEngine;
8use crate::server::proto::{
9    HybridSearchRequest, IngestFileRequest, IngestRequest, Provenance, ReasoningRequest,
10    ReasoningStrategy, SearchMode, SparqlRequest, Triple,
11};
12use crate::server::MySemanticEngine;
13use jsonschema::JSONSchema;
14use std::sync::Arc;
15use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
16use tonic::Request;
17
18pub struct McpStdioServer {
19    engine: Arc<MySemanticEngine>,
20}
21
22impl McpStdioServer {
23    pub fn new(engine: Arc<MySemanticEngine>) -> Self {
24        Self { engine }
25    }
26
27    pub async fn run(&self) -> Result<(), Box<dyn std::error::Error>> {
28        let mut reader = BufReader::new(tokio::io::stdin());
29        let mut writer = tokio::io::stdout();
30
31        loop {
32            let mut line = String::new();
33            if reader.read_line(&mut line).await? == 0 {
34                break;
35            }
36
37            let trimmed = line.trim();
38            if trimmed.is_empty() {
39                continue;
40            }
41
42            if let Ok(request) = serde_json::from_str::<McpRequest>(trimmed) {
43                let is_notification = request.id.is_none();
44                let response = self.handle_request(request).await;
45                
46                // Only send response if it's not a notification
47                if !is_notification {
48                    let response_json = serde_json::to_string(&response)? + "\n";
49                    writer.write_all(response_json.as_bytes()).await?;
50                    writer.flush().await?;
51                }
52            } else {
53                // Log failed parse to stderr but don't crash
54                eprintln!("MCP PROTOCOL ERROR: Failed to parse line: {}", trimmed);
55            }
56        }
57
58        Ok(())
59    }
60
61    fn create_request<T>(msg: T) -> Request<T> {
62        let mut req = Request::new(msg);
63
64        // Try to get token from env
65        let token_opt = std::env::var("SYNAPSE_ADMIN_TOKEN")
66            .or_else(|_| std::env::var("SYNAPSE_MCP_TOKEN"))
67            .ok();
68
69        if let Some(token) = token_opt {
70            if let Ok(val) = format!("Bearer {}", token).parse() {
71                req.metadata_mut().insert("authorization", val);
72            }
73        }
74        req
75    }
76
77    fn get_tools() -> Vec<Tool> {
78        vec![
79            Tool {
80                name: "ingest_triples".to_string(),
81                description: Some(
82                    "Ingest one or more RDF triples into the knowledge graph".to_string(),
83                ),
84                input_schema: serde_json::json!({
85                    "type": "object",
86                    "properties": {
87                        "triples": {
88                            "type": "array",
89                            "items": {
90                                "type": "object",
91                                "properties": {
92                                    "subject": { "type": "string" },
93                                    "predicate": { "type": "string" },
94                                    "object": { "type": "string" }
95                                },
96                                "required": ["subject", "predicate", "object"]
97                            }
98                        },
99                        "namespace": { "type": "string", "default": "default" }
100                    },
101                    "required": ["triples"]
102                }),
103            },
104            Tool {
105                name: "ingest_file".to_string(),
106                description: Some(
107                    "Ingest a CSV or Markdown file into the knowledge graph".to_string(),
108                ),
109                input_schema: serde_json::json!({
110                    "type": "object",
111                    "properties": {
112                        "path": { "type": "string", "description": "Path to the file" },
113                        "namespace": { "type": "string", "default": "default" }
114                    },
115                    "required": ["path"]
116                }),
117            },
118            Tool {
119                name: "sparql_query".to_string(),
120                description: Some("Execute a SPARQL query against the knowledge graph".to_string()),
121                input_schema: serde_json::json!({
122                    "type": "object",
123                    "properties": {
124                        "query": { "type": "string", "description": "SPARQL query string" },
125                        "namespace": { "type": "string", "default": "default" }
126                    },
127                    "required": ["query"]
128                }),
129            },
130            Tool {
131                name: "hybrid_search".to_string(),
132                description: Some("Perform a hybrid vector + graph search".to_string()),
133                input_schema: serde_json::json!({
134                    "type": "object",
135                    "properties": {
136                        "query": { "type": "string", "description": "Natural language query" },
137                        "namespace": { "type": "string", "default": "default" },
138                        "vector_k": { "type": "integer", "default": 10 },
139                        "graph_depth": { "type": "integer", "default": 1 },
140                        "limit": { "type": "integer", "default": 20 }
141                    },
142                    "required": ["query"]
143                }),
144            },
145            Tool {
146                name: "apply_reasoning".to_string(),
147                description: Some(
148                    "Apply RDFS or OWL-RL reasoning to infer new triples".to_string(),
149                ),
150                input_schema: serde_json::json!({
151                    "type": "object",
152                    "properties": {
153                        "namespace": { "type": "string", "default": "default" },
154                        "strategy": { "type": "string", "enum": ["rdfs", "owlrl"], "default": "rdfs" },
155                        "materialize": { "type": "boolean", "default": false }
156                    }
157                }),
158            },
159            Tool {
160                name: "get_neighbors".to_string(),
161                description: Some(
162                    "Get neighboring nodes connected to a given URI in the graph".to_string(),
163                ),
164                input_schema: serde_json::json!({
165                    "type": "object",
166                    "properties": {
167                        "uri": { "type": "string", "description": "URI of the entity to find neighbors for" },
168                        "namespace": { "type": "string", "default": "default" },
169                        "direction": { "type": "string", "enum": ["outgoing", "incoming", "both"], "default": "outgoing" }
170                    },
171                    "required": ["uri"]
172                }),
173            },
174            Tool {
175                name: "list_triples".to_string(),
176                description: Some(
177                    "List all triples in a namespace (useful for debugging/exploration)"
178                        .to_string(),
179                ),
180                input_schema: serde_json::json!({
181                    "type": "object",
182                    "properties": {
183                        "namespace": { "type": "string", "default": "default" },
184                        "limit": { "type": "integer", "default": 100 }
185                    }
186                }),
187            },
188            Tool {
189                name: "delete_namespace".to_string(),
190                description: Some("Delete all data in a namespace".to_string()),
191                input_schema: serde_json::json!({
192                    "type": "object",
193                    "properties": {
194                        "namespace": { "type": "string", "description": "Namespace to delete" }
195                    },
196                    "required": ["namespace"]
197                }),
198            },
199            Tool {
200                name: "ingest_url".to_string(),
201                description: Some(
202                    "Scrape a web page and add its content to the vector store for RAG retrieval"
203                        .to_string(),
204                ),
205                input_schema: serde_json::json!({
206                    "type": "object",
207                    "properties": {
208                        "url": { "type": "string", "description": "URL to scrape and ingest" },
209                        "namespace": { "type": "string", "default": "default" }
210                    },
211                    "required": ["url"]
212                }),
213            },
214            Tool {
215                name: "ingest_text".to_string(),
216                description: Some(
217                    "Add arbitrary text content to the vector store for RAG retrieval".to_string(),
218                ),
219                input_schema: serde_json::json!({
220                    "type": "object",
221                    "properties": {
222                        "uri": { "type": "string", "description": "Custom URI identifier for this text" },
223                        "content": { "type": "string", "description": "Text content to embed and store" },
224                        "namespace": { "type": "string", "default": "default" }
225                    },
226                    "required": ["uri", "content"]
227                }),
228            },
229            Tool {
230                name: "compact_vectors".to_string(),
231                description: Some("Compact the vector index by removing stale entries".to_string()),
232                input_schema: serde_json::json!({
233                    "type": "object",
234                    "properties": {
235                        "namespace": { "type": "string", "default": "default" }
236                    }
237                }),
238            },
239            Tool {
240                name: "vector_stats".to_string(),
241                description: Some("Get vector store statistics (active, stale, total)".to_string()),
242                input_schema: serde_json::json!({
243                    "type": "object",
244                    "properties": {
245                        "namespace": { "type": "string", "default": "default" }
246                    }
247                }),
248            },
249            Tool {
250                name: "disambiguate".to_string(),
251                description: Some("Find similar entities that might be duplicates".to_string()),
252                input_schema: serde_json::json!({
253                    "type": "object",
254                    "properties": {
255                        "namespace": { "type": "string", "default": "default" },
256                        "threshold": { "type": "number", "default": 0.8, "description": "Similarity threshold 0.0-1.0" }
257                    }
258                }),
259            },
260            Tool {
261                name: "get_node_degree".to_string(),
262                description: Some("Get the degree (number of connections) of a node".to_string()),
263                input_schema: serde_json::json!({
264                    "type": "object",
265                    "properties": {
266                        "uri": { "type": "string" },
267                        "namespace": { "type": "string", "default": "default" }
268                    },
269                    "required": ["uri"]
270                }),
271            },
272            Tool {
273                name: "install_ontology".to_string(),
274                description: Some("Download and install an ontology from a URL".to_string()),
275                input_schema: serde_json::json!({
276                    "type": "object",
277                    "properties": {
278                        "url": { "type": "string", "description": "URL of the ontology file (.owl, .ttl)" },
279                        "name": { "type": "string", "description": "Name for the local file (e.g. 'legal.owl')" },
280                        "namespace": { "type": "string", "default": "default" }
281                    },
282                    "required": ["url", "name"]
283                }),
284            },
285            Tool {
286                name: "list_scenarios".to_string(),
287                description: Some("List available scenarios in the marketplace".to_string()),
288                input_schema: serde_json::json!({
289                    "type": "object",
290                    "properties": {}
291                }),
292            },
293            Tool {
294                name: "install_scenario".to_string(),
295                description: Some(
296                    "Install a scenario (ontologies, data, docs) from the marketplace".to_string(),
297                ),
298                input_schema: serde_json::json!({
299                    "type": "object",
300                    "properties": {
301                        "name": { "type": "string", "description": "Name of the scenario to install" },
302                        "namespace": { "type": "string", "default": "default" }
303                    },
304                    "required": ["name"]
305                }),
306            },
307        ]
308    }
309
310    pub async fn handle_request(&self, request: McpRequest) -> McpResponse {
311        match request.method.as_str() {
312            "initialize" => {
313                // MCP protocol initialization
314                McpResponse {
315                    jsonrpc: "2.0".to_string(),
316                    id: request.id,
317                    result: Some(serde_json::json!({
318                        "protocolVersion": "2024-11-05",
319                        "capabilities": {
320                            "tools": {}
321                        },
322                        "serverInfo": {
323                        "name": "synapse",
324                        "version": env!("CARGO_PKG_VERSION")
325                    }
326                    })),
327                    error: None,
328                }
329            }
330            "notifications/initialized" | "initialized" => {
331                // Client confirms initialization - just acknowledge
332                McpResponse {
333                    jsonrpc: "2.0".to_string(),
334                    id: request.id,
335                    result: Some(serde_json::json!({})),
336                    error: None,
337                }
338            }
339            "tools/list" => {
340                let result = ListToolsResult {
341                    tools: Self::get_tools(),
342                };
343                McpResponse {
344                    jsonrpc: "2.0".to_string(),
345                    id: request.id,
346                    result: Some(serde_json::to_value(result).unwrap()),
347                    error: None,
348                }
349            }
350            "tools/call" => self.handle_tool_call(request).await,
351            // Legacy methods for backwards compatibility
352            "ingest" => self.handle_legacy_ingest(request).await,
353            "ingest_file" => self.handle_legacy_ingest_file(request).await,
354            _ => McpResponse {
355                jsonrpc: "2.0".to_string(),
356                id: request.id,
357                result: None,
358                error: Some(McpError {
359                    code: -32601,
360                    message: format!("Method not found: {}", request.method),
361                    data: None,
362                }),
363            },
364        }
365    }
366
367    fn validate_arguments(tool_name: &str, arguments: &serde_json::Value) -> Result<(), String> {
368        let tools = Self::get_tools();
369        if let Some(tool) = tools.iter().find(|t| t.name == tool_name) {
370            if let Ok(schema) = JSONSchema::compile(&tool.input_schema) {
371                if let Err(errors) = schema.validate(arguments) {
372                    let error_msg = errors.map(|e| e.to_string()).collect::<Vec<_>>().join(", ");
373                    return Err(format!("Validation error: {}", error_msg));
374                }
375            } else {
376                return Err("Invalid tool schema definition".to_string());
377            }
378        }
379        Ok(())
380    }
381
382    async fn handle_tool_call(&self, request: McpRequest) -> McpResponse {
383        let params = match request.params {
384            Some(serde_json::Value::Object(map)) => map,
385            Some(_) => return self.error_response(request.id, -32602, "Params must be an object"),
386            None => return self.error_response(request.id, -32602, "Missing params"),
387        };
388
389        let tool_name = match params.get("name").and_then(|v| v.as_str()) {
390            Some(n) => n,
391            None => return self.error_response(request.id, -32602, "Missing tool name"),
392        };
393
394        let arguments = params
395            .get("arguments")
396            .and_then(|v| v.as_object())
397            .cloned()
398            .unwrap_or_default();
399
400        let args_value = serde_json::Value::Object(arguments.clone());
401        if let Err(e) = Self::validate_arguments(tool_name, &args_value) {
402            return self.error_response(request.id, -32602, &e);
403        }
404
405        match tool_name {
406            "ingest_triples" => self.call_ingest_triples(request.id, &arguments).await,
407            "ingest_file" => self.call_ingest_file(request.id, &arguments).await,
408            "sparql_query" => self.call_sparql_query(request.id, &arguments).await,
409            "hybrid_search" => self.call_hybrid_search(request.id, &arguments).await,
410            "apply_reasoning" => self.call_apply_reasoning(request.id, &arguments).await,
411            "get_neighbors" => self.call_get_neighbors(request.id, &arguments).await,
412            "list_triples" => self.call_list_triples(request.id, &arguments).await,
413            "delete_namespace" => self.call_delete_namespace(request.id, &arguments).await,
414            "ingest_url" => self.call_ingest_url(request.id, &arguments).await,
415            "ingest_text" => self.call_ingest_text(request.id, &arguments).await,
416            "compact_vectors" => self.call_compact_vectors(request.id, &arguments).await,
417            "vector_stats" => self.call_vector_stats(request.id, &arguments).await,
418            "disambiguate" => self.call_disambiguate(request.id, &arguments).await,
419            "get_node_degree" => self.call_get_node_degree(request.id, &arguments).await,
420            "install_ontology" => self.call_install_ontology(request.id, &arguments).await,
421            "list_scenarios" => self.call_list_scenarios(request.id).await,
422            "install_scenario" => self.call_install_scenario(request.id, &arguments).await,
423            _ => self.error_response(request.id, -32602, &format!("Unknown tool: {}", tool_name)),
424        }
425    }
426
427    async fn call_list_scenarios(&self, id: Option<serde_json::Value>) -> McpResponse {
428        match self.engine.scenario_manager.list_scenarios().await {
429            Ok(registry) => {
430                let items: Vec<ScenarioItem> = registry
431                    .into_iter()
432                    .map(|e| ScenarioItem {
433                        name: e.name,
434                        description: e.description,
435                        version: e.version,
436                    })
437                    .collect();
438                self.serialize_result(id, ScenarioListResult { scenarios: items })
439            }
440            Err(e) => self.tool_result(id, &e.to_string(), true),
441        }
442    }
443
444    async fn call_install_scenario(
445        &self,
446        id: Option<serde_json::Value>,
447        args: &serde_json::Map<String, serde_json::Value>,
448    ) -> McpResponse {
449        let name = match args.get("name").and_then(|v| v.as_str()) {
450            Some(n) => n,
451            None => return self.error_response(id, -32602, "Missing 'name'"),
452        };
453        let namespace = args
454            .get("namespace")
455            .and_then(|v| v.as_str())
456            .unwrap_or("default");
457
458        match self.engine.install_scenario(name, namespace).await {
459            Ok(msg) => {
460                let result = SimpleSuccessResult {
461                    success: true,
462                    message: msg,
463                };
464                self.serialize_result(id, result)
465            }
466            Err(e) => self.tool_result(id, &e, true),
467        }
468    }
469
470    async fn call_install_ontology(
471        &self,
472        id: Option<serde_json::Value>,
473        args: &serde_json::Map<String, serde_json::Value>,
474    ) -> McpResponse {
475        let url = match args.get("url").and_then(|v| v.as_str()) {
476            Some(u) => u,
477            None => return self.error_response(id, -32602, "Missing 'url'"),
478        };
479        let name = match args.get("name").and_then(|v| v.as_str()) {
480            Some(n) => n,
481            None => return self.error_response(id, -32602, "Missing 'name'"),
482        };
483        let namespace = args
484            .get("namespace")
485            .and_then(|v| v.as_str())
486            .unwrap_or("default");
487
488        // Download
489        let response = match reqwest::get(url).await {
490            Ok(r) => r,
491            Err(e) => {
492                return self.tool_result(id, &format!("Failed to download ontology: {}", e), true)
493            }
494        };
495
496        if !response.status().is_success() {
497            return self.tool_result(id, &format!("HTTP error: {}", response.status()), true);
498        }
499
500        let content = match response.text().await {
501            Ok(t) => t,
502            Err(e) => {
503                return self.tool_result(id, &format!("Failed to read response: {}", e), true)
504            }
505        };
506
507        // Ensure ontology directory exists
508        let ontology_dir = std::path::Path::new("ontology");
509        if !ontology_dir.exists() {
510            if let Err(e) = std::fs::create_dir(ontology_dir) {
511                return self.tool_result(
512                    id,
513                    &format!("Failed to create ontology dir: {}", e),
514                    true,
515                );
516            }
517        }
518
519        // Security check: Sanitize filename to prevent directory traversal
520        let file_name = std::path::Path::new(name)
521            .file_name()
522            .and_then(|n| n.to_str())
523            .ok_or_else(|| "Invalid filename".to_string());
524
525        let clean_name = match file_name {
526            Ok(n) => n,
527            Err(e) => return self.tool_result(id, &format!("Security error: {}", e), true),
528        };
529
530        if clean_name != name {
531            return self.tool_result(
532                id,
533                "Security error: Filename contains path components",
534                true,
535            );
536        }
537
538        let path = ontology_dir.join(clean_name);
539        if let Err(e) = std::fs::write(&path, content) {
540            return self.tool_result(id, &format!("Failed to save ontology file: {}", e), true);
541        }
542
543        // Load into store
544        let store = match self.engine.get_store(namespace) {
545            Ok(s) => s,
546            Err(e) => return self.tool_result(id, &e.to_string(), true),
547        };
548
549        match crate::ingest::ontology::OntologyLoader::load_file(&store, &path).await {
550            Ok(count) => {
551                let result = SimpleSuccessResult {
552                    success: true,
553                    message: format!("Installed ontology '{}' and loaded {} triples", name, count),
554                };
555                self.serialize_result(id, result)
556            }
557            Err(e) => self.tool_result(id, &format!("Failed to load ontology: {}", e), true),
558        }
559    }
560
561    async fn call_ingest_triples(
562        &self,
563        id: Option<serde_json::Value>,
564        args: &serde_json::Map<String, serde_json::Value>,
565    ) -> McpResponse {
566        let namespace = args
567            .get("namespace")
568            .and_then(|v| v.as_str())
569            .unwrap_or("default");
570        let triples_array = match args.get("triples").and_then(|v| v.as_array()) {
571            Some(t) => t,
572            None => return self.error_response(id, -32602, "Missing 'triples' array"),
573        };
574
575        let mut triples = Vec::new();
576        for t in triples_array {
577            if let (Some(s), Some(p), Some(o)) = (
578                t.get("subject").and_then(|v| v.as_str()),
579                t.get("predicate").and_then(|v| v.as_str()),
580                t.get("object").and_then(|v| v.as_str()),
581            ) {
582                triples.push(Triple {
583                    subject: s.to_string(),
584                    predicate: p.to_string(),
585                    object: o.to_string(),
586                    provenance: Some(Provenance {
587                        source: "mcp".to_string(),
588                        timestamp: "".to_string(),
589                        method: "tools/call".to_string(),
590                    }),
591                    embedding: vec![],
592                });
593            }
594        }
595
596        let req = Self::create_request(IngestRequest {
597            triples,
598            namespace: namespace.to_string(),
599        });
600
601        match self.engine.ingest_triples(req).await {
602            Ok(resp) => {
603                let inner = resp.into_inner();
604                let result = IngestToolResult {
605                    nodes_added: inner.nodes_added,
606                    edges_added: inner.edges_added,
607                    message: format!("Ingested {} triples", inner.edges_added),
608                };
609                self.serialize_result(id, result)
610            }
611            Err(e) => self.tool_result(id, &e.to_string(), true),
612        }
613    }
614
615    async fn call_ingest_file(
616        &self,
617        id: Option<serde_json::Value>,
618        args: &serde_json::Map<String, serde_json::Value>,
619    ) -> McpResponse {
620        let path = match args.get("path").and_then(|v| v.as_str()) {
621            Some(p) => p,
622            None => return self.error_response(id, -32602, "Missing 'path'"),
623        };
624        let namespace = args
625            .get("namespace")
626            .and_then(|v| v.as_str())
627            .unwrap_or("default");
628
629        let req = Self::create_request(IngestFileRequest {
630            file_path: path.to_string(),
631            namespace: namespace.to_string(),
632        });
633
634        match self.engine.ingest_file(req).await {
635            Ok(resp) => {
636                let inner = resp.into_inner();
637                let result = IngestToolResult {
638                    nodes_added: inner.nodes_added,
639                    edges_added: inner.edges_added,
640                    message: format!("Ingested {} triples from {}", inner.edges_added, path),
641                };
642                self.serialize_result(id, result)
643            }
644            Err(e) => self.tool_result(id, &e.to_string(), true),
645        }
646    }
647
648    async fn call_sparql_query(
649        &self,
650        id: Option<serde_json::Value>,
651        args: &serde_json::Map<String, serde_json::Value>,
652    ) -> McpResponse {
653        let query = match args.get("query").and_then(|v| v.as_str()) {
654            Some(q) => q,
655            None => return self.error_response(id, -32602, "Missing 'query'"),
656        };
657        let namespace = args
658            .get("namespace")
659            .and_then(|v| v.as_str())
660            .unwrap_or("default");
661
662        let req = Self::create_request(SparqlRequest {
663            query: query.to_string(),
664            namespace: namespace.to_string(),
665        });
666
667        match self.engine.query_sparql(req).await {
668            Ok(resp) => self.tool_result(id, &resp.into_inner().results_json, false),
669            Err(e) => self.tool_result(id, &e.to_string(), true),
670        }
671    }
672
673    async fn call_hybrid_search(
674        &self,
675        id: Option<serde_json::Value>,
676        args: &serde_json::Map<String, serde_json::Value>,
677    ) -> McpResponse {
678        let query = match args.get("query").and_then(|v| v.as_str()) {
679            Some(q) => q,
680            None => return self.error_response(id, -32602, "Missing 'query'"),
681        };
682        let namespace = args
683            .get("namespace")
684            .and_then(|v| v.as_str())
685            .unwrap_or("default");
686        let vector_k = args.get("vector_k").and_then(|v| v.as_u64()).unwrap_or(10) as u32;
687        let graph_depth = args
688            .get("graph_depth")
689            .and_then(|v| v.as_u64())
690            .unwrap_or(1) as u32;
691        let limit = args.get("limit").and_then(|v| v.as_u64()).unwrap_or(20) as u32;
692
693        let req = Self::create_request(HybridSearchRequest {
694            query: query.to_string(),
695            namespace: namespace.to_string(),
696            vector_k,
697            graph_depth,
698            mode: SearchMode::Hybrid as i32,
699            limit,
700        });
701
702        match self.engine.hybrid_search(req).await {
703            Ok(resp) => {
704                let results = resp.into_inner().results;
705                let items: Vec<SearchResultItem> = results
706                    .into_iter()
707                    .map(|r| SearchResultItem {
708                        node_id: r.node_id,
709                        score: r.score,
710                        content: r.content,
711                        uri: r.uri,
712                    })
713                    .collect();
714
715                let result = SearchToolResult { results: items };
716                self.serialize_result(id, result)
717            }
718            Err(e) => self.tool_result(id, &e.to_string(), true),
719        }
720    }
721
722    async fn call_apply_reasoning(
723        &self,
724        id: Option<serde_json::Value>,
725        args: &serde_json::Map<String, serde_json::Value>,
726    ) -> McpResponse {
727        let namespace = args
728            .get("namespace")
729            .and_then(|v| v.as_str())
730            .unwrap_or("default");
731        let strategy_str = args
732            .get("strategy")
733            .and_then(|v| v.as_str())
734            .unwrap_or("rdfs");
735        let materialize = args
736            .get("materialize")
737            .and_then(|v| v.as_bool())
738            .unwrap_or(false);
739
740        let strategy = match strategy_str.to_lowercase().as_str() {
741            "owlrl" | "owl-rl" => ReasoningStrategy::Owlrl as i32,
742            _ => ReasoningStrategy::Rdfs as i32,
743        };
744
745        let req = Self::create_request(ReasoningRequest {
746            namespace: namespace.to_string(),
747            strategy,
748            materialize,
749        });
750
751        match self.engine.apply_reasoning(req).await {
752            Ok(resp) => {
753                let inner = resp.into_inner();
754                let result = ReasoningToolResult {
755                    success: inner.success,
756                    triples_inferred: inner.triples_inferred,
757                    message: inner.message,
758                };
759                self.serialize_result(id, result)
760            }
761            Err(e) => self.tool_result(id, &e.to_string(), true),
762        }
763    }
764
765    async fn call_get_neighbors(
766        &self,
767        id: Option<serde_json::Value>,
768        args: &serde_json::Map<String, serde_json::Value>,
769    ) -> McpResponse {
770        let uri = match args.get("uri").and_then(|v| v.as_str()) {
771            Some(u) => u,
772            None => return self.error_response(id, -32602, "Missing 'uri'"),
773        };
774        let namespace = args
775            .get("namespace")
776            .and_then(|v| v.as_str())
777            .unwrap_or("default");
778        let direction = args
779            .get("direction")
780            .and_then(|v| v.as_str())
781            .unwrap_or("outgoing");
782
783        let store = match self.engine.get_store(namespace) {
784            Ok(s) => s,
785            Err(e) => return self.tool_result(id, &e.to_string(), true),
786        };
787
788        let mut neighbors = Vec::new();
789
790        // Query outgoing edges (URI as subject)
791        if direction == "outgoing" || direction == "both" {
792            if let Ok(subj) = oxigraph::model::NamedNodeRef::new(uri) {
793                for q in store
794                    .store
795                    .quads_for_pattern(Some(subj.into()), None, None, None)
796                    .flatten()
797                {
798                    neighbors.push(NeighborItem {
799                        direction: "outgoing".to_string(),
800                        predicate: q.predicate.to_string(),
801                        target: q.object.to_string(),
802                        score: 1.0,
803                    });
804                }
805            }
806        }
807
808        // Query incoming edges (URI as object)
809        if direction == "incoming" || direction == "both" {
810            if let Ok(obj) = oxigraph::model::NamedNodeRef::new(uri) {
811                for q in store
812                    .store
813                    .quads_for_pattern(None, None, Some(obj.into()), None)
814                    .flatten()
815                {
816                    neighbors.push(NeighborItem {
817                        direction: "incoming".to_string(),
818                        predicate: q.predicate.to_string(),
819                        target: q.subject.to_string(),
820                        score: 1.0,
821                    });
822                }
823            }
824        }
825
826        let result = NeighborsToolResult { neighbors };
827        self.serialize_result(id, result)
828    }
829
830    async fn call_list_triples(
831        &self,
832        id: Option<serde_json::Value>,
833        args: &serde_json::Map<String, serde_json::Value>,
834    ) -> McpResponse {
835        let namespace = args
836            .get("namespace")
837            .and_then(|v| v.as_str())
838            .unwrap_or("default");
839        let limit = args.get("limit").and_then(|v| v.as_u64()).unwrap_or(100) as usize;
840
841        let store = match self.engine.get_store(namespace) {
842            Ok(s) => s,
843            Err(e) => return self.tool_result(id, &e.to_string(), true),
844        };
845
846        let mut triples = Vec::new();
847        for q in store.store.iter().take(limit).flatten() {
848            triples.push(TripleItem {
849                subject: q.subject.to_string(),
850                predicate: q.predicate.to_string(),
851                object: q.object.to_string(),
852            });
853        }
854
855        let result = TriplesToolResult { triples };
856        self.serialize_result(id, result)
857    }
858
859    async fn call_delete_namespace(
860        &self,
861        id: Option<serde_json::Value>,
862        args: &serde_json::Map<String, serde_json::Value>,
863    ) -> McpResponse {
864        let namespace = match args.get("namespace").and_then(|v| v.as_str()) {
865            Some(n) => n,
866            None => return self.error_response(id, -32602, "Missing 'namespace'"),
867        };
868
869        let req = Self::create_request(crate::server::proto::EmptyRequest {
870            namespace: namespace.to_string(),
871        });
872
873        match self.engine.delete_namespace_data(req).await {
874            Ok(resp) => {
875                let inner = resp.into_inner();
876                let result = SimpleSuccessResult {
877                    success: inner.success,
878                    message: inner.message,
879                };
880                self.serialize_result(id, result)
881            }
882            Err(e) => self.tool_result(id, &e.to_string(), true),
883        }
884    }
885
886    async fn call_ingest_url(
887        &self,
888        id: Option<serde_json::Value>,
889        args: &serde_json::Map<String, serde_json::Value>,
890    ) -> McpResponse {
891        let url = match args.get("url").and_then(|v| v.as_str()) {
892            Some(u) => u,
893            None => return self.error_response(id, -32602, "Missing 'url'"),
894        };
895        let namespace = args
896            .get("namespace")
897            .and_then(|v| v.as_str())
898            .unwrap_or("default");
899
900        // Fetch URL content
901        let client = reqwest::Client::new();
902        let response = match client.get(url).send().await {
903            Ok(r) => r,
904            Err(e) => return self.tool_result(id, &format!("Failed to fetch URL: {}", e), true),
905        };
906
907        if !response.status().is_success() {
908            return self.tool_result(id, &format!("HTTP error: {}", response.status()), true);
909        }
910
911        let html = match response.text().await {
912            Ok(t) => t,
913            Err(e) => {
914                return self.tool_result(id, &format!("Failed to read response: {}", e), true)
915            }
916        };
917
918        // HTML to text conversion with Regex
919        let script_re = regex::Regex::new(r"(?s)<script.*?>.*?</script>").unwrap();
920        let style_re = regex::Regex::new(r"(?s)<style.*?>.*?</style>").unwrap();
921        let tag_re = regex::Regex::new(r"<[^>]*>").unwrap();
922
923        let no_script = script_re.replace_all(&html, " ");
924        let no_style = style_re.replace_all(&no_script, " ");
925        let text_content = tag_re.replace_all(&no_style, " ");
926
927        let text = text_content
928            .split_whitespace()
929            .collect::<Vec<_>>()
930            .join(" ");
931
932        // Chunk text with overlap
933        let processor = crate::processor::TextProcessor::new();
934        let chunks = processor.chunk_text(&text, 1000, 150);
935
936        // Add to vector store
937        let store = match self.engine.get_store(namespace) {
938            Ok(s) => s,
939            Err(e) => return self.tool_result(id, &e.to_string(), true),
940        };
941
942        if let Some(ref vector_store) = store.vector_store {
943            let mut added_chunks = 0;
944            for (i, chunk) in chunks.iter().enumerate() {
945                let chunk_uri = format!("{}#chunk-{}", url, i);
946                // For MCP ingestion, we just use the chunk URI as the key and metadata URI
947                let metadata = serde_json::json!({
948                    "uri": chunk_uri,
949                    "source_url": url,
950                    "type": "web_chunk"
951                });
952                match vector_store.add(&chunk_uri, chunk, metadata).await {
953                    Ok(_) => added_chunks += 1,
954                    Err(e) => {
955                        eprintln!("Failed to add chunk {}: {}", i, e);
956                    }
957                }
958            }
959            let result = IngestToolResult {
960                nodes_added: 0,
961                edges_added: 0, // Ingest URL technically adds to vector store, no graph edges yet unless reasoned
962                message: format!(
963                    "Ingested URL: {} ({} chars, {} chunks)",
964                    url,
965                    text.len(),
966                    added_chunks
967                ),
968            };
969            self.serialize_result(id, result)
970        } else {
971            self.tool_result(id, "Vector store not available", true)
972        }
973    }
974
975    async fn call_ingest_text(
976        &self,
977        id: Option<serde_json::Value>,
978        args: &serde_json::Map<String, serde_json::Value>,
979    ) -> McpResponse {
980        let uri = match args.get("uri").and_then(|v| v.as_str()) {
981            Some(u) => u,
982            None => return self.error_response(id, -32602, "Missing 'uri'"),
983        };
984        let content = match args.get("content").and_then(|v| v.as_str()) {
985            Some(c) => c,
986            None => return self.error_response(id, -32602, "Missing 'content'"),
987        };
988        let namespace = args
989            .get("namespace")
990            .and_then(|v| v.as_str())
991            .unwrap_or("default");
992
993        // Chunk text with overlap
994        let processor = crate::processor::TextProcessor::new();
995        let chunks = processor.chunk_text(content, 1000, 150);
996
997        // Add to vector store
998        let store = match self.engine.get_store(namespace) {
999            Ok(s) => s,
1000            Err(e) => return self.tool_result(id, &e.to_string(), true),
1001        };
1002
1003        if let Some(ref vector_store) = store.vector_store {
1004            let mut added_chunks = 0;
1005            for (i, chunk) in chunks.iter().enumerate() {
1006                let chunk_uri = if chunks.len() > 1 {
1007                    format!("{}#chunk-{}", uri, i)
1008                } else {
1009                    uri.to_string()
1010                };
1011                let metadata = serde_json::json!({
1012                    "uri": uri, // Map back to original URI
1013                    "chunk_uri": chunk_uri,
1014                    "type": "text_chunk"
1015                });
1016                match vector_store.add(&chunk_uri, chunk, metadata).await {
1017                    Ok(_) => added_chunks += 1,
1018                    Err(e) => {
1019                        eprintln!("Failed to add chunk {}: {}", i, e);
1020                    }
1021                }
1022            }
1023            let result = IngestToolResult {
1024                nodes_added: 0,
1025                edges_added: 0,
1026                message: format!(
1027                    "Ingested text: {} ({} chars, {} chunks)",
1028                    uri,
1029                    content.len(),
1030                    added_chunks
1031                ),
1032            };
1033            self.serialize_result(id, result)
1034        } else {
1035            self.tool_result(id, "Vector store not available", true)
1036        }
1037    }
1038
1039    async fn call_compact_vectors(
1040        &self,
1041        id: Option<serde_json::Value>,
1042        args: &serde_json::Map<String, serde_json::Value>,
1043    ) -> McpResponse {
1044        let namespace = args
1045            .get("namespace")
1046            .and_then(|v| v.as_str())
1047            .unwrap_or("default");
1048
1049        let store = match self.engine.get_store(namespace) {
1050            Ok(s) => s,
1051            Err(e) => return self.tool_result(id, &e.to_string(), true),
1052        };
1053
1054        if let Some(ref vector_store) = store.vector_store {
1055            match vector_store.compact() {
1056                Ok(removed) => {
1057                    let result = SimpleSuccessResult {
1058                        success: true,
1059                        message: format!("Compaction complete: {} stale entries removed", removed),
1060                    };
1061                    self.serialize_result(id, result)
1062                }
1063                Err(e) => self.tool_result(id, &format!("Compaction error: {}", e), true),
1064            }
1065        } else {
1066            self.tool_result(id, "Vector store not available", true)
1067        }
1068    }
1069
1070    async fn call_vector_stats(
1071        &self,
1072        id: Option<serde_json::Value>,
1073        args: &serde_json::Map<String, serde_json::Value>,
1074    ) -> McpResponse {
1075        let namespace = args
1076            .get("namespace")
1077            .and_then(|v| v.as_str())
1078            .unwrap_or("default");
1079
1080        let store = match self.engine.get_store(namespace) {
1081            Ok(s) => s,
1082            Err(e) => return self.tool_result(id, &e.to_string(), true),
1083        };
1084
1085        if let Some(ref vector_store) = store.vector_store {
1086            let (active, stale, total) = vector_store.stats();
1087            let result = StatsToolResult {
1088                active_vectors: active,
1089                stale_vectors: stale,
1090                total_embeddings: total,
1091            };
1092            self.serialize_result(id, result)
1093        } else {
1094            self.tool_result(id, "Vector store not available", true)
1095        }
1096    }
1097
1098    async fn call_disambiguate(
1099        &self,
1100        id: Option<serde_json::Value>,
1101        args: &serde_json::Map<String, serde_json::Value>,
1102    ) -> McpResponse {
1103        let namespace = args
1104            .get("namespace")
1105            .and_then(|v| v.as_str())
1106            .unwrap_or("default");
1107        let threshold = args
1108            .get("threshold")
1109            .and_then(|v| v.as_f64())
1110            .unwrap_or(0.8);
1111
1112        let store = match self.engine.get_store(namespace) {
1113            Ok(s) => s,
1114            Err(e) => return self.tool_result(id, &e.to_string(), true),
1115        };
1116
1117        // Collect all URIs from the store
1118        let uri_map = store.uri_to_id.read().unwrap();
1119        let uris: Vec<String> = uri_map.keys().cloned().collect();
1120        drop(uri_map);
1121
1122        let disambiguator = crate::disambiguation::EntityDisambiguator::new(threshold);
1123        let suggestions = disambiguator.suggest_merges(&uris);
1124
1125        let items: Vec<DisambiguationItem> = suggestions
1126            .into_iter()
1127            .map(|(u1, u2, s)| DisambiguationItem {
1128                uri1: u1,
1129                uri2: u2,
1130                similarity: s,
1131            })
1132            .collect();
1133
1134        let message = if items.is_empty() {
1135            "No similar entities found above threshold".to_string()
1136        } else {
1137            format!("Found {} potential duplicates", items.len())
1138        };
1139
1140        let result = DisambiguationResult {
1141            suggestions: items,
1142            message,
1143        };
1144        self.serialize_result(id, result)
1145    }
1146
1147    // Legacy handlers for backward compatibility
1148    async fn handle_legacy_ingest(&self, request: McpRequest) -> McpResponse {
1149        let params = match request.params {
1150            Some(p) => p,
1151            None => return self.error_response(request.id, -32602, "Invalid params"),
1152        };
1153
1154        if let (Some(sub), Some(pred), Some(obj)) = (
1155            params.get("subject").and_then(|v| v.as_str()),
1156            params.get("predicate").and_then(|v| v.as_str()),
1157            params.get("object").and_then(|v| v.as_str()),
1158        ) {
1159            let namespace = params
1160                .get("namespace")
1161                .and_then(|v| v.as_str())
1162                .unwrap_or("default");
1163            let triple = Triple {
1164                subject: sub.to_string(),
1165                predicate: pred.to_string(),
1166                object: obj.to_string(),
1167                provenance: Some(Provenance {
1168                    source: "mcp".to_string(),
1169                    timestamp: "".to_string(),
1170                    method: "stdio".to_string(),
1171                }),
1172                embedding: vec![],
1173            };
1174
1175            let req = Self::create_request(IngestRequest {
1176                triples: vec![triple],
1177                namespace: namespace.to_string(),
1178            });
1179
1180            match self.engine.ingest_triples(req).await {
1181                Ok(_) => McpResponse {
1182                    jsonrpc: "2.0".to_string(),
1183                    id: request.id,
1184                    result: Some(serde_json::to_value("Ingested").unwrap()),
1185                    error: None,
1186                },
1187                Err(e) => self.error_response(request.id, -32000, &e.to_string()),
1188            }
1189        } else {
1190            self.error_response(request.id, -32602, "Invalid params")
1191        }
1192    }
1193
1194    async fn handle_legacy_ingest_file(&self, request: McpRequest) -> McpResponse {
1195        let params = match request.params {
1196            Some(p) => p,
1197            None => {
1198                return self.error_response(request.id, -32602, "Invalid params: 'path' required")
1199            }
1200        };
1201
1202        if let Some(path) = params.get("path").and_then(|v| v.as_str()) {
1203            let namespace = params
1204                .get("namespace")
1205                .and_then(|v| v.as_str())
1206                .unwrap_or("default");
1207
1208            let req = Self::create_request(IngestFileRequest {
1209                file_path: path.to_string(),
1210                namespace: namespace.to_string(),
1211            });
1212
1213            match self.engine.ingest_file(req).await {
1214                Ok(resp) => {
1215                    let inner = resp.into_inner();
1216                    McpResponse {
1217                        jsonrpc: "2.0".to_string(),
1218                        id: request.id,
1219                        result: Some(
1220                            serde_json::to_value(format!(
1221                                "Ingested {} triples from {}",
1222                                inner.edges_added, path
1223                            ))
1224                            .unwrap(),
1225                        ),
1226                        error: None,
1227                    }
1228                }
1229                Err(e) => self.error_response(request.id, -32000, &e.to_string()),
1230            }
1231        } else {
1232            self.error_response(request.id, -32602, "Invalid params: 'path' required")
1233        }
1234    }
1235
1236    fn serialize_result<T: serde::Serialize>(
1237        &self,
1238        id: Option<serde_json::Value>,
1239        result: T,
1240    ) -> McpResponse {
1241        match serde_json::to_string_pretty(&result) {
1242            Ok(json) => self.tool_result(id, &json, false),
1243            Err(e) => self.tool_result(id, &format!("Serialization error: {}", e), true),
1244        }
1245    }
1246
1247    async fn call_get_node_degree(
1248        &self,
1249        id: Option<serde_json::Value>,
1250        args: &serde_json::Map<String, serde_json::Value>,
1251    ) -> McpResponse {
1252        let uri = match args.get("uri").and_then(|v| v.as_str()) {
1253            Some(u) => u,
1254            None => return self.error_response(id, -32602, "Missing 'uri'"),
1255        };
1256        let namespace = args
1257            .get("namespace")
1258            .and_then(|v| v.as_str())
1259            .unwrap_or("default");
1260
1261        let store = match self.engine.get_store(namespace) {
1262            Ok(s) => s,
1263            Err(e) => return self.tool_result(id, &e.to_string(), true),
1264        };
1265
1266        let degree = store.get_degree(uri);
1267
1268        let result = DegreeResult {
1269            uri: uri.to_string(),
1270            degree,
1271        };
1272
1273        self.serialize_result(id, result)
1274    }
1275
1276    fn error_response(
1277        &self,
1278        id: Option<serde_json::Value>,
1279        code: i32,
1280        message: &str,
1281    ) -> McpResponse {
1282        McpResponse {
1283            jsonrpc: "2.0".to_string(),
1284            id,
1285            result: None,
1286            error: Some(McpError {
1287                code,
1288                message: message.to_string(),
1289                data: None,
1290            }),
1291        }
1292    }
1293
1294    fn tool_result(
1295        &self,
1296        id: Option<serde_json::Value>,
1297        text: &str,
1298        is_error: bool,
1299    ) -> McpResponse {
1300        let result = CallToolResult {
1301            content: vec![Content {
1302                content_type: "text".to_string(),
1303                text: text.to_string(),
1304            }],
1305            is_error: if is_error { Some(true) } else { None },
1306        };
1307        McpResponse {
1308            jsonrpc: "2.0".to_string(),
1309            id,
1310            result: Some(serde_json::to_value(result).unwrap()),
1311            error: None,
1312        }
1313    }
1314}