1use crate::mcp_types::{
2 CallToolResult, Content, DegreeResult, DisambiguationItem, DisambiguationResult,
3 IngestToolResult, ListToolsResult, McpError, McpRequest, McpResponse, NeighborItem,
4 NeighborsToolResult, ReasoningToolResult, SearchResultItem, SearchToolResult,
5 SimpleSuccessResult, StatsToolResult, Tool, TripleItem, TriplesToolResult,
6};
7use crate::server::proto::semantic_engine_server::SemanticEngine;
8use crate::server::proto::{
9 HybridSearchRequest, IngestFileRequest, IngestRequest, Provenance, ReasoningRequest,
10 ReasoningStrategy, SearchMode, SparqlRequest, Triple,
11};
12use crate::server::MySemanticEngine;
13use jsonschema::JSONSchema;
14use std::sync::Arc;
15use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
16use tonic::Request;
17
18pub struct McpStdioServer {
19 engine: Arc<MySemanticEngine>,
20}
21
22impl McpStdioServer {
23 pub fn new(engine: Arc<MySemanticEngine>) -> Self {
24 Self { engine }
25 }
26
27 pub async fn run(&self) -> Result<(), Box<dyn std::error::Error>> {
28 let mut reader = BufReader::new(tokio::io::stdin());
29 let mut writer = tokio::io::stdout();
30
31 loop {
32 let mut line = String::new();
33 if reader.read_line(&mut line).await? == 0 {
34 break;
35 }
36
37 let trimmed = line.trim();
38 if trimmed.is_empty() {
39 continue;
40 }
41
42 if let Ok(request) = serde_json::from_str::<McpRequest>(trimmed) {
43 let response = self.handle_request(request).await;
44 let response_json = serde_json::to_string(&response)? + "\n";
45 writer.write_all(response_json.as_bytes()).await?;
46 writer.flush().await?;
47 }
48 }
49
50 Ok(())
51 }
52
53 fn get_tools() -> Vec<Tool> {
54 vec![
55 Tool {
56 name: "ingest_triples".to_string(),
57 description: Some(
58 "Ingest one or more RDF triples into the knowledge graph".to_string(),
59 ),
60 input_schema: serde_json::json!({
61 "type": "object",
62 "properties": {
63 "triples": {
64 "type": "array",
65 "items": {
66 "type": "object",
67 "properties": {
68 "subject": { "type": "string" },
69 "predicate": { "type": "string" },
70 "object": { "type": "string" }
71 },
72 "required": ["subject", "predicate", "object"]
73 }
74 },
75 "namespace": { "type": "string", "default": "default" }
76 },
77 "required": ["triples"]
78 }),
79 },
80 Tool {
81 name: "ingest_file".to_string(),
82 description: Some(
83 "Ingest a CSV or Markdown file into the knowledge graph".to_string(),
84 ),
85 input_schema: serde_json::json!({
86 "type": "object",
87 "properties": {
88 "path": { "type": "string", "description": "Path to the file" },
89 "namespace": { "type": "string", "default": "default" }
90 },
91 "required": ["path"]
92 }),
93 },
94 Tool {
95 name: "sparql_query".to_string(),
96 description: Some("Execute a SPARQL query against the knowledge graph".to_string()),
97 input_schema: serde_json::json!({
98 "type": "object",
99 "properties": {
100 "query": { "type": "string", "description": "SPARQL query string" },
101 "namespace": { "type": "string", "default": "default" }
102 },
103 "required": ["query"]
104 }),
105 },
106 Tool {
107 name: "hybrid_search".to_string(),
108 description: Some("Perform a hybrid vector + graph search".to_string()),
109 input_schema: serde_json::json!({
110 "type": "object",
111 "properties": {
112 "query": { "type": "string", "description": "Natural language query" },
113 "namespace": { "type": "string", "default": "default" },
114 "vector_k": { "type": "integer", "default": 10 },
115 "graph_depth": { "type": "integer", "default": 1 },
116 "limit": { "type": "integer", "default": 20 }
117 },
118 "required": ["query"]
119 }),
120 },
121 Tool {
122 name: "apply_reasoning".to_string(),
123 description: Some(
124 "Apply RDFS or OWL-RL reasoning to infer new triples".to_string(),
125 ),
126 input_schema: serde_json::json!({
127 "type": "object",
128 "properties": {
129 "namespace": { "type": "string", "default": "default" },
130 "strategy": { "type": "string", "enum": ["rdfs", "owlrl"], "default": "rdfs" },
131 "materialize": { "type": "boolean", "default": false }
132 }
133 }),
134 },
135 Tool {
136 name: "get_neighbors".to_string(),
137 description: Some(
138 "Get neighboring nodes connected to a given URI in the graph".to_string(),
139 ),
140 input_schema: serde_json::json!({
141 "type": "object",
142 "properties": {
143 "uri": { "type": "string", "description": "URI of the entity to find neighbors for" },
144 "namespace": { "type": "string", "default": "default" },
145 "direction": { "type": "string", "enum": ["outgoing", "incoming", "both"], "default": "outgoing" }
146 },
147 "required": ["uri"]
148 }),
149 },
150 Tool {
151 name: "list_triples".to_string(),
152 description: Some(
153 "List all triples in a namespace (useful for debugging/exploration)"
154 .to_string(),
155 ),
156 input_schema: serde_json::json!({
157 "type": "object",
158 "properties": {
159 "namespace": { "type": "string", "default": "default" },
160 "limit": { "type": "integer", "default": 100 }
161 }
162 }),
163 },
164 Tool {
165 name: "delete_namespace".to_string(),
166 description: Some("Delete all data in a namespace".to_string()),
167 input_schema: serde_json::json!({
168 "type": "object",
169 "properties": {
170 "namespace": { "type": "string", "description": "Namespace to delete" }
171 },
172 "required": ["namespace"]
173 }),
174 },
175 Tool {
176 name: "ingest_url".to_string(),
177 description: Some(
178 "Scrape a web page and add its content to the vector store for RAG retrieval"
179 .to_string(),
180 ),
181 input_schema: serde_json::json!({
182 "type": "object",
183 "properties": {
184 "url": { "type": "string", "description": "URL to scrape and ingest" },
185 "namespace": { "type": "string", "default": "default" }
186 },
187 "required": ["url"]
188 }),
189 },
190 Tool {
191 name: "ingest_text".to_string(),
192 description: Some(
193 "Add arbitrary text content to the vector store for RAG retrieval".to_string(),
194 ),
195 input_schema: serde_json::json!({
196 "type": "object",
197 "properties": {
198 "uri": { "type": "string", "description": "Custom URI identifier for this text" },
199 "content": { "type": "string", "description": "Text content to embed and store" },
200 "namespace": { "type": "string", "default": "default" }
201 },
202 "required": ["uri", "content"]
203 }),
204 },
205 Tool {
206 name: "compact_vectors".to_string(),
207 description: Some("Compact the vector index by removing stale entries".to_string()),
208 input_schema: serde_json::json!({
209 "type": "object",
210 "properties": {
211 "namespace": { "type": "string", "default": "default" }
212 }
213 }),
214 },
215 Tool {
216 name: "vector_stats".to_string(),
217 description: Some("Get vector store statistics (active, stale, total)".to_string()),
218 input_schema: serde_json::json!({
219 "type": "object",
220 "properties": {
221 "namespace": { "type": "string", "default": "default" }
222 }
223 }),
224 },
225 Tool {
226 name: "disambiguate".to_string(),
227 description: Some("Find similar entities that might be duplicates".to_string()),
228 input_schema: serde_json::json!({
229 "type": "object",
230 "properties": {
231 "namespace": { "type": "string", "default": "default" },
232 "threshold": { "type": "number", "default": 0.8, "description": "Similarity threshold 0.0-1.0" }
233 }
234 }),
235 },
236 Tool {
237 name: "get_node_degree".to_string(),
238 description: Some("Get the degree (number of connections) of a node".to_string()),
239 input_schema: serde_json::json!({
240 "type": "object",
241 "properties": {
242 "uri": { "type": "string" },
243 "namespace": { "type": "string", "default": "default" }
244 },
245 "required": ["uri"]
246 }),
247 },
248 ]
249 }
250
251 pub async fn handle_request(&self, request: McpRequest) -> McpResponse {
252 match request.method.as_str() {
253 "initialize" => {
254 McpResponse {
256 jsonrpc: "2.0".to_string(),
257 id: request.id,
258 result: Some(serde_json::json!({
259 "protocolVersion": "2024-11-05",
260 "capabilities": {
261 "tools": {}
262 },
263 "serverInfo": {
264 "name": "synapse",
265 "version": env!("CARGO_PKG_VERSION")
266 }
267 })),
268 error: None,
269 }
270 }
271 "notifications/initialized" | "initialized" => {
272 McpResponse {
274 jsonrpc: "2.0".to_string(),
275 id: request.id,
276 result: Some(serde_json::json!({})),
277 error: None,
278 }
279 }
280 "tools/list" => {
281 let result = ListToolsResult {
282 tools: Self::get_tools(),
283 };
284 McpResponse {
285 jsonrpc: "2.0".to_string(),
286 id: request.id,
287 result: Some(serde_json::to_value(result).unwrap()),
288 error: None,
289 }
290 }
291 "tools/call" => self.handle_tool_call(request).await,
292 "ingest" => self.handle_legacy_ingest(request).await,
294 "ingest_file" => self.handle_legacy_ingest_file(request).await,
295 _ => McpResponse {
296 jsonrpc: "2.0".to_string(),
297 id: request.id,
298 result: None,
299 error: Some(McpError {
300 code: -32601,
301 message: format!("Method not found: {}", request.method),
302 data: None,
303 }),
304 },
305 }
306 }
307
308 fn validate_arguments(tool_name: &str, arguments: &serde_json::Value) -> Result<(), String> {
309 let tools = Self::get_tools();
310 if let Some(tool) = tools.iter().find(|t| t.name == tool_name) {
311 if let Ok(schema) = JSONSchema::compile(&tool.input_schema) {
312 if let Err(errors) = schema.validate(arguments) {
313 let error_msg = errors
314 .map(|e| e.to_string())
315 .collect::<Vec<_>>()
316 .join(", ");
317 return Err(format!("Validation error: {}", error_msg));
318 }
319 } else {
320 return Err("Invalid tool schema definition".to_string());
321 }
322 }
323 Ok(())
324 }
325
326 async fn handle_tool_call(&self, request: McpRequest) -> McpResponse {
327 let params = match request.params {
328 Some(p) => p,
329 None => return self.error_response(request.id, -32602, "Missing params"),
330 };
331
332 let tool_name = match params.get("name").and_then(|v| v.as_str()) {
333 Some(n) => n,
334 None => return self.error_response(request.id, -32602, "Missing tool name"),
335 };
336
337 let arguments = params
338 .get("arguments")
339 .and_then(|v| v.as_object())
340 .cloned()
341 .unwrap_or_default();
342
343 let args_value = serde_json::Value::Object(arguments.clone());
344 if let Err(e) = Self::validate_arguments(tool_name, &args_value) {
345 return self.error_response(request.id, -32602, &e);
346 }
347
348 match tool_name {
349 "ingest_triples" => self.call_ingest_triples(request.id, &arguments).await,
350 "ingest_file" => self.call_ingest_file(request.id, &arguments).await,
351 "sparql_query" => self.call_sparql_query(request.id, &arguments).await,
352 "hybrid_search" => self.call_hybrid_search(request.id, &arguments).await,
353 "apply_reasoning" => self.call_apply_reasoning(request.id, &arguments).await,
354 "get_neighbors" => self.call_get_neighbors(request.id, &arguments).await,
355 "list_triples" => self.call_list_triples(request.id, &arguments).await,
356 "delete_namespace" => self.call_delete_namespace(request.id, &arguments).await,
357 "ingest_url" => self.call_ingest_url(request.id, &arguments).await,
358 "ingest_text" => self.call_ingest_text(request.id, &arguments).await,
359 "compact_vectors" => self.call_compact_vectors(request.id, &arguments).await,
360 "vector_stats" => self.call_vector_stats(request.id, &arguments).await,
361 "disambiguate" => self.call_disambiguate(request.id, &arguments).await,
362 "get_node_degree" => self.call_get_node_degree(request.id, &arguments).await,
363 _ => self.error_response(request.id, -32602, &format!("Unknown tool: {}", tool_name)),
364 }
365 }
366
367 async fn call_ingest_triples(
368 &self,
369 id: Option<serde_json::Value>,
370 args: &serde_json::Map<String, serde_json::Value>,
371 ) -> McpResponse {
372 let namespace = args
373 .get("namespace")
374 .and_then(|v| v.as_str())
375 .unwrap_or("default");
376 let triples_array = match args.get("triples").and_then(|v| v.as_array()) {
377 Some(t) => t,
378 None => return self.error_response(id, -32602, "Missing 'triples' array"),
379 };
380
381 let mut triples = Vec::new();
382 for t in triples_array {
383 if let (Some(s), Some(p), Some(o)) = (
384 t.get("subject").and_then(|v| v.as_str()),
385 t.get("predicate").and_then(|v| v.as_str()),
386 t.get("object").and_then(|v| v.as_str()),
387 ) {
388 triples.push(Triple {
389 subject: s.to_string(),
390 predicate: p.to_string(),
391 object: o.to_string(),
392 provenance: Some(Provenance {
393 source: "mcp".to_string(),
394 timestamp: "".to_string(),
395 method: "tools/call".to_string(),
396 }),
397 embedding: vec![],
398 });
399 }
400 }
401
402 let req = Request::new(IngestRequest {
403 triples,
404 namespace: namespace.to_string(),
405 });
406
407 match self.engine.ingest_triples(req).await {
408 Ok(resp) => {
409 let inner = resp.into_inner();
410 let result = IngestToolResult {
411 nodes_added: inner.nodes_added,
412 edges_added: inner.edges_added,
413 message: format!("Ingested {} triples", inner.edges_added),
414 };
415 self.serialize_result(id, result)
416 }
417 Err(e) => self.tool_result(id, &e.to_string(), true),
418 }
419 }
420
421 async fn call_ingest_file(
422 &self,
423 id: Option<serde_json::Value>,
424 args: &serde_json::Map<String, serde_json::Value>,
425 ) -> McpResponse {
426 let path = match args.get("path").and_then(|v| v.as_str()) {
427 Some(p) => p,
428 None => return self.error_response(id, -32602, "Missing 'path'"),
429 };
430 let namespace = args
431 .get("namespace")
432 .and_then(|v| v.as_str())
433 .unwrap_or("default");
434
435 let req = Request::new(IngestFileRequest {
436 file_path: path.to_string(),
437 namespace: namespace.to_string(),
438 });
439
440 match self.engine.ingest_file(req).await {
441 Ok(resp) => {
442 let inner = resp.into_inner();
443 let result = IngestToolResult {
444 nodes_added: inner.nodes_added,
445 edges_added: inner.edges_added,
446 message: format!("Ingested {} triples from {}", inner.edges_added, path),
447 };
448 self.serialize_result(id, result)
449 }
450 Err(e) => self.tool_result(id, &e.to_string(), true),
451 }
452 }
453
454 async fn call_sparql_query(
455 &self,
456 id: Option<serde_json::Value>,
457 args: &serde_json::Map<String, serde_json::Value>,
458 ) -> McpResponse {
459 let query = match args.get("query").and_then(|v| v.as_str()) {
460 Some(q) => q,
461 None => return self.error_response(id, -32602, "Missing 'query'"),
462 };
463 let namespace = args
464 .get("namespace")
465 .and_then(|v| v.as_str())
466 .unwrap_or("default");
467
468 let req = Request::new(SparqlRequest {
469 query: query.to_string(),
470 namespace: namespace.to_string(),
471 });
472
473 match self.engine.query_sparql(req).await {
474 Ok(resp) => self.tool_result(id, &resp.into_inner().results_json, false),
475 Err(e) => self.tool_result(id, &e.to_string(), true),
476 }
477 }
478
479 async fn call_hybrid_search(
480 &self,
481 id: Option<serde_json::Value>,
482 args: &serde_json::Map<String, serde_json::Value>,
483 ) -> McpResponse {
484 let query = match args.get("query").and_then(|v| v.as_str()) {
485 Some(q) => q,
486 None => return self.error_response(id, -32602, "Missing 'query'"),
487 };
488 let namespace = args
489 .get("namespace")
490 .and_then(|v| v.as_str())
491 .unwrap_or("default");
492 let vector_k = args.get("vector_k").and_then(|v| v.as_u64()).unwrap_or(10) as u32;
493 let graph_depth = args
494 .get("graph_depth")
495 .and_then(|v| v.as_u64())
496 .unwrap_or(1) as u32;
497 let limit = args.get("limit").and_then(|v| v.as_u64()).unwrap_or(20) as u32;
498
499 let req = Request::new(HybridSearchRequest {
500 query: query.to_string(),
501 namespace: namespace.to_string(),
502 vector_k,
503 graph_depth,
504 mode: SearchMode::Hybrid as i32,
505 limit,
506 });
507
508 match self.engine.hybrid_search(req).await {
509 Ok(resp) => {
510 let results = resp.into_inner().results;
511 let items: Vec<SearchResultItem> = results
512 .into_iter()
513 .map(|r| SearchResultItem {
514 node_id: r.node_id,
515 score: r.score,
516 content: r.content,
517 uri: r.uri,
518 })
519 .collect();
520
521 let result = SearchToolResult { results: items };
522 self.serialize_result(id, result)
523 }
524 Err(e) => self.tool_result(id, &e.to_string(), true),
525 }
526 }
527
528 async fn call_apply_reasoning(
529 &self,
530 id: Option<serde_json::Value>,
531 args: &serde_json::Map<String, serde_json::Value>,
532 ) -> McpResponse {
533 let namespace = args
534 .get("namespace")
535 .and_then(|v| v.as_str())
536 .unwrap_or("default");
537 let strategy_str = args
538 .get("strategy")
539 .and_then(|v| v.as_str())
540 .unwrap_or("rdfs");
541 let materialize = args
542 .get("materialize")
543 .and_then(|v| v.as_bool())
544 .unwrap_or(false);
545
546 let strategy = match strategy_str.to_lowercase().as_str() {
547 "owlrl" | "owl-rl" => ReasoningStrategy::Owlrl as i32,
548 _ => ReasoningStrategy::Rdfs as i32,
549 };
550
551 let req = Request::new(ReasoningRequest {
552 namespace: namespace.to_string(),
553 strategy,
554 materialize,
555 });
556
557 match self.engine.apply_reasoning(req).await {
558 Ok(resp) => {
559 let inner = resp.into_inner();
560 let result = ReasoningToolResult {
561 success: inner.success,
562 triples_inferred: inner.triples_inferred,
563 message: inner.message,
564 };
565 self.serialize_result(id, result)
566 }
567 Err(e) => self.tool_result(id, &e.to_string(), true),
568 }
569 }
570
571 async fn call_get_neighbors(
572 &self,
573 id: Option<serde_json::Value>,
574 args: &serde_json::Map<String, serde_json::Value>,
575 ) -> McpResponse {
576 let uri = match args.get("uri").and_then(|v| v.as_str()) {
577 Some(u) => u,
578 None => return self.error_response(id, -32602, "Missing 'uri'"),
579 };
580 let namespace = args
581 .get("namespace")
582 .and_then(|v| v.as_str())
583 .unwrap_or("default");
584 let direction = args
585 .get("direction")
586 .and_then(|v| v.as_str())
587 .unwrap_or("outgoing");
588
589 let store = match self.engine.get_store(namespace) {
590 Ok(s) => s,
591 Err(e) => return self.tool_result(id, &e.to_string(), true),
592 };
593
594 let mut neighbors = Vec::new();
595
596 if direction == "outgoing" || direction == "both" {
598 if let Ok(subj) = oxigraph::model::NamedNodeRef::new(uri) {
599 for q in store
600 .store
601 .quads_for_pattern(Some(subj.into()), None, None, None)
602 .flatten()
603 {
604 neighbors.push(NeighborItem {
605 direction: "outgoing".to_string(),
606 predicate: q.predicate.to_string(),
607 target: q.object.to_string(),
608 score: 1.0,
609 });
610 }
611 }
612 }
613
614 if direction == "incoming" || direction == "both" {
616 if let Ok(obj) = oxigraph::model::NamedNodeRef::new(uri) {
617 for q in store
618 .store
619 .quads_for_pattern(None, None, Some(obj.into()), None)
620 .flatten()
621 {
622 neighbors.push(NeighborItem {
623 direction: "incoming".to_string(),
624 predicate: q.predicate.to_string(),
625 target: q.subject.to_string(),
626 score: 1.0,
627 });
628 }
629 }
630 }
631
632 let result = NeighborsToolResult { neighbors };
633 self.serialize_result(id, result)
634 }
635
636 async fn call_list_triples(
637 &self,
638 id: Option<serde_json::Value>,
639 args: &serde_json::Map<String, serde_json::Value>,
640 ) -> McpResponse {
641 let namespace = args
642 .get("namespace")
643 .and_then(|v| v.as_str())
644 .unwrap_or("default");
645 let limit = args.get("limit").and_then(|v| v.as_u64()).unwrap_or(100) as usize;
646
647 let store = match self.engine.get_store(namespace) {
648 Ok(s) => s,
649 Err(e) => return self.tool_result(id, &e.to_string(), true),
650 };
651
652 let mut triples = Vec::new();
653 for q in store.store.iter().take(limit).flatten() {
654 triples.push(TripleItem {
655 subject: q.subject.to_string(),
656 predicate: q.predicate.to_string(),
657 object: q.object.to_string(),
658 });
659 }
660
661 let result = TriplesToolResult { triples };
662 self.serialize_result(id, result)
663 }
664
665 async fn call_delete_namespace(
666 &self,
667 id: Option<serde_json::Value>,
668 args: &serde_json::Map<String, serde_json::Value>,
669 ) -> McpResponse {
670 let namespace = match args.get("namespace").and_then(|v| v.as_str()) {
671 Some(n) => n,
672 None => return self.error_response(id, -32602, "Missing 'namespace'"),
673 };
674
675 let req = Request::new(crate::server::proto::EmptyRequest {
676 namespace: namespace.to_string(),
677 });
678
679 match self.engine.delete_namespace_data(req).await {
680 Ok(resp) => {
681 let inner = resp.into_inner();
682 let result = SimpleSuccessResult {
683 success: inner.success,
684 message: inner.message,
685 };
686 self.serialize_result(id, result)
687 }
688 Err(e) => self.tool_result(id, &e.to_string(), true),
689 }
690 }
691
692 async fn call_ingest_url(
693 &self,
694 id: Option<serde_json::Value>,
695 args: &serde_json::Map<String, serde_json::Value>,
696 ) -> McpResponse {
697 let url = match args.get("url").and_then(|v| v.as_str()) {
698 Some(u) => u,
699 None => return self.error_response(id, -32602, "Missing 'url'"),
700 };
701 let namespace = args
702 .get("namespace")
703 .and_then(|v| v.as_str())
704 .unwrap_or("default");
705
706 let client = reqwest::Client::new();
708 let response = match client.get(url).send().await {
709 Ok(r) => r,
710 Err(e) => return self.tool_result(id, &format!("Failed to fetch URL: {}", e), true),
711 };
712
713 if !response.status().is_success() {
714 return self.tool_result(id, &format!("HTTP error: {}", response.status()), true);
715 }
716
717 let html = match response.text().await {
718 Ok(t) => t,
719 Err(e) => {
720 return self.tool_result(id, &format!("Failed to read response: {}", e), true)
721 }
722 };
723
724 let script_re = regex::Regex::new(r"(?s)<script.*?>.*?</script>").unwrap();
726 let style_re = regex::Regex::new(r"(?s)<style.*?>.*?</style>").unwrap();
727 let tag_re = regex::Regex::new(r"<[^>]*>").unwrap();
728
729 let no_script = script_re.replace_all(&html, " ");
730 let no_style = style_re.replace_all(&no_script, " ");
731 let text_content = tag_re.replace_all(&no_style, " ");
732
733 let text = text_content
734 .split_whitespace()
735 .collect::<Vec<_>>()
736 .join(" ");
737
738 let processor = crate::processor::TextProcessor::new();
740 let chunks = processor.chunk_text(&text, 1000, 150);
741
742 let store = match self.engine.get_store(namespace) {
744 Ok(s) => s,
745 Err(e) => return self.tool_result(id, &e.to_string(), true),
746 };
747
748 if let Some(ref vector_store) = store.vector_store {
749 let mut added_chunks = 0;
750 for (i, chunk) in chunks.iter().enumerate() {
751 let chunk_uri = format!("{}#chunk-{}", url, i);
752 let metadata = serde_json::json!({
754 "uri": chunk_uri,
755 "source_url": url,
756 "type": "web_chunk"
757 });
758 match vector_store.add(&chunk_uri, chunk, metadata).await {
759 Ok(_) => added_chunks += 1,
760 Err(e) => {
761 eprintln!("Failed to add chunk {}: {}", i, e);
762 }
763 }
764 }
765 let result = IngestToolResult {
766 nodes_added: 0,
767 edges_added: 0, message: format!(
769 "Ingested URL: {} ({} chars, {} chunks)",
770 url,
771 text.len(),
772 added_chunks
773 ),
774 };
775 self.serialize_result(id, result)
776 } else {
777 self.tool_result(id, "Vector store not available", true)
778 }
779 }
780
781 async fn call_ingest_text(
782 &self,
783 id: Option<serde_json::Value>,
784 args: &serde_json::Map<String, serde_json::Value>,
785 ) -> McpResponse {
786 let uri = match args.get("uri").and_then(|v| v.as_str()) {
787 Some(u) => u,
788 None => return self.error_response(id, -32602, "Missing 'uri'"),
789 };
790 let content = match args.get("content").and_then(|v| v.as_str()) {
791 Some(c) => c,
792 None => return self.error_response(id, -32602, "Missing 'content'"),
793 };
794 let namespace = args
795 .get("namespace")
796 .and_then(|v| v.as_str())
797 .unwrap_or("default");
798
799 let processor = crate::processor::TextProcessor::new();
801 let chunks = processor.chunk_text(&content, 1000, 150);
802
803 let store = match self.engine.get_store(namespace) {
805 Ok(s) => s,
806 Err(e) => return self.tool_result(id, &e.to_string(), true),
807 };
808
809 if let Some(ref vector_store) = store.vector_store {
810 let mut added_chunks = 0;
811 for (i, chunk) in chunks.iter().enumerate() {
812 let chunk_uri = if chunks.len() > 1 {
813 format!("{}#chunk-{}", uri, i)
814 } else {
815 uri.to_string()
816 };
817 let metadata = serde_json::json!({
818 "uri": uri, "chunk_uri": chunk_uri,
820 "type": "text_chunk"
821 });
822 match vector_store.add(&chunk_uri, chunk, metadata).await {
823 Ok(_) => added_chunks += 1,
824 Err(e) => {
825 eprintln!("Failed to add chunk {}: {}", i, e);
826 }
827 }
828 }
829 let result = IngestToolResult {
830 nodes_added: 0,
831 edges_added: 0,
832 message: format!(
833 "Ingested text: {} ({} chars, {} chunks)",
834 uri,
835 content.len(),
836 added_chunks
837 ),
838 };
839 self.serialize_result(id, result)
840 } else {
841 self.tool_result(id, "Vector store not available", true)
842 }
843 }
844
845 async fn call_compact_vectors(
846 &self,
847 id: Option<serde_json::Value>,
848 args: &serde_json::Map<String, serde_json::Value>,
849 ) -> McpResponse {
850 let namespace = args
851 .get("namespace")
852 .and_then(|v| v.as_str())
853 .unwrap_or("default");
854
855 let store = match self.engine.get_store(namespace) {
856 Ok(s) => s,
857 Err(e) => return self.tool_result(id, &e.to_string(), true),
858 };
859
860 if let Some(ref vector_store) = store.vector_store {
861 match vector_store.compact() {
862 Ok(removed) => {
863 let result = SimpleSuccessResult {
864 success: true,
865 message: format!("Compaction complete: {} stale entries removed", removed),
866 };
867 self.serialize_result(id, result)
868 }
869 Err(e) => self.tool_result(id, &format!("Compaction error: {}", e), true),
870 }
871 } else {
872 self.tool_result(id, "Vector store not available", true)
873 }
874 }
875
876 async fn call_vector_stats(
877 &self,
878 id: Option<serde_json::Value>,
879 args: &serde_json::Map<String, serde_json::Value>,
880 ) -> McpResponse {
881 let namespace = args
882 .get("namespace")
883 .and_then(|v| v.as_str())
884 .unwrap_or("default");
885
886 let store = match self.engine.get_store(namespace) {
887 Ok(s) => s,
888 Err(e) => return self.tool_result(id, &e.to_string(), true),
889 };
890
891 if let Some(ref vector_store) = store.vector_store {
892 let (active, stale, total) = vector_store.stats();
893 let result = StatsToolResult {
894 active_vectors: active,
895 stale_vectors: stale,
896 total_embeddings: total,
897 };
898 self.serialize_result(id, result)
899 } else {
900 self.tool_result(id, "Vector store not available", true)
901 }
902 }
903
904 async fn call_disambiguate(
905 &self,
906 id: Option<serde_json::Value>,
907 args: &serde_json::Map<String, serde_json::Value>,
908 ) -> McpResponse {
909 let namespace = args
910 .get("namespace")
911 .and_then(|v| v.as_str())
912 .unwrap_or("default");
913 let threshold = args
914 .get("threshold")
915 .and_then(|v| v.as_f64())
916 .unwrap_or(0.8);
917
918 let store = match self.engine.get_store(namespace) {
919 Ok(s) => s,
920 Err(e) => return self.tool_result(id, &e.to_string(), true),
921 };
922
923 let uri_map = store.uri_to_id.read().unwrap();
925 let uris: Vec<String> = uri_map.keys().cloned().collect();
926 drop(uri_map);
927
928 let disambiguator = crate::disambiguation::EntityDisambiguator::new(threshold);
929 let suggestions = disambiguator.suggest_merges(&uris);
930
931 let items: Vec<DisambiguationItem> = suggestions
932 .into_iter()
933 .map(|(u1, u2, s)| DisambiguationItem {
934 uri1: u1,
935 uri2: u2,
936 similarity: s,
937 })
938 .collect();
939
940 let message = if items.is_empty() {
941 "No similar entities found above threshold".to_string()
942 } else {
943 format!("Found {} potential duplicates", items.len())
944 };
945
946 let result = DisambiguationResult {
947 suggestions: items,
948 message,
949 };
950 self.serialize_result(id, result)
951 }
952
953 async fn handle_legacy_ingest(&self, request: McpRequest) -> McpResponse {
955 let params = match request.params {
956 Some(p) => p,
957 None => return self.error_response(request.id, -32602, "Invalid params"),
958 };
959
960 if let (Some(sub), Some(pred), Some(obj)) = (
961 params.get("subject").and_then(|v| v.as_str()),
962 params.get("predicate").and_then(|v| v.as_str()),
963 params.get("object").and_then(|v| v.as_str()),
964 ) {
965 let namespace = params
966 .get("namespace")
967 .and_then(|v| v.as_str())
968 .unwrap_or("default");
969 let triple = Triple {
970 subject: sub.to_string(),
971 predicate: pred.to_string(),
972 object: obj.to_string(),
973 provenance: Some(Provenance {
974 source: "mcp".to_string(),
975 timestamp: "".to_string(),
976 method: "stdio".to_string(),
977 }),
978 embedding: vec![],
979 };
980
981 let req = Request::new(IngestRequest {
982 triples: vec![triple],
983 namespace: namespace.to_string(),
984 });
985
986 match self.engine.ingest_triples(req).await {
987 Ok(_) => McpResponse {
988 jsonrpc: "2.0".to_string(),
989 id: request.id,
990 result: Some(serde_json::to_value("Ingested").unwrap()),
991 error: None,
992 },
993 Err(e) => self.error_response(request.id, -32000, &e.to_string()),
994 }
995 } else {
996 self.error_response(request.id, -32602, "Invalid params")
997 }
998 }
999
1000 async fn handle_legacy_ingest_file(&self, request: McpRequest) -> McpResponse {
1001 let params = match request.params {
1002 Some(p) => p,
1003 None => {
1004 return self.error_response(request.id, -32602, "Invalid params: 'path' required")
1005 }
1006 };
1007
1008 if let Some(path) = params.get("path").and_then(|v| v.as_str()) {
1009 let namespace = params
1010 .get("namespace")
1011 .and_then(|v| v.as_str())
1012 .unwrap_or("default");
1013
1014 let req = Request::new(IngestFileRequest {
1015 file_path: path.to_string(),
1016 namespace: namespace.to_string(),
1017 });
1018
1019 match self.engine.ingest_file(req).await {
1020 Ok(resp) => {
1021 let inner = resp.into_inner();
1022 McpResponse {
1023 jsonrpc: "2.0".to_string(),
1024 id: request.id,
1025 result: Some(
1026 serde_json::to_value(format!(
1027 "Ingested {} triples from {}",
1028 inner.edges_added, path
1029 ))
1030 .unwrap(),
1031 ),
1032 error: None,
1033 }
1034 }
1035 Err(e) => self.error_response(request.id, -32000, &e.to_string()),
1036 }
1037 } else {
1038 self.error_response(request.id, -32602, "Invalid params: 'path' required")
1039 }
1040 }
1041
1042 fn serialize_result<T: serde::Serialize>(
1043 &self,
1044 id: Option<serde_json::Value>,
1045 result: T,
1046 ) -> McpResponse {
1047 match serde_json::to_string_pretty(&result) {
1048 Ok(json) => self.tool_result(id, &json, false),
1049 Err(e) => self.tool_result(id, &format!("Serialization error: {}", e), true),
1050 }
1051 }
1052
1053 async fn call_get_node_degree(
1054 &self,
1055 id: Option<serde_json::Value>,
1056 args: &serde_json::Map<String, serde_json::Value>,
1057 ) -> McpResponse {
1058 let uri = match args.get("uri").and_then(|v| v.as_str()) {
1059 Some(u) => u,
1060 None => return self.error_response(id, -32602, "Missing 'uri'"),
1061 };
1062 let namespace = args
1063 .get("namespace")
1064 .and_then(|v| v.as_str())
1065 .unwrap_or("default");
1066
1067 let store = match self.engine.get_store(namespace) {
1068 Ok(s) => s,
1069 Err(e) => return self.tool_result(id, &e.to_string(), true),
1070 };
1071
1072 let degree = store.get_degree(uri);
1073
1074 let result = DegreeResult {
1075 uri: uri.to_string(),
1076 degree,
1077 };
1078
1079 self.serialize_result(id, result)
1080 }
1081
1082 fn error_response(
1083 &self,
1084 id: Option<serde_json::Value>,
1085 code: i32,
1086 message: &str,
1087 ) -> McpResponse {
1088 McpResponse {
1089 jsonrpc: "2.0".to_string(),
1090 id,
1091 result: None,
1092 error: Some(McpError {
1093 code,
1094 message: message.to_string(),
1095 data: None,
1096 }),
1097 }
1098 }
1099
1100 fn tool_result(
1101 &self,
1102 id: Option<serde_json::Value>,
1103 text: &str,
1104 is_error: bool,
1105 ) -> McpResponse {
1106 let result = CallToolResult {
1107 content: vec![Content {
1108 content_type: "text".to_string(),
1109 text: text.to_string(),
1110 }],
1111 is_error: if is_error { Some(true) } else { None },
1112 };
1113 McpResponse {
1114 jsonrpc: "2.0".to_string(),
1115 id,
1116 result: Some(serde_json::to_value(result).unwrap()),
1117 error: None,
1118 }
1119 }
1120}