1use crate::mcp_types::{
2 CallToolResult, Content, DegreeResult, DisambiguationItem, DisambiguationResult,
3 IngestToolResult, ListToolsResult, McpError, McpRequest, McpResponse, NeighborItem,
4 NeighborsToolResult, ReasoningToolResult, ScenarioItem, ScenarioListResult, SearchResultItem,
5 SearchToolResult, SimpleSuccessResult, StatsToolResult, Tool, TripleItem, TriplesToolResult,
6};
7use crate::server::proto::semantic_engine_server::SemanticEngine;
8use crate::server::proto::{
9 HybridSearchRequest, IngestFileRequest, IngestRequest, Provenance, ReasoningRequest,
10 ReasoningStrategy, SearchMode, SparqlRequest, Triple,
11};
12use crate::server::MySemanticEngine;
13use jsonschema::JSONSchema;
14use std::sync::Arc;
15use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
16use tonic::Request;
17
18pub struct McpStdioServer {
19 engine: Arc<MySemanticEngine>,
20}
21
22impl McpStdioServer {
23 pub fn new(engine: Arc<MySemanticEngine>) -> Self {
24 Self { engine }
25 }
26
27 pub async fn run(&self) -> Result<(), Box<dyn std::error::Error>> {
28 let mut reader = BufReader::new(tokio::io::stdin());
29 let mut writer = tokio::io::stdout();
30
31 loop {
32 let mut line = String::new();
33 if reader.read_line(&mut line).await? == 0 {
34 break;
35 }
36
37 let trimmed = line.trim();
38 if trimmed.is_empty() {
39 continue;
40 }
41
42 if let Ok(request) = serde_json::from_str::<McpRequest>(trimmed) {
43 let is_notification = request.id.is_none();
44 let response = self.handle_request(request).await;
45
46 if !is_notification {
48 let response_json = serde_json::to_string(&response)? + "\n";
49 writer.write_all(response_json.as_bytes()).await?;
50 writer.flush().await?;
51 }
52 } else {
53 eprintln!("MCP PROTOCOL ERROR: Failed to parse line: {}", trimmed);
55 }
56 }
57
58 Ok(())
59 }
60
61 fn create_request<T>(msg: T) -> Request<T> {
62 let mut req = Request::new(msg);
63
64 let token_opt = std::env::var("SYNAPSE_ADMIN_TOKEN")
66 .or_else(|_| std::env::var("SYNAPSE_MCP_TOKEN"))
67 .ok();
68
69 if let Some(token) = token_opt {
70 if let Ok(val) = format!("Bearer {}", token).parse() {
71 req.metadata_mut().insert("authorization", val);
72 }
73 }
74 req
75 }
76
77 fn get_tools() -> Vec<Tool> {
78 vec![
79 Tool {
80 name: "ingest_triples".to_string(),
81 description: Some(
82 "Ingest one or more RDF triples into the knowledge graph".to_string(),
83 ),
84 input_schema: serde_json::json!({
85 "type": "object",
86 "properties": {
87 "triples": {
88 "type": "array",
89 "items": {
90 "type": "object",
91 "properties": {
92 "subject": { "type": "string" },
93 "predicate": { "type": "string" },
94 "object": { "type": "string" }
95 },
96 "required": ["subject", "predicate", "object"]
97 }
98 },
99 "namespace": { "type": "string", "default": "default" }
100 },
101 "required": ["triples"]
102 }),
103 },
104 Tool {
105 name: "ingest_file".to_string(),
106 description: Some(
107 "Ingest a CSV or Markdown file into the knowledge graph".to_string(),
108 ),
109 input_schema: serde_json::json!({
110 "type": "object",
111 "properties": {
112 "path": { "type": "string", "description": "Path to the file" },
113 "namespace": { "type": "string", "default": "default" }
114 },
115 "required": ["path"]
116 }),
117 },
118 Tool {
119 name: "sparql_query".to_string(),
120 description: Some("Execute a SPARQL query against the knowledge graph".to_string()),
121 input_schema: serde_json::json!({
122 "type": "object",
123 "properties": {
124 "query": { "type": "string", "description": "SPARQL query string" },
125 "namespace": { "type": "string", "default": "default" }
126 },
127 "required": ["query"]
128 }),
129 },
130 Tool {
131 name: "hybrid_search".to_string(),
132 description: Some("Perform a hybrid vector + graph search".to_string()),
133 input_schema: serde_json::json!({
134 "type": "object",
135 "properties": {
136 "query": { "type": "string", "description": "Natural language query" },
137 "namespace": { "type": "string", "default": "default" },
138 "vector_k": { "type": "integer", "default": 10 },
139 "graph_depth": { "type": "integer", "default": 1 },
140 "limit": { "type": "integer", "default": 20 }
141 },
142 "required": ["query"]
143 }),
144 },
145 Tool {
146 name: "apply_reasoning".to_string(),
147 description: Some(
148 "Apply RDFS or OWL-RL reasoning to infer new triples".to_string(),
149 ),
150 input_schema: serde_json::json!({
151 "type": "object",
152 "properties": {
153 "namespace": { "type": "string", "default": "default" },
154 "strategy": { "type": "string", "enum": ["rdfs", "owlrl"], "default": "rdfs" },
155 "materialize": { "type": "boolean", "default": false }
156 }
157 }),
158 },
159 Tool {
160 name: "get_neighbors".to_string(),
161 description: Some(
162 "Get neighboring nodes connected to a given URI in the graph".to_string(),
163 ),
164 input_schema: serde_json::json!({
165 "type": "object",
166 "properties": {
167 "uri": { "type": "string", "description": "URI of the entity to find neighbors for" },
168 "namespace": { "type": "string", "default": "default" },
169 "direction": { "type": "string", "enum": ["outgoing", "incoming", "both"], "default": "outgoing" }
170 },
171 "required": ["uri"]
172 }),
173 },
174 Tool {
175 name: "list_triples".to_string(),
176 description: Some(
177 "List all triples in a namespace (useful for debugging/exploration)"
178 .to_string(),
179 ),
180 input_schema: serde_json::json!({
181 "type": "object",
182 "properties": {
183 "namespace": { "type": "string", "default": "default" },
184 "limit": { "type": "integer", "default": 100 }
185 }
186 }),
187 },
188 Tool {
189 name: "delete_namespace".to_string(),
190 description: Some("Delete all data in a namespace".to_string()),
191 input_schema: serde_json::json!({
192 "type": "object",
193 "properties": {
194 "namespace": { "type": "string", "description": "Namespace to delete" }
195 },
196 "required": ["namespace"]
197 }),
198 },
199 Tool {
200 name: "ingest_url".to_string(),
201 description: Some(
202 "Scrape a web page and add its content to the vector store for RAG retrieval"
203 .to_string(),
204 ),
205 input_schema: serde_json::json!({
206 "type": "object",
207 "properties": {
208 "url": { "type": "string", "description": "URL to scrape and ingest" },
209 "namespace": { "type": "string", "default": "default" }
210 },
211 "required": ["url"]
212 }),
213 },
214 Tool {
215 name: "ingest_text".to_string(),
216 description: Some(
217 "Add arbitrary text content to the vector store for RAG retrieval".to_string(),
218 ),
219 input_schema: serde_json::json!({
220 "type": "object",
221 "properties": {
222 "uri": { "type": "string", "description": "Custom URI identifier for this text" },
223 "content": { "type": "string", "description": "Text content to embed and store" },
224 "namespace": { "type": "string", "default": "default" }
225 },
226 "required": ["uri", "content"]
227 }),
228 },
229 Tool {
230 name: "compact_vectors".to_string(),
231 description: Some("Compact the vector index by removing stale entries".to_string()),
232 input_schema: serde_json::json!({
233 "type": "object",
234 "properties": {
235 "namespace": { "type": "string", "default": "default" }
236 }
237 }),
238 },
239 Tool {
240 name: "vector_stats".to_string(),
241 description: Some("Get vector store statistics (active, stale, total)".to_string()),
242 input_schema: serde_json::json!({
243 "type": "object",
244 "properties": {
245 "namespace": { "type": "string", "default": "default" }
246 }
247 }),
248 },
249 Tool {
250 name: "disambiguate".to_string(),
251 description: Some("Find similar entities that might be duplicates".to_string()),
252 input_schema: serde_json::json!({
253 "type": "object",
254 "properties": {
255 "namespace": { "type": "string", "default": "default" },
256 "threshold": { "type": "number", "default": 0.8, "description": "Similarity threshold 0.0-1.0" }
257 }
258 }),
259 },
260 Tool {
261 name: "get_node_degree".to_string(),
262 description: Some("Get the degree (number of connections) of a node".to_string()),
263 input_schema: serde_json::json!({
264 "type": "object",
265 "properties": {
266 "uri": { "type": "string" },
267 "namespace": { "type": "string", "default": "default" }
268 },
269 "required": ["uri"]
270 }),
271 },
272 Tool {
273 name: "install_ontology".to_string(),
274 description: Some("Download and install an ontology from a URL".to_string()),
275 input_schema: serde_json::json!({
276 "type": "object",
277 "properties": {
278 "url": { "type": "string", "description": "URL of the ontology file (.owl, .ttl)" },
279 "name": { "type": "string", "description": "Name for the local file (e.g. 'legal.owl')" },
280 "namespace": { "type": "string", "default": "default" }
281 },
282 "required": ["url", "name"]
283 }),
284 },
285 Tool {
286 name: "list_scenarios".to_string(),
287 description: Some("List available scenarios in the marketplace".to_string()),
288 input_schema: serde_json::json!({
289 "type": "object",
290 "properties": {}
291 }),
292 },
293 Tool {
294 name: "install_scenario".to_string(),
295 description: Some(
296 "Install a scenario (ontologies, data, docs) from the marketplace".to_string(),
297 ),
298 input_schema: serde_json::json!({
299 "type": "object",
300 "properties": {
301 "name": { "type": "string", "description": "Name of the scenario to install" },
302 "namespace": { "type": "string", "default": "default" }
303 },
304 "required": ["name"]
305 }),
306 },
307 ]
308 }
309
310 pub async fn handle_request(&self, request: McpRequest) -> McpResponse {
311 match request.method.as_str() {
312 "initialize" => {
313 McpResponse {
315 jsonrpc: "2.0".to_string(),
316 id: request.id,
317 result: Some(serde_json::json!({
318 "protocolVersion": "2024-11-05",
319 "capabilities": {
320 "tools": {}
321 },
322 "serverInfo": {
323 "name": "synapse",
324 "version": env!("CARGO_PKG_VERSION")
325 }
326 })),
327 error: None,
328 }
329 }
330 "notifications/initialized" | "initialized" => {
331 McpResponse {
333 jsonrpc: "2.0".to_string(),
334 id: request.id,
335 result: Some(serde_json::json!({})),
336 error: None,
337 }
338 }
339 "tools/list" => {
340 let result = ListToolsResult {
341 tools: Self::get_tools(),
342 };
343 McpResponse {
344 jsonrpc: "2.0".to_string(),
345 id: request.id,
346 result: Some(serde_json::to_value(result).unwrap()),
347 error: None,
348 }
349 }
350 "tools/call" => self.handle_tool_call(request).await,
351 "ingest" => self.handle_legacy_ingest(request).await,
353 "ingest_file" => self.handle_legacy_ingest_file(request).await,
354 _ => McpResponse {
355 jsonrpc: "2.0".to_string(),
356 id: request.id,
357 result: None,
358 error: Some(McpError {
359 code: -32601,
360 message: format!("Method not found: {}", request.method),
361 data: None,
362 }),
363 },
364 }
365 }
366
367 fn validate_arguments(tool_name: &str, arguments: &serde_json::Value) -> Result<(), String> {
368 let tools = Self::get_tools();
369 if let Some(tool) = tools.iter().find(|t| t.name == tool_name) {
370 if let Ok(schema) = JSONSchema::compile(&tool.input_schema) {
371 if let Err(errors) = schema.validate(arguments) {
372 let error_msg = errors.map(|e| e.to_string()).collect::<Vec<_>>().join(", ");
373 return Err(format!("Validation error: {}", error_msg));
374 }
375 } else {
376 return Err("Invalid tool schema definition".to_string());
377 }
378 }
379 Ok(())
380 }
381
382 async fn handle_tool_call(&self, request: McpRequest) -> McpResponse {
383 let params = match request.params {
384 Some(serde_json::Value::Object(map)) => map,
385 Some(_) => return self.error_response(request.id, -32602, "Params must be an object"),
386 None => return self.error_response(request.id, -32602, "Missing params"),
387 };
388
389 let tool_name = match params.get("name").and_then(|v| v.as_str()) {
390 Some(n) => n,
391 None => return self.error_response(request.id, -32602, "Missing tool name"),
392 };
393
394 let arguments = params
395 .get("arguments")
396 .and_then(|v| v.as_object())
397 .cloned()
398 .unwrap_or_default();
399
400 let args_value = serde_json::Value::Object(arguments.clone());
401 if let Err(e) = Self::validate_arguments(tool_name, &args_value) {
402 return self.error_response(request.id, -32602, &e);
403 }
404
405 match tool_name {
406 "ingest_triples" => self.call_ingest_triples(request.id, &arguments).await,
407 "ingest_file" => self.call_ingest_file(request.id, &arguments).await,
408 "sparql_query" => self.call_sparql_query(request.id, &arguments).await,
409 "hybrid_search" => self.call_hybrid_search(request.id, &arguments).await,
410 "apply_reasoning" => self.call_apply_reasoning(request.id, &arguments).await,
411 "get_neighbors" => self.call_get_neighbors(request.id, &arguments).await,
412 "list_triples" => self.call_list_triples(request.id, &arguments).await,
413 "delete_namespace" => self.call_delete_namespace(request.id, &arguments).await,
414 "ingest_url" => self.call_ingest_url(request.id, &arguments).await,
415 "ingest_text" => self.call_ingest_text(request.id, &arguments).await,
416 "compact_vectors" => self.call_compact_vectors(request.id, &arguments).await,
417 "vector_stats" => self.call_vector_stats(request.id, &arguments).await,
418 "disambiguate" => self.call_disambiguate(request.id, &arguments).await,
419 "get_node_degree" => self.call_get_node_degree(request.id, &arguments).await,
420 "install_ontology" => self.call_install_ontology(request.id, &arguments).await,
421 "list_scenarios" => self.call_list_scenarios(request.id).await,
422 "install_scenario" => self.call_install_scenario(request.id, &arguments).await,
423 _ => self.error_response(request.id, -32602, &format!("Unknown tool: {}", tool_name)),
424 }
425 }
426
427 async fn call_list_scenarios(&self, id: Option<serde_json::Value>) -> McpResponse {
428 match self.engine.scenario_manager.list_scenarios().await {
429 Ok(registry) => {
430 let items: Vec<ScenarioItem> = registry
431 .into_iter()
432 .map(|e| ScenarioItem {
433 name: e.name,
434 description: e.description,
435 version: e.version,
436 })
437 .collect();
438 self.serialize_result(id, ScenarioListResult { scenarios: items })
439 }
440 Err(e) => self.tool_result(id, &e.to_string(), true),
441 }
442 }
443
444 async fn call_install_scenario(
445 &self,
446 id: Option<serde_json::Value>,
447 args: &serde_json::Map<String, serde_json::Value>,
448 ) -> McpResponse {
449 let name = match args.get("name").and_then(|v| v.as_str()) {
450 Some(n) => n,
451 None => return self.error_response(id, -32602, "Missing 'name'"),
452 };
453 let namespace = args
454 .get("namespace")
455 .and_then(|v| v.as_str())
456 .unwrap_or("default");
457
458 match self.engine.install_scenario(name, namespace).await {
459 Ok(msg) => {
460 let result = SimpleSuccessResult {
461 success: true,
462 message: msg,
463 };
464 self.serialize_result(id, result)
465 }
466 Err(e) => self.tool_result(id, &e, true),
467 }
468 }
469
470 async fn call_install_ontology(
471 &self,
472 id: Option<serde_json::Value>,
473 args: &serde_json::Map<String, serde_json::Value>,
474 ) -> McpResponse {
475 let url = match args.get("url").and_then(|v| v.as_str()) {
476 Some(u) => u,
477 None => return self.error_response(id, -32602, "Missing 'url'"),
478 };
479 let name = match args.get("name").and_then(|v| v.as_str()) {
480 Some(n) => n,
481 None => return self.error_response(id, -32602, "Missing 'name'"),
482 };
483 let namespace = args
484 .get("namespace")
485 .and_then(|v| v.as_str())
486 .unwrap_or("default");
487
488 let response = match reqwest::get(url).await {
490 Ok(r) => r,
491 Err(e) => {
492 return self.tool_result(id, &format!("Failed to download ontology: {}", e), true)
493 }
494 };
495
496 if !response.status().is_success() {
497 return self.tool_result(id, &format!("HTTP error: {}", response.status()), true);
498 }
499
500 let content = match response.text().await {
501 Ok(t) => t,
502 Err(e) => {
503 return self.tool_result(id, &format!("Failed to read response: {}", e), true)
504 }
505 };
506
507 let ontology_dir = std::path::Path::new("ontology");
509 if !ontology_dir.exists() {
510 if let Err(e) = std::fs::create_dir(ontology_dir) {
511 return self.tool_result(
512 id,
513 &format!("Failed to create ontology dir: {}", e),
514 true,
515 );
516 }
517 }
518
519 let file_name = std::path::Path::new(name)
521 .file_name()
522 .and_then(|n| n.to_str())
523 .ok_or_else(|| "Invalid filename".to_string());
524
525 let clean_name = match file_name {
526 Ok(n) => n,
527 Err(e) => return self.tool_result(id, &format!("Security error: {}", e), true),
528 };
529
530 if clean_name != name {
531 return self.tool_result(
532 id,
533 "Security error: Filename contains path components",
534 true,
535 );
536 }
537
538 let path = ontology_dir.join(clean_name);
539 if let Err(e) = std::fs::write(&path, content) {
540 return self.tool_result(id, &format!("Failed to save ontology file: {}", e), true);
541 }
542
543 let store = match self.engine.get_store(namespace) {
545 Ok(s) => s,
546 Err(e) => return self.tool_result(id, &e.to_string(), true),
547 };
548
549 match crate::ingest::ontology::OntologyLoader::load_file(&store, &path).await {
550 Ok(count) => {
551 let result = SimpleSuccessResult {
552 success: true,
553 message: format!("Installed ontology '{}' and loaded {} triples", name, count),
554 };
555 self.serialize_result(id, result)
556 }
557 Err(e) => self.tool_result(id, &format!("Failed to load ontology: {}", e), true),
558 }
559 }
560
561 async fn call_ingest_triples(
562 &self,
563 id: Option<serde_json::Value>,
564 args: &serde_json::Map<String, serde_json::Value>,
565 ) -> McpResponse {
566 let namespace = args
567 .get("namespace")
568 .and_then(|v| v.as_str())
569 .unwrap_or("default");
570 let triples_array = match args.get("triples").and_then(|v| v.as_array()) {
571 Some(t) => t,
572 None => return self.error_response(id, -32602, "Missing 'triples' array"),
573 };
574
575 let mut triples = Vec::new();
576 for t in triples_array {
577 if let (Some(s), Some(p), Some(o)) = (
578 t.get("subject").and_then(|v| v.as_str()),
579 t.get("predicate").and_then(|v| v.as_str()),
580 t.get("object").and_then(|v| v.as_str()),
581 ) {
582 triples.push(Triple {
583 subject: s.to_string(),
584 predicate: p.to_string(),
585 object: o.to_string(),
586 provenance: Some(Provenance {
587 source: "mcp".to_string(),
588 timestamp: "".to_string(),
589 method: "tools/call".to_string(),
590 }),
591 embedding: vec![],
592 });
593 }
594 }
595
596 let req = Self::create_request(IngestRequest {
597 triples,
598 namespace: namespace.to_string(),
599 });
600
601 match self.engine.ingest_triples(req).await {
602 Ok(resp) => {
603 let inner = resp.into_inner();
604 let result = IngestToolResult {
605 nodes_added: inner.nodes_added,
606 edges_added: inner.edges_added,
607 message: format!("Ingested {} triples", inner.edges_added),
608 };
609 self.serialize_result(id, result)
610 }
611 Err(e) => self.tool_result(id, &e.to_string(), true),
612 }
613 }
614
615 async fn call_ingest_file(
616 &self,
617 id: Option<serde_json::Value>,
618 args: &serde_json::Map<String, serde_json::Value>,
619 ) -> McpResponse {
620 let path = match args.get("path").and_then(|v| v.as_str()) {
621 Some(p) => p,
622 None => return self.error_response(id, -32602, "Missing 'path'"),
623 };
624 let namespace = args
625 .get("namespace")
626 .and_then(|v| v.as_str())
627 .unwrap_or("default");
628
629 let req = Self::create_request(IngestFileRequest {
630 file_path: path.to_string(),
631 namespace: namespace.to_string(),
632 });
633
634 match self.engine.ingest_file(req).await {
635 Ok(resp) => {
636 let inner = resp.into_inner();
637 let result = IngestToolResult {
638 nodes_added: inner.nodes_added,
639 edges_added: inner.edges_added,
640 message: format!("Ingested {} triples from {}", inner.edges_added, path),
641 };
642 self.serialize_result(id, result)
643 }
644 Err(e) => self.tool_result(id, &e.to_string(), true),
645 }
646 }
647
648 async fn call_sparql_query(
649 &self,
650 id: Option<serde_json::Value>,
651 args: &serde_json::Map<String, serde_json::Value>,
652 ) -> McpResponse {
653 let query = match args.get("query").and_then(|v| v.as_str()) {
654 Some(q) => q,
655 None => return self.error_response(id, -32602, "Missing 'query'"),
656 };
657 let namespace = args
658 .get("namespace")
659 .and_then(|v| v.as_str())
660 .unwrap_or("default");
661
662 let req = Self::create_request(SparqlRequest {
663 query: query.to_string(),
664 namespace: namespace.to_string(),
665 });
666
667 match self.engine.query_sparql(req).await {
668 Ok(resp) => self.tool_result(id, &resp.into_inner().results_json, false),
669 Err(e) => self.tool_result(id, &e.to_string(), true),
670 }
671 }
672
673 async fn call_hybrid_search(
674 &self,
675 id: Option<serde_json::Value>,
676 args: &serde_json::Map<String, serde_json::Value>,
677 ) -> McpResponse {
678 let query = match args.get("query").and_then(|v| v.as_str()) {
679 Some(q) => q,
680 None => return self.error_response(id, -32602, "Missing 'query'"),
681 };
682 let namespace = args
683 .get("namespace")
684 .and_then(|v| v.as_str())
685 .unwrap_or("default");
686 let vector_k = args.get("vector_k").and_then(|v| v.as_u64()).unwrap_or(10) as u32;
687 let graph_depth = args
688 .get("graph_depth")
689 .and_then(|v| v.as_u64())
690 .unwrap_or(1) as u32;
691 let limit = args.get("limit").and_then(|v| v.as_u64()).unwrap_or(20) as u32;
692
693 let req = Self::create_request(HybridSearchRequest {
694 query: query.to_string(),
695 namespace: namespace.to_string(),
696 vector_k,
697 graph_depth,
698 mode: SearchMode::Hybrid as i32,
699 limit,
700 });
701
702 match self.engine.hybrid_search(req).await {
703 Ok(resp) => {
704 let results = resp.into_inner().results;
705 let items: Vec<SearchResultItem> = results
706 .into_iter()
707 .map(|r| SearchResultItem {
708 node_id: r.node_id,
709 score: r.score,
710 content: r.content,
711 uri: r.uri,
712 })
713 .collect();
714
715 let result = SearchToolResult { results: items };
716 self.serialize_result(id, result)
717 }
718 Err(e) => self.tool_result(id, &e.to_string(), true),
719 }
720 }
721
722 async fn call_apply_reasoning(
723 &self,
724 id: Option<serde_json::Value>,
725 args: &serde_json::Map<String, serde_json::Value>,
726 ) -> McpResponse {
727 let namespace = args
728 .get("namespace")
729 .and_then(|v| v.as_str())
730 .unwrap_or("default");
731 let strategy_str = args
732 .get("strategy")
733 .and_then(|v| v.as_str())
734 .unwrap_or("rdfs");
735 let materialize = args
736 .get("materialize")
737 .and_then(|v| v.as_bool())
738 .unwrap_or(false);
739
740 let strategy = match strategy_str.to_lowercase().as_str() {
741 "owlrl" | "owl-rl" => ReasoningStrategy::Owlrl as i32,
742 _ => ReasoningStrategy::Rdfs as i32,
743 };
744
745 let req = Self::create_request(ReasoningRequest {
746 namespace: namespace.to_string(),
747 strategy,
748 materialize,
749 });
750
751 match self.engine.apply_reasoning(req).await {
752 Ok(resp) => {
753 let inner = resp.into_inner();
754 let result = ReasoningToolResult {
755 success: inner.success,
756 triples_inferred: inner.triples_inferred,
757 message: inner.message,
758 };
759 self.serialize_result(id, result)
760 }
761 Err(e) => self.tool_result(id, &e.to_string(), true),
762 }
763 }
764
765 async fn call_get_neighbors(
766 &self,
767 id: Option<serde_json::Value>,
768 args: &serde_json::Map<String, serde_json::Value>,
769 ) -> McpResponse {
770 let uri = match args.get("uri").and_then(|v| v.as_str()) {
771 Some(u) => u,
772 None => return self.error_response(id, -32602, "Missing 'uri'"),
773 };
774 let namespace = args
775 .get("namespace")
776 .and_then(|v| v.as_str())
777 .unwrap_or("default");
778 let direction = args
779 .get("direction")
780 .and_then(|v| v.as_str())
781 .unwrap_or("outgoing");
782
783 let store = match self.engine.get_store(namespace) {
784 Ok(s) => s,
785 Err(e) => return self.tool_result(id, &e.to_string(), true),
786 };
787
788 let mut neighbors = Vec::new();
789
790 if direction == "outgoing" || direction == "both" {
792 if let Ok(subj) = oxigraph::model::NamedNodeRef::new(uri) {
793 for q in store
794 .store
795 .quads_for_pattern(Some(subj.into()), None, None, None)
796 .flatten()
797 {
798 neighbors.push(NeighborItem {
799 direction: "outgoing".to_string(),
800 predicate: q.predicate.to_string(),
801 target: q.object.to_string(),
802 score: 1.0,
803 });
804 }
805 }
806 }
807
808 if direction == "incoming" || direction == "both" {
810 if let Ok(obj) = oxigraph::model::NamedNodeRef::new(uri) {
811 for q in store
812 .store
813 .quads_for_pattern(None, None, Some(obj.into()), None)
814 .flatten()
815 {
816 neighbors.push(NeighborItem {
817 direction: "incoming".to_string(),
818 predicate: q.predicate.to_string(),
819 target: q.subject.to_string(),
820 score: 1.0,
821 });
822 }
823 }
824 }
825
826 let result = NeighborsToolResult { neighbors };
827 self.serialize_result(id, result)
828 }
829
830 async fn call_list_triples(
831 &self,
832 id: Option<serde_json::Value>,
833 args: &serde_json::Map<String, serde_json::Value>,
834 ) -> McpResponse {
835 let namespace = args
836 .get("namespace")
837 .and_then(|v| v.as_str())
838 .unwrap_or("default");
839 let limit = args.get("limit").and_then(|v| v.as_u64()).unwrap_or(100) as usize;
840
841 let store = match self.engine.get_store(namespace) {
842 Ok(s) => s,
843 Err(e) => return self.tool_result(id, &e.to_string(), true),
844 };
845
846 let mut triples = Vec::new();
847 for q in store.store.iter().take(limit).flatten() {
848 triples.push(TripleItem {
849 subject: q.subject.to_string(),
850 predicate: q.predicate.to_string(),
851 object: q.object.to_string(),
852 });
853 }
854
855 let result = TriplesToolResult { triples };
856 self.serialize_result(id, result)
857 }
858
859 async fn call_delete_namespace(
860 &self,
861 id: Option<serde_json::Value>,
862 args: &serde_json::Map<String, serde_json::Value>,
863 ) -> McpResponse {
864 let namespace = match args.get("namespace").and_then(|v| v.as_str()) {
865 Some(n) => n,
866 None => return self.error_response(id, -32602, "Missing 'namespace'"),
867 };
868
869 let req = Self::create_request(crate::server::proto::EmptyRequest {
870 namespace: namespace.to_string(),
871 });
872
873 match self.engine.delete_namespace_data(req).await {
874 Ok(resp) => {
875 let inner = resp.into_inner();
876 let result = SimpleSuccessResult {
877 success: inner.success,
878 message: inner.message,
879 };
880 self.serialize_result(id, result)
881 }
882 Err(e) => self.tool_result(id, &e.to_string(), true),
883 }
884 }
885
886 async fn call_ingest_url(
887 &self,
888 id: Option<serde_json::Value>,
889 args: &serde_json::Map<String, serde_json::Value>,
890 ) -> McpResponse {
891 let url = match args.get("url").and_then(|v| v.as_str()) {
892 Some(u) => u,
893 None => return self.error_response(id, -32602, "Missing 'url'"),
894 };
895 let namespace = args
896 .get("namespace")
897 .and_then(|v| v.as_str())
898 .unwrap_or("default");
899
900 let client = reqwest::Client::new();
902 let response = match client.get(url).send().await {
903 Ok(r) => r,
904 Err(e) => return self.tool_result(id, &format!("Failed to fetch URL: {}", e), true),
905 };
906
907 if !response.status().is_success() {
908 return self.tool_result(id, &format!("HTTP error: {}", response.status()), true);
909 }
910
911 let html = match response.text().await {
912 Ok(t) => t,
913 Err(e) => {
914 return self.tool_result(id, &format!("Failed to read response: {}", e), true)
915 }
916 };
917
918 let script_re = regex::Regex::new(r"(?s)<script.*?>.*?</script>").unwrap();
920 let style_re = regex::Regex::new(r"(?s)<style.*?>.*?</style>").unwrap();
921 let tag_re = regex::Regex::new(r"<[^>]*>").unwrap();
922
923 let no_script = script_re.replace_all(&html, " ");
924 let no_style = style_re.replace_all(&no_script, " ");
925 let text_content = tag_re.replace_all(&no_style, " ");
926
927 let text = text_content
928 .split_whitespace()
929 .collect::<Vec<_>>()
930 .join(" ");
931
932 let processor = crate::processor::TextProcessor::new();
934 let chunks = processor.chunk_text(&text, 1000, 150);
935
936 let store = match self.engine.get_store(namespace) {
938 Ok(s) => s,
939 Err(e) => return self.tool_result(id, &e.to_string(), true),
940 };
941
942 if let Some(ref vector_store) = store.vector_store {
943 let mut added_chunks = 0;
944 for (i, chunk) in chunks.iter().enumerate() {
945 let chunk_uri = format!("{}#chunk-{}", url, i);
946 let metadata = serde_json::json!({
948 "uri": chunk_uri,
949 "source_url": url,
950 "type": "web_chunk"
951 });
952 match vector_store.add(&chunk_uri, chunk, metadata).await {
953 Ok(_) => added_chunks += 1,
954 Err(e) => {
955 eprintln!("Failed to add chunk {}: {}", i, e);
956 }
957 }
958 }
959 let result = IngestToolResult {
960 nodes_added: 0,
961 edges_added: 0, message: format!(
963 "Ingested URL: {} ({} chars, {} chunks)",
964 url,
965 text.len(),
966 added_chunks
967 ),
968 };
969 self.serialize_result(id, result)
970 } else {
971 self.tool_result(id, "Vector store not available", true)
972 }
973 }
974
975 async fn call_ingest_text(
976 &self,
977 id: Option<serde_json::Value>,
978 args: &serde_json::Map<String, serde_json::Value>,
979 ) -> McpResponse {
980 let uri = match args.get("uri").and_then(|v| v.as_str()) {
981 Some(u) => u,
982 None => return self.error_response(id, -32602, "Missing 'uri'"),
983 };
984 let content = match args.get("content").and_then(|v| v.as_str()) {
985 Some(c) => c,
986 None => return self.error_response(id, -32602, "Missing 'content'"),
987 };
988 let namespace = args
989 .get("namespace")
990 .and_then(|v| v.as_str())
991 .unwrap_or("default");
992
993 let processor = crate::processor::TextProcessor::new();
995 let chunks = processor.chunk_text(content, 1000, 150);
996
997 let store = match self.engine.get_store(namespace) {
999 Ok(s) => s,
1000 Err(e) => return self.tool_result(id, &e.to_string(), true),
1001 };
1002
1003 if let Some(ref vector_store) = store.vector_store {
1004 let mut added_chunks = 0;
1005 for (i, chunk) in chunks.iter().enumerate() {
1006 let chunk_uri = if chunks.len() > 1 {
1007 format!("{}#chunk-{}", uri, i)
1008 } else {
1009 uri.to_string()
1010 };
1011 let metadata = serde_json::json!({
1012 "uri": uri, "chunk_uri": chunk_uri,
1014 "type": "text_chunk"
1015 });
1016 match vector_store.add(&chunk_uri, chunk, metadata).await {
1017 Ok(_) => added_chunks += 1,
1018 Err(e) => {
1019 eprintln!("Failed to add chunk {}: {}", i, e);
1020 }
1021 }
1022 }
1023 let result = IngestToolResult {
1024 nodes_added: 0,
1025 edges_added: 0,
1026 message: format!(
1027 "Ingested text: {} ({} chars, {} chunks)",
1028 uri,
1029 content.len(),
1030 added_chunks
1031 ),
1032 };
1033 self.serialize_result(id, result)
1034 } else {
1035 self.tool_result(id, "Vector store not available", true)
1036 }
1037 }
1038
1039 async fn call_compact_vectors(
1040 &self,
1041 id: Option<serde_json::Value>,
1042 args: &serde_json::Map<String, serde_json::Value>,
1043 ) -> McpResponse {
1044 let namespace = args
1045 .get("namespace")
1046 .and_then(|v| v.as_str())
1047 .unwrap_or("default");
1048
1049 let store = match self.engine.get_store(namespace) {
1050 Ok(s) => s,
1051 Err(e) => return self.tool_result(id, &e.to_string(), true),
1052 };
1053
1054 if let Some(ref vector_store) = store.vector_store {
1055 match vector_store.compact() {
1056 Ok(removed) => {
1057 let result = SimpleSuccessResult {
1058 success: true,
1059 message: format!("Compaction complete: {} stale entries removed", removed),
1060 };
1061 self.serialize_result(id, result)
1062 }
1063 Err(e) => self.tool_result(id, &format!("Compaction error: {}", e), true),
1064 }
1065 } else {
1066 self.tool_result(id, "Vector store not available", true)
1067 }
1068 }
1069
1070 async fn call_vector_stats(
1071 &self,
1072 id: Option<serde_json::Value>,
1073 args: &serde_json::Map<String, serde_json::Value>,
1074 ) -> McpResponse {
1075 let namespace = args
1076 .get("namespace")
1077 .and_then(|v| v.as_str())
1078 .unwrap_or("default");
1079
1080 let store = match self.engine.get_store(namespace) {
1081 Ok(s) => s,
1082 Err(e) => return self.tool_result(id, &e.to_string(), true),
1083 };
1084
1085 if let Some(ref vector_store) = store.vector_store {
1086 let (active, stale, total) = vector_store.stats();
1087 let result = StatsToolResult {
1088 active_vectors: active,
1089 stale_vectors: stale,
1090 total_embeddings: total,
1091 };
1092 self.serialize_result(id, result)
1093 } else {
1094 self.tool_result(id, "Vector store not available", true)
1095 }
1096 }
1097
1098 async fn call_disambiguate(
1099 &self,
1100 id: Option<serde_json::Value>,
1101 args: &serde_json::Map<String, serde_json::Value>,
1102 ) -> McpResponse {
1103 let namespace = args
1104 .get("namespace")
1105 .and_then(|v| v.as_str())
1106 .unwrap_or("default");
1107 let threshold = args
1108 .get("threshold")
1109 .and_then(|v| v.as_f64())
1110 .unwrap_or(0.8);
1111
1112 let store = match self.engine.get_store(namespace) {
1113 Ok(s) => s,
1114 Err(e) => return self.tool_result(id, &e.to_string(), true),
1115 };
1116
1117 let uri_map = store.uri_to_id.read().unwrap();
1119 let uris: Vec<String> = uri_map.keys().cloned().collect();
1120 drop(uri_map);
1121
1122 let disambiguator = crate::disambiguation::EntityDisambiguator::new(threshold);
1123 let suggestions = disambiguator.suggest_merges(&uris);
1124
1125 let items: Vec<DisambiguationItem> = suggestions
1126 .into_iter()
1127 .map(|(u1, u2, s)| DisambiguationItem {
1128 uri1: u1,
1129 uri2: u2,
1130 similarity: s,
1131 })
1132 .collect();
1133
1134 let message = if items.is_empty() {
1135 "No similar entities found above threshold".to_string()
1136 } else {
1137 format!("Found {} potential duplicates", items.len())
1138 };
1139
1140 let result = DisambiguationResult {
1141 suggestions: items,
1142 message,
1143 };
1144 self.serialize_result(id, result)
1145 }
1146
1147 async fn handle_legacy_ingest(&self, request: McpRequest) -> McpResponse {
1149 let params = match request.params {
1150 Some(p) => p,
1151 None => return self.error_response(request.id, -32602, "Invalid params"),
1152 };
1153
1154 if let (Some(sub), Some(pred), Some(obj)) = (
1155 params.get("subject").and_then(|v| v.as_str()),
1156 params.get("predicate").and_then(|v| v.as_str()),
1157 params.get("object").and_then(|v| v.as_str()),
1158 ) {
1159 let namespace = params
1160 .get("namespace")
1161 .and_then(|v| v.as_str())
1162 .unwrap_or("default");
1163 let triple = Triple {
1164 subject: sub.to_string(),
1165 predicate: pred.to_string(),
1166 object: obj.to_string(),
1167 provenance: Some(Provenance {
1168 source: "mcp".to_string(),
1169 timestamp: "".to_string(),
1170 method: "stdio".to_string(),
1171 }),
1172 embedding: vec![],
1173 };
1174
1175 let req = Self::create_request(IngestRequest {
1176 triples: vec![triple],
1177 namespace: namespace.to_string(),
1178 });
1179
1180 match self.engine.ingest_triples(req).await {
1181 Ok(_) => McpResponse {
1182 jsonrpc: "2.0".to_string(),
1183 id: request.id,
1184 result: Some(serde_json::to_value("Ingested").unwrap()),
1185 error: None,
1186 },
1187 Err(e) => self.error_response(request.id, -32000, &e.to_string()),
1188 }
1189 } else {
1190 self.error_response(request.id, -32602, "Invalid params")
1191 }
1192 }
1193
1194 async fn handle_legacy_ingest_file(&self, request: McpRequest) -> McpResponse {
1195 let params = match request.params {
1196 Some(p) => p,
1197 None => {
1198 return self.error_response(request.id, -32602, "Invalid params: 'path' required")
1199 }
1200 };
1201
1202 if let Some(path) = params.get("path").and_then(|v| v.as_str()) {
1203 let namespace = params
1204 .get("namespace")
1205 .and_then(|v| v.as_str())
1206 .unwrap_or("default");
1207
1208 let req = Self::create_request(IngestFileRequest {
1209 file_path: path.to_string(),
1210 namespace: namespace.to_string(),
1211 });
1212
1213 match self.engine.ingest_file(req).await {
1214 Ok(resp) => {
1215 let inner = resp.into_inner();
1216 McpResponse {
1217 jsonrpc: "2.0".to_string(),
1218 id: request.id,
1219 result: Some(
1220 serde_json::to_value(format!(
1221 "Ingested {} triples from {}",
1222 inner.edges_added, path
1223 ))
1224 .unwrap(),
1225 ),
1226 error: None,
1227 }
1228 }
1229 Err(e) => self.error_response(request.id, -32000, &e.to_string()),
1230 }
1231 } else {
1232 self.error_response(request.id, -32602, "Invalid params: 'path' required")
1233 }
1234 }
1235
1236 fn serialize_result<T: serde::Serialize>(
1237 &self,
1238 id: Option<serde_json::Value>,
1239 result: T,
1240 ) -> McpResponse {
1241 match serde_json::to_string_pretty(&result) {
1242 Ok(json) => self.tool_result(id, &json, false),
1243 Err(e) => self.tool_result(id, &format!("Serialization error: {}", e), true),
1244 }
1245 }
1246
1247 async fn call_get_node_degree(
1248 &self,
1249 id: Option<serde_json::Value>,
1250 args: &serde_json::Map<String, serde_json::Value>,
1251 ) -> McpResponse {
1252 let uri = match args.get("uri").and_then(|v| v.as_str()) {
1253 Some(u) => u,
1254 None => return self.error_response(id, -32602, "Missing 'uri'"),
1255 };
1256 let namespace = args
1257 .get("namespace")
1258 .and_then(|v| v.as_str())
1259 .unwrap_or("default");
1260
1261 let store = match self.engine.get_store(namespace) {
1262 Ok(s) => s,
1263 Err(e) => return self.tool_result(id, &e.to_string(), true),
1264 };
1265
1266 let degree = store.get_degree(uri);
1267
1268 let result = DegreeResult {
1269 uri: uri.to_string(),
1270 degree,
1271 };
1272
1273 self.serialize_result(id, result)
1274 }
1275
1276 fn error_response(
1277 &self,
1278 id: Option<serde_json::Value>,
1279 code: i32,
1280 message: &str,
1281 ) -> McpResponse {
1282 McpResponse {
1283 jsonrpc: "2.0".to_string(),
1284 id,
1285 result: None,
1286 error: Some(McpError {
1287 code,
1288 message: message.to_string(),
1289 data: None,
1290 }),
1291 }
1292 }
1293
1294 fn tool_result(
1295 &self,
1296 id: Option<serde_json::Value>,
1297 text: &str,
1298 is_error: bool,
1299 ) -> McpResponse {
1300 let result = CallToolResult {
1301 content: vec![Content {
1302 content_type: "text".to_string(),
1303 text: text.to_string(),
1304 }],
1305 is_error: if is_error { Some(true) } else { None },
1306 };
1307 McpResponse {
1308 jsonrpc: "2.0".to_string(),
1309 id,
1310 result: Some(serde_json::to_value(result).unwrap()),
1311 error: None,
1312 }
1313 }
1314}