1use crate::mcp_types::{
2 CallToolResult, Content, ListToolsResult, McpError, McpRequest, McpResponse, Tool,
3};
4use crate::server::proto::semantic_engine_server::SemanticEngine;
5use crate::server::proto::{
6 HybridSearchRequest, IngestFileRequest, IngestRequest, Provenance, ReasoningRequest,
7 ReasoningStrategy, SearchMode, SparqlRequest, Triple,
8};
9use crate::server::MySemanticEngine;
10use std::sync::Arc;
11use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
12use tonic::Request;
13
14pub struct McpStdioServer {
15 engine: Arc<MySemanticEngine>,
16}
17
18impl McpStdioServer {
19 pub fn new(engine: Arc<MySemanticEngine>) -> Self {
20 Self { engine }
21 }
22
23 pub async fn run(&self) -> Result<(), Box<dyn std::error::Error>> {
24 let mut reader = BufReader::new(tokio::io::stdin());
25 let mut writer = tokio::io::stdout();
26
27 loop {
28 let mut line = String::new();
29 if reader.read_line(&mut line).await? == 0 {
30 break;
31 }
32
33 let trimmed = line.trim();
34 if trimmed.is_empty() {
35 continue;
36 }
37
38 if let Ok(request) = serde_json::from_str::<McpRequest>(trimmed) {
39 let response = self.handle_request(request).await;
40 let response_json = serde_json::to_string(&response)? + "\n";
41 writer.write_all(response_json.as_bytes()).await?;
42 writer.flush().await?;
43 }
44 }
45
46 Ok(())
47 }
48
49 fn get_tools() -> Vec<Tool> {
50 vec![
51 Tool {
52 name: "ingest_triples".to_string(),
53 description: Some(
54 "Ingest one or more RDF triples into the knowledge graph".to_string(),
55 ),
56 input_schema: serde_json::json!({
57 "type": "object",
58 "properties": {
59 "triples": {
60 "type": "array",
61 "items": {
62 "type": "object",
63 "properties": {
64 "subject": { "type": "string" },
65 "predicate": { "type": "string" },
66 "object": { "type": "string" }
67 },
68 "required": ["subject", "predicate", "object"]
69 }
70 },
71 "namespace": { "type": "string", "default": "default" }
72 },
73 "required": ["triples"]
74 }),
75 },
76 Tool {
77 name: "ingest_file".to_string(),
78 description: Some(
79 "Ingest a CSV or Markdown file into the knowledge graph".to_string(),
80 ),
81 input_schema: serde_json::json!({
82 "type": "object",
83 "properties": {
84 "path": { "type": "string", "description": "Path to the file" },
85 "namespace": { "type": "string", "default": "default" }
86 },
87 "required": ["path"]
88 }),
89 },
90 Tool {
91 name: "sparql_query".to_string(),
92 description: Some("Execute a SPARQL query against the knowledge graph".to_string()),
93 input_schema: serde_json::json!({
94 "type": "object",
95 "properties": {
96 "query": { "type": "string", "description": "SPARQL query string" },
97 "namespace": { "type": "string", "default": "default" }
98 },
99 "required": ["query"]
100 }),
101 },
102 Tool {
103 name: "hybrid_search".to_string(),
104 description: Some("Perform a hybrid vector + graph search".to_string()),
105 input_schema: serde_json::json!({
106 "type": "object",
107 "properties": {
108 "query": { "type": "string", "description": "Natural language query" },
109 "namespace": { "type": "string", "default": "default" },
110 "vector_k": { "type": "integer", "default": 10 },
111 "graph_depth": { "type": "integer", "default": 1 },
112 "limit": { "type": "integer", "default": 20 }
113 },
114 "required": ["query"]
115 }),
116 },
117 Tool {
118 name: "apply_reasoning".to_string(),
119 description: Some(
120 "Apply RDFS or OWL-RL reasoning to infer new triples".to_string(),
121 ),
122 input_schema: serde_json::json!({
123 "type": "object",
124 "properties": {
125 "namespace": { "type": "string", "default": "default" },
126 "strategy": { "type": "string", "enum": ["rdfs", "owlrl"], "default": "rdfs" },
127 "materialize": { "type": "boolean", "default": false }
128 }
129 }),
130 },
131 Tool {
132 name: "get_neighbors".to_string(),
133 description: Some(
134 "Get neighboring nodes connected to a given URI in the graph".to_string(),
135 ),
136 input_schema: serde_json::json!({
137 "type": "object",
138 "properties": {
139 "uri": { "type": "string", "description": "URI of the entity to find neighbors for" },
140 "namespace": { "type": "string", "default": "default" },
141 "direction": { "type": "string", "enum": ["outgoing", "incoming", "both"], "default": "outgoing" }
142 },
143 "required": ["uri"]
144 }),
145 },
146 Tool {
147 name: "list_triples".to_string(),
148 description: Some(
149 "List all triples in a namespace (useful for debugging/exploration)"
150 .to_string(),
151 ),
152 input_schema: serde_json::json!({
153 "type": "object",
154 "properties": {
155 "namespace": { "type": "string", "default": "default" },
156 "limit": { "type": "integer", "default": 100 }
157 }
158 }),
159 },
160 Tool {
161 name: "delete_namespace".to_string(),
162 description: Some("Delete all data in a namespace".to_string()),
163 input_schema: serde_json::json!({
164 "type": "object",
165 "properties": {
166 "namespace": { "type": "string", "description": "Namespace to delete" }
167 },
168 "required": ["namespace"]
169 }),
170 },
171 Tool {
172 name: "ingest_url".to_string(),
173 description: Some(
174 "Scrape a web page and add its content to the vector store for RAG retrieval"
175 .to_string(),
176 ),
177 input_schema: serde_json::json!({
178 "type": "object",
179 "properties": {
180 "url": { "type": "string", "description": "URL to scrape and ingest" },
181 "namespace": { "type": "string", "default": "default" }
182 },
183 "required": ["url"]
184 }),
185 },
186 Tool {
187 name: "ingest_text".to_string(),
188 description: Some(
189 "Add arbitrary text content to the vector store for RAG retrieval".to_string(),
190 ),
191 input_schema: serde_json::json!({
192 "type": "object",
193 "properties": {
194 "uri": { "type": "string", "description": "Custom URI identifier for this text" },
195 "content": { "type": "string", "description": "Text content to embed and store" },
196 "namespace": { "type": "string", "default": "default" }
197 },
198 "required": ["uri", "content"]
199 }),
200 },
201 Tool {
202 name: "compact_vectors".to_string(),
203 description: Some("Compact the vector index by removing stale entries".to_string()),
204 input_schema: serde_json::json!({
205 "type": "object",
206 "properties": {
207 "namespace": { "type": "string", "default": "default" }
208 }
209 }),
210 },
211 Tool {
212 name: "vector_stats".to_string(),
213 description: Some("Get vector store statistics (active, stale, total)".to_string()),
214 input_schema: serde_json::json!({
215 "type": "object",
216 "properties": {
217 "namespace": { "type": "string", "default": "default" }
218 }
219 }),
220 },
221 Tool {
222 name: "disambiguate".to_string(),
223 description: Some("Find similar entities that might be duplicates".to_string()),
224 input_schema: serde_json::json!({
225 "type": "object",
226 "properties": {
227 "namespace": { "type": "string", "default": "default" },
228 "threshold": { "type": "number", "default": 0.8, "description": "Similarity threshold 0.0-1.0" }
229 }
230 }),
231 },
232 ]
233 }
234
235 async fn handle_request(&self, request: McpRequest) -> McpResponse {
236 match request.method.as_str() {
237 "initialize" => {
238 McpResponse {
240 jsonrpc: "2.0".to_string(),
241 id: request.id,
242 result: Some(serde_json::json!({
243 "protocolVersion": "2024-11-05",
244 "capabilities": {
245 "tools": {}
246 },
247 "serverInfo": {
248 "name": "synapse",
249 "version": "0.5.0"
250 }
251 })),
252 error: None,
253 }
254 }
255 "notifications/initialized" | "initialized" => {
256 McpResponse {
258 jsonrpc: "2.0".to_string(),
259 id: request.id,
260 result: Some(serde_json::json!({})),
261 error: None,
262 }
263 }
264 "tools/list" => {
265 let result = ListToolsResult {
266 tools: Self::get_tools(),
267 };
268 McpResponse {
269 jsonrpc: "2.0".to_string(),
270 id: request.id,
271 result: Some(serde_json::to_value(result).unwrap()),
272 error: None,
273 }
274 }
275 "tools/call" => self.handle_tool_call(request).await,
276 "ingest" => self.handle_legacy_ingest(request).await,
278 "ingest_file" => self.handle_legacy_ingest_file(request).await,
279 _ => McpResponse {
280 jsonrpc: "2.0".to_string(),
281 id: request.id,
282 result: None,
283 error: Some(McpError {
284 code: -32601,
285 message: format!("Method not found: {}", request.method),
286 data: None,
287 }),
288 },
289 }
290 }
291
292 async fn handle_tool_call(&self, request: McpRequest) -> McpResponse {
293 let params = match request.params {
294 Some(p) => p,
295 None => return self.error_response(request.id, -32602, "Missing params"),
296 };
297
298 let tool_name = match params.get("name").and_then(|v| v.as_str()) {
299 Some(n) => n,
300 None => return self.error_response(request.id, -32602, "Missing tool name"),
301 };
302
303 let arguments = params
304 .get("arguments")
305 .and_then(|v| v.as_object())
306 .cloned()
307 .unwrap_or_default();
308
309 match tool_name {
310 "ingest_triples" => self.call_ingest_triples(request.id, &arguments).await,
311 "ingest_file" => self.call_ingest_file(request.id, &arguments).await,
312 "sparql_query" => self.call_sparql_query(request.id, &arguments).await,
313 "hybrid_search" => self.call_hybrid_search(request.id, &arguments).await,
314 "apply_reasoning" => self.call_apply_reasoning(request.id, &arguments).await,
315 "get_neighbors" => self.call_get_neighbors(request.id, &arguments).await,
316 "list_triples" => self.call_list_triples(request.id, &arguments).await,
317 "delete_namespace" => self.call_delete_namespace(request.id, &arguments).await,
318 "ingest_url" => self.call_ingest_url(request.id, &arguments).await,
319 "ingest_text" => self.call_ingest_text(request.id, &arguments).await,
320 "compact_vectors" => self.call_compact_vectors(request.id, &arguments).await,
321 "vector_stats" => self.call_vector_stats(request.id, &arguments).await,
322 "disambiguate" => self.call_disambiguate(request.id, &arguments).await,
323 _ => self.error_response(request.id, -32602, &format!("Unknown tool: {}", tool_name)),
324 }
325 }
326
327 async fn call_ingest_triples(
328 &self,
329 id: Option<serde_json::Value>,
330 args: &serde_json::Map<String, serde_json::Value>,
331 ) -> McpResponse {
332 let namespace = args
333 .get("namespace")
334 .and_then(|v| v.as_str())
335 .unwrap_or("default");
336 let triples_array = match args.get("triples").and_then(|v| v.as_array()) {
337 Some(t) => t,
338 None => return self.error_response(id, -32602, "Missing 'triples' array"),
339 };
340
341 let mut triples = Vec::new();
342 for t in triples_array {
343 if let (Some(s), Some(p), Some(o)) = (
344 t.get("subject").and_then(|v| v.as_str()),
345 t.get("predicate").and_then(|v| v.as_str()),
346 t.get("object").and_then(|v| v.as_str()),
347 ) {
348 triples.push(Triple {
349 subject: s.to_string(),
350 predicate: p.to_string(),
351 object: o.to_string(),
352 provenance: Some(Provenance {
353 source: "mcp".to_string(),
354 timestamp: "".to_string(),
355 method: "tools/call".to_string(),
356 }),
357 embedding: vec![],
358 });
359 }
360 }
361
362 let req = Request::new(IngestRequest {
363 triples,
364 namespace: namespace.to_string(),
365 });
366
367 match self.engine.ingest_triples(req).await {
368 Ok(resp) => {
369 let inner = resp.into_inner();
370 self.tool_result(
371 id,
372 &format!("Ingested {} triples", inner.edges_added),
373 false,
374 )
375 }
376 Err(e) => self.tool_result(id, &e.to_string(), true),
377 }
378 }
379
380 async fn call_ingest_file(
381 &self,
382 id: Option<serde_json::Value>,
383 args: &serde_json::Map<String, serde_json::Value>,
384 ) -> McpResponse {
385 let path = match args.get("path").and_then(|v| v.as_str()) {
386 Some(p) => p,
387 None => return self.error_response(id, -32602, "Missing 'path'"),
388 };
389 let namespace = args
390 .get("namespace")
391 .and_then(|v| v.as_str())
392 .unwrap_or("default");
393
394 let req = Request::new(IngestFileRequest {
395 file_path: path.to_string(),
396 namespace: namespace.to_string(),
397 });
398
399 match self.engine.ingest_file(req).await {
400 Ok(resp) => {
401 let inner = resp.into_inner();
402 self.tool_result(
403 id,
404 &format!("Ingested {} triples from {}", inner.edges_added, path),
405 false,
406 )
407 }
408 Err(e) => self.tool_result(id, &e.to_string(), true),
409 }
410 }
411
412 async fn call_sparql_query(
413 &self,
414 id: Option<serde_json::Value>,
415 args: &serde_json::Map<String, serde_json::Value>,
416 ) -> McpResponse {
417 let query = match args.get("query").and_then(|v| v.as_str()) {
418 Some(q) => q,
419 None => return self.error_response(id, -32602, "Missing 'query'"),
420 };
421 let namespace = args
422 .get("namespace")
423 .and_then(|v| v.as_str())
424 .unwrap_or("default");
425
426 let req = Request::new(SparqlRequest {
427 query: query.to_string(),
428 namespace: namespace.to_string(),
429 });
430
431 match self.engine.query_sparql(req).await {
432 Ok(resp) => self.tool_result(id, &resp.into_inner().results_json, false),
433 Err(e) => self.tool_result(id, &e.to_string(), true),
434 }
435 }
436
437 async fn call_hybrid_search(
438 &self,
439 id: Option<serde_json::Value>,
440 args: &serde_json::Map<String, serde_json::Value>,
441 ) -> McpResponse {
442 let query = match args.get("query").and_then(|v| v.as_str()) {
443 Some(q) => q,
444 None => return self.error_response(id, -32602, "Missing 'query'"),
445 };
446 let namespace = args
447 .get("namespace")
448 .and_then(|v| v.as_str())
449 .unwrap_or("default");
450 let vector_k = args.get("vector_k").and_then(|v| v.as_u64()).unwrap_or(10) as u32;
451 let graph_depth = args
452 .get("graph_depth")
453 .and_then(|v| v.as_u64())
454 .unwrap_or(1) as u32;
455 let limit = args.get("limit").and_then(|v| v.as_u64()).unwrap_or(20) as u32;
456
457 let req = Request::new(HybridSearchRequest {
458 query: query.to_string(),
459 namespace: namespace.to_string(),
460 vector_k,
461 graph_depth,
462 mode: SearchMode::Hybrid as i32,
463 limit,
464 });
465
466 match self.engine.hybrid_search(req).await {
467 Ok(resp) => {
468 let results = resp.into_inner().results;
469 let json_results: Vec<serde_json::Value> = results
471 .iter()
472 .map(|r| {
473 serde_json::json!({
474 "node_id": r.node_id,
475 "score": r.score,
476 "content": r.content,
477 "uri": r.uri
478 })
479 })
480 .collect();
481 let json = serde_json::to_string_pretty(&json_results).unwrap_or_default();
482 self.tool_result(id, &json, false)
483 }
484 Err(e) => self.tool_result(id, &e.to_string(), true),
485 }
486 }
487
488 async fn call_apply_reasoning(
489 &self,
490 id: Option<serde_json::Value>,
491 args: &serde_json::Map<String, serde_json::Value>,
492 ) -> McpResponse {
493 let namespace = args
494 .get("namespace")
495 .and_then(|v| v.as_str())
496 .unwrap_or("default");
497 let strategy_str = args
498 .get("strategy")
499 .and_then(|v| v.as_str())
500 .unwrap_or("rdfs");
501 let materialize = args
502 .get("materialize")
503 .and_then(|v| v.as_bool())
504 .unwrap_or(false);
505
506 let strategy = match strategy_str.to_lowercase().as_str() {
507 "owlrl" | "owl-rl" => ReasoningStrategy::Owlrl as i32,
508 _ => ReasoningStrategy::Rdfs as i32,
509 };
510
511 let req = Request::new(ReasoningRequest {
512 namespace: namespace.to_string(),
513 strategy,
514 materialize,
515 });
516
517 match self.engine.apply_reasoning(req).await {
518 Ok(resp) => {
519 let inner = resp.into_inner();
520 self.tool_result(id, &inner.message, !inner.success)
521 }
522 Err(e) => self.tool_result(id, &e.to_string(), true),
523 }
524 }
525
526 async fn call_get_neighbors(
527 &self,
528 id: Option<serde_json::Value>,
529 args: &serde_json::Map<String, serde_json::Value>,
530 ) -> McpResponse {
531 let uri = match args.get("uri").and_then(|v| v.as_str()) {
532 Some(u) => u,
533 None => return self.error_response(id, -32602, "Missing 'uri'"),
534 };
535 let namespace = args
536 .get("namespace")
537 .and_then(|v| v.as_str())
538 .unwrap_or("default");
539 let direction = args
540 .get("direction")
541 .and_then(|v| v.as_str())
542 .unwrap_or("outgoing");
543
544 let store = match self.engine.get_store(namespace) {
545 Ok(s) => s,
546 Err(e) => return self.tool_result(id, &e.to_string(), true),
547 };
548
549 let mut neighbors = Vec::new();
550
551 if direction == "outgoing" || direction == "both" {
553 if let Ok(subj) = oxigraph::model::NamedNodeRef::new(uri) {
554 for q in store
555 .store
556 .quads_for_pattern(Some(subj.into()), None, None, None)
557 .flatten()
558 {
559 neighbors.push(serde_json::json!({
560 "direction": "outgoing",
561 "predicate": q.predicate.to_string(),
562 "target": q.object.to_string()
563 }));
564 }
565 }
566 }
567
568 if direction == "incoming" || direction == "both" {
570 if let Ok(obj) = oxigraph::model::NamedNodeRef::new(uri) {
571 for q in store
572 .store
573 .quads_for_pattern(None, None, Some(obj.into()), None)
574 .flatten()
575 {
576 neighbors.push(serde_json::json!({
577 "direction": "incoming",
578 "predicate": q.predicate.to_string(),
579 "source": q.subject.to_string()
580 }));
581 }
582 }
583 }
584
585 let json = serde_json::to_string_pretty(&neighbors).unwrap_or_default();
586 self.tool_result(id, &json, false)
587 }
588
589 async fn call_list_triples(
590 &self,
591 id: Option<serde_json::Value>,
592 args: &serde_json::Map<String, serde_json::Value>,
593 ) -> McpResponse {
594 let namespace = args
595 .get("namespace")
596 .and_then(|v| v.as_str())
597 .unwrap_or("default");
598 let limit = args.get("limit").and_then(|v| v.as_u64()).unwrap_or(100) as usize;
599
600 let store = match self.engine.get_store(namespace) {
601 Ok(s) => s,
602 Err(e) => return self.tool_result(id, &e.to_string(), true),
603 };
604
605 let mut triples = Vec::new();
606 for q in store.store.iter().take(limit).flatten() {
607 triples.push(serde_json::json!({
608 "subject": q.subject.to_string(),
609 "predicate": q.predicate.to_string(),
610 "object": q.object.to_string()
611 }));
612 }
613
614 let json = serde_json::to_string_pretty(&triples).unwrap_or_default();
615 self.tool_result(id, &json, false)
616 }
617
618 async fn call_delete_namespace(
619 &self,
620 id: Option<serde_json::Value>,
621 args: &serde_json::Map<String, serde_json::Value>,
622 ) -> McpResponse {
623 let namespace = match args.get("namespace").and_then(|v| v.as_str()) {
624 Some(n) => n,
625 None => return self.error_response(id, -32602, "Missing 'namespace'"),
626 };
627
628 let req = Request::new(crate::server::proto::EmptyRequest {
629 namespace: namespace.to_string(),
630 });
631
632 match self.engine.delete_namespace_data(req).await {
633 Ok(resp) => {
634 let inner = resp.into_inner();
635 self.tool_result(id, &inner.message, !inner.success)
636 }
637 Err(e) => self.tool_result(id, &e.to_string(), true),
638 }
639 }
640
641 async fn call_ingest_url(
642 &self,
643 id: Option<serde_json::Value>,
644 args: &serde_json::Map<String, serde_json::Value>,
645 ) -> McpResponse {
646 let url = match args.get("url").and_then(|v| v.as_str()) {
647 Some(u) => u,
648 None => return self.error_response(id, -32602, "Missing 'url'"),
649 };
650 let namespace = args
651 .get("namespace")
652 .and_then(|v| v.as_str())
653 .unwrap_or("default");
654
655 let client = reqwest::Client::new();
657 let response = match client.get(url).send().await {
658 Ok(r) => r,
659 Err(e) => return self.tool_result(id, &format!("Failed to fetch URL: {}", e), true),
660 };
661
662 if !response.status().is_success() {
663 return self.tool_result(id, &format!("HTTP error: {}", response.status()), true);
664 }
665
666 let html = match response.text().await {
667 Ok(t) => t,
668 Err(e) => {
669 return self.tool_result(id, &format!("Failed to read response: {}", e), true)
670 }
671 };
672
673 let script_re = regex::Regex::new(r"(?s)<script.*?>.*?</script>").unwrap();
675 let style_re = regex::Regex::new(r"(?s)<style.*?>.*?</style>").unwrap();
676 let tag_re = regex::Regex::new(r"<[^>]*>").unwrap();
677
678 let no_script = script_re.replace_all(&html, " ");
679 let no_style = style_re.replace_all(&no_script, " ");
680 let text_content = tag_re.replace_all(&no_style, " ");
681
682 let text = text_content
683 .split_whitespace()
684 .collect::<Vec<_>>()
685 .join(" ");
686
687 let processor = crate::processor::TextProcessor::new();
689 let chunks = processor.chunk_text(&text, 1000, 150);
690
691 let store = match self.engine.get_store(namespace) {
693 Ok(s) => s,
694 Err(e) => return self.tool_result(id, &e.to_string(), true),
695 };
696
697 if let Some(ref vector_store) = store.vector_store {
698 let mut added_chunks = 0;
699 for (i, chunk) in chunks.iter().enumerate() {
700 let chunk_uri = format!("{}#chunk-{}", url, i);
701 match vector_store.add(&chunk_uri, chunk).await {
702 Ok(_) => added_chunks += 1,
703 Err(e) => {
704 eprintln!("Failed to add chunk {}: {}", i, e);
705 }
706 }
707 }
708 self.tool_result(
709 id,
710 &format!(
711 "Ingested URL: {} ({} chars, {} chunks)",
712 url,
713 text.len(),
714 added_chunks
715 ),
716 false,
717 )
718 } else {
719 self.tool_result(id, "Vector store not available", true)
720 }
721 }
722
723 async fn call_ingest_text(
724 &self,
725 id: Option<serde_json::Value>,
726 args: &serde_json::Map<String, serde_json::Value>,
727 ) -> McpResponse {
728 let uri = match args.get("uri").and_then(|v| v.as_str()) {
729 Some(u) => u,
730 None => return self.error_response(id, -32602, "Missing 'uri'"),
731 };
732 let content = match args.get("content").and_then(|v| v.as_str()) {
733 Some(c) => c,
734 None => return self.error_response(id, -32602, "Missing 'content'"),
735 };
736 let namespace = args
737 .get("namespace")
738 .and_then(|v| v.as_str())
739 .unwrap_or("default");
740
741 let processor = crate::processor::TextProcessor::new();
743 let chunks = processor.chunk_text(&content, 1000, 150);
744
745 let store = match self.engine.get_store(namespace) {
747 Ok(s) => s,
748 Err(e) => return self.tool_result(id, &e.to_string(), true),
749 };
750
751 if let Some(ref vector_store) = store.vector_store {
752 let mut added_chunks = 0;
753 for (i, chunk) in chunks.iter().enumerate() {
754 let chunk_uri = if chunks.len() > 1 {
755 format!("{}#chunk-{}", uri, i)
756 } else {
757 uri.to_string()
758 };
759 match vector_store.add(&chunk_uri, chunk).await {
760 Ok(_) => added_chunks += 1,
761 Err(e) => {
762 eprintln!("Failed to add chunk {}: {}", i, e);
763 }
764 }
765 }
766 self.tool_result(
767 id,
768 &format!(
769 "Ingested text: {} ({} chars, {} chunks)",
770 uri,
771 content.len(),
772 added_chunks
773 ),
774 false,
775 )
776 } else {
777 self.tool_result(id, "Vector store not available", true)
778 }
779 }
780
781 async fn call_compact_vectors(
782 &self,
783 id: Option<serde_json::Value>,
784 args: &serde_json::Map<String, serde_json::Value>,
785 ) -> McpResponse {
786 let namespace = args
787 .get("namespace")
788 .and_then(|v| v.as_str())
789 .unwrap_or("default");
790
791 let store = match self.engine.get_store(namespace) {
792 Ok(s) => s,
793 Err(e) => return self.tool_result(id, &e.to_string(), true),
794 };
795
796 if let Some(ref vector_store) = store.vector_store {
797 match vector_store.compact() {
798 Ok(removed) => self.tool_result(
799 id,
800 &format!("Compaction complete: {} stale entries removed", removed),
801 false,
802 ),
803 Err(e) => self.tool_result(id, &format!("Compaction error: {}", e), true),
804 }
805 } else {
806 self.tool_result(id, "Vector store not available", true)
807 }
808 }
809
810 async fn call_vector_stats(
811 &self,
812 id: Option<serde_json::Value>,
813 args: &serde_json::Map<String, serde_json::Value>,
814 ) -> McpResponse {
815 let namespace = args
816 .get("namespace")
817 .and_then(|v| v.as_str())
818 .unwrap_or("default");
819
820 let store = match self.engine.get_store(namespace) {
821 Ok(s) => s,
822 Err(e) => return self.tool_result(id, &e.to_string(), true),
823 };
824
825 if let Some(ref vector_store) = store.vector_store {
826 let (active, stale, total) = vector_store.stats();
827 let msg = format!(
828 "Vector store stats:\n Active: {}\n Stale: {}\n Total embeddings: {}",
829 active, stale, total
830 );
831 self.tool_result(id, &msg, false)
832 } else {
833 self.tool_result(id, "Vector store not available", true)
834 }
835 }
836
837 async fn call_disambiguate(
838 &self,
839 id: Option<serde_json::Value>,
840 args: &serde_json::Map<String, serde_json::Value>,
841 ) -> McpResponse {
842 let namespace = args
843 .get("namespace")
844 .and_then(|v| v.as_str())
845 .unwrap_or("default");
846 let threshold = args
847 .get("threshold")
848 .and_then(|v| v.as_f64())
849 .unwrap_or(0.8);
850
851 let store = match self.engine.get_store(namespace) {
852 Ok(s) => s,
853 Err(e) => return self.tool_result(id, &e.to_string(), true),
854 };
855
856 let uri_map = store.uri_to_id.read().unwrap();
858 let uris: Vec<String> = uri_map.keys().cloned().collect();
859 drop(uri_map);
860
861 let disambiguator = crate::disambiguation::EntityDisambiguator::new(threshold);
862 let suggestions = disambiguator.suggest_merges(&uris);
863
864 if suggestions.is_empty() {
865 self.tool_result(id, "No similar entities found above threshold", false)
866 } else {
867 let mut msg = format!("Found {} potential duplicates:\n", suggestions.len());
868 for (uri1, uri2, sim) in suggestions.iter().take(20) {
869 msg.push_str(&format!(" {:.2}%: {} <-> {}\n", sim * 100.0, uri1, uri2));
870 }
871 if suggestions.len() > 20 {
872 msg.push_str(&format!(" ... and {} more\n", suggestions.len() - 20));
873 }
874 self.tool_result(id, &msg, false)
875 }
876 }
877
878 async fn handle_legacy_ingest(&self, request: McpRequest) -> McpResponse {
880 let params = match request.params {
881 Some(p) => p,
882 None => return self.error_response(request.id, -32602, "Invalid params"),
883 };
884
885 if let (Some(sub), Some(pred), Some(obj)) = (
886 params.get("subject").and_then(|v| v.as_str()),
887 params.get("predicate").and_then(|v| v.as_str()),
888 params.get("object").and_then(|v| v.as_str()),
889 ) {
890 let namespace = params
891 .get("namespace")
892 .and_then(|v| v.as_str())
893 .unwrap_or("default");
894 let triple = Triple {
895 subject: sub.to_string(),
896 predicate: pred.to_string(),
897 object: obj.to_string(),
898 provenance: Some(Provenance {
899 source: "mcp".to_string(),
900 timestamp: "".to_string(),
901 method: "stdio".to_string(),
902 }),
903 embedding: vec![],
904 };
905
906 let req = Request::new(IngestRequest {
907 triples: vec![triple],
908 namespace: namespace.to_string(),
909 });
910
911 match self.engine.ingest_triples(req).await {
912 Ok(_) => McpResponse {
913 jsonrpc: "2.0".to_string(),
914 id: request.id,
915 result: Some(serde_json::to_value("Ingested").unwrap()),
916 error: None,
917 },
918 Err(e) => self.error_response(request.id, -32000, &e.to_string()),
919 }
920 } else {
921 self.error_response(request.id, -32602, "Invalid params")
922 }
923 }
924
925 async fn handle_legacy_ingest_file(&self, request: McpRequest) -> McpResponse {
926 let params = match request.params {
927 Some(p) => p,
928 None => {
929 return self.error_response(request.id, -32602, "Invalid params: 'path' required")
930 }
931 };
932
933 if let Some(path) = params.get("path").and_then(|v| v.as_str()) {
934 let namespace = params
935 .get("namespace")
936 .and_then(|v| v.as_str())
937 .unwrap_or("default");
938
939 let req = Request::new(IngestFileRequest {
940 file_path: path.to_string(),
941 namespace: namespace.to_string(),
942 });
943
944 match self.engine.ingest_file(req).await {
945 Ok(resp) => {
946 let inner = resp.into_inner();
947 McpResponse {
948 jsonrpc: "2.0".to_string(),
949 id: request.id,
950 result: Some(
951 serde_json::to_value(format!(
952 "Ingested {} triples from {}",
953 inner.edges_added, path
954 ))
955 .unwrap(),
956 ),
957 error: None,
958 }
959 }
960 Err(e) => self.error_response(request.id, -32000, &e.to_string()),
961 }
962 } else {
963 self.error_response(request.id, -32602, "Invalid params: 'path' required")
964 }
965 }
966
967 fn error_response(
968 &self,
969 id: Option<serde_json::Value>,
970 code: i32,
971 message: &str,
972 ) -> McpResponse {
973 McpResponse {
974 jsonrpc: "2.0".to_string(),
975 id,
976 result: None,
977 error: Some(McpError {
978 code,
979 message: message.to_string(),
980 data: None,
981 }),
982 }
983 }
984
985 fn tool_result(
986 &self,
987 id: Option<serde_json::Value>,
988 text: &str,
989 is_error: bool,
990 ) -> McpResponse {
991 let result = CallToolResult {
992 content: vec![Content {
993 content_type: "text".to_string(),
994 text: text.to_string(),
995 }],
996 is_error: if is_error { Some(true) } else { None },
997 };
998 McpResponse {
999 jsonrpc: "2.0".to_string(),
1000 id,
1001 result: Some(serde_json::to_value(result).unwrap()),
1002 error: None,
1003 }
1004 }
1005}