1use crate::mcp_types::{
2 CallToolResult, Content, DegreeResult, DisambiguationItem, DisambiguationResult,
3 IngestToolResult, ListToolsResult, McpError, McpRequest, McpResponse, NeighborItem,
4 NeighborsToolResult, ReasoningToolResult, ScenarioItem, ScenarioListResult, SearchResultItem,
5 SearchToolResult, SimpleSuccessResult, StatsToolResult, Tool, TripleItem, TriplesToolResult,
6};
7use crate::server::proto::semantic_engine_server::SemanticEngine;
8use crate::server::proto::{
9 HybridSearchRequest, IngestFileRequest, IngestRequest, Provenance, ReasoningRequest,
10 ReasoningStrategy, SearchMode, SparqlRequest, Triple,
11};
12use crate::server::MySemanticEngine;
13use jsonschema::JSONSchema;
14use std::sync::Arc;
15use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
16use tonic::Request;
17
18pub struct McpStdioServer {
19 engine: Arc<MySemanticEngine>,
20}
21
22impl McpStdioServer {
23 pub fn new(engine: Arc<MySemanticEngine>) -> Self {
24 Self { engine }
25 }
26
27 pub async fn run(&self) -> Result<(), Box<dyn std::error::Error>> {
28 let mut reader = BufReader::new(tokio::io::stdin());
29 let mut writer = tokio::io::stdout();
30
31 loop {
32 let mut line = String::new();
33 if reader.read_line(&mut line).await? == 0 {
34 break;
35 }
36
37 let trimmed = line.trim();
38 if trimmed.is_empty() {
39 continue;
40 }
41
42 if let Ok(request) = serde_json::from_str::<McpRequest>(trimmed) {
43 let is_notification = request.id.is_none();
44 let response = self.handle_request(request).await;
45
46 if !is_notification {
48 let response_json = serde_json::to_string(&response)? + "\n";
49 writer.write_all(response_json.as_bytes()).await?;
50 writer.flush().await?;
51 }
52 } else {
53 eprintln!("MCP PROTOCOL ERROR: Failed to parse line: {}", trimmed);
55 }
56 }
57
58 self.engine.shutdown().await;
59 Ok(())
60 }
61
62 fn create_request<T>(msg: T) -> Request<T> {
63 let mut req = Request::new(msg);
64
65 let token_opt = std::env::var("SYNAPSE_ADMIN_TOKEN")
67 .or_else(|_| std::env::var("SYNAPSE_MCP_TOKEN"))
68 .ok();
69
70 if let Some(token) = token_opt {
71 if let Ok(val) = format!("Bearer {}", token).parse() {
72 req.metadata_mut().insert("authorization", val);
73 }
74 }
75 req
76 }
77
78 fn get_tools() -> Vec<Tool> {
79 vec![
80 Tool {
81 name: "ingest_triples".to_string(),
82 description: Some(
83 "Ingest one or more RDF triples into the knowledge graph".to_string(),
84 ),
85 input_schema: serde_json::json!({
86 "type": "object",
87 "properties": {
88 "triples": {
89 "type": "array",
90 "items": {
91 "type": "object",
92 "properties": {
93 "subject": { "type": "string" },
94 "predicate": { "type": "string" },
95 "object": { "type": "string" }
96 },
97 "required": ["subject", "predicate", "object"]
98 }
99 },
100 "namespace": { "type": "string", "default": "default" }
101 },
102 "required": ["triples"]
103 }),
104 },
105 Tool {
106 name: "ingest_file".to_string(),
107 description: Some(
108 "Ingest a CSV or Markdown file into the knowledge graph".to_string(),
109 ),
110 input_schema: serde_json::json!({
111 "type": "object",
112 "properties": {
113 "path": { "type": "string", "description": "Path to the file" },
114 "namespace": { "type": "string", "default": "default" }
115 },
116 "required": ["path"]
117 }),
118 },
119 Tool {
120 name: "sparql_query".to_string(),
121 description: Some("Execute a SPARQL query against the knowledge graph".to_string()),
122 input_schema: serde_json::json!({
123 "type": "object",
124 "properties": {
125 "query": { "type": "string", "description": "SPARQL query string" },
126 "namespace": { "type": "string", "default": "default" }
127 },
128 "required": ["query"]
129 }),
130 },
131 Tool {
132 name: "hybrid_search".to_string(),
133 description: Some("Perform a hybrid vector + graph search".to_string()),
134 input_schema: serde_json::json!({
135 "type": "object",
136 "properties": {
137 "query": { "type": "string", "description": "Natural language query" },
138 "namespace": { "type": "string", "default": "default" },
139 "vector_k": { "type": "integer", "default": 10 },
140 "graph_depth": { "type": "integer", "default": 1 },
141 "limit": { "type": "integer", "default": 20 }
142 },
143 "required": ["query"]
144 }),
145 },
146 Tool {
147 name: "apply_reasoning".to_string(),
148 description: Some(
149 "Apply RDFS or OWL-RL reasoning to infer new triples".to_string(),
150 ),
151 input_schema: serde_json::json!({
152 "type": "object",
153 "properties": {
154 "namespace": { "type": "string", "default": "default" },
155 "strategy": { "type": "string", "enum": ["rdfs", "owlrl"], "default": "rdfs" },
156 "materialize": { "type": "boolean", "default": false }
157 }
158 }),
159 },
160 Tool {
161 name: "get_neighbors".to_string(),
162 description: Some(
163 "Get neighboring nodes connected to a given URI in the graph".to_string(),
164 ),
165 input_schema: serde_json::json!({
166 "type": "object",
167 "properties": {
168 "uri": { "type": "string", "description": "URI of the entity to find neighbors for" },
169 "namespace": { "type": "string", "default": "default" },
170 "direction": { "type": "string", "enum": ["outgoing", "incoming", "both"], "default": "outgoing" }
171 },
172 "required": ["uri"]
173 }),
174 },
175 Tool {
176 name: "list_triples".to_string(),
177 description: Some(
178 "List all triples in a namespace (useful for debugging/exploration)"
179 .to_string(),
180 ),
181 input_schema: serde_json::json!({
182 "type": "object",
183 "properties": {
184 "namespace": { "type": "string", "default": "default" },
185 "limit": { "type": "integer", "default": 100 }
186 }
187 }),
188 },
189 Tool {
190 name: "delete_namespace".to_string(),
191 description: Some("Delete all data in a namespace".to_string()),
192 input_schema: serde_json::json!({
193 "type": "object",
194 "properties": {
195 "namespace": { "type": "string", "description": "Namespace to delete" }
196 },
197 "required": ["namespace"]
198 }),
199 },
200 Tool {
201 name: "ingest_url".to_string(),
202 description: Some(
203 "Scrape a web page and add its content to the vector store for RAG retrieval"
204 .to_string(),
205 ),
206 input_schema: serde_json::json!({
207 "type": "object",
208 "properties": {
209 "url": { "type": "string", "description": "URL to scrape and ingest" },
210 "namespace": { "type": "string", "default": "default" }
211 },
212 "required": ["url"]
213 }),
214 },
215 Tool {
216 name: "ingest_text".to_string(),
217 description: Some(
218 "Add arbitrary text content to the vector store for RAG retrieval".to_string(),
219 ),
220 input_schema: serde_json::json!({
221 "type": "object",
222 "properties": {
223 "uri": { "type": "string", "description": "Custom URI identifier for this text" },
224 "content": { "type": "string", "description": "Text content to embed and store" },
225 "namespace": { "type": "string", "default": "default" }
226 },
227 "required": ["uri", "content"]
228 }),
229 },
230 Tool {
231 name: "compact_vectors".to_string(),
232 description: Some("Compact the vector index by removing stale entries".to_string()),
233 input_schema: serde_json::json!({
234 "type": "object",
235 "properties": {
236 "namespace": { "type": "string", "default": "default" }
237 }
238 }),
239 },
240 Tool {
241 name: "vector_stats".to_string(),
242 description: Some("Get vector store statistics (active, stale, total)".to_string()),
243 input_schema: serde_json::json!({
244 "type": "object",
245 "properties": {
246 "namespace": { "type": "string", "default": "default" }
247 }
248 }),
249 },
250 Tool {
251 name: "disambiguate".to_string(),
252 description: Some("Find similar entities that might be duplicates".to_string()),
253 input_schema: serde_json::json!({
254 "type": "object",
255 "properties": {
256 "namespace": { "type": "string", "default": "default" },
257 "threshold": { "type": "number", "default": 0.8, "description": "Similarity threshold 0.0-1.0" }
258 }
259 }),
260 },
261 Tool {
262 name: "get_node_degree".to_string(),
263 description: Some("Get the degree (number of connections) of a node".to_string()),
264 input_schema: serde_json::json!({
265 "type": "object",
266 "properties": {
267 "uri": { "type": "string" },
268 "namespace": { "type": "string", "default": "default" }
269 },
270 "required": ["uri"]
271 }),
272 },
273 Tool {
274 name: "install_ontology".to_string(),
275 description: Some("Download and install an ontology from a URL".to_string()),
276 input_schema: serde_json::json!({
277 "type": "object",
278 "properties": {
279 "url": { "type": "string", "description": "URL of the ontology file (.owl, .ttl)" },
280 "name": { "type": "string", "description": "Name for the local file (e.g. 'legal.owl')" },
281 "namespace": { "type": "string", "default": "default" }
282 },
283 "required": ["url", "name"]
284 }),
285 },
286 Tool {
287 name: "list_scenarios".to_string(),
288 description: Some("List available scenarios in the marketplace".to_string()),
289 input_schema: serde_json::json!({
290 "type": "object",
291 "properties": {}
292 }),
293 },
294 Tool {
295 name: "install_scenario".to_string(),
296 description: Some(
297 "Install a scenario (ontologies, data, docs) from the marketplace".to_string(),
298 ),
299 input_schema: serde_json::json!({
300 "type": "object",
301 "properties": {
302 "name": { "type": "string", "description": "Name of the scenario to install" },
303 "namespace": { "type": "string", "default": "default" }
304 },
305 "required": ["name"]
306 }),
307 },
308 ]
309 }
310
311 pub async fn handle_request(&self, request: McpRequest) -> McpResponse {
312 match request.method.as_str() {
313 "initialize" => {
314 McpResponse {
316 jsonrpc: "2.0".to_string(),
317 id: request.id,
318 result: Some(serde_json::json!({
319 "protocolVersion": "2024-11-05",
320 "capabilities": {
321 "tools": {}
322 },
323 "serverInfo": {
324 "name": "synapse",
325 "version": env!("CARGO_PKG_VERSION")
326 }
327 })),
328 error: None,
329 }
330 }
331 "notifications/initialized" | "initialized" => {
332 McpResponse {
334 jsonrpc: "2.0".to_string(),
335 id: request.id,
336 result: Some(serde_json::json!({})),
337 error: None,
338 }
339 }
340 "tools/list" => {
341 let result = ListToolsResult {
342 tools: Self::get_tools(),
343 };
344 McpResponse {
345 jsonrpc: "2.0".to_string(),
346 id: request.id,
347 result: Some(serde_json::to_value(result).unwrap()),
348 error: None,
349 }
350 }
351 "tools/call" => self.handle_tool_call(request).await,
352 "ingest" => self.handle_legacy_ingest(request).await,
354 "ingest_file" => self.handle_legacy_ingest_file(request).await,
355 _ => McpResponse {
356 jsonrpc: "2.0".to_string(),
357 id: request.id,
358 result: None,
359 error: Some(McpError {
360 code: -32601,
361 message: format!("Method not found: {}", request.method),
362 data: None,
363 }),
364 },
365 }
366 }
367
368 fn validate_arguments(tool_name: &str, arguments: &serde_json::Value) -> Result<(), String> {
369 let tools = Self::get_tools();
370 if let Some(tool) = tools.iter().find(|t| t.name == tool_name) {
371 if let Ok(schema) = JSONSchema::compile(&tool.input_schema) {
372 if let Err(errors) = schema.validate(arguments) {
373 let error_msg = errors.map(|e| e.to_string()).collect::<Vec<_>>().join(", ");
374 return Err(format!("Validation error: {}", error_msg));
375 }
376 } else {
377 return Err("Invalid tool schema definition".to_string());
378 }
379 }
380 Ok(())
381 }
382
383 async fn handle_tool_call(&self, request: McpRequest) -> McpResponse {
384 let params = match request.params {
385 Some(serde_json::Value::Object(map)) => map,
386 Some(_) => return self.error_response(request.id, -32602, "Params must be an object"),
387 None => return self.error_response(request.id, -32602, "Missing params"),
388 };
389
390 let tool_name = match params.get("name").and_then(|v| v.as_str()) {
391 Some(n) => n,
392 None => return self.error_response(request.id, -32602, "Missing tool name"),
393 };
394
395 let arguments = params
396 .get("arguments")
397 .and_then(|v| v.as_object())
398 .cloned()
399 .unwrap_or_default();
400
401 let args_value = serde_json::Value::Object(arguments.clone());
402 if let Err(e) = Self::validate_arguments(tool_name, &args_value) {
403 return self.error_response(request.id, -32602, &e);
404 }
405
406 match tool_name {
407 "ingest_triples" => self.call_ingest_triples(request.id, &arguments).await,
408 "ingest_file" => self.call_ingest_file(request.id, &arguments).await,
409 "sparql_query" => self.call_sparql_query(request.id, &arguments).await,
410 "hybrid_search" => self.call_hybrid_search(request.id, &arguments).await,
411 "apply_reasoning" => self.call_apply_reasoning(request.id, &arguments).await,
412 "get_neighbors" => self.call_get_neighbors(request.id, &arguments).await,
413 "list_triples" => self.call_list_triples(request.id, &arguments).await,
414 "delete_namespace" => self.call_delete_namespace(request.id, &arguments).await,
415 "ingest_url" => self.call_ingest_url(request.id, &arguments).await,
416 "ingest_text" => self.call_ingest_text(request.id, &arguments).await,
417 "compact_vectors" => self.call_compact_vectors(request.id, &arguments).await,
418 "vector_stats" => self.call_vector_stats(request.id, &arguments).await,
419 "disambiguate" => self.call_disambiguate(request.id, &arguments).await,
420 "get_node_degree" => self.call_get_node_degree(request.id, &arguments).await,
421 "install_ontology" => self.call_install_ontology(request.id, &arguments).await,
422 "list_scenarios" => self.call_list_scenarios(request.id).await,
423 "install_scenario" => self.call_install_scenario(request.id, &arguments).await,
424 _ => self.error_response(request.id, -32602, &format!("Unknown tool: {}", tool_name)),
425 }
426 }
427
428 async fn call_list_scenarios(&self, id: Option<serde_json::Value>) -> McpResponse {
429 match self.engine.scenario_manager.list_scenarios().await {
430 Ok(registry) => {
431 let items: Vec<ScenarioItem> = registry
432 .into_iter()
433 .map(|e| ScenarioItem {
434 name: e.name,
435 description: e.description,
436 version: e.version,
437 })
438 .collect();
439 self.serialize_result(id, ScenarioListResult { scenarios: items })
440 }
441 Err(e) => self.tool_result(id, &e.to_string(), true),
442 }
443 }
444
445 async fn call_install_scenario(
446 &self,
447 id: Option<serde_json::Value>,
448 args: &serde_json::Map<String, serde_json::Value>,
449 ) -> McpResponse {
450 let name = match args.get("name").and_then(|v| v.as_str()) {
451 Some(n) => n,
452 None => return self.error_response(id, -32602, "Missing 'name'"),
453 };
454 let namespace = args
455 .get("namespace")
456 .and_then(|v| v.as_str())
457 .unwrap_or("default");
458
459 match self.engine.install_scenario(name, namespace).await {
460 Ok(msg) => {
461 let result = SimpleSuccessResult {
462 success: true,
463 message: msg,
464 };
465 self.serialize_result(id, result)
466 }
467 Err(e) => self.tool_result(id, &e, true),
468 }
469 }
470
471 async fn call_install_ontology(
472 &self,
473 id: Option<serde_json::Value>,
474 args: &serde_json::Map<String, serde_json::Value>,
475 ) -> McpResponse {
476 let url = match args.get("url").and_then(|v| v.as_str()) {
477 Some(u) => u,
478 None => return self.error_response(id, -32602, "Missing 'url'"),
479 };
480 let name = match args.get("name").and_then(|v| v.as_str()) {
481 Some(n) => n,
482 None => return self.error_response(id, -32602, "Missing 'name'"),
483 };
484 let namespace = args
485 .get("namespace")
486 .and_then(|v| v.as_str())
487 .unwrap_or("default");
488
489 let response = match reqwest::get(url).await {
491 Ok(r) => r,
492 Err(e) => {
493 return self.tool_result(id, &format!("Failed to download ontology: {}", e), true)
494 }
495 };
496
497 if !response.status().is_success() {
498 return self.tool_result(id, &format!("HTTP error: {}", response.status()), true);
499 }
500
501 let content = match response.text().await {
502 Ok(t) => t,
503 Err(e) => {
504 return self.tool_result(id, &format!("Failed to read response: {}", e), true)
505 }
506 };
507
508 let ontology_dir = std::path::Path::new("ontology");
510 if !ontology_dir.exists() {
511 if let Err(e) = std::fs::create_dir(ontology_dir) {
512 return self.tool_result(
513 id,
514 &format!("Failed to create ontology dir: {}", e),
515 true,
516 );
517 }
518 }
519
520 let file_name = std::path::Path::new(name)
522 .file_name()
523 .and_then(|n| n.to_str())
524 .ok_or_else(|| "Invalid filename".to_string());
525
526 let clean_name = match file_name {
527 Ok(n) => n,
528 Err(e) => return self.tool_result(id, &format!("Security error: {}", e), true),
529 };
530
531 if clean_name != name {
532 return self.tool_result(
533 id,
534 "Security error: Filename contains path components",
535 true,
536 );
537 }
538
539 let path = ontology_dir.join(clean_name);
540 if let Err(e) = std::fs::write(&path, content) {
541 return self.tool_result(id, &format!("Failed to save ontology file: {}", e), true);
542 }
543
544 let store = match self.engine.get_store(namespace) {
546 Ok(s) => s,
547 Err(e) => return self.tool_result(id, &e.to_string(), true),
548 };
549
550 match crate::ingest::ontology::OntologyLoader::load_file(&store, &path).await {
551 Ok(count) => {
552 let result = SimpleSuccessResult {
553 success: true,
554 message: format!("Installed ontology '{}' and loaded {} triples", name, count),
555 };
556 self.serialize_result(id, result)
557 }
558 Err(e) => self.tool_result(id, &format!("Failed to load ontology: {}", e), true),
559 }
560 }
561
562 async fn call_ingest_triples(
563 &self,
564 id: Option<serde_json::Value>,
565 args: &serde_json::Map<String, serde_json::Value>,
566 ) -> McpResponse {
567 let namespace = args
568 .get("namespace")
569 .and_then(|v| v.as_str())
570 .unwrap_or("default");
571 let triples_array = match args.get("triples").and_then(|v| v.as_array()) {
572 Some(t) => t,
573 None => return self.error_response(id, -32602, "Missing 'triples' array"),
574 };
575
576 let mut triples = Vec::new();
577 for t in triples_array {
578 if let (Some(s), Some(p), Some(o)) = (
579 t.get("subject").and_then(|v| v.as_str()),
580 t.get("predicate").and_then(|v| v.as_str()),
581 t.get("object").and_then(|v| v.as_str()),
582 ) {
583 triples.push(Triple {
584 subject: s.to_string(),
585 predicate: p.to_string(),
586 object: o.to_string(),
587 provenance: Some(Provenance {
588 source: "mcp".to_string(),
589 timestamp: "".to_string(),
590 method: "tools/call".to_string(),
591 }),
592 embedding: vec![],
593 });
594 }
595 }
596
597 let req = Self::create_request(IngestRequest {
598 triples,
599 namespace: namespace.to_string(),
600 });
601
602 match self.engine.ingest_triples(req).await {
603 Ok(resp) => {
604 let inner = resp.into_inner();
605 let result = IngestToolResult {
606 nodes_added: inner.nodes_added,
607 edges_added: inner.edges_added,
608 message: format!("Ingested {} triples", inner.edges_added),
609 };
610 self.serialize_result(id, result)
611 }
612 Err(e) => self.tool_result(id, &e.to_string(), true),
613 }
614 }
615
616 async fn call_ingest_file(
617 &self,
618 id: Option<serde_json::Value>,
619 args: &serde_json::Map<String, serde_json::Value>,
620 ) -> McpResponse {
621 let path = match args.get("path").and_then(|v| v.as_str()) {
622 Some(p) => p,
623 None => return self.error_response(id, -32602, "Missing 'path'"),
624 };
625 let namespace = args
626 .get("namespace")
627 .and_then(|v| v.as_str())
628 .unwrap_or("default");
629
630 let req = Self::create_request(IngestFileRequest {
631 file_path: path.to_string(),
632 namespace: namespace.to_string(),
633 });
634
635 match self.engine.ingest_file(req).await {
636 Ok(resp) => {
637 let inner = resp.into_inner();
638 let result = IngestToolResult {
639 nodes_added: inner.nodes_added,
640 edges_added: inner.edges_added,
641 message: format!("Ingested {} triples from {}", inner.edges_added, path),
642 };
643 self.serialize_result(id, result)
644 }
645 Err(e) => self.tool_result(id, &e.to_string(), true),
646 }
647 }
648
649 async fn call_sparql_query(
650 &self,
651 id: Option<serde_json::Value>,
652 args: &serde_json::Map<String, serde_json::Value>,
653 ) -> McpResponse {
654 let query = match args.get("query").and_then(|v| v.as_str()) {
655 Some(q) => q,
656 None => return self.error_response(id, -32602, "Missing 'query'"),
657 };
658 let namespace = args
659 .get("namespace")
660 .and_then(|v| v.as_str())
661 .unwrap_or("default");
662
663 let req = Self::create_request(SparqlRequest {
664 query: query.to_string(),
665 namespace: namespace.to_string(),
666 });
667
668 match self.engine.query_sparql(req).await {
669 Ok(resp) => self.tool_result(id, &resp.into_inner().results_json, false),
670 Err(e) => self.tool_result(id, &e.to_string(), true),
671 }
672 }
673
674 async fn call_hybrid_search(
675 &self,
676 id: Option<serde_json::Value>,
677 args: &serde_json::Map<String, serde_json::Value>,
678 ) -> McpResponse {
679 let query = match args.get("query").and_then(|v| v.as_str()) {
680 Some(q) => q,
681 None => return self.error_response(id, -32602, "Missing 'query'"),
682 };
683 let namespace = args
684 .get("namespace")
685 .and_then(|v| v.as_str())
686 .unwrap_or("default");
687 let vector_k = args.get("vector_k").and_then(|v| v.as_u64()).unwrap_or(10) as u32;
688 let graph_depth = args
689 .get("graph_depth")
690 .and_then(|v| v.as_u64())
691 .unwrap_or(1) as u32;
692 let limit = args.get("limit").and_then(|v| v.as_u64()).unwrap_or(20) as u32;
693
694 let req = Self::create_request(HybridSearchRequest {
695 query: query.to_string(),
696 namespace: namespace.to_string(),
697 vector_k,
698 graph_depth,
699 mode: SearchMode::Hybrid as i32,
700 limit,
701 });
702
703 match self.engine.hybrid_search(req).await {
704 Ok(resp) => {
705 let results = resp.into_inner().results;
706 let items: Vec<SearchResultItem> = results
707 .into_iter()
708 .map(|r| SearchResultItem {
709 node_id: r.node_id,
710 score: r.score,
711 content: r.content,
712 uri: r.uri,
713 })
714 .collect();
715
716 let result = SearchToolResult { results: items };
717 self.serialize_result(id, result)
718 }
719 Err(e) => self.tool_result(id, &e.to_string(), true),
720 }
721 }
722
723 async fn call_apply_reasoning(
724 &self,
725 id: Option<serde_json::Value>,
726 args: &serde_json::Map<String, serde_json::Value>,
727 ) -> McpResponse {
728 let namespace = args
729 .get("namespace")
730 .and_then(|v| v.as_str())
731 .unwrap_or("default");
732 let strategy_str = args
733 .get("strategy")
734 .and_then(|v| v.as_str())
735 .unwrap_or("rdfs");
736 let materialize = args
737 .get("materialize")
738 .and_then(|v| v.as_bool())
739 .unwrap_or(false);
740
741 let strategy = match strategy_str.to_lowercase().as_str() {
742 "owlrl" | "owl-rl" => ReasoningStrategy::Owlrl as i32,
743 _ => ReasoningStrategy::Rdfs as i32,
744 };
745
746 let req = Self::create_request(ReasoningRequest {
747 namespace: namespace.to_string(),
748 strategy,
749 materialize,
750 });
751
752 match self.engine.apply_reasoning(req).await {
753 Ok(resp) => {
754 let inner = resp.into_inner();
755 let result = ReasoningToolResult {
756 success: inner.success,
757 triples_inferred: inner.triples_inferred,
758 message: inner.message,
759 };
760 self.serialize_result(id, result)
761 }
762 Err(e) => self.tool_result(id, &e.to_string(), true),
763 }
764 }
765
766 async fn call_get_neighbors(
767 &self,
768 id: Option<serde_json::Value>,
769 args: &serde_json::Map<String, serde_json::Value>,
770 ) -> McpResponse {
771 let uri = match args.get("uri").and_then(|v| v.as_str()) {
772 Some(u) => u,
773 None => return self.error_response(id, -32602, "Missing 'uri'"),
774 };
775 let namespace = args
776 .get("namespace")
777 .and_then(|v| v.as_str())
778 .unwrap_or("default");
779 let direction = args
780 .get("direction")
781 .and_then(|v| v.as_str())
782 .unwrap_or("outgoing");
783
784 let store = match self.engine.get_store(namespace) {
785 Ok(s) => s,
786 Err(e) => return self.tool_result(id, &e.to_string(), true),
787 };
788
789 let mut neighbors = Vec::new();
790
791 if direction == "outgoing" || direction == "both" {
793 if let Ok(subj) = oxigraph::model::NamedNodeRef::new(uri) {
794 for q in store
795 .store
796 .quads_for_pattern(Some(subj.into()), None, None, None)
797 .flatten()
798 {
799 neighbors.push(NeighborItem {
800 direction: "outgoing".to_string(),
801 predicate: q.predicate.to_string(),
802 target: q.object.to_string(),
803 score: 1.0,
804 });
805 }
806 }
807 }
808
809 if direction == "incoming" || direction == "both" {
811 if let Ok(obj) = oxigraph::model::NamedNodeRef::new(uri) {
812 for q in store
813 .store
814 .quads_for_pattern(None, None, Some(obj.into()), None)
815 .flatten()
816 {
817 neighbors.push(NeighborItem {
818 direction: "incoming".to_string(),
819 predicate: q.predicate.to_string(),
820 target: q.subject.to_string(),
821 score: 1.0,
822 });
823 }
824 }
825 }
826
827 let result = NeighborsToolResult { neighbors };
828 self.serialize_result(id, result)
829 }
830
831 async fn call_list_triples(
832 &self,
833 id: Option<serde_json::Value>,
834 args: &serde_json::Map<String, serde_json::Value>,
835 ) -> McpResponse {
836 let namespace = args
837 .get("namespace")
838 .and_then(|v| v.as_str())
839 .unwrap_or("default");
840 let limit = args.get("limit").and_then(|v| v.as_u64()).unwrap_or(100) as usize;
841
842 let store = match self.engine.get_store(namespace) {
843 Ok(s) => s,
844 Err(e) => return self.tool_result(id, &e.to_string(), true),
845 };
846
847 let mut triples = Vec::new();
848 for q in store.store.iter().take(limit).flatten() {
849 triples.push(TripleItem {
850 subject: q.subject.to_string(),
851 predicate: q.predicate.to_string(),
852 object: q.object.to_string(),
853 });
854 }
855
856 let result = TriplesToolResult { triples };
857 self.serialize_result(id, result)
858 }
859
860 async fn call_delete_namespace(
861 &self,
862 id: Option<serde_json::Value>,
863 args: &serde_json::Map<String, serde_json::Value>,
864 ) -> McpResponse {
865 let namespace = match args.get("namespace").and_then(|v| v.as_str()) {
866 Some(n) => n,
867 None => return self.error_response(id, -32602, "Missing 'namespace'"),
868 };
869
870 let req = Self::create_request(crate::server::proto::EmptyRequest {
871 namespace: namespace.to_string(),
872 });
873
874 match self.engine.delete_namespace_data(req).await {
875 Ok(resp) => {
876 let inner = resp.into_inner();
877 let result = SimpleSuccessResult {
878 success: inner.success,
879 message: inner.message,
880 };
881 self.serialize_result(id, result)
882 }
883 Err(e) => self.tool_result(id, &e.to_string(), true),
884 }
885 }
886
887 async fn call_ingest_url(
888 &self,
889 id: Option<serde_json::Value>,
890 args: &serde_json::Map<String, serde_json::Value>,
891 ) -> McpResponse {
892 let url = match args.get("url").and_then(|v| v.as_str()) {
893 Some(u) => u,
894 None => return self.error_response(id, -32602, "Missing 'url'"),
895 };
896 let namespace = args
897 .get("namespace")
898 .and_then(|v| v.as_str())
899 .unwrap_or("default");
900
901 let client = reqwest::Client::new();
903 let response = match client.get(url).send().await {
904 Ok(r) => r,
905 Err(e) => return self.tool_result(id, &format!("Failed to fetch URL: {}", e), true),
906 };
907
908 if !response.status().is_success() {
909 return self.tool_result(id, &format!("HTTP error: {}", response.status()), true);
910 }
911
912 let html = match response.text().await {
913 Ok(t) => t,
914 Err(e) => {
915 return self.tool_result(id, &format!("Failed to read response: {}", e), true)
916 }
917 };
918
919 let script_re = regex::Regex::new(r"(?s)<script.*?>.*?</script>").unwrap();
921 let style_re = regex::Regex::new(r"(?s)<style.*?>.*?</style>").unwrap();
922 let tag_re = regex::Regex::new(r"<[^>]*>").unwrap();
923
924 let no_script = script_re.replace_all(&html, " ");
925 let no_style = style_re.replace_all(&no_script, " ");
926 let text_content = tag_re.replace_all(&no_style, " ");
927
928 let text = text_content
929 .split_whitespace()
930 .collect::<Vec<_>>()
931 .join(" ");
932
933 let processor = crate::processor::TextProcessor::new();
935 let chunks = processor.chunk_text(&text, 1000, 150);
936
937 let store = match self.engine.get_store(namespace) {
939 Ok(s) => s,
940 Err(e) => return self.tool_result(id, &e.to_string(), true),
941 };
942
943 if let Some(ref vector_store) = store.vector_store {
944 let mut added_chunks = 0;
945 for (i, chunk) in chunks.iter().enumerate() {
946 let chunk_uri = format!("{}#chunk-{}", url, i);
947 let metadata = serde_json::json!({
949 "uri": chunk_uri,
950 "source_url": url,
951 "type": "web_chunk"
952 });
953 match vector_store.add(&chunk_uri, chunk, metadata).await {
954 Ok(_) => added_chunks += 1,
955 Err(e) => {
956 eprintln!("Failed to add chunk {}: {}", i, e);
957 }
958 }
959 }
960 let result = IngestToolResult {
961 nodes_added: 0,
962 edges_added: 0, message: format!(
964 "Ingested URL: {} ({} chars, {} chunks)",
965 url,
966 text.len(),
967 added_chunks
968 ),
969 };
970 self.serialize_result(id, result)
971 } else {
972 self.tool_result(id, "Vector store not available", true)
973 }
974 }
975
976 async fn call_ingest_text(
977 &self,
978 id: Option<serde_json::Value>,
979 args: &serde_json::Map<String, serde_json::Value>,
980 ) -> McpResponse {
981 let uri = match args.get("uri").and_then(|v| v.as_str()) {
982 Some(u) => u,
983 None => return self.error_response(id, -32602, "Missing 'uri'"),
984 };
985 let content = match args.get("content").and_then(|v| v.as_str()) {
986 Some(c) => c,
987 None => return self.error_response(id, -32602, "Missing 'content'"),
988 };
989 let namespace = args
990 .get("namespace")
991 .and_then(|v| v.as_str())
992 .unwrap_or("default");
993
994 let processor = crate::processor::TextProcessor::new();
996 let chunks = processor.chunk_text(content, 1000, 150);
997
998 let store = match self.engine.get_store(namespace) {
1000 Ok(s) => s,
1001 Err(e) => return self.tool_result(id, &e.to_string(), true),
1002 };
1003
1004 if let Some(ref vector_store) = store.vector_store {
1005 let mut added_chunks = 0;
1006 for (i, chunk) in chunks.iter().enumerate() {
1007 let chunk_uri = if chunks.len() > 1 {
1008 format!("{}#chunk-{}", uri, i)
1009 } else {
1010 uri.to_string()
1011 };
1012 let metadata = serde_json::json!({
1013 "uri": uri, "chunk_uri": chunk_uri,
1015 "type": "text_chunk"
1016 });
1017 match vector_store.add(&chunk_uri, chunk, metadata).await {
1018 Ok(_) => added_chunks += 1,
1019 Err(e) => {
1020 eprintln!("Failed to add chunk {}: {}", i, e);
1021 }
1022 }
1023 }
1024 let result = IngestToolResult {
1025 nodes_added: 0,
1026 edges_added: 0,
1027 message: format!(
1028 "Ingested text: {} ({} chars, {} chunks)",
1029 uri,
1030 content.len(),
1031 added_chunks
1032 ),
1033 };
1034 self.serialize_result(id, result)
1035 } else {
1036 self.tool_result(id, "Vector store not available", true)
1037 }
1038 }
1039
1040 async fn call_compact_vectors(
1041 &self,
1042 id: Option<serde_json::Value>,
1043 args: &serde_json::Map<String, serde_json::Value>,
1044 ) -> McpResponse {
1045 let namespace = args
1046 .get("namespace")
1047 .and_then(|v| v.as_str())
1048 .unwrap_or("default");
1049
1050 let store = match self.engine.get_store(namespace) {
1051 Ok(s) => s,
1052 Err(e) => return self.tool_result(id, &e.to_string(), true),
1053 };
1054
1055 if let Some(ref vector_store) = store.vector_store {
1056 match vector_store.compact() {
1057 Ok(removed) => {
1058 let result = SimpleSuccessResult {
1059 success: true,
1060 message: format!("Compaction complete: {} stale entries removed", removed),
1061 };
1062 self.serialize_result(id, result)
1063 }
1064 Err(e) => self.tool_result(id, &format!("Compaction error: {}", e), true),
1065 }
1066 } else {
1067 self.tool_result(id, "Vector store not available", true)
1068 }
1069 }
1070
1071 async fn call_vector_stats(
1072 &self,
1073 id: Option<serde_json::Value>,
1074 args: &serde_json::Map<String, serde_json::Value>,
1075 ) -> McpResponse {
1076 let namespace = args
1077 .get("namespace")
1078 .and_then(|v| v.as_str())
1079 .unwrap_or("default");
1080
1081 let store = match self.engine.get_store(namespace) {
1082 Ok(s) => s,
1083 Err(e) => return self.tool_result(id, &e.to_string(), true),
1084 };
1085
1086 if let Some(ref vector_store) = store.vector_store {
1087 let (active, stale, total) = vector_store.stats();
1088 let result = StatsToolResult {
1089 active_vectors: active,
1090 stale_vectors: stale,
1091 total_embeddings: total,
1092 };
1093 self.serialize_result(id, result)
1094 } else {
1095 self.tool_result(id, "Vector store not available", true)
1096 }
1097 }
1098
1099 async fn call_disambiguate(
1100 &self,
1101 id: Option<serde_json::Value>,
1102 args: &serde_json::Map<String, serde_json::Value>,
1103 ) -> McpResponse {
1104 let namespace = args
1105 .get("namespace")
1106 .and_then(|v| v.as_str())
1107 .unwrap_or("default");
1108 let threshold = args
1109 .get("threshold")
1110 .and_then(|v| v.as_f64())
1111 .unwrap_or(0.8);
1112
1113 let store = match self.engine.get_store(namespace) {
1114 Ok(s) => s,
1115 Err(e) => return self.tool_result(id, &e.to_string(), true),
1116 };
1117
1118 let uri_map = store.uri_to_id.read().unwrap();
1120 let uris: Vec<String> = uri_map.keys().cloned().collect();
1121 drop(uri_map);
1122
1123 let disambiguator = crate::disambiguation::EntityDisambiguator::new(threshold);
1124 let suggestions = disambiguator.suggest_merges(&uris);
1125
1126 let items: Vec<DisambiguationItem> = suggestions
1127 .into_iter()
1128 .map(|(u1, u2, s)| DisambiguationItem {
1129 uri1: u1,
1130 uri2: u2,
1131 similarity: s,
1132 })
1133 .collect();
1134
1135 let message = if items.is_empty() {
1136 "No similar entities found above threshold".to_string()
1137 } else {
1138 format!("Found {} potential duplicates", items.len())
1139 };
1140
1141 let result = DisambiguationResult {
1142 suggestions: items,
1143 message,
1144 };
1145 self.serialize_result(id, result)
1146 }
1147
1148 async fn handle_legacy_ingest(&self, request: McpRequest) -> McpResponse {
1150 let params = match request.params {
1151 Some(p) => p,
1152 None => return self.error_response(request.id, -32602, "Invalid params"),
1153 };
1154
1155 if let (Some(sub), Some(pred), Some(obj)) = (
1156 params.get("subject").and_then(|v| v.as_str()),
1157 params.get("predicate").and_then(|v| v.as_str()),
1158 params.get("object").and_then(|v| v.as_str()),
1159 ) {
1160 let namespace = params
1161 .get("namespace")
1162 .and_then(|v| v.as_str())
1163 .unwrap_or("default");
1164 let triple = Triple {
1165 subject: sub.to_string(),
1166 predicate: pred.to_string(),
1167 object: obj.to_string(),
1168 provenance: Some(Provenance {
1169 source: "mcp".to_string(),
1170 timestamp: "".to_string(),
1171 method: "stdio".to_string(),
1172 }),
1173 embedding: vec![],
1174 };
1175
1176 let req = Self::create_request(IngestRequest {
1177 triples: vec![triple],
1178 namespace: namespace.to_string(),
1179 });
1180
1181 match self.engine.ingest_triples(req).await {
1182 Ok(_) => McpResponse {
1183 jsonrpc: "2.0".to_string(),
1184 id: request.id,
1185 result: Some(serde_json::to_value("Ingested").unwrap()),
1186 error: None,
1187 },
1188 Err(e) => self.error_response(request.id, -32000, &e.to_string()),
1189 }
1190 } else {
1191 self.error_response(request.id, -32602, "Invalid params")
1192 }
1193 }
1194
1195 async fn handle_legacy_ingest_file(&self, request: McpRequest) -> McpResponse {
1196 let params = match request.params {
1197 Some(p) => p,
1198 None => {
1199 return self.error_response(request.id, -32602, "Invalid params: 'path' required")
1200 }
1201 };
1202
1203 if let Some(path) = params.get("path").and_then(|v| v.as_str()) {
1204 let namespace = params
1205 .get("namespace")
1206 .and_then(|v| v.as_str())
1207 .unwrap_or("default");
1208
1209 let req = Self::create_request(IngestFileRequest {
1210 file_path: path.to_string(),
1211 namespace: namespace.to_string(),
1212 });
1213
1214 match self.engine.ingest_file(req).await {
1215 Ok(resp) => {
1216 let inner = resp.into_inner();
1217 McpResponse {
1218 jsonrpc: "2.0".to_string(),
1219 id: request.id,
1220 result: Some(
1221 serde_json::to_value(format!(
1222 "Ingested {} triples from {}",
1223 inner.edges_added, path
1224 ))
1225 .unwrap(),
1226 ),
1227 error: None,
1228 }
1229 }
1230 Err(e) => self.error_response(request.id, -32000, &e.to_string()),
1231 }
1232 } else {
1233 self.error_response(request.id, -32602, "Invalid params: 'path' required")
1234 }
1235 }
1236
1237 fn serialize_result<T: serde::Serialize>(
1238 &self,
1239 id: Option<serde_json::Value>,
1240 result: T,
1241 ) -> McpResponse {
1242 match serde_json::to_string_pretty(&result) {
1243 Ok(json) => self.tool_result(id, &json, false),
1244 Err(e) => self.tool_result(id, &format!("Serialization error: {}", e), true),
1245 }
1246 }
1247
1248 async fn call_get_node_degree(
1249 &self,
1250 id: Option<serde_json::Value>,
1251 args: &serde_json::Map<String, serde_json::Value>,
1252 ) -> McpResponse {
1253 let uri = match args.get("uri").and_then(|v| v.as_str()) {
1254 Some(u) => u,
1255 None => return self.error_response(id, -32602, "Missing 'uri'"),
1256 };
1257 let namespace = args
1258 .get("namespace")
1259 .and_then(|v| v.as_str())
1260 .unwrap_or("default");
1261
1262 let store = match self.engine.get_store(namespace) {
1263 Ok(s) => s,
1264 Err(e) => return self.tool_result(id, &e.to_string(), true),
1265 };
1266
1267 let degree = store.get_degree(uri);
1268
1269 let result = DegreeResult {
1270 uri: uri.to_string(),
1271 degree,
1272 };
1273
1274 self.serialize_result(id, result)
1275 }
1276
1277 fn error_response(
1278 &self,
1279 id: Option<serde_json::Value>,
1280 code: i32,
1281 message: &str,
1282 ) -> McpResponse {
1283 McpResponse {
1284 jsonrpc: "2.0".to_string(),
1285 id,
1286 result: None,
1287 error: Some(McpError {
1288 code,
1289 message: message.to_string(),
1290 data: None,
1291 }),
1292 }
1293 }
1294
1295 fn tool_result(
1296 &self,
1297 id: Option<serde_json::Value>,
1298 text: &str,
1299 is_error: bool,
1300 ) -> McpResponse {
1301 let result = CallToolResult {
1302 content: vec![Content {
1303 content_type: "text".to_string(),
1304 text: text.to_string(),
1305 }],
1306 is_error: if is_error { Some(true) } else { None },
1307 };
1308 McpResponse {
1309 jsonrpc: "2.0".to_string(),
1310 id,
1311 result: Some(serde_json::to_value(result).unwrap()),
1312 error: None,
1313 }
1314 }
1315}