1use crate::server::MySemanticEngine;
2use crate::server::proto::semantic_engine_server::SemanticEngine;
3use crate::server::proto::{
4 IngestRequest, IngestFileRequest, Triple, Provenance,
5 SparqlRequest, HybridSearchRequest, ReasoningRequest,
6 SearchMode, ReasoningStrategy,
7};
8use crate::mcp_types::{McpRequest, McpResponse, McpError, Tool, ListToolsResult, CallToolResult, Content};
9use std::sync::Arc;
10use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
11use tonic::Request;
12
13pub struct McpStdioServer {
14 engine: Arc<MySemanticEngine>,
15}
16
17impl McpStdioServer {
18 pub fn new(engine: Arc<MySemanticEngine>) -> Self {
19 Self { engine }
20 }
21
22 pub async fn run(&self) -> Result<(), Box<dyn std::error::Error>> {
23 let mut reader = BufReader::new(tokio::io::stdin());
24 let mut writer = tokio::io::stdout();
25
26 loop {
27 let mut line = String::new();
28 if reader.read_line(&mut line).await? == 0 {
29 break;
30 }
31
32 let trimmed = line.trim();
33 if trimmed.is_empty() {
34 continue;
35 }
36
37 if let Ok(request) = serde_json::from_str::<McpRequest>(trimmed) {
38 let response = self.handle_request(request).await;
39 let response_json = serde_json::to_string(&response)? + "\n";
40 writer.write_all(response_json.as_bytes()).await?;
41 writer.flush().await?;
42 }
43 }
44
45 Ok(())
46 }
47
48 fn get_tools() -> Vec<Tool> {
49 vec![
50 Tool {
51 name: "ingest_triples".to_string(),
52 description: Some("Ingest one or more RDF triples into the knowledge graph".to_string()),
53 input_schema: serde_json::json!({
54 "type": "object",
55 "properties": {
56 "triples": {
57 "type": "array",
58 "items": {
59 "type": "object",
60 "properties": {
61 "subject": { "type": "string" },
62 "predicate": { "type": "string" },
63 "object": { "type": "string" }
64 },
65 "required": ["subject", "predicate", "object"]
66 }
67 },
68 "namespace": { "type": "string", "default": "default" }
69 },
70 "required": ["triples"]
71 }),
72 },
73 Tool {
74 name: "ingest_file".to_string(),
75 description: Some("Ingest a CSV or Markdown file into the knowledge graph".to_string()),
76 input_schema: serde_json::json!({
77 "type": "object",
78 "properties": {
79 "path": { "type": "string", "description": "Path to the file" },
80 "namespace": { "type": "string", "default": "default" }
81 },
82 "required": ["path"]
83 }),
84 },
85 Tool {
86 name: "sparql_query".to_string(),
87 description: Some("Execute a SPARQL query against the knowledge graph".to_string()),
88 input_schema: serde_json::json!({
89 "type": "object",
90 "properties": {
91 "query": { "type": "string", "description": "SPARQL query string" },
92 "namespace": { "type": "string", "default": "default" }
93 },
94 "required": ["query"]
95 }),
96 },
97 Tool {
98 name: "hybrid_search".to_string(),
99 description: Some("Perform a hybrid vector + graph search".to_string()),
100 input_schema: serde_json::json!({
101 "type": "object",
102 "properties": {
103 "query": { "type": "string", "description": "Natural language query" },
104 "namespace": { "type": "string", "default": "default" },
105 "vector_k": { "type": "integer", "default": 10 },
106 "graph_depth": { "type": "integer", "default": 1 },
107 "limit": { "type": "integer", "default": 20 }
108 },
109 "required": ["query"]
110 }),
111 },
112 Tool {
113 name: "apply_reasoning".to_string(),
114 description: Some("Apply RDFS or OWL-RL reasoning to infer new triples".to_string()),
115 input_schema: serde_json::json!({
116 "type": "object",
117 "properties": {
118 "namespace": { "type": "string", "default": "default" },
119 "strategy": { "type": "string", "enum": ["rdfs", "owlrl"], "default": "rdfs" },
120 "materialize": { "type": "boolean", "default": false }
121 }
122 }),
123 },
124 Tool {
125 name: "get_neighbors".to_string(),
126 description: Some("Get neighboring nodes connected to a given URI in the graph".to_string()),
127 input_schema: serde_json::json!({
128 "type": "object",
129 "properties": {
130 "uri": { "type": "string", "description": "URI of the entity to find neighbors for" },
131 "namespace": { "type": "string", "default": "default" },
132 "direction": { "type": "string", "enum": ["outgoing", "incoming", "both"], "default": "outgoing" }
133 },
134 "required": ["uri"]
135 }),
136 },
137 Tool {
138 name: "list_triples".to_string(),
139 description: Some("List all triples in a namespace (useful for debugging/exploration)".to_string()),
140 input_schema: serde_json::json!({
141 "type": "object",
142 "properties": {
143 "namespace": { "type": "string", "default": "default" },
144 "limit": { "type": "integer", "default": 100 }
145 }
146 }),
147 },
148 Tool {
149 name: "delete_namespace".to_string(),
150 description: Some("Delete all data in a namespace".to_string()),
151 input_schema: serde_json::json!({
152 "type": "object",
153 "properties": {
154 "namespace": { "type": "string", "description": "Namespace to delete" }
155 },
156 "required": ["namespace"]
157 }),
158 },
159 Tool {
160 name: "ingest_url".to_string(),
161 description: Some("Scrape a web page and add its content to the vector store for RAG retrieval".to_string()),
162 input_schema: serde_json::json!({
163 "type": "object",
164 "properties": {
165 "url": { "type": "string", "description": "URL to scrape and ingest" },
166 "namespace": { "type": "string", "default": "default" }
167 },
168 "required": ["url"]
169 }),
170 },
171 Tool {
172 name: "ingest_text".to_string(),
173 description: Some("Add arbitrary text content to the vector store for RAG retrieval".to_string()),
174 input_schema: serde_json::json!({
175 "type": "object",
176 "properties": {
177 "uri": { "type": "string", "description": "Custom URI identifier for this text" },
178 "content": { "type": "string", "description": "Text content to embed and store" },
179 "namespace": { "type": "string", "default": "default" }
180 },
181 "required": ["uri", "content"]
182 }),
183 },
184 Tool {
185 name: "compact_vectors".to_string(),
186 description: Some("Compact the vector index by removing stale entries".to_string()),
187 input_schema: serde_json::json!({
188 "type": "object",
189 "properties": {
190 "namespace": { "type": "string", "default": "default" }
191 }
192 }),
193 },
194 Tool {
195 name: "vector_stats".to_string(),
196 description: Some("Get vector store statistics (active, stale, total)".to_string()),
197 input_schema: serde_json::json!({
198 "type": "object",
199 "properties": {
200 "namespace": { "type": "string", "default": "default" }
201 }
202 }),
203 },
204 Tool {
205 name: "disambiguate".to_string(),
206 description: Some("Find similar entities that might be duplicates".to_string()),
207 input_schema: serde_json::json!({
208 "type": "object",
209 "properties": {
210 "namespace": { "type": "string", "default": "default" },
211 "threshold": { "type": "number", "default": 0.8, "description": "Similarity threshold 0.0-1.0" }
212 }
213 }),
214 },
215 ]
216 }
217
218 async fn handle_request(&self, request: McpRequest) -> McpResponse {
219 match request.method.as_str() {
220 "initialize" => {
221 McpResponse {
223 jsonrpc: "2.0".to_string(),
224 id: request.id,
225 result: Some(serde_json::json!({
226 "protocolVersion": "2024-11-05",
227 "capabilities": {
228 "tools": {}
229 },
230 "serverInfo": {
231 "name": "synapse",
232 "version": "0.2.0"
233 }
234 })),
235 error: None,
236 }
237 }
238 "notifications/initialized" | "initialized" => {
239 McpResponse {
241 jsonrpc: "2.0".to_string(),
242 id: request.id,
243 result: Some(serde_json::json!({})),
244 error: None,
245 }
246 }
247 "tools/list" => {
248 let result = ListToolsResult { tools: Self::get_tools() };
249 McpResponse {
250 jsonrpc: "2.0".to_string(),
251 id: request.id,
252 result: Some(serde_json::to_value(result).unwrap()),
253 error: None,
254 }
255 }
256 "tools/call" => {
257 self.handle_tool_call(request).await
258 }
259 "ingest" => self.handle_legacy_ingest(request).await,
261 "ingest_file" => self.handle_legacy_ingest_file(request).await,
262 _ => McpResponse {
263 jsonrpc: "2.0".to_string(),
264 id: request.id,
265 result: None,
266 error: Some(McpError {
267 code: -32601,
268 message: format!("Method not found: {}", request.method),
269 data: None,
270 }),
271 },
272 }
273 }
274
275 async fn handle_tool_call(&self, request: McpRequest) -> McpResponse {
276 let params = match request.params {
277 Some(p) => p,
278 None => return self.error_response(request.id, -32602, "Missing params"),
279 };
280
281 let tool_name = match params.get("name").and_then(|v| v.as_str()) {
282 Some(n) => n,
283 None => return self.error_response(request.id, -32602, "Missing tool name"),
284 };
285
286 let arguments = params.get("arguments")
287 .and_then(|v| v.as_object())
288 .cloned()
289 .unwrap_or_default();
290
291 match tool_name {
292 "ingest_triples" => self.call_ingest_triples(request.id, &arguments).await,
293 "ingest_file" => self.call_ingest_file(request.id, &arguments).await,
294 "sparql_query" => self.call_sparql_query(request.id, &arguments).await,
295 "hybrid_search" => self.call_hybrid_search(request.id, &arguments).await,
296 "apply_reasoning" => self.call_apply_reasoning(request.id, &arguments).await,
297 "get_neighbors" => self.call_get_neighbors(request.id, &arguments).await,
298 "list_triples" => self.call_list_triples(request.id, &arguments).await,
299 "delete_namespace" => self.call_delete_namespace(request.id, &arguments).await,
300 "ingest_url" => self.call_ingest_url(request.id, &arguments).await,
301 "ingest_text" => self.call_ingest_text(request.id, &arguments).await,
302 "compact_vectors" => self.call_compact_vectors(request.id, &arguments).await,
303 "vector_stats" => self.call_vector_stats(request.id, &arguments).await,
304 "disambiguate" => self.call_disambiguate(request.id, &arguments).await,
305 _ => self.error_response(request.id, -32602, &format!("Unknown tool: {}", tool_name)),
306 }
307 }
308
309 async fn call_ingest_triples(
310 &self,
311 id: Option<serde_json::Value>,
312 args: &serde_json::Map<String, serde_json::Value>,
313 ) -> McpResponse {
314 let namespace = args.get("namespace").and_then(|v| v.as_str()).unwrap_or("default");
315 let triples_array = match args.get("triples").and_then(|v| v.as_array()) {
316 Some(t) => t,
317 None => return self.error_response(id, -32602, "Missing 'triples' array"),
318 };
319
320 let mut triples = Vec::new();
321 for t in triples_array {
322 if let (Some(s), Some(p), Some(o)) = (
323 t.get("subject").and_then(|v| v.as_str()),
324 t.get("predicate").and_then(|v| v.as_str()),
325 t.get("object").and_then(|v| v.as_str()),
326 ) {
327 triples.push(Triple {
328 subject: s.to_string(),
329 predicate: p.to_string(),
330 object: o.to_string(),
331 provenance: Some(Provenance {
332 source: "mcp".to_string(),
333 timestamp: "".to_string(),
334 method: "tools/call".to_string(),
335 }),
336 embedding: vec![],
337 });
338 }
339 }
340
341 let req = Request::new(IngestRequest {
342 triples,
343 namespace: namespace.to_string(),
344 });
345
346 match self.engine.ingest_triples(req).await {
347 Ok(resp) => {
348 let inner = resp.into_inner();
349 self.tool_result(id, &format!("Ingested {} triples", inner.edges_added), false)
350 }
351 Err(e) => self.tool_result(id, &e.to_string(), true),
352 }
353 }
354
355 async fn call_ingest_file(
356 &self,
357 id: Option<serde_json::Value>,
358 args: &serde_json::Map<String, serde_json::Value>,
359 ) -> McpResponse {
360 let path = match args.get("path").and_then(|v| v.as_str()) {
361 Some(p) => p,
362 None => return self.error_response(id, -32602, "Missing 'path'"),
363 };
364 let namespace = args.get("namespace").and_then(|v| v.as_str()).unwrap_or("default");
365
366 let req = Request::new(IngestFileRequest {
367 file_path: path.to_string(),
368 namespace: namespace.to_string(),
369 });
370
371 match self.engine.ingest_file(req).await {
372 Ok(resp) => {
373 let inner = resp.into_inner();
374 self.tool_result(id, &format!("Ingested {} triples from {}", inner.edges_added, path), false)
375 }
376 Err(e) => self.tool_result(id, &e.to_string(), true),
377 }
378 }
379
380 async fn call_sparql_query(
381 &self,
382 id: Option<serde_json::Value>,
383 args: &serde_json::Map<String, serde_json::Value>,
384 ) -> McpResponse {
385 let query = match args.get("query").and_then(|v| v.as_str()) {
386 Some(q) => q,
387 None => return self.error_response(id, -32602, "Missing 'query'"),
388 };
389 let namespace = args.get("namespace").and_then(|v| v.as_str()).unwrap_or("default");
390
391 let req = Request::new(SparqlRequest {
392 query: query.to_string(),
393 namespace: namespace.to_string(),
394 });
395
396 match self.engine.query_sparql(req).await {
397 Ok(resp) => {
398 self.tool_result(id, &resp.into_inner().results_json, false)
399 }
400 Err(e) => self.tool_result(id, &e.to_string(), true),
401 }
402 }
403
404 async fn call_hybrid_search(
405 &self,
406 id: Option<serde_json::Value>,
407 args: &serde_json::Map<String, serde_json::Value>,
408 ) -> McpResponse {
409 let query = match args.get("query").and_then(|v| v.as_str()) {
410 Some(q) => q,
411 None => return self.error_response(id, -32602, "Missing 'query'"),
412 };
413 let namespace = args.get("namespace").and_then(|v| v.as_str()).unwrap_or("default");
414 let vector_k = args.get("vector_k").and_then(|v| v.as_u64()).unwrap_or(10) as u32;
415 let graph_depth = args.get("graph_depth").and_then(|v| v.as_u64()).unwrap_or(1) as u32;
416 let limit = args.get("limit").and_then(|v| v.as_u64()).unwrap_or(20) as u32;
417
418 let req = Request::new(HybridSearchRequest {
419 query: query.to_string(),
420 namespace: namespace.to_string(),
421 vector_k,
422 graph_depth,
423 mode: SearchMode::Hybrid as i32,
424 limit,
425 });
426
427 match self.engine.hybrid_search(req).await {
428 Ok(resp) => {
429 let results = resp.into_inner().results;
430 let json_results: Vec<serde_json::Value> = results.iter().map(|r| {
432 serde_json::json!({
433 "node_id": r.node_id,
434 "score": r.score,
435 "content": r.content,
436 "uri": r.uri
437 })
438 }).collect();
439 let json = serde_json::to_string_pretty(&json_results).unwrap_or_default();
440 self.tool_result(id, &json, false)
441 }
442 Err(e) => self.tool_result(id, &e.to_string(), true),
443 }
444 }
445
446 async fn call_apply_reasoning(
447 &self,
448 id: Option<serde_json::Value>,
449 args: &serde_json::Map<String, serde_json::Value>,
450 ) -> McpResponse {
451 let namespace = args.get("namespace").and_then(|v| v.as_str()).unwrap_or("default");
452 let strategy_str = args.get("strategy").and_then(|v| v.as_str()).unwrap_or("rdfs");
453 let materialize = args.get("materialize").and_then(|v| v.as_bool()).unwrap_or(false);
454
455 let strategy = match strategy_str.to_lowercase().as_str() {
456 "owlrl" | "owl-rl" => ReasoningStrategy::Owlrl as i32,
457 _ => ReasoningStrategy::Rdfs as i32,
458 };
459
460 let req = Request::new(ReasoningRequest {
461 namespace: namespace.to_string(),
462 strategy,
463 materialize,
464 });
465
466 match self.engine.apply_reasoning(req).await {
467 Ok(resp) => {
468 let inner = resp.into_inner();
469 self.tool_result(id, &inner.message, !inner.success)
470 }
471 Err(e) => self.tool_result(id, &e.to_string(), true),
472 }
473 }
474
475 async fn call_get_neighbors(
476 &self,
477 id: Option<serde_json::Value>,
478 args: &serde_json::Map<String, serde_json::Value>,
479 ) -> McpResponse {
480 let uri = match args.get("uri").and_then(|v| v.as_str()) {
481 Some(u) => u,
482 None => return self.error_response(id, -32602, "Missing 'uri'"),
483 };
484 let namespace = args.get("namespace").and_then(|v| v.as_str()).unwrap_or("default");
485 let direction = args.get("direction").and_then(|v| v.as_str()).unwrap_or("outgoing");
486
487 let store = match self.engine.get_store(namespace) {
488 Ok(s) => s,
489 Err(e) => return self.tool_result(id, &e.to_string(), true),
490 };
491
492 let mut neighbors = Vec::new();
493
494 if direction == "outgoing" || direction == "both" {
496 if let Ok(subj) = oxigraph::model::NamedNodeRef::new(uri) {
497 for quad in store.store.quads_for_pattern(Some(subj.into()), None, None, None) {
498 if let Ok(q) = quad {
499 neighbors.push(serde_json::json!({
500 "direction": "outgoing",
501 "predicate": q.predicate.to_string(),
502 "target": q.object.to_string()
503 }));
504 }
505 }
506 }
507 }
508
509 if direction == "incoming" || direction == "both" {
511 if let Ok(obj) = oxigraph::model::NamedNodeRef::new(uri) {
512 for quad in store.store.quads_for_pattern(None, None, Some(obj.into()), None) {
513 if let Ok(q) = quad {
514 neighbors.push(serde_json::json!({
515 "direction": "incoming",
516 "predicate": q.predicate.to_string(),
517 "source": q.subject.to_string()
518 }));
519 }
520 }
521 }
522 }
523
524 let json = serde_json::to_string_pretty(&neighbors).unwrap_or_default();
525 self.tool_result(id, &json, false)
526 }
527
528 async fn call_list_triples(
529 &self,
530 id: Option<serde_json::Value>,
531 args: &serde_json::Map<String, serde_json::Value>,
532 ) -> McpResponse {
533 let namespace = args.get("namespace").and_then(|v| v.as_str()).unwrap_or("default");
534 let limit = args.get("limit").and_then(|v| v.as_u64()).unwrap_or(100) as usize;
535
536 let store = match self.engine.get_store(namespace) {
537 Ok(s) => s,
538 Err(e) => return self.tool_result(id, &e.to_string(), true),
539 };
540
541 let mut triples = Vec::new();
542 for quad in store.store.iter().take(limit) {
543 if let Ok(q) = quad {
544 triples.push(serde_json::json!({
545 "subject": q.subject.to_string(),
546 "predicate": q.predicate.to_string(),
547 "object": q.object.to_string()
548 }));
549 }
550 }
551
552 let json = serde_json::to_string_pretty(&triples).unwrap_or_default();
553 self.tool_result(id, &json, false)
554 }
555
556 async fn call_delete_namespace(
557 &self,
558 id: Option<serde_json::Value>,
559 args: &serde_json::Map<String, serde_json::Value>,
560 ) -> McpResponse {
561 let namespace = match args.get("namespace").and_then(|v| v.as_str()) {
562 Some(n) => n,
563 None => return self.error_response(id, -32602, "Missing 'namespace'"),
564 };
565
566 let req = Request::new(crate::server::proto::EmptyRequest {
567 namespace: namespace.to_string(),
568 });
569
570 match self.engine.delete_namespace_data(req).await {
571 Ok(resp) => {
572 let inner = resp.into_inner();
573 self.tool_result(id, &inner.message, !inner.success)
574 }
575 Err(e) => self.tool_result(id, &e.to_string(), true),
576 }
577 }
578
579 async fn call_ingest_url(
580 &self,
581 id: Option<serde_json::Value>,
582 args: &serde_json::Map<String, serde_json::Value>,
583 ) -> McpResponse {
584 let url = match args.get("url").and_then(|v| v.as_str()) {
585 Some(u) => u,
586 None => return self.error_response(id, -32602, "Missing 'url'"),
587 };
588 let namespace = args.get("namespace").and_then(|v| v.as_str()).unwrap_or("default");
589
590 let client = reqwest::Client::new();
592 let response = match client.get(url).send().await {
593 Ok(r) => r,
594 Err(e) => return self.tool_result(id, &format!("Failed to fetch URL: {}", e), true),
595 };
596
597 if !response.status().is_success() {
598 return self.tool_result(id, &format!("HTTP error: {}", response.status()), true);
599 }
600
601 let html = match response.text().await {
602 Ok(t) => t,
603 Err(e) => return self.tool_result(id, &format!("Failed to read response: {}", e), true),
604 };
605
606 let text = html
608 .split('<')
609 .filter_map(|s| s.split('>').nth(1))
610 .collect::<Vec<_>>()
611 .join(" ")
612 .split_whitespace()
613 .collect::<Vec<_>>()
614 .join(" ");
615
616 let processor = crate::processor::TextProcessor::new();
618 let chunks = processor.chunk_text(&text, 1000);
619
620 let store = match self.engine.get_store(namespace) {
622 Ok(s) => s,
623 Err(e) => return self.tool_result(id, &e.to_string(), true),
624 };
625
626 if let Some(ref vector_store) = store.vector_store {
627 let mut added_chunks = 0;
628 for (i, chunk) in chunks.iter().enumerate() {
629 let chunk_uri = format!("{}#chunk-{}", url, i);
630 match vector_store.add(&chunk_uri, chunk).await {
631 Ok(_) => added_chunks += 1,
632 Err(e) => {
633 eprintln!("Failed to add chunk {}: {}", i, e);
634 }
635 }
636 }
637 self.tool_result(id, &format!("Ingested URL: {} ({} chars, {} chunks)", url, text.len(), added_chunks), false)
638 } else {
639 self.tool_result(id, "Vector store not available", true)
640 }
641 }
642
643
644 async fn call_ingest_text(
645 &self,
646 id: Option<serde_json::Value>,
647 args: &serde_json::Map<String, serde_json::Value>,
648 ) -> McpResponse {
649 let uri = match args.get("uri").and_then(|v| v.as_str()) {
650 Some(u) => u,
651 None => return self.error_response(id, -32602, "Missing 'uri'"),
652 };
653 let content = match args.get("content").and_then(|v| v.as_str()) {
654 Some(c) => c,
655 None => return self.error_response(id, -32602, "Missing 'content'"),
656 };
657 let namespace = args.get("namespace").and_then(|v| v.as_str()).unwrap_or("default");
658
659 let processor = crate::processor::TextProcessor::new();
661 let chunks = processor.chunk_text(&content, 1000);
662
663 let store = match self.engine.get_store(namespace) {
665 Ok(s) => s,
666 Err(e) => return self.tool_result(id, &e.to_string(), true),
667 };
668
669 if let Some(ref vector_store) = store.vector_store {
670 let mut added_chunks = 0;
671 for (i, chunk) in chunks.iter().enumerate() {
672 let chunk_uri = if chunks.len() > 1 { format!("{}#chunk-{}", uri, i) } else { uri.to_string() };
673 match vector_store.add(&chunk_uri, chunk).await {
674 Ok(_) => added_chunks += 1,
675 Err(e) => {
676 eprintln!("Failed to add chunk {}: {}", i, e);
677 }
678 }
679 }
680 self.tool_result(id, &format!("Ingested text: {} ({} chars, {} chunks)", uri, content.len(), added_chunks), false)
681 } else {
682 self.tool_result(id, "Vector store not available", true)
683 }
684 }
685
686 async fn call_compact_vectors(
687 &self,
688 id: Option<serde_json::Value>,
689 args: &serde_json::Map<String, serde_json::Value>,
690 ) -> McpResponse {
691 let namespace = args.get("namespace").and_then(|v| v.as_str()).unwrap_or("default");
692
693 let store = match self.engine.get_store(namespace) {
694 Ok(s) => s,
695 Err(e) => return self.tool_result(id, &e.to_string(), true),
696 };
697
698 if let Some(ref vector_store) = store.vector_store {
699 match vector_store.compact() {
700 Ok(removed) => self.tool_result(id, &format!("Compaction complete: {} stale entries removed", removed), false),
701 Err(e) => self.tool_result(id, &format!("Compaction error: {}", e), true),
702 }
703 } else {
704 self.tool_result(id, "Vector store not available", true)
705 }
706 }
707
708 async fn call_vector_stats(
709 &self,
710 id: Option<serde_json::Value>,
711 args: &serde_json::Map<String, serde_json::Value>,
712 ) -> McpResponse {
713 let namespace = args.get("namespace").and_then(|v| v.as_str()).unwrap_or("default");
714
715 let store = match self.engine.get_store(namespace) {
716 Ok(s) => s,
717 Err(e) => return self.tool_result(id, &e.to_string(), true),
718 };
719
720 if let Some(ref vector_store) = store.vector_store {
721 let (active, stale, total) = vector_store.stats();
722 let msg = format!(
723 "Vector store stats:\n Active: {}\n Stale: {}\n Total embeddings: {}",
724 active, stale, total
725 );
726 self.tool_result(id, &msg, false)
727 } else {
728 self.tool_result(id, "Vector store not available", true)
729 }
730 }
731
732 async fn call_disambiguate(
733 &self,
734 id: Option<serde_json::Value>,
735 args: &serde_json::Map<String, serde_json::Value>,
736 ) -> McpResponse {
737 let namespace = args.get("namespace").and_then(|v| v.as_str()).unwrap_or("default");
738 let threshold = args.get("threshold").and_then(|v| v.as_f64()).unwrap_or(0.8);
739
740 let store = match self.engine.get_store(namespace) {
741 Ok(s) => s,
742 Err(e) => return self.tool_result(id, &e.to_string(), true),
743 };
744
745 let uri_map = store.uri_to_id.read().unwrap();
747 let uris: Vec<String> = uri_map.keys().cloned().collect();
748 drop(uri_map);
749
750 let disambiguator = crate::disambiguation::EntityDisambiguator::new(threshold);
751 let suggestions = disambiguator.suggest_merges(&uris);
752
753 if suggestions.is_empty() {
754 self.tool_result(id, "No similar entities found above threshold", false)
755 } else {
756 let mut msg = format!("Found {} potential duplicates:\n", suggestions.len());
757 for (uri1, uri2, sim) in suggestions.iter().take(20) {
758 msg.push_str(&format!(" {:.2}%: {} <-> {}\n", sim * 100.0, uri1, uri2));
759 }
760 if suggestions.len() > 20 {
761 msg.push_str(&format!(" ... and {} more\n", suggestions.len() - 20));
762 }
763 self.tool_result(id, &msg, false)
764 }
765 }
766
767 async fn handle_legacy_ingest(&self, request: McpRequest) -> McpResponse {
769 let params = match request.params {
770 Some(p) => p,
771 None => return self.error_response(request.id, -32602, "Invalid params"),
772 };
773
774 if let (Some(sub), Some(pred), Some(obj)) = (
775 params.get("subject").and_then(|v| v.as_str()),
776 params.get("predicate").and_then(|v| v.as_str()),
777 params.get("object").and_then(|v| v.as_str()),
778 ) {
779 let namespace = params.get("namespace").and_then(|v| v.as_str()).unwrap_or("default");
780 let triple = Triple {
781 subject: sub.to_string(),
782 predicate: pred.to_string(),
783 object: obj.to_string(),
784 provenance: Some(Provenance {
785 source: "mcp".to_string(),
786 timestamp: "".to_string(),
787 method: "stdio".to_string(),
788 }),
789 embedding: vec![],
790 };
791
792 let req = Request::new(IngestRequest {
793 triples: vec![triple],
794 namespace: namespace.to_string(),
795 });
796
797 match self.engine.ingest_triples(req).await {
798 Ok(_) => McpResponse {
799 jsonrpc: "2.0".to_string(),
800 id: request.id,
801 result: Some(serde_json::to_value("Ingested").unwrap()),
802 error: None,
803 },
804 Err(e) => self.error_response(request.id, -32000, &e.to_string()),
805 }
806 } else {
807 self.error_response(request.id, -32602, "Invalid params")
808 }
809 }
810
811 async fn handle_legacy_ingest_file(&self, request: McpRequest) -> McpResponse {
812 let params = match request.params {
813 Some(p) => p,
814 None => return self.error_response(request.id, -32602, "Invalid params: 'path' required"),
815 };
816
817 if let Some(path) = params.get("path").and_then(|v| v.as_str()) {
818 let namespace = params.get("namespace").and_then(|v| v.as_str()).unwrap_or("default");
819
820 let req = Request::new(IngestFileRequest {
821 file_path: path.to_string(),
822 namespace: namespace.to_string(),
823 });
824
825 match self.engine.ingest_file(req).await {
826 Ok(resp) => {
827 let inner = resp.into_inner();
828 McpResponse {
829 jsonrpc: "2.0".to_string(),
830 id: request.id,
831 result: Some(serde_json::to_value(format!(
832 "Ingested {} triples from {}",
833 inner.edges_added, path
834 )).unwrap()),
835 error: None,
836 }
837 }
838 Err(e) => self.error_response(request.id, -32000, &e.to_string()),
839 }
840 } else {
841 self.error_response(request.id, -32602, "Invalid params: 'path' required")
842 }
843 }
844
845 fn error_response(&self, id: Option<serde_json::Value>, code: i32, message: &str) -> McpResponse {
846 McpResponse {
847 jsonrpc: "2.0".to_string(),
848 id,
849 result: None,
850 error: Some(McpError {
851 code,
852 message: message.to_string(),
853 data: None,
854 }),
855 }
856 }
857
858 fn tool_result(&self, id: Option<serde_json::Value>, text: &str, is_error: bool) -> McpResponse {
859 let result = CallToolResult {
860 content: vec![Content {
861 content_type: "text".to_string(),
862 text: text.to_string(),
863 }],
864 is_error: if is_error { Some(true) } else { None },
865 };
866 McpResponse {
867 jsonrpc: "2.0".to_string(),
868 id,
869 result: Some(serde_json::to_value(result).unwrap()),
870 error: None,
871 }
872 }
873}