Skip to main content

synapse_core/
mcp_stdio.rs

1use crate::mcp_types::{
2    CallToolResult, Content, DegreeResult, DisambiguationItem, DisambiguationResult,
3    IngestToolResult, ListToolsResult, McpError, McpRequest, McpResponse, NeighborItem,
4    NeighborsToolResult, ReasoningToolResult, SearchResultItem, SearchToolResult,
5    SimpleSuccessResult, StatsToolResult, Tool, TripleItem, TriplesToolResult,
6};
7use crate::server::proto::semantic_engine_server::SemanticEngine;
8use crate::server::proto::{
9    HybridSearchRequest, IngestFileRequest, IngestRequest, Provenance, ReasoningRequest,
10    ReasoningStrategy, SearchMode, SparqlRequest, Triple,
11};
12use crate::server::MySemanticEngine;
13use jsonschema::JSONSchema;
14use std::sync::Arc;
15use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
16use tonic::Request;
17
18pub struct McpStdioServer {
19    engine: Arc<MySemanticEngine>,
20}
21
22impl McpStdioServer {
23    pub fn new(engine: Arc<MySemanticEngine>) -> Self {
24        Self { engine }
25    }
26
27    pub async fn run(&self) -> Result<(), Box<dyn std::error::Error>> {
28        let mut reader = BufReader::new(tokio::io::stdin());
29        let mut writer = tokio::io::stdout();
30
31        loop {
32            let mut line = String::new();
33            if reader.read_line(&mut line).await? == 0 {
34                break;
35            }
36
37            let trimmed = line.trim();
38            if trimmed.is_empty() {
39                continue;
40            }
41
42            if let Ok(request) = serde_json::from_str::<McpRequest>(trimmed) {
43                let response = self.handle_request(request).await;
44                let response_json = serde_json::to_string(&response)? + "\n";
45                writer.write_all(response_json.as_bytes()).await?;
46                writer.flush().await?;
47            }
48        }
49
50        Ok(())
51    }
52
53    fn create_request<T>(msg: T) -> Request<T> {
54        let mut req = Request::new(msg);
55
56        // Try to get token from env
57        let token_opt = std::env::var("SYNAPSE_ADMIN_TOKEN")
58            .or_else(|_| std::env::var("SYNAPSE_MCP_TOKEN"))
59            .ok();
60
61        if let Some(token) = token_opt {
62             if let Ok(val) = format!("Bearer {}", token).parse() {
63                 req.metadata_mut().insert("authorization", val);
64             }
65        }
66        req
67    }
68
69    fn get_tools() -> Vec<Tool> {
70        vec![
71            Tool {
72                name: "ingest_triples".to_string(),
73                description: Some(
74                    "Ingest one or more RDF triples into the knowledge graph".to_string(),
75                ),
76                input_schema: serde_json::json!({
77                    "type": "object",
78                    "properties": {
79                        "triples": {
80                            "type": "array",
81                            "items": {
82                                "type": "object",
83                                "properties": {
84                                    "subject": { "type": "string" },
85                                    "predicate": { "type": "string" },
86                                    "object": { "type": "string" }
87                                },
88                                "required": ["subject", "predicate", "object"]
89                            }
90                        },
91                        "namespace": { "type": "string", "default": "default" }
92                    },
93                    "required": ["triples"]
94                }),
95            },
96            Tool {
97                name: "ingest_file".to_string(),
98                description: Some(
99                    "Ingest a CSV or Markdown file into the knowledge graph".to_string(),
100                ),
101                input_schema: serde_json::json!({
102                    "type": "object",
103                    "properties": {
104                        "path": { "type": "string", "description": "Path to the file" },
105                        "namespace": { "type": "string", "default": "default" }
106                    },
107                    "required": ["path"]
108                }),
109            },
110            Tool {
111                name: "sparql_query".to_string(),
112                description: Some("Execute a SPARQL query against the knowledge graph".to_string()),
113                input_schema: serde_json::json!({
114                    "type": "object",
115                    "properties": {
116                        "query": { "type": "string", "description": "SPARQL query string" },
117                        "namespace": { "type": "string", "default": "default" }
118                    },
119                    "required": ["query"]
120                }),
121            },
122            Tool {
123                name: "hybrid_search".to_string(),
124                description: Some("Perform a hybrid vector + graph search".to_string()),
125                input_schema: serde_json::json!({
126                    "type": "object",
127                    "properties": {
128                        "query": { "type": "string", "description": "Natural language query" },
129                        "namespace": { "type": "string", "default": "default" },
130                        "vector_k": { "type": "integer", "default": 10 },
131                        "graph_depth": { "type": "integer", "default": 1 },
132                        "limit": { "type": "integer", "default": 20 }
133                    },
134                    "required": ["query"]
135                }),
136            },
137            Tool {
138                name: "apply_reasoning".to_string(),
139                description: Some(
140                    "Apply RDFS or OWL-RL reasoning to infer new triples".to_string(),
141                ),
142                input_schema: serde_json::json!({
143                    "type": "object",
144                    "properties": {
145                        "namespace": { "type": "string", "default": "default" },
146                        "strategy": { "type": "string", "enum": ["rdfs", "owlrl"], "default": "rdfs" },
147                        "materialize": { "type": "boolean", "default": false }
148                    }
149                }),
150            },
151            Tool {
152                name: "get_neighbors".to_string(),
153                description: Some(
154                    "Get neighboring nodes connected to a given URI in the graph".to_string(),
155                ),
156                input_schema: serde_json::json!({
157                    "type": "object",
158                    "properties": {
159                        "uri": { "type": "string", "description": "URI of the entity to find neighbors for" },
160                        "namespace": { "type": "string", "default": "default" },
161                        "direction": { "type": "string", "enum": ["outgoing", "incoming", "both"], "default": "outgoing" }
162                    },
163                    "required": ["uri"]
164                }),
165            },
166            Tool {
167                name: "list_triples".to_string(),
168                description: Some(
169                    "List all triples in a namespace (useful for debugging/exploration)"
170                        .to_string(),
171                ),
172                input_schema: serde_json::json!({
173                    "type": "object",
174                    "properties": {
175                        "namespace": { "type": "string", "default": "default" },
176                        "limit": { "type": "integer", "default": 100 }
177                    }
178                }),
179            },
180            Tool {
181                name: "delete_namespace".to_string(),
182                description: Some("Delete all data in a namespace".to_string()),
183                input_schema: serde_json::json!({
184                    "type": "object",
185                    "properties": {
186                        "namespace": { "type": "string", "description": "Namespace to delete" }
187                    },
188                    "required": ["namespace"]
189                }),
190            },
191            Tool {
192                name: "ingest_url".to_string(),
193                description: Some(
194                    "Scrape a web page and add its content to the vector store for RAG retrieval"
195                        .to_string(),
196                ),
197                input_schema: serde_json::json!({
198                    "type": "object",
199                    "properties": {
200                        "url": { "type": "string", "description": "URL to scrape and ingest" },
201                        "namespace": { "type": "string", "default": "default" }
202                    },
203                    "required": ["url"]
204                }),
205            },
206            Tool {
207                name: "ingest_text".to_string(),
208                description: Some(
209                    "Add arbitrary text content to the vector store for RAG retrieval".to_string(),
210                ),
211                input_schema: serde_json::json!({
212                    "type": "object",
213                    "properties": {
214                        "uri": { "type": "string", "description": "Custom URI identifier for this text" },
215                        "content": { "type": "string", "description": "Text content to embed and store" },
216                        "namespace": { "type": "string", "default": "default" }
217                    },
218                    "required": ["uri", "content"]
219                }),
220            },
221            Tool {
222                name: "compact_vectors".to_string(),
223                description: Some("Compact the vector index by removing stale entries".to_string()),
224                input_schema: serde_json::json!({
225                    "type": "object",
226                    "properties": {
227                        "namespace": { "type": "string", "default": "default" }
228                    }
229                }),
230            },
231            Tool {
232                name: "vector_stats".to_string(),
233                description: Some("Get vector store statistics (active, stale, total)".to_string()),
234                input_schema: serde_json::json!({
235                    "type": "object",
236                    "properties": {
237                        "namespace": { "type": "string", "default": "default" }
238                    }
239                }),
240            },
241            Tool {
242                name: "disambiguate".to_string(),
243                description: Some("Find similar entities that might be duplicates".to_string()),
244                input_schema: serde_json::json!({
245                    "type": "object",
246                    "properties": {
247                        "namespace": { "type": "string", "default": "default" },
248                        "threshold": { "type": "number", "default": 0.8, "description": "Similarity threshold 0.0-1.0" }
249                    }
250                }),
251            },
252            Tool {
253                name: "get_node_degree".to_string(),
254                description: Some("Get the degree (number of connections) of a node".to_string()),
255                input_schema: serde_json::json!({
256                    "type": "object",
257                    "properties": {
258                        "uri": { "type": "string" },
259                        "namespace": { "type": "string", "default": "default" }
260                    },
261                    "required": ["uri"]
262                }),
263            },
264        ]
265    }
266
267    pub async fn handle_request(&self, request: McpRequest) -> McpResponse {
268        match request.method.as_str() {
269            "initialize" => {
270                // MCP protocol initialization
271                McpResponse {
272                    jsonrpc: "2.0".to_string(),
273                    id: request.id,
274                    result: Some(serde_json::json!({
275                        "protocolVersion": "2024-11-05",
276                        "capabilities": {
277                            "tools": {}
278                        },
279                        "serverInfo": {
280                        "name": "synapse",
281                        "version": env!("CARGO_PKG_VERSION")
282                    }
283                    })),
284                    error: None,
285                }
286            }
287            "notifications/initialized" | "initialized" => {
288                // Client confirms initialization - just acknowledge
289                McpResponse {
290                    jsonrpc: "2.0".to_string(),
291                    id: request.id,
292                    result: Some(serde_json::json!({})),
293                    error: None,
294                }
295            }
296            "tools/list" => {
297                let result = ListToolsResult {
298                    tools: Self::get_tools(),
299                };
300                McpResponse {
301                    jsonrpc: "2.0".to_string(),
302                    id: request.id,
303                    result: Some(serde_json::to_value(result).unwrap()),
304                    error: None,
305                }
306            }
307            "tools/call" => self.handle_tool_call(request).await,
308            // Legacy methods for backwards compatibility
309            "ingest" => self.handle_legacy_ingest(request).await,
310            "ingest_file" => self.handle_legacy_ingest_file(request).await,
311            _ => McpResponse {
312                jsonrpc: "2.0".to_string(),
313                id: request.id,
314                result: None,
315                error: Some(McpError {
316                    code: -32601,
317                    message: format!("Method not found: {}", request.method),
318                    data: None,
319                }),
320            },
321        }
322    }
323
324    fn validate_arguments(tool_name: &str, arguments: &serde_json::Value) -> Result<(), String> {
325        let tools = Self::get_tools();
326        if let Some(tool) = tools.iter().find(|t| t.name == tool_name) {
327            if let Ok(schema) = JSONSchema::compile(&tool.input_schema) {
328                if let Err(errors) = schema.validate(arguments) {
329                    let error_msg = errors.map(|e| e.to_string()).collect::<Vec<_>>().join(", ");
330                    return Err(format!("Validation error: {}", error_msg));
331                }
332            } else {
333                return Err("Invalid tool schema definition".to_string());
334            }
335        }
336        Ok(())
337    }
338
339    async fn handle_tool_call(&self, request: McpRequest) -> McpResponse {
340        let params = match request.params {
341            Some(p) => p,
342            None => return self.error_response(request.id, -32602, "Missing params"),
343        };
344
345        let tool_name = match params.get("name").and_then(|v| v.as_str()) {
346            Some(n) => n,
347            None => return self.error_response(request.id, -32602, "Missing tool name"),
348        };
349
350        let arguments = params
351            .get("arguments")
352            .and_then(|v| v.as_object())
353            .cloned()
354            .unwrap_or_default();
355
356        let args_value = serde_json::Value::Object(arguments.clone());
357        if let Err(e) = Self::validate_arguments(tool_name, &args_value) {
358            return self.error_response(request.id, -32602, &e);
359        }
360
361        match tool_name {
362            "ingest_triples" => self.call_ingest_triples(request.id, &arguments).await,
363            "ingest_file" => self.call_ingest_file(request.id, &arguments).await,
364            "sparql_query" => self.call_sparql_query(request.id, &arguments).await,
365            "hybrid_search" => self.call_hybrid_search(request.id, &arguments).await,
366            "apply_reasoning" => self.call_apply_reasoning(request.id, &arguments).await,
367            "get_neighbors" => self.call_get_neighbors(request.id, &arguments).await,
368            "list_triples" => self.call_list_triples(request.id, &arguments).await,
369            "delete_namespace" => self.call_delete_namespace(request.id, &arguments).await,
370            "ingest_url" => self.call_ingest_url(request.id, &arguments).await,
371            "ingest_text" => self.call_ingest_text(request.id, &arguments).await,
372            "compact_vectors" => self.call_compact_vectors(request.id, &arguments).await,
373            "vector_stats" => self.call_vector_stats(request.id, &arguments).await,
374            "disambiguate" => self.call_disambiguate(request.id, &arguments).await,
375            "get_node_degree" => self.call_get_node_degree(request.id, &arguments).await,
376            _ => self.error_response(request.id, -32602, &format!("Unknown tool: {}", tool_name)),
377        }
378    }
379
380    async fn call_ingest_triples(
381        &self,
382        id: Option<serde_json::Value>,
383        args: &serde_json::Map<String, serde_json::Value>,
384    ) -> McpResponse {
385        let namespace = args
386            .get("namespace")
387            .and_then(|v| v.as_str())
388            .unwrap_or("default");
389        let triples_array = match args.get("triples").and_then(|v| v.as_array()) {
390            Some(t) => t,
391            None => return self.error_response(id, -32602, "Missing 'triples' array"),
392        };
393
394        let mut triples = Vec::new();
395        for t in triples_array {
396            if let (Some(s), Some(p), Some(o)) = (
397                t.get("subject").and_then(|v| v.as_str()),
398                t.get("predicate").and_then(|v| v.as_str()),
399                t.get("object").and_then(|v| v.as_str()),
400            ) {
401                triples.push(Triple {
402                    subject: s.to_string(),
403                    predicate: p.to_string(),
404                    object: o.to_string(),
405                    provenance: Some(Provenance {
406                        source: "mcp".to_string(),
407                        timestamp: "".to_string(),
408                        method: "tools/call".to_string(),
409                    }),
410                    embedding: vec![],
411                });
412            }
413        }
414
415        let req = Self::create_request(IngestRequest {
416            triples,
417            namespace: namespace.to_string(),
418        });
419
420        match self.engine.ingest_triples(req).await {
421            Ok(resp) => {
422                let inner = resp.into_inner();
423                let result = IngestToolResult {
424                    nodes_added: inner.nodes_added,
425                    edges_added: inner.edges_added,
426                    message: format!("Ingested {} triples", inner.edges_added),
427                };
428                self.serialize_result(id, result)
429            }
430            Err(e) => self.tool_result(id, &e.to_string(), true),
431        }
432    }
433
434    async fn call_ingest_file(
435        &self,
436        id: Option<serde_json::Value>,
437        args: &serde_json::Map<String, serde_json::Value>,
438    ) -> McpResponse {
439        let path = match args.get("path").and_then(|v| v.as_str()) {
440            Some(p) => p,
441            None => return self.error_response(id, -32602, "Missing 'path'"),
442        };
443        let namespace = args
444            .get("namespace")
445            .and_then(|v| v.as_str())
446            .unwrap_or("default");
447
448        let req = Self::create_request(IngestFileRequest {
449            file_path: path.to_string(),
450            namespace: namespace.to_string(),
451        });
452
453        match self.engine.ingest_file(req).await {
454            Ok(resp) => {
455                let inner = resp.into_inner();
456                let result = IngestToolResult {
457                    nodes_added: inner.nodes_added,
458                    edges_added: inner.edges_added,
459                    message: format!("Ingested {} triples from {}", inner.edges_added, path),
460                };
461                self.serialize_result(id, result)
462            }
463            Err(e) => self.tool_result(id, &e.to_string(), true),
464        }
465    }
466
467    async fn call_sparql_query(
468        &self,
469        id: Option<serde_json::Value>,
470        args: &serde_json::Map<String, serde_json::Value>,
471    ) -> McpResponse {
472        let query = match args.get("query").and_then(|v| v.as_str()) {
473            Some(q) => q,
474            None => return self.error_response(id, -32602, "Missing 'query'"),
475        };
476        let namespace = args
477            .get("namespace")
478            .and_then(|v| v.as_str())
479            .unwrap_or("default");
480
481        let req = Self::create_request(SparqlRequest {
482            query: query.to_string(),
483            namespace: namespace.to_string(),
484        });
485
486        match self.engine.query_sparql(req).await {
487            Ok(resp) => self.tool_result(id, &resp.into_inner().results_json, false),
488            Err(e) => self.tool_result(id, &e.to_string(), true),
489        }
490    }
491
492    async fn call_hybrid_search(
493        &self,
494        id: Option<serde_json::Value>,
495        args: &serde_json::Map<String, serde_json::Value>,
496    ) -> McpResponse {
497        let query = match args.get("query").and_then(|v| v.as_str()) {
498            Some(q) => q,
499            None => return self.error_response(id, -32602, "Missing 'query'"),
500        };
501        let namespace = args
502            .get("namespace")
503            .and_then(|v| v.as_str())
504            .unwrap_or("default");
505        let vector_k = args.get("vector_k").and_then(|v| v.as_u64()).unwrap_or(10) as u32;
506        let graph_depth = args
507            .get("graph_depth")
508            .and_then(|v| v.as_u64())
509            .unwrap_or(1) as u32;
510        let limit = args.get("limit").and_then(|v| v.as_u64()).unwrap_or(20) as u32;
511
512        let req = Self::create_request(HybridSearchRequest {
513            query: query.to_string(),
514            namespace: namespace.to_string(),
515            vector_k,
516            graph_depth,
517            mode: SearchMode::Hybrid as i32,
518            limit,
519        });
520
521        match self.engine.hybrid_search(req).await {
522            Ok(resp) => {
523                let results = resp.into_inner().results;
524                let items: Vec<SearchResultItem> = results
525                    .into_iter()
526                    .map(|r| SearchResultItem {
527                        node_id: r.node_id,
528                        score: r.score,
529                        content: r.content,
530                        uri: r.uri,
531                    })
532                    .collect();
533
534                let result = SearchToolResult { results: items };
535                self.serialize_result(id, result)
536            }
537            Err(e) => self.tool_result(id, &e.to_string(), true),
538        }
539    }
540
541    async fn call_apply_reasoning(
542        &self,
543        id: Option<serde_json::Value>,
544        args: &serde_json::Map<String, serde_json::Value>,
545    ) -> McpResponse {
546        let namespace = args
547            .get("namespace")
548            .and_then(|v| v.as_str())
549            .unwrap_or("default");
550        let strategy_str = args
551            .get("strategy")
552            .and_then(|v| v.as_str())
553            .unwrap_or("rdfs");
554        let materialize = args
555            .get("materialize")
556            .and_then(|v| v.as_bool())
557            .unwrap_or(false);
558
559        let strategy = match strategy_str.to_lowercase().as_str() {
560            "owlrl" | "owl-rl" => ReasoningStrategy::Owlrl as i32,
561            _ => ReasoningStrategy::Rdfs as i32,
562        };
563
564        let req = Self::create_request(ReasoningRequest {
565            namespace: namespace.to_string(),
566            strategy,
567            materialize,
568        });
569
570        match self.engine.apply_reasoning(req).await {
571            Ok(resp) => {
572                let inner = resp.into_inner();
573                let result = ReasoningToolResult {
574                    success: inner.success,
575                    triples_inferred: inner.triples_inferred,
576                    message: inner.message,
577                };
578                self.serialize_result(id, result)
579            }
580            Err(e) => self.tool_result(id, &e.to_string(), true),
581        }
582    }
583
584    async fn call_get_neighbors(
585        &self,
586        id: Option<serde_json::Value>,
587        args: &serde_json::Map<String, serde_json::Value>,
588    ) -> McpResponse {
589        let uri = match args.get("uri").and_then(|v| v.as_str()) {
590            Some(u) => u,
591            None => return self.error_response(id, -32602, "Missing 'uri'"),
592        };
593        let namespace = args
594            .get("namespace")
595            .and_then(|v| v.as_str())
596            .unwrap_or("default");
597        let direction = args
598            .get("direction")
599            .and_then(|v| v.as_str())
600            .unwrap_or("outgoing");
601
602        let store = match self.engine.get_store(namespace) {
603            Ok(s) => s,
604            Err(e) => return self.tool_result(id, &e.to_string(), true),
605        };
606
607        let mut neighbors = Vec::new();
608
609        // Query outgoing edges (URI as subject)
610        if direction == "outgoing" || direction == "both" {
611            if let Ok(subj) = oxigraph::model::NamedNodeRef::new(uri) {
612                for q in store
613                    .store
614                    .quads_for_pattern(Some(subj.into()), None, None, None)
615                    .flatten()
616                {
617                    neighbors.push(NeighborItem {
618                        direction: "outgoing".to_string(),
619                        predicate: q.predicate.to_string(),
620                        target: q.object.to_string(),
621                        score: 1.0,
622                    });
623                }
624            }
625        }
626
627        // Query incoming edges (URI as object)
628        if direction == "incoming" || direction == "both" {
629            if let Ok(obj) = oxigraph::model::NamedNodeRef::new(uri) {
630                for q in store
631                    .store
632                    .quads_for_pattern(None, None, Some(obj.into()), None)
633                    .flatten()
634                {
635                    neighbors.push(NeighborItem {
636                        direction: "incoming".to_string(),
637                        predicate: q.predicate.to_string(),
638                        target: q.subject.to_string(),
639                        score: 1.0,
640                    });
641                }
642            }
643        }
644
645        let result = NeighborsToolResult { neighbors };
646        self.serialize_result(id, result)
647    }
648
649    async fn call_list_triples(
650        &self,
651        id: Option<serde_json::Value>,
652        args: &serde_json::Map<String, serde_json::Value>,
653    ) -> McpResponse {
654        let namespace = args
655            .get("namespace")
656            .and_then(|v| v.as_str())
657            .unwrap_or("default");
658        let limit = args.get("limit").and_then(|v| v.as_u64()).unwrap_or(100) as usize;
659
660        let store = match self.engine.get_store(namespace) {
661            Ok(s) => s,
662            Err(e) => return self.tool_result(id, &e.to_string(), true),
663        };
664
665        let mut triples = Vec::new();
666        for q in store.store.iter().take(limit).flatten() {
667            triples.push(TripleItem {
668                subject: q.subject.to_string(),
669                predicate: q.predicate.to_string(),
670                object: q.object.to_string(),
671            });
672        }
673
674        let result = TriplesToolResult { triples };
675        self.serialize_result(id, result)
676    }
677
678    async fn call_delete_namespace(
679        &self,
680        id: Option<serde_json::Value>,
681        args: &serde_json::Map<String, serde_json::Value>,
682    ) -> McpResponse {
683        let namespace = match args.get("namespace").and_then(|v| v.as_str()) {
684            Some(n) => n,
685            None => return self.error_response(id, -32602, "Missing 'namespace'"),
686        };
687
688        let req = Self::create_request(crate::server::proto::EmptyRequest {
689            namespace: namespace.to_string(),
690        });
691
692        match self.engine.delete_namespace_data(req).await {
693            Ok(resp) => {
694                let inner = resp.into_inner();
695                let result = SimpleSuccessResult {
696                    success: inner.success,
697                    message: inner.message,
698                };
699                self.serialize_result(id, result)
700            }
701            Err(e) => self.tool_result(id, &e.to_string(), true),
702        }
703    }
704
705    async fn call_ingest_url(
706        &self,
707        id: Option<serde_json::Value>,
708        args: &serde_json::Map<String, serde_json::Value>,
709    ) -> McpResponse {
710        let url = match args.get("url").and_then(|v| v.as_str()) {
711            Some(u) => u,
712            None => return self.error_response(id, -32602, "Missing 'url'"),
713        };
714        let namespace = args
715            .get("namespace")
716            .and_then(|v| v.as_str())
717            .unwrap_or("default");
718
719        // Fetch URL content
720        let client = reqwest::Client::new();
721        let response = match client.get(url).send().await {
722            Ok(r) => r,
723            Err(e) => return self.tool_result(id, &format!("Failed to fetch URL: {}", e), true),
724        };
725
726        if !response.status().is_success() {
727            return self.tool_result(id, &format!("HTTP error: {}", response.status()), true);
728        }
729
730        let html = match response.text().await {
731            Ok(t) => t,
732            Err(e) => {
733                return self.tool_result(id, &format!("Failed to read response: {}", e), true)
734            }
735        };
736
737        // HTML to text conversion with Regex
738        let script_re = regex::Regex::new(r"(?s)<script.*?>.*?</script>").unwrap();
739        let style_re = regex::Regex::new(r"(?s)<style.*?>.*?</style>").unwrap();
740        let tag_re = regex::Regex::new(r"<[^>]*>").unwrap();
741
742        let no_script = script_re.replace_all(&html, " ");
743        let no_style = style_re.replace_all(&no_script, " ");
744        let text_content = tag_re.replace_all(&no_style, " ");
745
746        let text = text_content
747            .split_whitespace()
748            .collect::<Vec<_>>()
749            .join(" ");
750
751        // Chunk text with overlap
752        let processor = crate::processor::TextProcessor::new();
753        let chunks = processor.chunk_text(&text, 1000, 150);
754
755        // Add to vector store
756        let store = match self.engine.get_store(namespace) {
757            Ok(s) => s,
758            Err(e) => return self.tool_result(id, &e.to_string(), true),
759        };
760
761        if let Some(ref vector_store) = store.vector_store {
762            let mut added_chunks = 0;
763            for (i, chunk) in chunks.iter().enumerate() {
764                let chunk_uri = format!("{}#chunk-{}", url, i);
765                // For MCP ingestion, we just use the chunk URI as the key and metadata URI
766                let metadata = serde_json::json!({
767                    "uri": chunk_uri,
768                    "source_url": url,
769                    "type": "web_chunk"
770                });
771                match vector_store.add(&chunk_uri, chunk, metadata).await {
772                    Ok(_) => added_chunks += 1,
773                    Err(e) => {
774                        eprintln!("Failed to add chunk {}: {}", i, e);
775                    }
776                }
777            }
778            let result = IngestToolResult {
779                nodes_added: 0,
780                edges_added: 0, // Ingest URL technically adds to vector store, no graph edges yet unless reasoned
781                message: format!(
782                    "Ingested URL: {} ({} chars, {} chunks)",
783                    url,
784                    text.len(),
785                    added_chunks
786                ),
787            };
788            self.serialize_result(id, result)
789        } else {
790            self.tool_result(id, "Vector store not available", true)
791        }
792    }
793
794    async fn call_ingest_text(
795        &self,
796        id: Option<serde_json::Value>,
797        args: &serde_json::Map<String, serde_json::Value>,
798    ) -> McpResponse {
799        let uri = match args.get("uri").and_then(|v| v.as_str()) {
800            Some(u) => u,
801            None => return self.error_response(id, -32602, "Missing 'uri'"),
802        };
803        let content = match args.get("content").and_then(|v| v.as_str()) {
804            Some(c) => c,
805            None => return self.error_response(id, -32602, "Missing 'content'"),
806        };
807        let namespace = args
808            .get("namespace")
809            .and_then(|v| v.as_str())
810            .unwrap_or("default");
811
812        // Chunk text with overlap
813        let processor = crate::processor::TextProcessor::new();
814        let chunks = processor.chunk_text(content, 1000, 150);
815
816        // Add to vector store
817        let store = match self.engine.get_store(namespace) {
818            Ok(s) => s,
819            Err(e) => return self.tool_result(id, &e.to_string(), true),
820        };
821
822        if let Some(ref vector_store) = store.vector_store {
823            let mut added_chunks = 0;
824            for (i, chunk) in chunks.iter().enumerate() {
825                let chunk_uri = if chunks.len() > 1 {
826                    format!("{}#chunk-{}", uri, i)
827                } else {
828                    uri.to_string()
829                };
830                let metadata = serde_json::json!({
831                    "uri": uri, // Map back to original URI
832                    "chunk_uri": chunk_uri,
833                    "type": "text_chunk"
834                });
835                match vector_store.add(&chunk_uri, chunk, metadata).await {
836                    Ok(_) => added_chunks += 1,
837                    Err(e) => {
838                        eprintln!("Failed to add chunk {}: {}", i, e);
839                    }
840                }
841            }
842            let result = IngestToolResult {
843                nodes_added: 0,
844                edges_added: 0,
845                message: format!(
846                    "Ingested text: {} ({} chars, {} chunks)",
847                    uri,
848                    content.len(),
849                    added_chunks
850                ),
851            };
852            self.serialize_result(id, result)
853        } else {
854            self.tool_result(id, "Vector store not available", true)
855        }
856    }
857
858    async fn call_compact_vectors(
859        &self,
860        id: Option<serde_json::Value>,
861        args: &serde_json::Map<String, serde_json::Value>,
862    ) -> McpResponse {
863        let namespace = args
864            .get("namespace")
865            .and_then(|v| v.as_str())
866            .unwrap_or("default");
867
868        let store = match self.engine.get_store(namespace) {
869            Ok(s) => s,
870            Err(e) => return self.tool_result(id, &e.to_string(), true),
871        };
872
873        if let Some(ref vector_store) = store.vector_store {
874            match vector_store.compact() {
875                Ok(removed) => {
876                    let result = SimpleSuccessResult {
877                        success: true,
878                        message: format!("Compaction complete: {} stale entries removed", removed),
879                    };
880                    self.serialize_result(id, result)
881                }
882                Err(e) => self.tool_result(id, &format!("Compaction error: {}", e), true),
883            }
884        } else {
885            self.tool_result(id, "Vector store not available", true)
886        }
887    }
888
889    async fn call_vector_stats(
890        &self,
891        id: Option<serde_json::Value>,
892        args: &serde_json::Map<String, serde_json::Value>,
893    ) -> McpResponse {
894        let namespace = args
895            .get("namespace")
896            .and_then(|v| v.as_str())
897            .unwrap_or("default");
898
899        let store = match self.engine.get_store(namespace) {
900            Ok(s) => s,
901            Err(e) => return self.tool_result(id, &e.to_string(), true),
902        };
903
904        if let Some(ref vector_store) = store.vector_store {
905            let (active, stale, total) = vector_store.stats();
906            let result = StatsToolResult {
907                active_vectors: active,
908                stale_vectors: stale,
909                total_embeddings: total,
910            };
911            self.serialize_result(id, result)
912        } else {
913            self.tool_result(id, "Vector store not available", true)
914        }
915    }
916
917    async fn call_disambiguate(
918        &self,
919        id: Option<serde_json::Value>,
920        args: &serde_json::Map<String, serde_json::Value>,
921    ) -> McpResponse {
922        let namespace = args
923            .get("namespace")
924            .and_then(|v| v.as_str())
925            .unwrap_or("default");
926        let threshold = args
927            .get("threshold")
928            .and_then(|v| v.as_f64())
929            .unwrap_or(0.8);
930
931        let store = match self.engine.get_store(namespace) {
932            Ok(s) => s,
933            Err(e) => return self.tool_result(id, &e.to_string(), true),
934        };
935
936        // Collect all URIs from the store
937        let uri_map = store.uri_to_id.read().unwrap();
938        let uris: Vec<String> = uri_map.keys().cloned().collect();
939        drop(uri_map);
940
941        let disambiguator = crate::disambiguation::EntityDisambiguator::new(threshold);
942        let suggestions = disambiguator.suggest_merges(&uris);
943
944        let items: Vec<DisambiguationItem> = suggestions
945            .into_iter()
946            .map(|(u1, u2, s)| DisambiguationItem {
947                uri1: u1,
948                uri2: u2,
949                similarity: s,
950            })
951            .collect();
952
953        let message = if items.is_empty() {
954            "No similar entities found above threshold".to_string()
955        } else {
956            format!("Found {} potential duplicates", items.len())
957        };
958
959        let result = DisambiguationResult {
960            suggestions: items,
961            message,
962        };
963        self.serialize_result(id, result)
964    }
965
966    // Legacy handlers for backward compatibility
967    async fn handle_legacy_ingest(&self, request: McpRequest) -> McpResponse {
968        let params = match request.params {
969            Some(p) => p,
970            None => return self.error_response(request.id, -32602, "Invalid params"),
971        };
972
973        if let (Some(sub), Some(pred), Some(obj)) = (
974            params.get("subject").and_then(|v| v.as_str()),
975            params.get("predicate").and_then(|v| v.as_str()),
976            params.get("object").and_then(|v| v.as_str()),
977        ) {
978            let namespace = params
979                .get("namespace")
980                .and_then(|v| v.as_str())
981                .unwrap_or("default");
982            let triple = Triple {
983                subject: sub.to_string(),
984                predicate: pred.to_string(),
985                object: obj.to_string(),
986                provenance: Some(Provenance {
987                    source: "mcp".to_string(),
988                    timestamp: "".to_string(),
989                    method: "stdio".to_string(),
990                }),
991                embedding: vec![],
992            };
993
994            let req = Self::create_request(IngestRequest {
995                triples: vec![triple],
996                namespace: namespace.to_string(),
997            });
998
999            match self.engine.ingest_triples(req).await {
1000                Ok(_) => McpResponse {
1001                    jsonrpc: "2.0".to_string(),
1002                    id: request.id,
1003                    result: Some(serde_json::to_value("Ingested").unwrap()),
1004                    error: None,
1005                },
1006                Err(e) => self.error_response(request.id, -32000, &e.to_string()),
1007            }
1008        } else {
1009            self.error_response(request.id, -32602, "Invalid params")
1010        }
1011    }
1012
1013    async fn handle_legacy_ingest_file(&self, request: McpRequest) -> McpResponse {
1014        let params = match request.params {
1015            Some(p) => p,
1016            None => {
1017                return self.error_response(request.id, -32602, "Invalid params: 'path' required")
1018            }
1019        };
1020
1021        if let Some(path) = params.get("path").and_then(|v| v.as_str()) {
1022            let namespace = params
1023                .get("namespace")
1024                .and_then(|v| v.as_str())
1025                .unwrap_or("default");
1026
1027            let req = Self::create_request(IngestFileRequest {
1028                file_path: path.to_string(),
1029                namespace: namespace.to_string(),
1030            });
1031
1032            match self.engine.ingest_file(req).await {
1033                Ok(resp) => {
1034                    let inner = resp.into_inner();
1035                    McpResponse {
1036                        jsonrpc: "2.0".to_string(),
1037                        id: request.id,
1038                        result: Some(
1039                            serde_json::to_value(format!(
1040                                "Ingested {} triples from {}",
1041                                inner.edges_added, path
1042                            ))
1043                            .unwrap(),
1044                        ),
1045                        error: None,
1046                    }
1047                }
1048                Err(e) => self.error_response(request.id, -32000, &e.to_string()),
1049            }
1050        } else {
1051            self.error_response(request.id, -32602, "Invalid params: 'path' required")
1052        }
1053    }
1054
1055    fn serialize_result<T: serde::Serialize>(
1056        &self,
1057        id: Option<serde_json::Value>,
1058        result: T,
1059    ) -> McpResponse {
1060        match serde_json::to_string_pretty(&result) {
1061            Ok(json) => self.tool_result(id, &json, false),
1062            Err(e) => self.tool_result(id, &format!("Serialization error: {}", e), true),
1063        }
1064    }
1065
1066    async fn call_get_node_degree(
1067        &self,
1068        id: Option<serde_json::Value>,
1069        args: &serde_json::Map<String, serde_json::Value>,
1070    ) -> McpResponse {
1071        let uri = match args.get("uri").and_then(|v| v.as_str()) {
1072            Some(u) => u,
1073            None => return self.error_response(id, -32602, "Missing 'uri'"),
1074        };
1075        let namespace = args
1076            .get("namespace")
1077            .and_then(|v| v.as_str())
1078            .unwrap_or("default");
1079
1080        let store = match self.engine.get_store(namespace) {
1081            Ok(s) => s,
1082            Err(e) => return self.tool_result(id, &e.to_string(), true),
1083        };
1084
1085        let degree = store.get_degree(uri);
1086
1087        let result = DegreeResult {
1088            uri: uri.to_string(),
1089            degree,
1090        };
1091
1092        self.serialize_result(id, result)
1093    }
1094
1095    fn error_response(
1096        &self,
1097        id: Option<serde_json::Value>,
1098        code: i32,
1099        message: &str,
1100    ) -> McpResponse {
1101        McpResponse {
1102            jsonrpc: "2.0".to_string(),
1103            id,
1104            result: None,
1105            error: Some(McpError {
1106                code,
1107                message: message.to_string(),
1108                data: None,
1109            }),
1110        }
1111    }
1112
1113    fn tool_result(
1114        &self,
1115        id: Option<serde_json::Value>,
1116        text: &str,
1117        is_error: bool,
1118    ) -> McpResponse {
1119        let result = CallToolResult {
1120            content: vec![Content {
1121                content_type: "text".to_string(),
1122                text: text.to_string(),
1123            }],
1124            is_error: if is_error { Some(true) } else { None },
1125        };
1126        McpResponse {
1127            jsonrpc: "2.0".to_string(),
1128            id,
1129            result: Some(serde_json::to_value(result).unwrap()),
1130            error: None,
1131        }
1132    }
1133}