Skip to main content

synapse_core/
mcp_stdio.rs

1use crate::mcp_types::{
2    CallToolResult, Content, DegreeResult, DisambiguationItem, DisambiguationResult,
3    IngestToolResult, ListToolsResult, McpError, McpRequest, McpResponse, NeighborItem,
4    NeighborsToolResult, ReasoningToolResult, ScenarioItem, ScenarioListResult, SearchResultItem,
5    SearchToolResult, SimpleSuccessResult, StatsToolResult, Tool, TripleItem, TriplesToolResult,
6};
7use crate::server::proto::semantic_engine_server::SemanticEngine;
8use crate::server::proto::{
9    HybridSearchRequest, IngestFileRequest, IngestRequest, Provenance, ReasoningRequest,
10    ReasoningStrategy, SearchMode, SparqlRequest, Triple,
11};
12use crate::server::MySemanticEngine;
13use jsonschema::JSONSchema;
14use std::sync::Arc;
15use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
16use tonic::Request;
17
18pub struct McpStdioServer {
19    engine: Arc<MySemanticEngine>,
20}
21
22impl McpStdioServer {
23    pub fn new(engine: Arc<MySemanticEngine>) -> Self {
24        Self { engine }
25    }
26
27    pub async fn run(&self) -> Result<(), Box<dyn std::error::Error>> {
28        let mut reader = BufReader::new(tokio::io::stdin());
29        let mut writer = tokio::io::stdout();
30
31        loop {
32            let mut line = String::new();
33            if reader.read_line(&mut line).await? == 0 {
34                break;
35            }
36
37            let trimmed = line.trim();
38            if trimmed.is_empty() {
39                continue;
40            }
41
42            if let Ok(request) = serde_json::from_str::<McpRequest>(trimmed) {
43                let response = self.handle_request(request).await;
44                let response_json = serde_json::to_string(&response)? + "\n";
45                writer.write_all(response_json.as_bytes()).await?;
46                writer.flush().await?;
47            }
48        }
49
50        Ok(())
51    }
52
53    fn create_request<T>(msg: T) -> Request<T> {
54        let mut req = Request::new(msg);
55
56        // Try to get token from env
57        let token_opt = std::env::var("SYNAPSE_ADMIN_TOKEN")
58            .or_else(|_| std::env::var("SYNAPSE_MCP_TOKEN"))
59            .ok();
60
61        if let Some(token) = token_opt {
62            if let Ok(val) = format!("Bearer {}", token).parse() {
63                req.metadata_mut().insert("authorization", val);
64            }
65        }
66        req
67    }
68
69    fn get_tools() -> Vec<Tool> {
70        vec![
71            Tool {
72                name: "ingest_triples".to_string(),
73                description: Some(
74                    "Ingest one or more RDF triples into the knowledge graph".to_string(),
75                ),
76                input_schema: serde_json::json!({
77                    "type": "object",
78                    "properties": {
79                        "triples": {
80                            "type": "array",
81                            "items": {
82                                "type": "object",
83                                "properties": {
84                                    "subject": { "type": "string" },
85                                    "predicate": { "type": "string" },
86                                    "object": { "type": "string" }
87                                },
88                                "required": ["subject", "predicate", "object"]
89                            }
90                        },
91                        "namespace": { "type": "string", "default": "default" }
92                    },
93                    "required": ["triples"]
94                }),
95            },
96            Tool {
97                name: "ingest_file".to_string(),
98                description: Some(
99                    "Ingest a CSV or Markdown file into the knowledge graph".to_string(),
100                ),
101                input_schema: serde_json::json!({
102                    "type": "object",
103                    "properties": {
104                        "path": { "type": "string", "description": "Path to the file" },
105                        "namespace": { "type": "string", "default": "default" }
106                    },
107                    "required": ["path"]
108                }),
109            },
110            Tool {
111                name: "sparql_query".to_string(),
112                description: Some("Execute a SPARQL query against the knowledge graph".to_string()),
113                input_schema: serde_json::json!({
114                    "type": "object",
115                    "properties": {
116                        "query": { "type": "string", "description": "SPARQL query string" },
117                        "namespace": { "type": "string", "default": "default" }
118                    },
119                    "required": ["query"]
120                }),
121            },
122            Tool {
123                name: "hybrid_search".to_string(),
124                description: Some("Perform a hybrid vector + graph search".to_string()),
125                input_schema: serde_json::json!({
126                    "type": "object",
127                    "properties": {
128                        "query": { "type": "string", "description": "Natural language query" },
129                        "namespace": { "type": "string", "default": "default" },
130                        "vector_k": { "type": "integer", "default": 10 },
131                        "graph_depth": { "type": "integer", "default": 1 },
132                        "limit": { "type": "integer", "default": 20 }
133                    },
134                    "required": ["query"]
135                }),
136            },
137            Tool {
138                name: "apply_reasoning".to_string(),
139                description: Some(
140                    "Apply RDFS or OWL-RL reasoning to infer new triples".to_string(),
141                ),
142                input_schema: serde_json::json!({
143                    "type": "object",
144                    "properties": {
145                        "namespace": { "type": "string", "default": "default" },
146                        "strategy": { "type": "string", "enum": ["rdfs", "owlrl"], "default": "rdfs" },
147                        "materialize": { "type": "boolean", "default": false }
148                    }
149                }),
150            },
151            Tool {
152                name: "get_neighbors".to_string(),
153                description: Some(
154                    "Get neighboring nodes connected to a given URI in the graph".to_string(),
155                ),
156                input_schema: serde_json::json!({
157                    "type": "object",
158                    "properties": {
159                        "uri": { "type": "string", "description": "URI of the entity to find neighbors for" },
160                        "namespace": { "type": "string", "default": "default" },
161                        "direction": { "type": "string", "enum": ["outgoing", "incoming", "both"], "default": "outgoing" }
162                    },
163                    "required": ["uri"]
164                }),
165            },
166            Tool {
167                name: "list_triples".to_string(),
168                description: Some(
169                    "List all triples in a namespace (useful for debugging/exploration)"
170                        .to_string(),
171                ),
172                input_schema: serde_json::json!({
173                    "type": "object",
174                    "properties": {
175                        "namespace": { "type": "string", "default": "default" },
176                        "limit": { "type": "integer", "default": 100 }
177                    }
178                }),
179            },
180            Tool {
181                name: "delete_namespace".to_string(),
182                description: Some("Delete all data in a namespace".to_string()),
183                input_schema: serde_json::json!({
184                    "type": "object",
185                    "properties": {
186                        "namespace": { "type": "string", "description": "Namespace to delete" }
187                    },
188                    "required": ["namespace"]
189                }),
190            },
191            Tool {
192                name: "ingest_url".to_string(),
193                description: Some(
194                    "Scrape a web page and add its content to the vector store for RAG retrieval"
195                        .to_string(),
196                ),
197                input_schema: serde_json::json!({
198                    "type": "object",
199                    "properties": {
200                        "url": { "type": "string", "description": "URL to scrape and ingest" },
201                        "namespace": { "type": "string", "default": "default" }
202                    },
203                    "required": ["url"]
204                }),
205            },
206            Tool {
207                name: "ingest_text".to_string(),
208                description: Some(
209                    "Add arbitrary text content to the vector store for RAG retrieval".to_string(),
210                ),
211                input_schema: serde_json::json!({
212                    "type": "object",
213                    "properties": {
214                        "uri": { "type": "string", "description": "Custom URI identifier for this text" },
215                        "content": { "type": "string", "description": "Text content to embed and store" },
216                        "namespace": { "type": "string", "default": "default" }
217                    },
218                    "required": ["uri", "content"]
219                }),
220            },
221            Tool {
222                name: "compact_vectors".to_string(),
223                description: Some("Compact the vector index by removing stale entries".to_string()),
224                input_schema: serde_json::json!({
225                    "type": "object",
226                    "properties": {
227                        "namespace": { "type": "string", "default": "default" }
228                    }
229                }),
230            },
231            Tool {
232                name: "vector_stats".to_string(),
233                description: Some("Get vector store statistics (active, stale, total)".to_string()),
234                input_schema: serde_json::json!({
235                    "type": "object",
236                    "properties": {
237                        "namespace": { "type": "string", "default": "default" }
238                    }
239                }),
240            },
241            Tool {
242                name: "disambiguate".to_string(),
243                description: Some("Find similar entities that might be duplicates".to_string()),
244                input_schema: serde_json::json!({
245                    "type": "object",
246                    "properties": {
247                        "namespace": { "type": "string", "default": "default" },
248                        "threshold": { "type": "number", "default": 0.8, "description": "Similarity threshold 0.0-1.0" }
249                    }
250                }),
251            },
252            Tool {
253                name: "get_node_degree".to_string(),
254                description: Some("Get the degree (number of connections) of a node".to_string()),
255                input_schema: serde_json::json!({
256                    "type": "object",
257                    "properties": {
258                        "uri": { "type": "string" },
259                        "namespace": { "type": "string", "default": "default" }
260                    },
261                    "required": ["uri"]
262                }),
263            },
264            Tool {
265                name: "install_ontology".to_string(),
266                description: Some("Download and install an ontology from a URL".to_string()),
267                input_schema: serde_json::json!({
268                    "type": "object",
269                    "properties": {
270                        "url": { "type": "string", "description": "URL of the ontology file (.owl, .ttl)" },
271                        "name": { "type": "string", "description": "Name for the local file (e.g. 'legal.owl')" },
272                        "namespace": { "type": "string", "default": "default" }
273                    },
274                    "required": ["url", "name"]
275                }),
276            },
277            Tool {
278                name: "list_scenarios".to_string(),
279                description: Some("List available scenarios in the marketplace".to_string()),
280                input_schema: serde_json::json!({
281                    "type": "object",
282                    "properties": {}
283                }),
284            },
285            Tool {
286                name: "install_scenario".to_string(),
287                description: Some(
288                    "Install a scenario (ontologies, data, docs) from the marketplace".to_string(),
289                ),
290                input_schema: serde_json::json!({
291                    "type": "object",
292                    "properties": {
293                        "name": { "type": "string", "description": "Name of the scenario to install" },
294                        "namespace": { "type": "string", "default": "default" }
295                    },
296                    "required": ["name"]
297                }),
298            },
299        ]
300    }
301
302    pub async fn handle_request(&self, request: McpRequest) -> McpResponse {
303        match request.method.as_str() {
304            "initialize" => {
305                // MCP protocol initialization
306                McpResponse {
307                    jsonrpc: "2.0".to_string(),
308                    id: request.id,
309                    result: Some(serde_json::json!({
310                        "protocolVersion": "2024-11-05",
311                        "capabilities": {
312                            "tools": {}
313                        },
314                        "serverInfo": {
315                        "name": "synapse",
316                        "version": env!("CARGO_PKG_VERSION")
317                    }
318                    })),
319                    error: None,
320                }
321            }
322            "notifications/initialized" | "initialized" => {
323                // Client confirms initialization - just acknowledge
324                McpResponse {
325                    jsonrpc: "2.0".to_string(),
326                    id: request.id,
327                    result: Some(serde_json::json!({})),
328                    error: None,
329                }
330            }
331            "tools/list" => {
332                let result = ListToolsResult {
333                    tools: Self::get_tools(),
334                };
335                McpResponse {
336                    jsonrpc: "2.0".to_string(),
337                    id: request.id,
338                    result: Some(serde_json::to_value(result).unwrap()),
339                    error: None,
340                }
341            }
342            "tools/call" => self.handle_tool_call(request).await,
343            // Legacy methods for backwards compatibility
344            "ingest" => self.handle_legacy_ingest(request).await,
345            "ingest_file" => self.handle_legacy_ingest_file(request).await,
346            _ => McpResponse {
347                jsonrpc: "2.0".to_string(),
348                id: request.id,
349                result: None,
350                error: Some(McpError {
351                    code: -32601,
352                    message: format!("Method not found: {}", request.method),
353                    data: None,
354                }),
355            },
356        }
357    }
358
359    fn validate_arguments(tool_name: &str, arguments: &serde_json::Value) -> Result<(), String> {
360        let tools = Self::get_tools();
361        if let Some(tool) = tools.iter().find(|t| t.name == tool_name) {
362            if let Ok(schema) = JSONSchema::compile(&tool.input_schema) {
363                if let Err(errors) = schema.validate(arguments) {
364                    let error_msg = errors.map(|e| e.to_string()).collect::<Vec<_>>().join(", ");
365                    return Err(format!("Validation error: {}", error_msg));
366                }
367            } else {
368                return Err("Invalid tool schema definition".to_string());
369            }
370        }
371        Ok(())
372    }
373
374    async fn handle_tool_call(&self, request: McpRequest) -> McpResponse {
375        let params = match request.params {
376            Some(p) => p,
377            None => return self.error_response(request.id, -32602, "Missing params"),
378        };
379
380        let tool_name = match params.get("name").and_then(|v| v.as_str()) {
381            Some(n) => n,
382            None => return self.error_response(request.id, -32602, "Missing tool name"),
383        };
384
385        let arguments = params
386            .get("arguments")
387            .and_then(|v| v.as_object())
388            .cloned()
389            .unwrap_or_default();
390
391        let args_value = serde_json::Value::Object(arguments.clone());
392        if let Err(e) = Self::validate_arguments(tool_name, &args_value) {
393            return self.error_response(request.id, -32602, &e);
394        }
395
396        match tool_name {
397            "ingest_triples" => self.call_ingest_triples(request.id, &arguments).await,
398            "ingest_file" => self.call_ingest_file(request.id, &arguments).await,
399            "sparql_query" => self.call_sparql_query(request.id, &arguments).await,
400            "hybrid_search" => self.call_hybrid_search(request.id, &arguments).await,
401            "apply_reasoning" => self.call_apply_reasoning(request.id, &arguments).await,
402            "get_neighbors" => self.call_get_neighbors(request.id, &arguments).await,
403            "list_triples" => self.call_list_triples(request.id, &arguments).await,
404            "delete_namespace" => self.call_delete_namespace(request.id, &arguments).await,
405            "ingest_url" => self.call_ingest_url(request.id, &arguments).await,
406            "ingest_text" => self.call_ingest_text(request.id, &arguments).await,
407            "compact_vectors" => self.call_compact_vectors(request.id, &arguments).await,
408            "vector_stats" => self.call_vector_stats(request.id, &arguments).await,
409            "disambiguate" => self.call_disambiguate(request.id, &arguments).await,
410            "get_node_degree" => self.call_get_node_degree(request.id, &arguments).await,
411            "install_ontology" => self.call_install_ontology(request.id, &arguments).await,
412            "list_scenarios" => self.call_list_scenarios(request.id).await,
413            "install_scenario" => self.call_install_scenario(request.id, &arguments).await,
414            _ => self.error_response(request.id, -32602, &format!("Unknown tool: {}", tool_name)),
415        }
416    }
417
418    async fn call_list_scenarios(&self, id: Option<serde_json::Value>) -> McpResponse {
419        match self.engine.scenario_manager.list_scenarios().await {
420            Ok(registry) => {
421                let items: Vec<ScenarioItem> = registry
422                    .into_iter()
423                    .map(|e| ScenarioItem {
424                        name: e.name,
425                        description: e.description,
426                        version: e.version,
427                    })
428                    .collect();
429                self.serialize_result(id, ScenarioListResult { scenarios: items })
430            }
431            Err(e) => self.tool_result(id, &e.to_string(), true),
432        }
433    }
434
435    async fn call_install_scenario(
436        &self,
437        id: Option<serde_json::Value>,
438        args: &serde_json::Map<String, serde_json::Value>,
439    ) -> McpResponse {
440        let name = match args.get("name").and_then(|v| v.as_str()) {
441            Some(n) => n,
442            None => return self.error_response(id, -32602, "Missing 'name'"),
443        };
444        let namespace = args
445            .get("namespace")
446            .and_then(|v| v.as_str())
447            .unwrap_or("default");
448
449        match self.engine.install_scenario(name, namespace).await {
450            Ok(msg) => {
451                let result = SimpleSuccessResult {
452                    success: true,
453                    message: msg,
454                };
455                self.serialize_result(id, result)
456            }
457            Err(e) => self.tool_result(id, &e, true),
458        }
459    }
460
461    async fn call_install_ontology(
462        &self,
463        id: Option<serde_json::Value>,
464        args: &serde_json::Map<String, serde_json::Value>,
465    ) -> McpResponse {
466        let url = match args.get("url").and_then(|v| v.as_str()) {
467            Some(u) => u,
468            None => return self.error_response(id, -32602, "Missing 'url'"),
469        };
470        let name = match args.get("name").and_then(|v| v.as_str()) {
471            Some(n) => n,
472            None => return self.error_response(id, -32602, "Missing 'name'"),
473        };
474        let namespace = args
475            .get("namespace")
476            .and_then(|v| v.as_str())
477            .unwrap_or("default");
478
479        // Download
480        let response = match reqwest::get(url).await {
481            Ok(r) => r,
482            Err(e) => {
483                return self.tool_result(id, &format!("Failed to download ontology: {}", e), true)
484            }
485        };
486
487        if !response.status().is_success() {
488            return self.tool_result(id, &format!("HTTP error: {}", response.status()), true);
489        }
490
491        let content = match response.text().await {
492            Ok(t) => t,
493            Err(e) => {
494                return self.tool_result(id, &format!("Failed to read response: {}", e), true)
495            }
496        };
497
498        // Ensure ontology directory exists
499        let ontology_dir = std::path::Path::new("ontology");
500        if !ontology_dir.exists() {
501            if let Err(e) = std::fs::create_dir(ontology_dir) {
502                return self.tool_result(
503                    id,
504                    &format!("Failed to create ontology dir: {}", e),
505                    true,
506                );
507            }
508        }
509
510        // Security check: Sanitize filename to prevent directory traversal
511        let file_name = std::path::Path::new(name)
512            .file_name()
513            .and_then(|n| n.to_str())
514            .ok_or_else(|| "Invalid filename".to_string());
515
516        let clean_name = match file_name {
517            Ok(n) => n,
518            Err(e) => return self.tool_result(id, &format!("Security error: {}", e), true),
519        };
520
521        if clean_name != name {
522            return self.tool_result(
523                id,
524                "Security error: Filename contains path components",
525                true,
526            );
527        }
528
529        let path = ontology_dir.join(clean_name);
530        if let Err(e) = std::fs::write(&path, content) {
531            return self.tool_result(id, &format!("Failed to save ontology file: {}", e), true);
532        }
533
534        // Load into store
535        let store = match self.engine.get_store(namespace) {
536            Ok(s) => s,
537            Err(e) => return self.tool_result(id, &e.to_string(), true),
538        };
539
540        match crate::ingest::ontology::OntologyLoader::load_file(&store, &path).await {
541            Ok(count) => {
542                let result = SimpleSuccessResult {
543                    success: true,
544                    message: format!("Installed ontology '{}' and loaded {} triples", name, count),
545                };
546                self.serialize_result(id, result)
547            }
548            Err(e) => self.tool_result(id, &format!("Failed to load ontology: {}", e), true),
549        }
550    }
551
552    async fn call_ingest_triples(
553        &self,
554        id: Option<serde_json::Value>,
555        args: &serde_json::Map<String, serde_json::Value>,
556    ) -> McpResponse {
557        let namespace = args
558            .get("namespace")
559            .and_then(|v| v.as_str())
560            .unwrap_or("default");
561        let triples_array = match args.get("triples").and_then(|v| v.as_array()) {
562            Some(t) => t,
563            None => return self.error_response(id, -32602, "Missing 'triples' array"),
564        };
565
566        let mut triples = Vec::new();
567        for t in triples_array {
568            if let (Some(s), Some(p), Some(o)) = (
569                t.get("subject").and_then(|v| v.as_str()),
570                t.get("predicate").and_then(|v| v.as_str()),
571                t.get("object").and_then(|v| v.as_str()),
572            ) {
573                triples.push(Triple {
574                    subject: s.to_string(),
575                    predicate: p.to_string(),
576                    object: o.to_string(),
577                    provenance: Some(Provenance {
578                        source: "mcp".to_string(),
579                        timestamp: "".to_string(),
580                        method: "tools/call".to_string(),
581                    }),
582                    embedding: vec![],
583                });
584            }
585        }
586
587        let req = Self::create_request(IngestRequest {
588            triples,
589            namespace: namespace.to_string(),
590        });
591
592        match self.engine.ingest_triples(req).await {
593            Ok(resp) => {
594                let inner = resp.into_inner();
595                let result = IngestToolResult {
596                    nodes_added: inner.nodes_added,
597                    edges_added: inner.edges_added,
598                    message: format!("Ingested {} triples", inner.edges_added),
599                };
600                self.serialize_result(id, result)
601            }
602            Err(e) => self.tool_result(id, &e.to_string(), true),
603        }
604    }
605
606    async fn call_ingest_file(
607        &self,
608        id: Option<serde_json::Value>,
609        args: &serde_json::Map<String, serde_json::Value>,
610    ) -> McpResponse {
611        let path = match args.get("path").and_then(|v| v.as_str()) {
612            Some(p) => p,
613            None => return self.error_response(id, -32602, "Missing 'path'"),
614        };
615        let namespace = args
616            .get("namespace")
617            .and_then(|v| v.as_str())
618            .unwrap_or("default");
619
620        let req = Self::create_request(IngestFileRequest {
621            file_path: path.to_string(),
622            namespace: namespace.to_string(),
623        });
624
625        match self.engine.ingest_file(req).await {
626            Ok(resp) => {
627                let inner = resp.into_inner();
628                let result = IngestToolResult {
629                    nodes_added: inner.nodes_added,
630                    edges_added: inner.edges_added,
631                    message: format!("Ingested {} triples from {}", inner.edges_added, path),
632                };
633                self.serialize_result(id, result)
634            }
635            Err(e) => self.tool_result(id, &e.to_string(), true),
636        }
637    }
638
639    async fn call_sparql_query(
640        &self,
641        id: Option<serde_json::Value>,
642        args: &serde_json::Map<String, serde_json::Value>,
643    ) -> McpResponse {
644        let query = match args.get("query").and_then(|v| v.as_str()) {
645            Some(q) => q,
646            None => return self.error_response(id, -32602, "Missing 'query'"),
647        };
648        let namespace = args
649            .get("namespace")
650            .and_then(|v| v.as_str())
651            .unwrap_or("default");
652
653        let req = Self::create_request(SparqlRequest {
654            query: query.to_string(),
655            namespace: namespace.to_string(),
656        });
657
658        match self.engine.query_sparql(req).await {
659            Ok(resp) => self.tool_result(id, &resp.into_inner().results_json, false),
660            Err(e) => self.tool_result(id, &e.to_string(), true),
661        }
662    }
663
664    async fn call_hybrid_search(
665        &self,
666        id: Option<serde_json::Value>,
667        args: &serde_json::Map<String, serde_json::Value>,
668    ) -> McpResponse {
669        let query = match args.get("query").and_then(|v| v.as_str()) {
670            Some(q) => q,
671            None => return self.error_response(id, -32602, "Missing 'query'"),
672        };
673        let namespace = args
674            .get("namespace")
675            .and_then(|v| v.as_str())
676            .unwrap_or("default");
677        let vector_k = args.get("vector_k").and_then(|v| v.as_u64()).unwrap_or(10) as u32;
678        let graph_depth = args
679            .get("graph_depth")
680            .and_then(|v| v.as_u64())
681            .unwrap_or(1) as u32;
682        let limit = args.get("limit").and_then(|v| v.as_u64()).unwrap_or(20) as u32;
683
684        let req = Self::create_request(HybridSearchRequest {
685            query: query.to_string(),
686            namespace: namespace.to_string(),
687            vector_k,
688            graph_depth,
689            mode: SearchMode::Hybrid as i32,
690            limit,
691        });
692
693        match self.engine.hybrid_search(req).await {
694            Ok(resp) => {
695                let results = resp.into_inner().results;
696                let items: Vec<SearchResultItem> = results
697                    .into_iter()
698                    .map(|r| SearchResultItem {
699                        node_id: r.node_id,
700                        score: r.score,
701                        content: r.content,
702                        uri: r.uri,
703                    })
704                    .collect();
705
706                let result = SearchToolResult { results: items };
707                self.serialize_result(id, result)
708            }
709            Err(e) => self.tool_result(id, &e.to_string(), true),
710        }
711    }
712
713    async fn call_apply_reasoning(
714        &self,
715        id: Option<serde_json::Value>,
716        args: &serde_json::Map<String, serde_json::Value>,
717    ) -> McpResponse {
718        let namespace = args
719            .get("namespace")
720            .and_then(|v| v.as_str())
721            .unwrap_or("default");
722        let strategy_str = args
723            .get("strategy")
724            .and_then(|v| v.as_str())
725            .unwrap_or("rdfs");
726        let materialize = args
727            .get("materialize")
728            .and_then(|v| v.as_bool())
729            .unwrap_or(false);
730
731        let strategy = match strategy_str.to_lowercase().as_str() {
732            "owlrl" | "owl-rl" => ReasoningStrategy::Owlrl as i32,
733            _ => ReasoningStrategy::Rdfs as i32,
734        };
735
736        let req = Self::create_request(ReasoningRequest {
737            namespace: namespace.to_string(),
738            strategy,
739            materialize,
740        });
741
742        match self.engine.apply_reasoning(req).await {
743            Ok(resp) => {
744                let inner = resp.into_inner();
745                let result = ReasoningToolResult {
746                    success: inner.success,
747                    triples_inferred: inner.triples_inferred,
748                    message: inner.message,
749                };
750                self.serialize_result(id, result)
751            }
752            Err(e) => self.tool_result(id, &e.to_string(), true),
753        }
754    }
755
756    async fn call_get_neighbors(
757        &self,
758        id: Option<serde_json::Value>,
759        args: &serde_json::Map<String, serde_json::Value>,
760    ) -> McpResponse {
761        let uri = match args.get("uri").and_then(|v| v.as_str()) {
762            Some(u) => u,
763            None => return self.error_response(id, -32602, "Missing 'uri'"),
764        };
765        let namespace = args
766            .get("namespace")
767            .and_then(|v| v.as_str())
768            .unwrap_or("default");
769        let direction = args
770            .get("direction")
771            .and_then(|v| v.as_str())
772            .unwrap_or("outgoing");
773
774        let store = match self.engine.get_store(namespace) {
775            Ok(s) => s,
776            Err(e) => return self.tool_result(id, &e.to_string(), true),
777        };
778
779        let mut neighbors = Vec::new();
780
781        // Query outgoing edges (URI as subject)
782        if direction == "outgoing" || direction == "both" {
783            if let Ok(subj) = oxigraph::model::NamedNodeRef::new(uri) {
784                for q in store
785                    .store
786                    .quads_for_pattern(Some(subj.into()), None, None, None)
787                    .flatten()
788                {
789                    neighbors.push(NeighborItem {
790                        direction: "outgoing".to_string(),
791                        predicate: q.predicate.to_string(),
792                        target: q.object.to_string(),
793                        score: 1.0,
794                    });
795                }
796            }
797        }
798
799        // Query incoming edges (URI as object)
800        if direction == "incoming" || direction == "both" {
801            if let Ok(obj) = oxigraph::model::NamedNodeRef::new(uri) {
802                for q in store
803                    .store
804                    .quads_for_pattern(None, None, Some(obj.into()), None)
805                    .flatten()
806                {
807                    neighbors.push(NeighborItem {
808                        direction: "incoming".to_string(),
809                        predicate: q.predicate.to_string(),
810                        target: q.subject.to_string(),
811                        score: 1.0,
812                    });
813                }
814            }
815        }
816
817        let result = NeighborsToolResult { neighbors };
818        self.serialize_result(id, result)
819    }
820
821    async fn call_list_triples(
822        &self,
823        id: Option<serde_json::Value>,
824        args: &serde_json::Map<String, serde_json::Value>,
825    ) -> McpResponse {
826        let namespace = args
827            .get("namespace")
828            .and_then(|v| v.as_str())
829            .unwrap_or("default");
830        let limit = args.get("limit").and_then(|v| v.as_u64()).unwrap_or(100) as usize;
831
832        let store = match self.engine.get_store(namespace) {
833            Ok(s) => s,
834            Err(e) => return self.tool_result(id, &e.to_string(), true),
835        };
836
837        let mut triples = Vec::new();
838        for q in store.store.iter().take(limit).flatten() {
839            triples.push(TripleItem {
840                subject: q.subject.to_string(),
841                predicate: q.predicate.to_string(),
842                object: q.object.to_string(),
843            });
844        }
845
846        let result = TriplesToolResult { triples };
847        self.serialize_result(id, result)
848    }
849
850    async fn call_delete_namespace(
851        &self,
852        id: Option<serde_json::Value>,
853        args: &serde_json::Map<String, serde_json::Value>,
854    ) -> McpResponse {
855        let namespace = match args.get("namespace").and_then(|v| v.as_str()) {
856            Some(n) => n,
857            None => return self.error_response(id, -32602, "Missing 'namespace'"),
858        };
859
860        let req = Self::create_request(crate::server::proto::EmptyRequest {
861            namespace: namespace.to_string(),
862        });
863
864        match self.engine.delete_namespace_data(req).await {
865            Ok(resp) => {
866                let inner = resp.into_inner();
867                let result = SimpleSuccessResult {
868                    success: inner.success,
869                    message: inner.message,
870                };
871                self.serialize_result(id, result)
872            }
873            Err(e) => self.tool_result(id, &e.to_string(), true),
874        }
875    }
876
877    async fn call_ingest_url(
878        &self,
879        id: Option<serde_json::Value>,
880        args: &serde_json::Map<String, serde_json::Value>,
881    ) -> McpResponse {
882        let url = match args.get("url").and_then(|v| v.as_str()) {
883            Some(u) => u,
884            None => return self.error_response(id, -32602, "Missing 'url'"),
885        };
886        let namespace = args
887            .get("namespace")
888            .and_then(|v| v.as_str())
889            .unwrap_or("default");
890
891        // Fetch URL content
892        let client = reqwest::Client::new();
893        let response = match client.get(url).send().await {
894            Ok(r) => r,
895            Err(e) => return self.tool_result(id, &format!("Failed to fetch URL: {}", e), true),
896        };
897
898        if !response.status().is_success() {
899            return self.tool_result(id, &format!("HTTP error: {}", response.status()), true);
900        }
901
902        let html = match response.text().await {
903            Ok(t) => t,
904            Err(e) => {
905                return self.tool_result(id, &format!("Failed to read response: {}", e), true)
906            }
907        };
908
909        // HTML to text conversion with Regex
910        let script_re = regex::Regex::new(r"(?s)<script.*?>.*?</script>").unwrap();
911        let style_re = regex::Regex::new(r"(?s)<style.*?>.*?</style>").unwrap();
912        let tag_re = regex::Regex::new(r"<[^>]*>").unwrap();
913
914        let no_script = script_re.replace_all(&html, " ");
915        let no_style = style_re.replace_all(&no_script, " ");
916        let text_content = tag_re.replace_all(&no_style, " ");
917
918        let text = text_content
919            .split_whitespace()
920            .collect::<Vec<_>>()
921            .join(" ");
922
923        // Chunk text with overlap
924        let processor = crate::processor::TextProcessor::new();
925        let chunks = processor.chunk_text(&text, 1000, 150);
926
927        // Add to vector store
928        let store = match self.engine.get_store(namespace) {
929            Ok(s) => s,
930            Err(e) => return self.tool_result(id, &e.to_string(), true),
931        };
932
933        if let Some(ref vector_store) = store.vector_store {
934            let mut added_chunks = 0;
935            for (i, chunk) in chunks.iter().enumerate() {
936                let chunk_uri = format!("{}#chunk-{}", url, i);
937                // For MCP ingestion, we just use the chunk URI as the key and metadata URI
938                let metadata = serde_json::json!({
939                    "uri": chunk_uri,
940                    "source_url": url,
941                    "type": "web_chunk"
942                });
943                match vector_store.add(&chunk_uri, chunk, metadata).await {
944                    Ok(_) => added_chunks += 1,
945                    Err(e) => {
946                        eprintln!("Failed to add chunk {}: {}", i, e);
947                    }
948                }
949            }
950            let result = IngestToolResult {
951                nodes_added: 0,
952                edges_added: 0, // Ingest URL technically adds to vector store, no graph edges yet unless reasoned
953                message: format!(
954                    "Ingested URL: {} ({} chars, {} chunks)",
955                    url,
956                    text.len(),
957                    added_chunks
958                ),
959            };
960            self.serialize_result(id, result)
961        } else {
962            self.tool_result(id, "Vector store not available", true)
963        }
964    }
965
966    async fn call_ingest_text(
967        &self,
968        id: Option<serde_json::Value>,
969        args: &serde_json::Map<String, serde_json::Value>,
970    ) -> McpResponse {
971        let uri = match args.get("uri").and_then(|v| v.as_str()) {
972            Some(u) => u,
973            None => return self.error_response(id, -32602, "Missing 'uri'"),
974        };
975        let content = match args.get("content").and_then(|v| v.as_str()) {
976            Some(c) => c,
977            None => return self.error_response(id, -32602, "Missing 'content'"),
978        };
979        let namespace = args
980            .get("namespace")
981            .and_then(|v| v.as_str())
982            .unwrap_or("default");
983
984        // Chunk text with overlap
985        let processor = crate::processor::TextProcessor::new();
986        let chunks = processor.chunk_text(content, 1000, 150);
987
988        // Add to vector store
989        let store = match self.engine.get_store(namespace) {
990            Ok(s) => s,
991            Err(e) => return self.tool_result(id, &e.to_string(), true),
992        };
993
994        if let Some(ref vector_store) = store.vector_store {
995            let mut added_chunks = 0;
996            for (i, chunk) in chunks.iter().enumerate() {
997                let chunk_uri = if chunks.len() > 1 {
998                    format!("{}#chunk-{}", uri, i)
999                } else {
1000                    uri.to_string()
1001                };
1002                let metadata = serde_json::json!({
1003                    "uri": uri, // Map back to original URI
1004                    "chunk_uri": chunk_uri,
1005                    "type": "text_chunk"
1006                });
1007                match vector_store.add(&chunk_uri, chunk, metadata).await {
1008                    Ok(_) => added_chunks += 1,
1009                    Err(e) => {
1010                        eprintln!("Failed to add chunk {}: {}", i, e);
1011                    }
1012                }
1013            }
1014            let result = IngestToolResult {
1015                nodes_added: 0,
1016                edges_added: 0,
1017                message: format!(
1018                    "Ingested text: {} ({} chars, {} chunks)",
1019                    uri,
1020                    content.len(),
1021                    added_chunks
1022                ),
1023            };
1024            self.serialize_result(id, result)
1025        } else {
1026            self.tool_result(id, "Vector store not available", true)
1027        }
1028    }
1029
1030    async fn call_compact_vectors(
1031        &self,
1032        id: Option<serde_json::Value>,
1033        args: &serde_json::Map<String, serde_json::Value>,
1034    ) -> McpResponse {
1035        let namespace = args
1036            .get("namespace")
1037            .and_then(|v| v.as_str())
1038            .unwrap_or("default");
1039
1040        let store = match self.engine.get_store(namespace) {
1041            Ok(s) => s,
1042            Err(e) => return self.tool_result(id, &e.to_string(), true),
1043        };
1044
1045        if let Some(ref vector_store) = store.vector_store {
1046            match vector_store.compact() {
1047                Ok(removed) => {
1048                    let result = SimpleSuccessResult {
1049                        success: true,
1050                        message: format!("Compaction complete: {} stale entries removed", removed),
1051                    };
1052                    self.serialize_result(id, result)
1053                }
1054                Err(e) => self.tool_result(id, &format!("Compaction error: {}", e), true),
1055            }
1056        } else {
1057            self.tool_result(id, "Vector store not available", true)
1058        }
1059    }
1060
1061    async fn call_vector_stats(
1062        &self,
1063        id: Option<serde_json::Value>,
1064        args: &serde_json::Map<String, serde_json::Value>,
1065    ) -> McpResponse {
1066        let namespace = args
1067            .get("namespace")
1068            .and_then(|v| v.as_str())
1069            .unwrap_or("default");
1070
1071        let store = match self.engine.get_store(namespace) {
1072            Ok(s) => s,
1073            Err(e) => return self.tool_result(id, &e.to_string(), true),
1074        };
1075
1076        if let Some(ref vector_store) = store.vector_store {
1077            let (active, stale, total) = vector_store.stats();
1078            let result = StatsToolResult {
1079                active_vectors: active,
1080                stale_vectors: stale,
1081                total_embeddings: total,
1082            };
1083            self.serialize_result(id, result)
1084        } else {
1085            self.tool_result(id, "Vector store not available", true)
1086        }
1087    }
1088
1089    async fn call_disambiguate(
1090        &self,
1091        id: Option<serde_json::Value>,
1092        args: &serde_json::Map<String, serde_json::Value>,
1093    ) -> McpResponse {
1094        let namespace = args
1095            .get("namespace")
1096            .and_then(|v| v.as_str())
1097            .unwrap_or("default");
1098        let threshold = args
1099            .get("threshold")
1100            .and_then(|v| v.as_f64())
1101            .unwrap_or(0.8);
1102
1103        let store = match self.engine.get_store(namespace) {
1104            Ok(s) => s,
1105            Err(e) => return self.tool_result(id, &e.to_string(), true),
1106        };
1107
1108        // Collect all URIs from the store
1109        let uri_map = store.uri_to_id.read().unwrap();
1110        let uris: Vec<String> = uri_map.keys().cloned().collect();
1111        drop(uri_map);
1112
1113        let disambiguator = crate::disambiguation::EntityDisambiguator::new(threshold);
1114        let suggestions = disambiguator.suggest_merges(&uris);
1115
1116        let items: Vec<DisambiguationItem> = suggestions
1117            .into_iter()
1118            .map(|(u1, u2, s)| DisambiguationItem {
1119                uri1: u1,
1120                uri2: u2,
1121                similarity: s,
1122            })
1123            .collect();
1124
1125        let message = if items.is_empty() {
1126            "No similar entities found above threshold".to_string()
1127        } else {
1128            format!("Found {} potential duplicates", items.len())
1129        };
1130
1131        let result = DisambiguationResult {
1132            suggestions: items,
1133            message,
1134        };
1135        self.serialize_result(id, result)
1136    }
1137
1138    // Legacy handlers for backward compatibility
1139    async fn handle_legacy_ingest(&self, request: McpRequest) -> McpResponse {
1140        let params = match request.params {
1141            Some(p) => p,
1142            None => return self.error_response(request.id, -32602, "Invalid params"),
1143        };
1144
1145        if let (Some(sub), Some(pred), Some(obj)) = (
1146            params.get("subject").and_then(|v| v.as_str()),
1147            params.get("predicate").and_then(|v| v.as_str()),
1148            params.get("object").and_then(|v| v.as_str()),
1149        ) {
1150            let namespace = params
1151                .get("namespace")
1152                .and_then(|v| v.as_str())
1153                .unwrap_or("default");
1154            let triple = Triple {
1155                subject: sub.to_string(),
1156                predicate: pred.to_string(),
1157                object: obj.to_string(),
1158                provenance: Some(Provenance {
1159                    source: "mcp".to_string(),
1160                    timestamp: "".to_string(),
1161                    method: "stdio".to_string(),
1162                }),
1163                embedding: vec![],
1164            };
1165
1166            let req = Self::create_request(IngestRequest {
1167                triples: vec![triple],
1168                namespace: namespace.to_string(),
1169            });
1170
1171            match self.engine.ingest_triples(req).await {
1172                Ok(_) => McpResponse {
1173                    jsonrpc: "2.0".to_string(),
1174                    id: request.id,
1175                    result: Some(serde_json::to_value("Ingested").unwrap()),
1176                    error: None,
1177                },
1178                Err(e) => self.error_response(request.id, -32000, &e.to_string()),
1179            }
1180        } else {
1181            self.error_response(request.id, -32602, "Invalid params")
1182        }
1183    }
1184
1185    async fn handle_legacy_ingest_file(&self, request: McpRequest) -> McpResponse {
1186        let params = match request.params {
1187            Some(p) => p,
1188            None => {
1189                return self.error_response(request.id, -32602, "Invalid params: 'path' required")
1190            }
1191        };
1192
1193        if let Some(path) = params.get("path").and_then(|v| v.as_str()) {
1194            let namespace = params
1195                .get("namespace")
1196                .and_then(|v| v.as_str())
1197                .unwrap_or("default");
1198
1199            let req = Self::create_request(IngestFileRequest {
1200                file_path: path.to_string(),
1201                namespace: namespace.to_string(),
1202            });
1203
1204            match self.engine.ingest_file(req).await {
1205                Ok(resp) => {
1206                    let inner = resp.into_inner();
1207                    McpResponse {
1208                        jsonrpc: "2.0".to_string(),
1209                        id: request.id,
1210                        result: Some(
1211                            serde_json::to_value(format!(
1212                                "Ingested {} triples from {}",
1213                                inner.edges_added, path
1214                            ))
1215                            .unwrap(),
1216                        ),
1217                        error: None,
1218                    }
1219                }
1220                Err(e) => self.error_response(request.id, -32000, &e.to_string()),
1221            }
1222        } else {
1223            self.error_response(request.id, -32602, "Invalid params: 'path' required")
1224        }
1225    }
1226
1227    fn serialize_result<T: serde::Serialize>(
1228        &self,
1229        id: Option<serde_json::Value>,
1230        result: T,
1231    ) -> McpResponse {
1232        match serde_json::to_string_pretty(&result) {
1233            Ok(json) => self.tool_result(id, &json, false),
1234            Err(e) => self.tool_result(id, &format!("Serialization error: {}", e), true),
1235        }
1236    }
1237
1238    async fn call_get_node_degree(
1239        &self,
1240        id: Option<serde_json::Value>,
1241        args: &serde_json::Map<String, serde_json::Value>,
1242    ) -> McpResponse {
1243        let uri = match args.get("uri").and_then(|v| v.as_str()) {
1244            Some(u) => u,
1245            None => return self.error_response(id, -32602, "Missing 'uri'"),
1246        };
1247        let namespace = args
1248            .get("namespace")
1249            .and_then(|v| v.as_str())
1250            .unwrap_or("default");
1251
1252        let store = match self.engine.get_store(namespace) {
1253            Ok(s) => s,
1254            Err(e) => return self.tool_result(id, &e.to_string(), true),
1255        };
1256
1257        let degree = store.get_degree(uri);
1258
1259        let result = DegreeResult {
1260            uri: uri.to_string(),
1261            degree,
1262        };
1263
1264        self.serialize_result(id, result)
1265    }
1266
1267    fn error_response(
1268        &self,
1269        id: Option<serde_json::Value>,
1270        code: i32,
1271        message: &str,
1272    ) -> McpResponse {
1273        McpResponse {
1274            jsonrpc: "2.0".to_string(),
1275            id,
1276            result: None,
1277            error: Some(McpError {
1278                code,
1279                message: message.to_string(),
1280                data: None,
1281            }),
1282        }
1283    }
1284
1285    fn tool_result(
1286        &self,
1287        id: Option<serde_json::Value>,
1288        text: &str,
1289        is_error: bool,
1290    ) -> McpResponse {
1291        let result = CallToolResult {
1292            content: vec![Content {
1293                content_type: "text".to_string(),
1294                text: text.to_string(),
1295            }],
1296            is_error: if is_error { Some(true) } else { None },
1297        };
1298        McpResponse {
1299            jsonrpc: "2.0".to_string(),
1300            id,
1301            result: Some(serde_json::to_value(result).unwrap()),
1302            error: None,
1303        }
1304    }
1305}