1use crate::mcp_types::{
2 CallToolResult, Content, DegreeResult, DisambiguationItem, DisambiguationResult,
3 IngestToolResult, ListToolsResult, McpError, McpRequest, McpResponse, NeighborItem,
4 NeighborsToolResult, ReasoningToolResult, SearchResultItem, SearchToolResult,
5 SimpleSuccessResult, StatsToolResult, Tool, TripleItem, TriplesToolResult,
6};
7use crate::server::proto::semantic_engine_server::SemanticEngine;
8use crate::server::proto::{
9 HybridSearchRequest, IngestFileRequest, IngestRequest, Provenance, ReasoningRequest,
10 ReasoningStrategy, SearchMode, SparqlRequest, Triple,
11};
12use crate::server::MySemanticEngine;
13use jsonschema::JSONSchema;
14use std::sync::Arc;
15use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
16use tonic::Request;
17
18pub struct McpStdioServer {
19 engine: Arc<MySemanticEngine>,
20}
21
22impl McpStdioServer {
23 pub fn new(engine: Arc<MySemanticEngine>) -> Self {
24 Self { engine }
25 }
26
27 pub async fn run(&self) -> Result<(), Box<dyn std::error::Error>> {
28 let mut reader = BufReader::new(tokio::io::stdin());
29 let mut writer = tokio::io::stdout();
30
31 loop {
32 let mut line = String::new();
33 if reader.read_line(&mut line).await? == 0 {
34 break;
35 }
36
37 let trimmed = line.trim();
38 if trimmed.is_empty() {
39 continue;
40 }
41
42 if let Ok(request) = serde_json::from_str::<McpRequest>(trimmed) {
43 let response = self.handle_request(request).await;
44 let response_json = serde_json::to_string(&response)? + "\n";
45 writer.write_all(response_json.as_bytes()).await?;
46 writer.flush().await?;
47 }
48 }
49
50 Ok(())
51 }
52
53 fn create_request<T>(msg: T) -> Request<T> {
54 let mut req = Request::new(msg);
55
56 let token_opt = std::env::var("SYNAPSE_ADMIN_TOKEN")
58 .or_else(|_| std::env::var("SYNAPSE_MCP_TOKEN"))
59 .ok();
60
61 if let Some(token) = token_opt {
62 if let Ok(val) = format!("Bearer {}", token).parse() {
63 req.metadata_mut().insert("authorization", val);
64 }
65 }
66 req
67 }
68
69 fn get_tools() -> Vec<Tool> {
70 vec![
71 Tool {
72 name: "ingest_triples".to_string(),
73 description: Some(
74 "Ingest one or more RDF triples into the knowledge graph".to_string(),
75 ),
76 input_schema: serde_json::json!({
77 "type": "object",
78 "properties": {
79 "triples": {
80 "type": "array",
81 "items": {
82 "type": "object",
83 "properties": {
84 "subject": { "type": "string" },
85 "predicate": { "type": "string" },
86 "object": { "type": "string" }
87 },
88 "required": ["subject", "predicate", "object"]
89 }
90 },
91 "namespace": { "type": "string", "default": "default" }
92 },
93 "required": ["triples"]
94 }),
95 },
96 Tool {
97 name: "ingest_file".to_string(),
98 description: Some(
99 "Ingest a CSV or Markdown file into the knowledge graph".to_string(),
100 ),
101 input_schema: serde_json::json!({
102 "type": "object",
103 "properties": {
104 "path": { "type": "string", "description": "Path to the file" },
105 "namespace": { "type": "string", "default": "default" }
106 },
107 "required": ["path"]
108 }),
109 },
110 Tool {
111 name: "sparql_query".to_string(),
112 description: Some("Execute a SPARQL query against the knowledge graph".to_string()),
113 input_schema: serde_json::json!({
114 "type": "object",
115 "properties": {
116 "query": { "type": "string", "description": "SPARQL query string" },
117 "namespace": { "type": "string", "default": "default" }
118 },
119 "required": ["query"]
120 }),
121 },
122 Tool {
123 name: "hybrid_search".to_string(),
124 description: Some("Perform a hybrid vector + graph search".to_string()),
125 input_schema: serde_json::json!({
126 "type": "object",
127 "properties": {
128 "query": { "type": "string", "description": "Natural language query" },
129 "namespace": { "type": "string", "default": "default" },
130 "vector_k": { "type": "integer", "default": 10 },
131 "graph_depth": { "type": "integer", "default": 1 },
132 "limit": { "type": "integer", "default": 20 }
133 },
134 "required": ["query"]
135 }),
136 },
137 Tool {
138 name: "apply_reasoning".to_string(),
139 description: Some(
140 "Apply RDFS or OWL-RL reasoning to infer new triples".to_string(),
141 ),
142 input_schema: serde_json::json!({
143 "type": "object",
144 "properties": {
145 "namespace": { "type": "string", "default": "default" },
146 "strategy": { "type": "string", "enum": ["rdfs", "owlrl"], "default": "rdfs" },
147 "materialize": { "type": "boolean", "default": false }
148 }
149 }),
150 },
151 Tool {
152 name: "get_neighbors".to_string(),
153 description: Some(
154 "Get neighboring nodes connected to a given URI in the graph".to_string(),
155 ),
156 input_schema: serde_json::json!({
157 "type": "object",
158 "properties": {
159 "uri": { "type": "string", "description": "URI of the entity to find neighbors for" },
160 "namespace": { "type": "string", "default": "default" },
161 "direction": { "type": "string", "enum": ["outgoing", "incoming", "both"], "default": "outgoing" }
162 },
163 "required": ["uri"]
164 }),
165 },
166 Tool {
167 name: "list_triples".to_string(),
168 description: Some(
169 "List all triples in a namespace (useful for debugging/exploration)"
170 .to_string(),
171 ),
172 input_schema: serde_json::json!({
173 "type": "object",
174 "properties": {
175 "namespace": { "type": "string", "default": "default" },
176 "limit": { "type": "integer", "default": 100 }
177 }
178 }),
179 },
180 Tool {
181 name: "delete_namespace".to_string(),
182 description: Some("Delete all data in a namespace".to_string()),
183 input_schema: serde_json::json!({
184 "type": "object",
185 "properties": {
186 "namespace": { "type": "string", "description": "Namespace to delete" }
187 },
188 "required": ["namespace"]
189 }),
190 },
191 Tool {
192 name: "ingest_url".to_string(),
193 description: Some(
194 "Scrape a web page and add its content to the vector store for RAG retrieval"
195 .to_string(),
196 ),
197 input_schema: serde_json::json!({
198 "type": "object",
199 "properties": {
200 "url": { "type": "string", "description": "URL to scrape and ingest" },
201 "namespace": { "type": "string", "default": "default" }
202 },
203 "required": ["url"]
204 }),
205 },
206 Tool {
207 name: "ingest_text".to_string(),
208 description: Some(
209 "Add arbitrary text content to the vector store for RAG retrieval".to_string(),
210 ),
211 input_schema: serde_json::json!({
212 "type": "object",
213 "properties": {
214 "uri": { "type": "string", "description": "Custom URI identifier for this text" },
215 "content": { "type": "string", "description": "Text content to embed and store" },
216 "namespace": { "type": "string", "default": "default" }
217 },
218 "required": ["uri", "content"]
219 }),
220 },
221 Tool {
222 name: "compact_vectors".to_string(),
223 description: Some("Compact the vector index by removing stale entries".to_string()),
224 input_schema: serde_json::json!({
225 "type": "object",
226 "properties": {
227 "namespace": { "type": "string", "default": "default" }
228 }
229 }),
230 },
231 Tool {
232 name: "vector_stats".to_string(),
233 description: Some("Get vector store statistics (active, stale, total)".to_string()),
234 input_schema: serde_json::json!({
235 "type": "object",
236 "properties": {
237 "namespace": { "type": "string", "default": "default" }
238 }
239 }),
240 },
241 Tool {
242 name: "disambiguate".to_string(),
243 description: Some("Find similar entities that might be duplicates".to_string()),
244 input_schema: serde_json::json!({
245 "type": "object",
246 "properties": {
247 "namespace": { "type": "string", "default": "default" },
248 "threshold": { "type": "number", "default": 0.8, "description": "Similarity threshold 0.0-1.0" }
249 }
250 }),
251 },
252 Tool {
253 name: "get_node_degree".to_string(),
254 description: Some("Get the degree (number of connections) of a node".to_string()),
255 input_schema: serde_json::json!({
256 "type": "object",
257 "properties": {
258 "uri": { "type": "string" },
259 "namespace": { "type": "string", "default": "default" }
260 },
261 "required": ["uri"]
262 }),
263 },
264 ]
265 }
266
267 pub async fn handle_request(&self, request: McpRequest) -> McpResponse {
268 match request.method.as_str() {
269 "initialize" => {
270 McpResponse {
272 jsonrpc: "2.0".to_string(),
273 id: request.id,
274 result: Some(serde_json::json!({
275 "protocolVersion": "2024-11-05",
276 "capabilities": {
277 "tools": {}
278 },
279 "serverInfo": {
280 "name": "synapse",
281 "version": env!("CARGO_PKG_VERSION")
282 }
283 })),
284 error: None,
285 }
286 }
287 "notifications/initialized" | "initialized" => {
288 McpResponse {
290 jsonrpc: "2.0".to_string(),
291 id: request.id,
292 result: Some(serde_json::json!({})),
293 error: None,
294 }
295 }
296 "tools/list" => {
297 let result = ListToolsResult {
298 tools: Self::get_tools(),
299 };
300 McpResponse {
301 jsonrpc: "2.0".to_string(),
302 id: request.id,
303 result: Some(serde_json::to_value(result).unwrap()),
304 error: None,
305 }
306 }
307 "tools/call" => self.handle_tool_call(request).await,
308 "ingest" => self.handle_legacy_ingest(request).await,
310 "ingest_file" => self.handle_legacy_ingest_file(request).await,
311 _ => McpResponse {
312 jsonrpc: "2.0".to_string(),
313 id: request.id,
314 result: None,
315 error: Some(McpError {
316 code: -32601,
317 message: format!("Method not found: {}", request.method),
318 data: None,
319 }),
320 },
321 }
322 }
323
324 fn validate_arguments(tool_name: &str, arguments: &serde_json::Value) -> Result<(), String> {
325 let tools = Self::get_tools();
326 if let Some(tool) = tools.iter().find(|t| t.name == tool_name) {
327 if let Ok(schema) = JSONSchema::compile(&tool.input_schema) {
328 if let Err(errors) = schema.validate(arguments) {
329 let error_msg = errors.map(|e| e.to_string()).collect::<Vec<_>>().join(", ");
330 return Err(format!("Validation error: {}", error_msg));
331 }
332 } else {
333 return Err("Invalid tool schema definition".to_string());
334 }
335 }
336 Ok(())
337 }
338
339 async fn handle_tool_call(&self, request: McpRequest) -> McpResponse {
340 let params = match request.params {
341 Some(p) => p,
342 None => return self.error_response(request.id, -32602, "Missing params"),
343 };
344
345 let tool_name = match params.get("name").and_then(|v| v.as_str()) {
346 Some(n) => n,
347 None => return self.error_response(request.id, -32602, "Missing tool name"),
348 };
349
350 let arguments = params
351 .get("arguments")
352 .and_then(|v| v.as_object())
353 .cloned()
354 .unwrap_or_default();
355
356 let args_value = serde_json::Value::Object(arguments.clone());
357 if let Err(e) = Self::validate_arguments(tool_name, &args_value) {
358 return self.error_response(request.id, -32602, &e);
359 }
360
361 match tool_name {
362 "ingest_triples" => self.call_ingest_triples(request.id, &arguments).await,
363 "ingest_file" => self.call_ingest_file(request.id, &arguments).await,
364 "sparql_query" => self.call_sparql_query(request.id, &arguments).await,
365 "hybrid_search" => self.call_hybrid_search(request.id, &arguments).await,
366 "apply_reasoning" => self.call_apply_reasoning(request.id, &arguments).await,
367 "get_neighbors" => self.call_get_neighbors(request.id, &arguments).await,
368 "list_triples" => self.call_list_triples(request.id, &arguments).await,
369 "delete_namespace" => self.call_delete_namespace(request.id, &arguments).await,
370 "ingest_url" => self.call_ingest_url(request.id, &arguments).await,
371 "ingest_text" => self.call_ingest_text(request.id, &arguments).await,
372 "compact_vectors" => self.call_compact_vectors(request.id, &arguments).await,
373 "vector_stats" => self.call_vector_stats(request.id, &arguments).await,
374 "disambiguate" => self.call_disambiguate(request.id, &arguments).await,
375 "get_node_degree" => self.call_get_node_degree(request.id, &arguments).await,
376 _ => self.error_response(request.id, -32602, &format!("Unknown tool: {}", tool_name)),
377 }
378 }
379
380 async fn call_ingest_triples(
381 &self,
382 id: Option<serde_json::Value>,
383 args: &serde_json::Map<String, serde_json::Value>,
384 ) -> McpResponse {
385 let namespace = args
386 .get("namespace")
387 .and_then(|v| v.as_str())
388 .unwrap_or("default");
389 let triples_array = match args.get("triples").and_then(|v| v.as_array()) {
390 Some(t) => t,
391 None => return self.error_response(id, -32602, "Missing 'triples' array"),
392 };
393
394 let mut triples = Vec::new();
395 for t in triples_array {
396 if let (Some(s), Some(p), Some(o)) = (
397 t.get("subject").and_then(|v| v.as_str()),
398 t.get("predicate").and_then(|v| v.as_str()),
399 t.get("object").and_then(|v| v.as_str()),
400 ) {
401 triples.push(Triple {
402 subject: s.to_string(),
403 predicate: p.to_string(),
404 object: o.to_string(),
405 provenance: Some(Provenance {
406 source: "mcp".to_string(),
407 timestamp: "".to_string(),
408 method: "tools/call".to_string(),
409 }),
410 embedding: vec![],
411 });
412 }
413 }
414
415 let req = Self::create_request(IngestRequest {
416 triples,
417 namespace: namespace.to_string(),
418 });
419
420 match self.engine.ingest_triples(req).await {
421 Ok(resp) => {
422 let inner = resp.into_inner();
423 let result = IngestToolResult {
424 nodes_added: inner.nodes_added,
425 edges_added: inner.edges_added,
426 message: format!("Ingested {} triples", inner.edges_added),
427 };
428 self.serialize_result(id, result)
429 }
430 Err(e) => self.tool_result(id, &e.to_string(), true),
431 }
432 }
433
434 async fn call_ingest_file(
435 &self,
436 id: Option<serde_json::Value>,
437 args: &serde_json::Map<String, serde_json::Value>,
438 ) -> McpResponse {
439 let path = match args.get("path").and_then(|v| v.as_str()) {
440 Some(p) => p,
441 None => return self.error_response(id, -32602, "Missing 'path'"),
442 };
443 let namespace = args
444 .get("namespace")
445 .and_then(|v| v.as_str())
446 .unwrap_or("default");
447
448 let req = Self::create_request(IngestFileRequest {
449 file_path: path.to_string(),
450 namespace: namespace.to_string(),
451 });
452
453 match self.engine.ingest_file(req).await {
454 Ok(resp) => {
455 let inner = resp.into_inner();
456 let result = IngestToolResult {
457 nodes_added: inner.nodes_added,
458 edges_added: inner.edges_added,
459 message: format!("Ingested {} triples from {}", inner.edges_added, path),
460 };
461 self.serialize_result(id, result)
462 }
463 Err(e) => self.tool_result(id, &e.to_string(), true),
464 }
465 }
466
467 async fn call_sparql_query(
468 &self,
469 id: Option<serde_json::Value>,
470 args: &serde_json::Map<String, serde_json::Value>,
471 ) -> McpResponse {
472 let query = match args.get("query").and_then(|v| v.as_str()) {
473 Some(q) => q,
474 None => return self.error_response(id, -32602, "Missing 'query'"),
475 };
476 let namespace = args
477 .get("namespace")
478 .and_then(|v| v.as_str())
479 .unwrap_or("default");
480
481 let req = Self::create_request(SparqlRequest {
482 query: query.to_string(),
483 namespace: namespace.to_string(),
484 });
485
486 match self.engine.query_sparql(req).await {
487 Ok(resp) => self.tool_result(id, &resp.into_inner().results_json, false),
488 Err(e) => self.tool_result(id, &e.to_string(), true),
489 }
490 }
491
492 async fn call_hybrid_search(
493 &self,
494 id: Option<serde_json::Value>,
495 args: &serde_json::Map<String, serde_json::Value>,
496 ) -> McpResponse {
497 let query = match args.get("query").and_then(|v| v.as_str()) {
498 Some(q) => q,
499 None => return self.error_response(id, -32602, "Missing 'query'"),
500 };
501 let namespace = args
502 .get("namespace")
503 .and_then(|v| v.as_str())
504 .unwrap_or("default");
505 let vector_k = args.get("vector_k").and_then(|v| v.as_u64()).unwrap_or(10) as u32;
506 let graph_depth = args
507 .get("graph_depth")
508 .and_then(|v| v.as_u64())
509 .unwrap_or(1) as u32;
510 let limit = args.get("limit").and_then(|v| v.as_u64()).unwrap_or(20) as u32;
511
512 let req = Self::create_request(HybridSearchRequest {
513 query: query.to_string(),
514 namespace: namespace.to_string(),
515 vector_k,
516 graph_depth,
517 mode: SearchMode::Hybrid as i32,
518 limit,
519 });
520
521 match self.engine.hybrid_search(req).await {
522 Ok(resp) => {
523 let results = resp.into_inner().results;
524 let items: Vec<SearchResultItem> = results
525 .into_iter()
526 .map(|r| SearchResultItem {
527 node_id: r.node_id,
528 score: r.score,
529 content: r.content,
530 uri: r.uri,
531 })
532 .collect();
533
534 let result = SearchToolResult { results: items };
535 self.serialize_result(id, result)
536 }
537 Err(e) => self.tool_result(id, &e.to_string(), true),
538 }
539 }
540
541 async fn call_apply_reasoning(
542 &self,
543 id: Option<serde_json::Value>,
544 args: &serde_json::Map<String, serde_json::Value>,
545 ) -> McpResponse {
546 let namespace = args
547 .get("namespace")
548 .and_then(|v| v.as_str())
549 .unwrap_or("default");
550 let strategy_str = args
551 .get("strategy")
552 .and_then(|v| v.as_str())
553 .unwrap_or("rdfs");
554 let materialize = args
555 .get("materialize")
556 .and_then(|v| v.as_bool())
557 .unwrap_or(false);
558
559 let strategy = match strategy_str.to_lowercase().as_str() {
560 "owlrl" | "owl-rl" => ReasoningStrategy::Owlrl as i32,
561 _ => ReasoningStrategy::Rdfs as i32,
562 };
563
564 let req = Self::create_request(ReasoningRequest {
565 namespace: namespace.to_string(),
566 strategy,
567 materialize,
568 });
569
570 match self.engine.apply_reasoning(req).await {
571 Ok(resp) => {
572 let inner = resp.into_inner();
573 let result = ReasoningToolResult {
574 success: inner.success,
575 triples_inferred: inner.triples_inferred,
576 message: inner.message,
577 };
578 self.serialize_result(id, result)
579 }
580 Err(e) => self.tool_result(id, &e.to_string(), true),
581 }
582 }
583
584 async fn call_get_neighbors(
585 &self,
586 id: Option<serde_json::Value>,
587 args: &serde_json::Map<String, serde_json::Value>,
588 ) -> McpResponse {
589 let uri = match args.get("uri").and_then(|v| v.as_str()) {
590 Some(u) => u,
591 None => return self.error_response(id, -32602, "Missing 'uri'"),
592 };
593 let namespace = args
594 .get("namespace")
595 .and_then(|v| v.as_str())
596 .unwrap_or("default");
597 let direction = args
598 .get("direction")
599 .and_then(|v| v.as_str())
600 .unwrap_or("outgoing");
601
602 let store = match self.engine.get_store(namespace) {
603 Ok(s) => s,
604 Err(e) => return self.tool_result(id, &e.to_string(), true),
605 };
606
607 let mut neighbors = Vec::new();
608
609 if direction == "outgoing" || direction == "both" {
611 if let Ok(subj) = oxigraph::model::NamedNodeRef::new(uri) {
612 for q in store
613 .store
614 .quads_for_pattern(Some(subj.into()), None, None, None)
615 .flatten()
616 {
617 neighbors.push(NeighborItem {
618 direction: "outgoing".to_string(),
619 predicate: q.predicate.to_string(),
620 target: q.object.to_string(),
621 score: 1.0,
622 });
623 }
624 }
625 }
626
627 if direction == "incoming" || direction == "both" {
629 if let Ok(obj) = oxigraph::model::NamedNodeRef::new(uri) {
630 for q in store
631 .store
632 .quads_for_pattern(None, None, Some(obj.into()), None)
633 .flatten()
634 {
635 neighbors.push(NeighborItem {
636 direction: "incoming".to_string(),
637 predicate: q.predicate.to_string(),
638 target: q.subject.to_string(),
639 score: 1.0,
640 });
641 }
642 }
643 }
644
645 let result = NeighborsToolResult { neighbors };
646 self.serialize_result(id, result)
647 }
648
649 async fn call_list_triples(
650 &self,
651 id: Option<serde_json::Value>,
652 args: &serde_json::Map<String, serde_json::Value>,
653 ) -> McpResponse {
654 let namespace = args
655 .get("namespace")
656 .and_then(|v| v.as_str())
657 .unwrap_or("default");
658 let limit = args.get("limit").and_then(|v| v.as_u64()).unwrap_or(100) as usize;
659
660 let store = match self.engine.get_store(namespace) {
661 Ok(s) => s,
662 Err(e) => return self.tool_result(id, &e.to_string(), true),
663 };
664
665 let mut triples = Vec::new();
666 for q in store.store.iter().take(limit).flatten() {
667 triples.push(TripleItem {
668 subject: q.subject.to_string(),
669 predicate: q.predicate.to_string(),
670 object: q.object.to_string(),
671 });
672 }
673
674 let result = TriplesToolResult { triples };
675 self.serialize_result(id, result)
676 }
677
678 async fn call_delete_namespace(
679 &self,
680 id: Option<serde_json::Value>,
681 args: &serde_json::Map<String, serde_json::Value>,
682 ) -> McpResponse {
683 let namespace = match args.get("namespace").and_then(|v| v.as_str()) {
684 Some(n) => n,
685 None => return self.error_response(id, -32602, "Missing 'namespace'"),
686 };
687
688 let req = Self::create_request(crate::server::proto::EmptyRequest {
689 namespace: namespace.to_string(),
690 });
691
692 match self.engine.delete_namespace_data(req).await {
693 Ok(resp) => {
694 let inner = resp.into_inner();
695 let result = SimpleSuccessResult {
696 success: inner.success,
697 message: inner.message,
698 };
699 self.serialize_result(id, result)
700 }
701 Err(e) => self.tool_result(id, &e.to_string(), true),
702 }
703 }
704
705 async fn call_ingest_url(
706 &self,
707 id: Option<serde_json::Value>,
708 args: &serde_json::Map<String, serde_json::Value>,
709 ) -> McpResponse {
710 let url = match args.get("url").and_then(|v| v.as_str()) {
711 Some(u) => u,
712 None => return self.error_response(id, -32602, "Missing 'url'"),
713 };
714 let namespace = args
715 .get("namespace")
716 .and_then(|v| v.as_str())
717 .unwrap_or("default");
718
719 let client = reqwest::Client::new();
721 let response = match client.get(url).send().await {
722 Ok(r) => r,
723 Err(e) => return self.tool_result(id, &format!("Failed to fetch URL: {}", e), true),
724 };
725
726 if !response.status().is_success() {
727 return self.tool_result(id, &format!("HTTP error: {}", response.status()), true);
728 }
729
730 let html = match response.text().await {
731 Ok(t) => t,
732 Err(e) => {
733 return self.tool_result(id, &format!("Failed to read response: {}", e), true)
734 }
735 };
736
737 let script_re = regex::Regex::new(r"(?s)<script.*?>.*?</script>").unwrap();
739 let style_re = regex::Regex::new(r"(?s)<style.*?>.*?</style>").unwrap();
740 let tag_re = regex::Regex::new(r"<[^>]*>").unwrap();
741
742 let no_script = script_re.replace_all(&html, " ");
743 let no_style = style_re.replace_all(&no_script, " ");
744 let text_content = tag_re.replace_all(&no_style, " ");
745
746 let text = text_content
747 .split_whitespace()
748 .collect::<Vec<_>>()
749 .join(" ");
750
751 let processor = crate::processor::TextProcessor::new();
753 let chunks = processor.chunk_text(&text, 1000, 150);
754
755 let store = match self.engine.get_store(namespace) {
757 Ok(s) => s,
758 Err(e) => return self.tool_result(id, &e.to_string(), true),
759 };
760
761 if let Some(ref vector_store) = store.vector_store {
762 let mut added_chunks = 0;
763 for (i, chunk) in chunks.iter().enumerate() {
764 let chunk_uri = format!("{}#chunk-{}", url, i);
765 let metadata = serde_json::json!({
767 "uri": chunk_uri,
768 "source_url": url,
769 "type": "web_chunk"
770 });
771 match vector_store.add(&chunk_uri, chunk, metadata).await {
772 Ok(_) => added_chunks += 1,
773 Err(e) => {
774 eprintln!("Failed to add chunk {}: {}", i, e);
775 }
776 }
777 }
778 let result = IngestToolResult {
779 nodes_added: 0,
780 edges_added: 0, message: format!(
782 "Ingested URL: {} ({} chars, {} chunks)",
783 url,
784 text.len(),
785 added_chunks
786 ),
787 };
788 self.serialize_result(id, result)
789 } else {
790 self.tool_result(id, "Vector store not available", true)
791 }
792 }
793
794 async fn call_ingest_text(
795 &self,
796 id: Option<serde_json::Value>,
797 args: &serde_json::Map<String, serde_json::Value>,
798 ) -> McpResponse {
799 let uri = match args.get("uri").and_then(|v| v.as_str()) {
800 Some(u) => u,
801 None => return self.error_response(id, -32602, "Missing 'uri'"),
802 };
803 let content = match args.get("content").and_then(|v| v.as_str()) {
804 Some(c) => c,
805 None => return self.error_response(id, -32602, "Missing 'content'"),
806 };
807 let namespace = args
808 .get("namespace")
809 .and_then(|v| v.as_str())
810 .unwrap_or("default");
811
812 let processor = crate::processor::TextProcessor::new();
814 let chunks = processor.chunk_text(content, 1000, 150);
815
816 let store = match self.engine.get_store(namespace) {
818 Ok(s) => s,
819 Err(e) => return self.tool_result(id, &e.to_string(), true),
820 };
821
822 if let Some(ref vector_store) = store.vector_store {
823 let mut added_chunks = 0;
824 for (i, chunk) in chunks.iter().enumerate() {
825 let chunk_uri = if chunks.len() > 1 {
826 format!("{}#chunk-{}", uri, i)
827 } else {
828 uri.to_string()
829 };
830 let metadata = serde_json::json!({
831 "uri": uri, "chunk_uri": chunk_uri,
833 "type": "text_chunk"
834 });
835 match vector_store.add(&chunk_uri, chunk, metadata).await {
836 Ok(_) => added_chunks += 1,
837 Err(e) => {
838 eprintln!("Failed to add chunk {}: {}", i, e);
839 }
840 }
841 }
842 let result = IngestToolResult {
843 nodes_added: 0,
844 edges_added: 0,
845 message: format!(
846 "Ingested text: {} ({} chars, {} chunks)",
847 uri,
848 content.len(),
849 added_chunks
850 ),
851 };
852 self.serialize_result(id, result)
853 } else {
854 self.tool_result(id, "Vector store not available", true)
855 }
856 }
857
858 async fn call_compact_vectors(
859 &self,
860 id: Option<serde_json::Value>,
861 args: &serde_json::Map<String, serde_json::Value>,
862 ) -> McpResponse {
863 let namespace = args
864 .get("namespace")
865 .and_then(|v| v.as_str())
866 .unwrap_or("default");
867
868 let store = match self.engine.get_store(namespace) {
869 Ok(s) => s,
870 Err(e) => return self.tool_result(id, &e.to_string(), true),
871 };
872
873 if let Some(ref vector_store) = store.vector_store {
874 match vector_store.compact() {
875 Ok(removed) => {
876 let result = SimpleSuccessResult {
877 success: true,
878 message: format!("Compaction complete: {} stale entries removed", removed),
879 };
880 self.serialize_result(id, result)
881 }
882 Err(e) => self.tool_result(id, &format!("Compaction error: {}", e), true),
883 }
884 } else {
885 self.tool_result(id, "Vector store not available", true)
886 }
887 }
888
889 async fn call_vector_stats(
890 &self,
891 id: Option<serde_json::Value>,
892 args: &serde_json::Map<String, serde_json::Value>,
893 ) -> McpResponse {
894 let namespace = args
895 .get("namespace")
896 .and_then(|v| v.as_str())
897 .unwrap_or("default");
898
899 let store = match self.engine.get_store(namespace) {
900 Ok(s) => s,
901 Err(e) => return self.tool_result(id, &e.to_string(), true),
902 };
903
904 if let Some(ref vector_store) = store.vector_store {
905 let (active, stale, total) = vector_store.stats();
906 let result = StatsToolResult {
907 active_vectors: active,
908 stale_vectors: stale,
909 total_embeddings: total,
910 };
911 self.serialize_result(id, result)
912 } else {
913 self.tool_result(id, "Vector store not available", true)
914 }
915 }
916
917 async fn call_disambiguate(
918 &self,
919 id: Option<serde_json::Value>,
920 args: &serde_json::Map<String, serde_json::Value>,
921 ) -> McpResponse {
922 let namespace = args
923 .get("namespace")
924 .and_then(|v| v.as_str())
925 .unwrap_or("default");
926 let threshold = args
927 .get("threshold")
928 .and_then(|v| v.as_f64())
929 .unwrap_or(0.8);
930
931 let store = match self.engine.get_store(namespace) {
932 Ok(s) => s,
933 Err(e) => return self.tool_result(id, &e.to_string(), true),
934 };
935
936 let uri_map = store.uri_to_id.read().unwrap();
938 let uris: Vec<String> = uri_map.keys().cloned().collect();
939 drop(uri_map);
940
941 let disambiguator = crate::disambiguation::EntityDisambiguator::new(threshold);
942 let suggestions = disambiguator.suggest_merges(&uris);
943
944 let items: Vec<DisambiguationItem> = suggestions
945 .into_iter()
946 .map(|(u1, u2, s)| DisambiguationItem {
947 uri1: u1,
948 uri2: u2,
949 similarity: s,
950 })
951 .collect();
952
953 let message = if items.is_empty() {
954 "No similar entities found above threshold".to_string()
955 } else {
956 format!("Found {} potential duplicates", items.len())
957 };
958
959 let result = DisambiguationResult {
960 suggestions: items,
961 message,
962 };
963 self.serialize_result(id, result)
964 }
965
966 async fn handle_legacy_ingest(&self, request: McpRequest) -> McpResponse {
968 let params = match request.params {
969 Some(p) => p,
970 None => return self.error_response(request.id, -32602, "Invalid params"),
971 };
972
973 if let (Some(sub), Some(pred), Some(obj)) = (
974 params.get("subject").and_then(|v| v.as_str()),
975 params.get("predicate").and_then(|v| v.as_str()),
976 params.get("object").and_then(|v| v.as_str()),
977 ) {
978 let namespace = params
979 .get("namespace")
980 .and_then(|v| v.as_str())
981 .unwrap_or("default");
982 let triple = Triple {
983 subject: sub.to_string(),
984 predicate: pred.to_string(),
985 object: obj.to_string(),
986 provenance: Some(Provenance {
987 source: "mcp".to_string(),
988 timestamp: "".to_string(),
989 method: "stdio".to_string(),
990 }),
991 embedding: vec![],
992 };
993
994 let req = Self::create_request(IngestRequest {
995 triples: vec![triple],
996 namespace: namespace.to_string(),
997 });
998
999 match self.engine.ingest_triples(req).await {
1000 Ok(_) => McpResponse {
1001 jsonrpc: "2.0".to_string(),
1002 id: request.id,
1003 result: Some(serde_json::to_value("Ingested").unwrap()),
1004 error: None,
1005 },
1006 Err(e) => self.error_response(request.id, -32000, &e.to_string()),
1007 }
1008 } else {
1009 self.error_response(request.id, -32602, "Invalid params")
1010 }
1011 }
1012
1013 async fn handle_legacy_ingest_file(&self, request: McpRequest) -> McpResponse {
1014 let params = match request.params {
1015 Some(p) => p,
1016 None => {
1017 return self.error_response(request.id, -32602, "Invalid params: 'path' required")
1018 }
1019 };
1020
1021 if let Some(path) = params.get("path").and_then(|v| v.as_str()) {
1022 let namespace = params
1023 .get("namespace")
1024 .and_then(|v| v.as_str())
1025 .unwrap_or("default");
1026
1027 let req = Self::create_request(IngestFileRequest {
1028 file_path: path.to_string(),
1029 namespace: namespace.to_string(),
1030 });
1031
1032 match self.engine.ingest_file(req).await {
1033 Ok(resp) => {
1034 let inner = resp.into_inner();
1035 McpResponse {
1036 jsonrpc: "2.0".to_string(),
1037 id: request.id,
1038 result: Some(
1039 serde_json::to_value(format!(
1040 "Ingested {} triples from {}",
1041 inner.edges_added, path
1042 ))
1043 .unwrap(),
1044 ),
1045 error: None,
1046 }
1047 }
1048 Err(e) => self.error_response(request.id, -32000, &e.to_string()),
1049 }
1050 } else {
1051 self.error_response(request.id, -32602, "Invalid params: 'path' required")
1052 }
1053 }
1054
1055 fn serialize_result<T: serde::Serialize>(
1056 &self,
1057 id: Option<serde_json::Value>,
1058 result: T,
1059 ) -> McpResponse {
1060 match serde_json::to_string_pretty(&result) {
1061 Ok(json) => self.tool_result(id, &json, false),
1062 Err(e) => self.tool_result(id, &format!("Serialization error: {}", e), true),
1063 }
1064 }
1065
1066 async fn call_get_node_degree(
1067 &self,
1068 id: Option<serde_json::Value>,
1069 args: &serde_json::Map<String, serde_json::Value>,
1070 ) -> McpResponse {
1071 let uri = match args.get("uri").and_then(|v| v.as_str()) {
1072 Some(u) => u,
1073 None => return self.error_response(id, -32602, "Missing 'uri'"),
1074 };
1075 let namespace = args
1076 .get("namespace")
1077 .and_then(|v| v.as_str())
1078 .unwrap_or("default");
1079
1080 let store = match self.engine.get_store(namespace) {
1081 Ok(s) => s,
1082 Err(e) => return self.tool_result(id, &e.to_string(), true),
1083 };
1084
1085 let degree = store.get_degree(uri);
1086
1087 let result = DegreeResult {
1088 uri: uri.to_string(),
1089 degree,
1090 };
1091
1092 self.serialize_result(id, result)
1093 }
1094
1095 fn error_response(
1096 &self,
1097 id: Option<serde_json::Value>,
1098 code: i32,
1099 message: &str,
1100 ) -> McpResponse {
1101 McpResponse {
1102 jsonrpc: "2.0".to_string(),
1103 id,
1104 result: None,
1105 error: Some(McpError {
1106 code,
1107 message: message.to_string(),
1108 data: None,
1109 }),
1110 }
1111 }
1112
1113 fn tool_result(
1114 &self,
1115 id: Option<serde_json::Value>,
1116 text: &str,
1117 is_error: bool,
1118 ) -> McpResponse {
1119 let result = CallToolResult {
1120 content: vec![Content {
1121 content_type: "text".to_string(),
1122 text: text.to_string(),
1123 }],
1124 is_error: if is_error { Some(true) } else { None },
1125 };
1126 McpResponse {
1127 jsonrpc: "2.0".to_string(),
1128 id,
1129 result: Some(serde_json::to_value(result).unwrap()),
1130 error: None,
1131 }
1132 }
1133}