1use crate::mcp_types::{
2 CallToolResult, Content, DegreeResult, DisambiguationItem, DisambiguationResult,
3 IngestToolResult, ListToolsResult, McpError, McpRequest, McpResponse, NeighborItem,
4 NeighborsToolResult, ReasoningToolResult, ScenarioItem, ScenarioListResult, SearchResultItem,
5 SearchToolResult, SimpleSuccessResult, StatsToolResult, Tool, TripleItem, TriplesToolResult,
6};
7use crate::server::proto::semantic_engine_server::SemanticEngine;
8use crate::server::proto::{
9 HybridSearchRequest, IngestFileRequest, IngestRequest, Provenance, ReasoningRequest,
10 ReasoningStrategy, SearchMode, SparqlRequest, Triple,
11};
12use crate::server::MySemanticEngine;
13use jsonschema::JSONSchema;
14use std::sync::Arc;
15use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
16use tonic::Request;
17
18pub struct McpStdioServer {
19 engine: Arc<MySemanticEngine>,
20}
21
22impl McpStdioServer {
23 pub fn new(engine: Arc<MySemanticEngine>) -> Self {
24 Self { engine }
25 }
26
27 pub async fn run(&self) -> Result<(), Box<dyn std::error::Error>> {
28 let mut reader = BufReader::new(tokio::io::stdin());
29 let mut writer = tokio::io::stdout();
30
31 loop {
32 let mut line = String::new();
33 if reader.read_line(&mut line).await? == 0 {
34 break;
35 }
36
37 let trimmed = line.trim();
38 if trimmed.is_empty() {
39 continue;
40 }
41
42 if let Ok(request) = serde_json::from_str::<McpRequest>(trimmed) {
43 let response = self.handle_request(request).await;
44 let response_json = serde_json::to_string(&response)? + "\n";
45 writer.write_all(response_json.as_bytes()).await?;
46 writer.flush().await?;
47 }
48 }
49
50 Ok(())
51 }
52
53 fn create_request<T>(msg: T) -> Request<T> {
54 let mut req = Request::new(msg);
55
56 let token_opt = std::env::var("SYNAPSE_ADMIN_TOKEN")
58 .or_else(|_| std::env::var("SYNAPSE_MCP_TOKEN"))
59 .ok();
60
61 if let Some(token) = token_opt {
62 if let Ok(val) = format!("Bearer {}", token).parse() {
63 req.metadata_mut().insert("authorization", val);
64 }
65 }
66 req
67 }
68
69 fn get_tools() -> Vec<Tool> {
70 vec![
71 Tool {
72 name: "ingest_triples".to_string(),
73 description: Some(
74 "Ingest one or more RDF triples into the knowledge graph".to_string(),
75 ),
76 input_schema: serde_json::json!({
77 "type": "object",
78 "properties": {
79 "triples": {
80 "type": "array",
81 "items": {
82 "type": "object",
83 "properties": {
84 "subject": { "type": "string" },
85 "predicate": { "type": "string" },
86 "object": { "type": "string" }
87 },
88 "required": ["subject", "predicate", "object"]
89 }
90 },
91 "namespace": { "type": "string", "default": "default" }
92 },
93 "required": ["triples"]
94 }),
95 },
96 Tool {
97 name: "ingest_file".to_string(),
98 description: Some(
99 "Ingest a CSV or Markdown file into the knowledge graph".to_string(),
100 ),
101 input_schema: serde_json::json!({
102 "type": "object",
103 "properties": {
104 "path": { "type": "string", "description": "Path to the file" },
105 "namespace": { "type": "string", "default": "default" }
106 },
107 "required": ["path"]
108 }),
109 },
110 Tool {
111 name: "sparql_query".to_string(),
112 description: Some("Execute a SPARQL query against the knowledge graph".to_string()),
113 input_schema: serde_json::json!({
114 "type": "object",
115 "properties": {
116 "query": { "type": "string", "description": "SPARQL query string" },
117 "namespace": { "type": "string", "default": "default" }
118 },
119 "required": ["query"]
120 }),
121 },
122 Tool {
123 name: "hybrid_search".to_string(),
124 description: Some("Perform a hybrid vector + graph search".to_string()),
125 input_schema: serde_json::json!({
126 "type": "object",
127 "properties": {
128 "query": { "type": "string", "description": "Natural language query" },
129 "namespace": { "type": "string", "default": "default" },
130 "vector_k": { "type": "integer", "default": 10 },
131 "graph_depth": { "type": "integer", "default": 1 },
132 "limit": { "type": "integer", "default": 20 }
133 },
134 "required": ["query"]
135 }),
136 },
137 Tool {
138 name: "apply_reasoning".to_string(),
139 description: Some(
140 "Apply RDFS or OWL-RL reasoning to infer new triples".to_string(),
141 ),
142 input_schema: serde_json::json!({
143 "type": "object",
144 "properties": {
145 "namespace": { "type": "string", "default": "default" },
146 "strategy": { "type": "string", "enum": ["rdfs", "owlrl"], "default": "rdfs" },
147 "materialize": { "type": "boolean", "default": false }
148 }
149 }),
150 },
151 Tool {
152 name: "get_neighbors".to_string(),
153 description: Some(
154 "Get neighboring nodes connected to a given URI in the graph".to_string(),
155 ),
156 input_schema: serde_json::json!({
157 "type": "object",
158 "properties": {
159 "uri": { "type": "string", "description": "URI of the entity to find neighbors for" },
160 "namespace": { "type": "string", "default": "default" },
161 "direction": { "type": "string", "enum": ["outgoing", "incoming", "both"], "default": "outgoing" }
162 },
163 "required": ["uri"]
164 }),
165 },
166 Tool {
167 name: "list_triples".to_string(),
168 description: Some(
169 "List all triples in a namespace (useful for debugging/exploration)"
170 .to_string(),
171 ),
172 input_schema: serde_json::json!({
173 "type": "object",
174 "properties": {
175 "namespace": { "type": "string", "default": "default" },
176 "limit": { "type": "integer", "default": 100 }
177 }
178 }),
179 },
180 Tool {
181 name: "delete_namespace".to_string(),
182 description: Some("Delete all data in a namespace".to_string()),
183 input_schema: serde_json::json!({
184 "type": "object",
185 "properties": {
186 "namespace": { "type": "string", "description": "Namespace to delete" }
187 },
188 "required": ["namespace"]
189 }),
190 },
191 Tool {
192 name: "ingest_url".to_string(),
193 description: Some(
194 "Scrape a web page and add its content to the vector store for RAG retrieval"
195 .to_string(),
196 ),
197 input_schema: serde_json::json!({
198 "type": "object",
199 "properties": {
200 "url": { "type": "string", "description": "URL to scrape and ingest" },
201 "namespace": { "type": "string", "default": "default" }
202 },
203 "required": ["url"]
204 }),
205 },
206 Tool {
207 name: "ingest_text".to_string(),
208 description: Some(
209 "Add arbitrary text content to the vector store for RAG retrieval".to_string(),
210 ),
211 input_schema: serde_json::json!({
212 "type": "object",
213 "properties": {
214 "uri": { "type": "string", "description": "Custom URI identifier for this text" },
215 "content": { "type": "string", "description": "Text content to embed and store" },
216 "namespace": { "type": "string", "default": "default" }
217 },
218 "required": ["uri", "content"]
219 }),
220 },
221 Tool {
222 name: "compact_vectors".to_string(),
223 description: Some("Compact the vector index by removing stale entries".to_string()),
224 input_schema: serde_json::json!({
225 "type": "object",
226 "properties": {
227 "namespace": { "type": "string", "default": "default" }
228 }
229 }),
230 },
231 Tool {
232 name: "vector_stats".to_string(),
233 description: Some("Get vector store statistics (active, stale, total)".to_string()),
234 input_schema: serde_json::json!({
235 "type": "object",
236 "properties": {
237 "namespace": { "type": "string", "default": "default" }
238 }
239 }),
240 },
241 Tool {
242 name: "disambiguate".to_string(),
243 description: Some("Find similar entities that might be duplicates".to_string()),
244 input_schema: serde_json::json!({
245 "type": "object",
246 "properties": {
247 "namespace": { "type": "string", "default": "default" },
248 "threshold": { "type": "number", "default": 0.8, "description": "Similarity threshold 0.0-1.0" }
249 }
250 }),
251 },
252 Tool {
253 name: "get_node_degree".to_string(),
254 description: Some("Get the degree (number of connections) of a node".to_string()),
255 input_schema: serde_json::json!({
256 "type": "object",
257 "properties": {
258 "uri": { "type": "string" },
259 "namespace": { "type": "string", "default": "default" }
260 },
261 "required": ["uri"]
262 }),
263 },
264 Tool {
265 name: "install_ontology".to_string(),
266 description: Some("Download and install an ontology from a URL".to_string()),
267 input_schema: serde_json::json!({
268 "type": "object",
269 "properties": {
270 "url": { "type": "string", "description": "URL of the ontology file (.owl, .ttl)" },
271 "name": { "type": "string", "description": "Name for the local file (e.g. 'legal.owl')" },
272 "namespace": { "type": "string", "default": "default" }
273 },
274 "required": ["url", "name"]
275 }),
276 },
277 Tool {
278 name: "list_scenarios".to_string(),
279 description: Some("List available scenarios in the marketplace".to_string()),
280 input_schema: serde_json::json!({
281 "type": "object",
282 "properties": {}
283 }),
284 },
285 Tool {
286 name: "install_scenario".to_string(),
287 description: Some(
288 "Install a scenario (ontologies, data, docs) from the marketplace".to_string(),
289 ),
290 input_schema: serde_json::json!({
291 "type": "object",
292 "properties": {
293 "name": { "type": "string", "description": "Name of the scenario to install" },
294 "namespace": { "type": "string", "default": "default" }
295 },
296 "required": ["name"]
297 }),
298 },
299 ]
300 }
301
302 pub async fn handle_request(&self, request: McpRequest) -> McpResponse {
303 match request.method.as_str() {
304 "initialize" => {
305 McpResponse {
307 jsonrpc: "2.0".to_string(),
308 id: request.id,
309 result: Some(serde_json::json!({
310 "protocolVersion": "2024-11-05",
311 "capabilities": {
312 "tools": {}
313 },
314 "serverInfo": {
315 "name": "synapse",
316 "version": env!("CARGO_PKG_VERSION")
317 }
318 })),
319 error: None,
320 }
321 }
322 "notifications/initialized" | "initialized" => {
323 McpResponse {
325 jsonrpc: "2.0".to_string(),
326 id: request.id,
327 result: Some(serde_json::json!({})),
328 error: None,
329 }
330 }
331 "tools/list" => {
332 let result = ListToolsResult {
333 tools: Self::get_tools(),
334 };
335 McpResponse {
336 jsonrpc: "2.0".to_string(),
337 id: request.id,
338 result: Some(serde_json::to_value(result).unwrap()),
339 error: None,
340 }
341 }
342 "tools/call" => self.handle_tool_call(request).await,
343 "ingest" => self.handle_legacy_ingest(request).await,
345 "ingest_file" => self.handle_legacy_ingest_file(request).await,
346 _ => McpResponse {
347 jsonrpc: "2.0".to_string(),
348 id: request.id,
349 result: None,
350 error: Some(McpError {
351 code: -32601,
352 message: format!("Method not found: {}", request.method),
353 data: None,
354 }),
355 },
356 }
357 }
358
359 fn validate_arguments(tool_name: &str, arguments: &serde_json::Value) -> Result<(), String> {
360 let tools = Self::get_tools();
361 if let Some(tool) = tools.iter().find(|t| t.name == tool_name) {
362 if let Ok(schema) = JSONSchema::compile(&tool.input_schema) {
363 if let Err(errors) = schema.validate(arguments) {
364 let error_msg = errors.map(|e| e.to_string()).collect::<Vec<_>>().join(", ");
365 return Err(format!("Validation error: {}", error_msg));
366 }
367 } else {
368 return Err("Invalid tool schema definition".to_string());
369 }
370 }
371 Ok(())
372 }
373
374 async fn handle_tool_call(&self, request: McpRequest) -> McpResponse {
375 let params = match request.params {
376 Some(p) => p,
377 None => return self.error_response(request.id, -32602, "Missing params"),
378 };
379
380 let tool_name = match params.get("name").and_then(|v| v.as_str()) {
381 Some(n) => n,
382 None => return self.error_response(request.id, -32602, "Missing tool name"),
383 };
384
385 let arguments = params
386 .get("arguments")
387 .and_then(|v| v.as_object())
388 .cloned()
389 .unwrap_or_default();
390
391 let args_value = serde_json::Value::Object(arguments.clone());
392 if let Err(e) = Self::validate_arguments(tool_name, &args_value) {
393 return self.error_response(request.id, -32602, &e);
394 }
395
396 match tool_name {
397 "ingest_triples" => self.call_ingest_triples(request.id, &arguments).await,
398 "ingest_file" => self.call_ingest_file(request.id, &arguments).await,
399 "sparql_query" => self.call_sparql_query(request.id, &arguments).await,
400 "hybrid_search" => self.call_hybrid_search(request.id, &arguments).await,
401 "apply_reasoning" => self.call_apply_reasoning(request.id, &arguments).await,
402 "get_neighbors" => self.call_get_neighbors(request.id, &arguments).await,
403 "list_triples" => self.call_list_triples(request.id, &arguments).await,
404 "delete_namespace" => self.call_delete_namespace(request.id, &arguments).await,
405 "ingest_url" => self.call_ingest_url(request.id, &arguments).await,
406 "ingest_text" => self.call_ingest_text(request.id, &arguments).await,
407 "compact_vectors" => self.call_compact_vectors(request.id, &arguments).await,
408 "vector_stats" => self.call_vector_stats(request.id, &arguments).await,
409 "disambiguate" => self.call_disambiguate(request.id, &arguments).await,
410 "get_node_degree" => self.call_get_node_degree(request.id, &arguments).await,
411 "install_ontology" => self.call_install_ontology(request.id, &arguments).await,
412 "list_scenarios" => self.call_list_scenarios(request.id).await,
413 "install_scenario" => self.call_install_scenario(request.id, &arguments).await,
414 _ => self.error_response(request.id, -32602, &format!("Unknown tool: {}", tool_name)),
415 }
416 }
417
418 async fn call_list_scenarios(&self, id: Option<serde_json::Value>) -> McpResponse {
419 match self.engine.scenario_manager.list_scenarios().await {
420 Ok(registry) => {
421 let items: Vec<ScenarioItem> = registry
422 .into_iter()
423 .map(|e| ScenarioItem {
424 name: e.name,
425 description: e.description,
426 version: e.version,
427 })
428 .collect();
429 self.serialize_result(id, ScenarioListResult { scenarios: items })
430 }
431 Err(e) => self.tool_result(id, &e.to_string(), true),
432 }
433 }
434
435 async fn call_install_scenario(
436 &self,
437 id: Option<serde_json::Value>,
438 args: &serde_json::Map<String, serde_json::Value>,
439 ) -> McpResponse {
440 let name = match args.get("name").and_then(|v| v.as_str()) {
441 Some(n) => n,
442 None => return self.error_response(id, -32602, "Missing 'name'"),
443 };
444 let namespace = args
445 .get("namespace")
446 .and_then(|v| v.as_str())
447 .unwrap_or("default");
448
449 match self.engine.install_scenario(name, namespace).await {
450 Ok(msg) => {
451 let result = SimpleSuccessResult {
452 success: true,
453 message: msg,
454 };
455 self.serialize_result(id, result)
456 }
457 Err(e) => self.tool_result(id, &e, true),
458 }
459 }
460
461 async fn call_install_ontology(
462 &self,
463 id: Option<serde_json::Value>,
464 args: &serde_json::Map<String, serde_json::Value>,
465 ) -> McpResponse {
466 let url = match args.get("url").and_then(|v| v.as_str()) {
467 Some(u) => u,
468 None => return self.error_response(id, -32602, "Missing 'url'"),
469 };
470 let name = match args.get("name").and_then(|v| v.as_str()) {
471 Some(n) => n,
472 None => return self.error_response(id, -32602, "Missing 'name'"),
473 };
474 let namespace = args
475 .get("namespace")
476 .and_then(|v| v.as_str())
477 .unwrap_or("default");
478
479 let response = match reqwest::get(url).await {
481 Ok(r) => r,
482 Err(e) => {
483 return self.tool_result(id, &format!("Failed to download ontology: {}", e), true)
484 }
485 };
486
487 if !response.status().is_success() {
488 return self.tool_result(id, &format!("HTTP error: {}", response.status()), true);
489 }
490
491 let content = match response.text().await {
492 Ok(t) => t,
493 Err(e) => {
494 return self.tool_result(id, &format!("Failed to read response: {}", e), true)
495 }
496 };
497
498 let ontology_dir = std::path::Path::new("ontology");
500 if !ontology_dir.exists() {
501 if let Err(e) = std::fs::create_dir(ontology_dir) {
502 return self.tool_result(
503 id,
504 &format!("Failed to create ontology dir: {}", e),
505 true,
506 );
507 }
508 }
509
510 let file_name = std::path::Path::new(name)
512 .file_name()
513 .and_then(|n| n.to_str())
514 .ok_or_else(|| "Invalid filename".to_string());
515
516 let clean_name = match file_name {
517 Ok(n) => n,
518 Err(e) => return self.tool_result(id, &format!("Security error: {}", e), true),
519 };
520
521 if clean_name != name {
522 return self.tool_result(
523 id,
524 "Security error: Filename contains path components",
525 true,
526 );
527 }
528
529 let path = ontology_dir.join(clean_name);
530 if let Err(e) = std::fs::write(&path, content) {
531 return self.tool_result(id, &format!("Failed to save ontology file: {}", e), true);
532 }
533
534 let store = match self.engine.get_store(namespace) {
536 Ok(s) => s,
537 Err(e) => return self.tool_result(id, &e.to_string(), true),
538 };
539
540 match crate::ingest::ontology::OntologyLoader::load_file(&store, &path).await {
541 Ok(count) => {
542 let result = SimpleSuccessResult {
543 success: true,
544 message: format!("Installed ontology '{}' and loaded {} triples", name, count),
545 };
546 self.serialize_result(id, result)
547 }
548 Err(e) => self.tool_result(id, &format!("Failed to load ontology: {}", e), true),
549 }
550 }
551
552 async fn call_ingest_triples(
553 &self,
554 id: Option<serde_json::Value>,
555 args: &serde_json::Map<String, serde_json::Value>,
556 ) -> McpResponse {
557 let namespace = args
558 .get("namespace")
559 .and_then(|v| v.as_str())
560 .unwrap_or("default");
561 let triples_array = match args.get("triples").and_then(|v| v.as_array()) {
562 Some(t) => t,
563 None => return self.error_response(id, -32602, "Missing 'triples' array"),
564 };
565
566 let mut triples = Vec::new();
567 for t in triples_array {
568 if let (Some(s), Some(p), Some(o)) = (
569 t.get("subject").and_then(|v| v.as_str()),
570 t.get("predicate").and_then(|v| v.as_str()),
571 t.get("object").and_then(|v| v.as_str()),
572 ) {
573 triples.push(Triple {
574 subject: s.to_string(),
575 predicate: p.to_string(),
576 object: o.to_string(),
577 provenance: Some(Provenance {
578 source: "mcp".to_string(),
579 timestamp: "".to_string(),
580 method: "tools/call".to_string(),
581 }),
582 embedding: vec![],
583 });
584 }
585 }
586
587 let req = Self::create_request(IngestRequest {
588 triples,
589 namespace: namespace.to_string(),
590 });
591
592 match self.engine.ingest_triples(req).await {
593 Ok(resp) => {
594 let inner = resp.into_inner();
595 let result = IngestToolResult {
596 nodes_added: inner.nodes_added,
597 edges_added: inner.edges_added,
598 message: format!("Ingested {} triples", inner.edges_added),
599 };
600 self.serialize_result(id, result)
601 }
602 Err(e) => self.tool_result(id, &e.to_string(), true),
603 }
604 }
605
606 async fn call_ingest_file(
607 &self,
608 id: Option<serde_json::Value>,
609 args: &serde_json::Map<String, serde_json::Value>,
610 ) -> McpResponse {
611 let path = match args.get("path").and_then(|v| v.as_str()) {
612 Some(p) => p,
613 None => return self.error_response(id, -32602, "Missing 'path'"),
614 };
615 let namespace = args
616 .get("namespace")
617 .and_then(|v| v.as_str())
618 .unwrap_or("default");
619
620 let req = Self::create_request(IngestFileRequest {
621 file_path: path.to_string(),
622 namespace: namespace.to_string(),
623 });
624
625 match self.engine.ingest_file(req).await {
626 Ok(resp) => {
627 let inner = resp.into_inner();
628 let result = IngestToolResult {
629 nodes_added: inner.nodes_added,
630 edges_added: inner.edges_added,
631 message: format!("Ingested {} triples from {}", inner.edges_added, path),
632 };
633 self.serialize_result(id, result)
634 }
635 Err(e) => self.tool_result(id, &e.to_string(), true),
636 }
637 }
638
639 async fn call_sparql_query(
640 &self,
641 id: Option<serde_json::Value>,
642 args: &serde_json::Map<String, serde_json::Value>,
643 ) -> McpResponse {
644 let query = match args.get("query").and_then(|v| v.as_str()) {
645 Some(q) => q,
646 None => return self.error_response(id, -32602, "Missing 'query'"),
647 };
648 let namespace = args
649 .get("namespace")
650 .and_then(|v| v.as_str())
651 .unwrap_or("default");
652
653 let req = Self::create_request(SparqlRequest {
654 query: query.to_string(),
655 namespace: namespace.to_string(),
656 });
657
658 match self.engine.query_sparql(req).await {
659 Ok(resp) => self.tool_result(id, &resp.into_inner().results_json, false),
660 Err(e) => self.tool_result(id, &e.to_string(), true),
661 }
662 }
663
664 async fn call_hybrid_search(
665 &self,
666 id: Option<serde_json::Value>,
667 args: &serde_json::Map<String, serde_json::Value>,
668 ) -> McpResponse {
669 let query = match args.get("query").and_then(|v| v.as_str()) {
670 Some(q) => q,
671 None => return self.error_response(id, -32602, "Missing 'query'"),
672 };
673 let namespace = args
674 .get("namespace")
675 .and_then(|v| v.as_str())
676 .unwrap_or("default");
677 let vector_k = args.get("vector_k").and_then(|v| v.as_u64()).unwrap_or(10) as u32;
678 let graph_depth = args
679 .get("graph_depth")
680 .and_then(|v| v.as_u64())
681 .unwrap_or(1) as u32;
682 let limit = args.get("limit").and_then(|v| v.as_u64()).unwrap_or(20) as u32;
683
684 let req = Self::create_request(HybridSearchRequest {
685 query: query.to_string(),
686 namespace: namespace.to_string(),
687 vector_k,
688 graph_depth,
689 mode: SearchMode::Hybrid as i32,
690 limit,
691 });
692
693 match self.engine.hybrid_search(req).await {
694 Ok(resp) => {
695 let results = resp.into_inner().results;
696 let items: Vec<SearchResultItem> = results
697 .into_iter()
698 .map(|r| SearchResultItem {
699 node_id: r.node_id,
700 score: r.score,
701 content: r.content,
702 uri: r.uri,
703 })
704 .collect();
705
706 let result = SearchToolResult { results: items };
707 self.serialize_result(id, result)
708 }
709 Err(e) => self.tool_result(id, &e.to_string(), true),
710 }
711 }
712
713 async fn call_apply_reasoning(
714 &self,
715 id: Option<serde_json::Value>,
716 args: &serde_json::Map<String, serde_json::Value>,
717 ) -> McpResponse {
718 let namespace = args
719 .get("namespace")
720 .and_then(|v| v.as_str())
721 .unwrap_or("default");
722 let strategy_str = args
723 .get("strategy")
724 .and_then(|v| v.as_str())
725 .unwrap_or("rdfs");
726 let materialize = args
727 .get("materialize")
728 .and_then(|v| v.as_bool())
729 .unwrap_or(false);
730
731 let strategy = match strategy_str.to_lowercase().as_str() {
732 "owlrl" | "owl-rl" => ReasoningStrategy::Owlrl as i32,
733 _ => ReasoningStrategy::Rdfs as i32,
734 };
735
736 let req = Self::create_request(ReasoningRequest {
737 namespace: namespace.to_string(),
738 strategy,
739 materialize,
740 });
741
742 match self.engine.apply_reasoning(req).await {
743 Ok(resp) => {
744 let inner = resp.into_inner();
745 let result = ReasoningToolResult {
746 success: inner.success,
747 triples_inferred: inner.triples_inferred,
748 message: inner.message,
749 };
750 self.serialize_result(id, result)
751 }
752 Err(e) => self.tool_result(id, &e.to_string(), true),
753 }
754 }
755
756 async fn call_get_neighbors(
757 &self,
758 id: Option<serde_json::Value>,
759 args: &serde_json::Map<String, serde_json::Value>,
760 ) -> McpResponse {
761 let uri = match args.get("uri").and_then(|v| v.as_str()) {
762 Some(u) => u,
763 None => return self.error_response(id, -32602, "Missing 'uri'"),
764 };
765 let namespace = args
766 .get("namespace")
767 .and_then(|v| v.as_str())
768 .unwrap_or("default");
769 let direction = args
770 .get("direction")
771 .and_then(|v| v.as_str())
772 .unwrap_or("outgoing");
773
774 let store = match self.engine.get_store(namespace) {
775 Ok(s) => s,
776 Err(e) => return self.tool_result(id, &e.to_string(), true),
777 };
778
779 let mut neighbors = Vec::new();
780
781 if direction == "outgoing" || direction == "both" {
783 if let Ok(subj) = oxigraph::model::NamedNodeRef::new(uri) {
784 for q in store
785 .store
786 .quads_for_pattern(Some(subj.into()), None, None, None)
787 .flatten()
788 {
789 neighbors.push(NeighborItem {
790 direction: "outgoing".to_string(),
791 predicate: q.predicate.to_string(),
792 target: q.object.to_string(),
793 score: 1.0,
794 });
795 }
796 }
797 }
798
799 if direction == "incoming" || direction == "both" {
801 if let Ok(obj) = oxigraph::model::NamedNodeRef::new(uri) {
802 for q in store
803 .store
804 .quads_for_pattern(None, None, Some(obj.into()), None)
805 .flatten()
806 {
807 neighbors.push(NeighborItem {
808 direction: "incoming".to_string(),
809 predicate: q.predicate.to_string(),
810 target: q.subject.to_string(),
811 score: 1.0,
812 });
813 }
814 }
815 }
816
817 let result = NeighborsToolResult { neighbors };
818 self.serialize_result(id, result)
819 }
820
821 async fn call_list_triples(
822 &self,
823 id: Option<serde_json::Value>,
824 args: &serde_json::Map<String, serde_json::Value>,
825 ) -> McpResponse {
826 let namespace = args
827 .get("namespace")
828 .and_then(|v| v.as_str())
829 .unwrap_or("default");
830 let limit = args.get("limit").and_then(|v| v.as_u64()).unwrap_or(100) as usize;
831
832 let store = match self.engine.get_store(namespace) {
833 Ok(s) => s,
834 Err(e) => return self.tool_result(id, &e.to_string(), true),
835 };
836
837 let mut triples = Vec::new();
838 for q in store.store.iter().take(limit).flatten() {
839 triples.push(TripleItem {
840 subject: q.subject.to_string(),
841 predicate: q.predicate.to_string(),
842 object: q.object.to_string(),
843 });
844 }
845
846 let result = TriplesToolResult { triples };
847 self.serialize_result(id, result)
848 }
849
850 async fn call_delete_namespace(
851 &self,
852 id: Option<serde_json::Value>,
853 args: &serde_json::Map<String, serde_json::Value>,
854 ) -> McpResponse {
855 let namespace = match args.get("namespace").and_then(|v| v.as_str()) {
856 Some(n) => n,
857 None => return self.error_response(id, -32602, "Missing 'namespace'"),
858 };
859
860 let req = Self::create_request(crate::server::proto::EmptyRequest {
861 namespace: namespace.to_string(),
862 });
863
864 match self.engine.delete_namespace_data(req).await {
865 Ok(resp) => {
866 let inner = resp.into_inner();
867 let result = SimpleSuccessResult {
868 success: inner.success,
869 message: inner.message,
870 };
871 self.serialize_result(id, result)
872 }
873 Err(e) => self.tool_result(id, &e.to_string(), true),
874 }
875 }
876
877 async fn call_ingest_url(
878 &self,
879 id: Option<serde_json::Value>,
880 args: &serde_json::Map<String, serde_json::Value>,
881 ) -> McpResponse {
882 let url = match args.get("url").and_then(|v| v.as_str()) {
883 Some(u) => u,
884 None => return self.error_response(id, -32602, "Missing 'url'"),
885 };
886 let namespace = args
887 .get("namespace")
888 .and_then(|v| v.as_str())
889 .unwrap_or("default");
890
891 let client = reqwest::Client::new();
893 let response = match client.get(url).send().await {
894 Ok(r) => r,
895 Err(e) => return self.tool_result(id, &format!("Failed to fetch URL: {}", e), true),
896 };
897
898 if !response.status().is_success() {
899 return self.tool_result(id, &format!("HTTP error: {}", response.status()), true);
900 }
901
902 let html = match response.text().await {
903 Ok(t) => t,
904 Err(e) => {
905 return self.tool_result(id, &format!("Failed to read response: {}", e), true)
906 }
907 };
908
909 let script_re = regex::Regex::new(r"(?s)<script.*?>.*?</script>").unwrap();
911 let style_re = regex::Regex::new(r"(?s)<style.*?>.*?</style>").unwrap();
912 let tag_re = regex::Regex::new(r"<[^>]*>").unwrap();
913
914 let no_script = script_re.replace_all(&html, " ");
915 let no_style = style_re.replace_all(&no_script, " ");
916 let text_content = tag_re.replace_all(&no_style, " ");
917
918 let text = text_content
919 .split_whitespace()
920 .collect::<Vec<_>>()
921 .join(" ");
922
923 let processor = crate::processor::TextProcessor::new();
925 let chunks = processor.chunk_text(&text, 1000, 150);
926
927 let store = match self.engine.get_store(namespace) {
929 Ok(s) => s,
930 Err(e) => return self.tool_result(id, &e.to_string(), true),
931 };
932
933 if let Some(ref vector_store) = store.vector_store {
934 let mut added_chunks = 0;
935 for (i, chunk) in chunks.iter().enumerate() {
936 let chunk_uri = format!("{}#chunk-{}", url, i);
937 let metadata = serde_json::json!({
939 "uri": chunk_uri,
940 "source_url": url,
941 "type": "web_chunk"
942 });
943 match vector_store.add(&chunk_uri, chunk, metadata).await {
944 Ok(_) => added_chunks += 1,
945 Err(e) => {
946 eprintln!("Failed to add chunk {}: {}", i, e);
947 }
948 }
949 }
950 let result = IngestToolResult {
951 nodes_added: 0,
952 edges_added: 0, message: format!(
954 "Ingested URL: {} ({} chars, {} chunks)",
955 url,
956 text.len(),
957 added_chunks
958 ),
959 };
960 self.serialize_result(id, result)
961 } else {
962 self.tool_result(id, "Vector store not available", true)
963 }
964 }
965
966 async fn call_ingest_text(
967 &self,
968 id: Option<serde_json::Value>,
969 args: &serde_json::Map<String, serde_json::Value>,
970 ) -> McpResponse {
971 let uri = match args.get("uri").and_then(|v| v.as_str()) {
972 Some(u) => u,
973 None => return self.error_response(id, -32602, "Missing 'uri'"),
974 };
975 let content = match args.get("content").and_then(|v| v.as_str()) {
976 Some(c) => c,
977 None => return self.error_response(id, -32602, "Missing 'content'"),
978 };
979 let namespace = args
980 .get("namespace")
981 .and_then(|v| v.as_str())
982 .unwrap_or("default");
983
984 let processor = crate::processor::TextProcessor::new();
986 let chunks = processor.chunk_text(content, 1000, 150);
987
988 let store = match self.engine.get_store(namespace) {
990 Ok(s) => s,
991 Err(e) => return self.tool_result(id, &e.to_string(), true),
992 };
993
994 if let Some(ref vector_store) = store.vector_store {
995 let mut added_chunks = 0;
996 for (i, chunk) in chunks.iter().enumerate() {
997 let chunk_uri = if chunks.len() > 1 {
998 format!("{}#chunk-{}", uri, i)
999 } else {
1000 uri.to_string()
1001 };
1002 let metadata = serde_json::json!({
1003 "uri": uri, "chunk_uri": chunk_uri,
1005 "type": "text_chunk"
1006 });
1007 match vector_store.add(&chunk_uri, chunk, metadata).await {
1008 Ok(_) => added_chunks += 1,
1009 Err(e) => {
1010 eprintln!("Failed to add chunk {}: {}", i, e);
1011 }
1012 }
1013 }
1014 let result = IngestToolResult {
1015 nodes_added: 0,
1016 edges_added: 0,
1017 message: format!(
1018 "Ingested text: {} ({} chars, {} chunks)",
1019 uri,
1020 content.len(),
1021 added_chunks
1022 ),
1023 };
1024 self.serialize_result(id, result)
1025 } else {
1026 self.tool_result(id, "Vector store not available", true)
1027 }
1028 }
1029
1030 async fn call_compact_vectors(
1031 &self,
1032 id: Option<serde_json::Value>,
1033 args: &serde_json::Map<String, serde_json::Value>,
1034 ) -> McpResponse {
1035 let namespace = args
1036 .get("namespace")
1037 .and_then(|v| v.as_str())
1038 .unwrap_or("default");
1039
1040 let store = match self.engine.get_store(namespace) {
1041 Ok(s) => s,
1042 Err(e) => return self.tool_result(id, &e.to_string(), true),
1043 };
1044
1045 if let Some(ref vector_store) = store.vector_store {
1046 match vector_store.compact() {
1047 Ok(removed) => {
1048 let result = SimpleSuccessResult {
1049 success: true,
1050 message: format!("Compaction complete: {} stale entries removed", removed),
1051 };
1052 self.serialize_result(id, result)
1053 }
1054 Err(e) => self.tool_result(id, &format!("Compaction error: {}", e), true),
1055 }
1056 } else {
1057 self.tool_result(id, "Vector store not available", true)
1058 }
1059 }
1060
1061 async fn call_vector_stats(
1062 &self,
1063 id: Option<serde_json::Value>,
1064 args: &serde_json::Map<String, serde_json::Value>,
1065 ) -> McpResponse {
1066 let namespace = args
1067 .get("namespace")
1068 .and_then(|v| v.as_str())
1069 .unwrap_or("default");
1070
1071 let store = match self.engine.get_store(namespace) {
1072 Ok(s) => s,
1073 Err(e) => return self.tool_result(id, &e.to_string(), true),
1074 };
1075
1076 if let Some(ref vector_store) = store.vector_store {
1077 let (active, stale, total) = vector_store.stats();
1078 let result = StatsToolResult {
1079 active_vectors: active,
1080 stale_vectors: stale,
1081 total_embeddings: total,
1082 };
1083 self.serialize_result(id, result)
1084 } else {
1085 self.tool_result(id, "Vector store not available", true)
1086 }
1087 }
1088
1089 async fn call_disambiguate(
1090 &self,
1091 id: Option<serde_json::Value>,
1092 args: &serde_json::Map<String, serde_json::Value>,
1093 ) -> McpResponse {
1094 let namespace = args
1095 .get("namespace")
1096 .and_then(|v| v.as_str())
1097 .unwrap_or("default");
1098 let threshold = args
1099 .get("threshold")
1100 .and_then(|v| v.as_f64())
1101 .unwrap_or(0.8);
1102
1103 let store = match self.engine.get_store(namespace) {
1104 Ok(s) => s,
1105 Err(e) => return self.tool_result(id, &e.to_string(), true),
1106 };
1107
1108 let uri_map = store.uri_to_id.read().unwrap();
1110 let uris: Vec<String> = uri_map.keys().cloned().collect();
1111 drop(uri_map);
1112
1113 let disambiguator = crate::disambiguation::EntityDisambiguator::new(threshold);
1114 let suggestions = disambiguator.suggest_merges(&uris);
1115
1116 let items: Vec<DisambiguationItem> = suggestions
1117 .into_iter()
1118 .map(|(u1, u2, s)| DisambiguationItem {
1119 uri1: u1,
1120 uri2: u2,
1121 similarity: s,
1122 })
1123 .collect();
1124
1125 let message = if items.is_empty() {
1126 "No similar entities found above threshold".to_string()
1127 } else {
1128 format!("Found {} potential duplicates", items.len())
1129 };
1130
1131 let result = DisambiguationResult {
1132 suggestions: items,
1133 message,
1134 };
1135 self.serialize_result(id, result)
1136 }
1137
1138 async fn handle_legacy_ingest(&self, request: McpRequest) -> McpResponse {
1140 let params = match request.params {
1141 Some(p) => p,
1142 None => return self.error_response(request.id, -32602, "Invalid params"),
1143 };
1144
1145 if let (Some(sub), Some(pred), Some(obj)) = (
1146 params.get("subject").and_then(|v| v.as_str()),
1147 params.get("predicate").and_then(|v| v.as_str()),
1148 params.get("object").and_then(|v| v.as_str()),
1149 ) {
1150 let namespace = params
1151 .get("namespace")
1152 .and_then(|v| v.as_str())
1153 .unwrap_or("default");
1154 let triple = Triple {
1155 subject: sub.to_string(),
1156 predicate: pred.to_string(),
1157 object: obj.to_string(),
1158 provenance: Some(Provenance {
1159 source: "mcp".to_string(),
1160 timestamp: "".to_string(),
1161 method: "stdio".to_string(),
1162 }),
1163 embedding: vec![],
1164 };
1165
1166 let req = Self::create_request(IngestRequest {
1167 triples: vec![triple],
1168 namespace: namespace.to_string(),
1169 });
1170
1171 match self.engine.ingest_triples(req).await {
1172 Ok(_) => McpResponse {
1173 jsonrpc: "2.0".to_string(),
1174 id: request.id,
1175 result: Some(serde_json::to_value("Ingested").unwrap()),
1176 error: None,
1177 },
1178 Err(e) => self.error_response(request.id, -32000, &e.to_string()),
1179 }
1180 } else {
1181 self.error_response(request.id, -32602, "Invalid params")
1182 }
1183 }
1184
1185 async fn handle_legacy_ingest_file(&self, request: McpRequest) -> McpResponse {
1186 let params = match request.params {
1187 Some(p) => p,
1188 None => {
1189 return self.error_response(request.id, -32602, "Invalid params: 'path' required")
1190 }
1191 };
1192
1193 if let Some(path) = params.get("path").and_then(|v| v.as_str()) {
1194 let namespace = params
1195 .get("namespace")
1196 .and_then(|v| v.as_str())
1197 .unwrap_or("default");
1198
1199 let req = Self::create_request(IngestFileRequest {
1200 file_path: path.to_string(),
1201 namespace: namespace.to_string(),
1202 });
1203
1204 match self.engine.ingest_file(req).await {
1205 Ok(resp) => {
1206 let inner = resp.into_inner();
1207 McpResponse {
1208 jsonrpc: "2.0".to_string(),
1209 id: request.id,
1210 result: Some(
1211 serde_json::to_value(format!(
1212 "Ingested {} triples from {}",
1213 inner.edges_added, path
1214 ))
1215 .unwrap(),
1216 ),
1217 error: None,
1218 }
1219 }
1220 Err(e) => self.error_response(request.id, -32000, &e.to_string()),
1221 }
1222 } else {
1223 self.error_response(request.id, -32602, "Invalid params: 'path' required")
1224 }
1225 }
1226
1227 fn serialize_result<T: serde::Serialize>(
1228 &self,
1229 id: Option<serde_json::Value>,
1230 result: T,
1231 ) -> McpResponse {
1232 match serde_json::to_string_pretty(&result) {
1233 Ok(json) => self.tool_result(id, &json, false),
1234 Err(e) => self.tool_result(id, &format!("Serialization error: {}", e), true),
1235 }
1236 }
1237
1238 async fn call_get_node_degree(
1239 &self,
1240 id: Option<serde_json::Value>,
1241 args: &serde_json::Map<String, serde_json::Value>,
1242 ) -> McpResponse {
1243 let uri = match args.get("uri").and_then(|v| v.as_str()) {
1244 Some(u) => u,
1245 None => return self.error_response(id, -32602, "Missing 'uri'"),
1246 };
1247 let namespace = args
1248 .get("namespace")
1249 .and_then(|v| v.as_str())
1250 .unwrap_or("default");
1251
1252 let store = match self.engine.get_store(namespace) {
1253 Ok(s) => s,
1254 Err(e) => return self.tool_result(id, &e.to_string(), true),
1255 };
1256
1257 let degree = store.get_degree(uri);
1258
1259 let result = DegreeResult {
1260 uri: uri.to_string(),
1261 degree,
1262 };
1263
1264 self.serialize_result(id, result)
1265 }
1266
1267 fn error_response(
1268 &self,
1269 id: Option<serde_json::Value>,
1270 code: i32,
1271 message: &str,
1272 ) -> McpResponse {
1273 McpResponse {
1274 jsonrpc: "2.0".to_string(),
1275 id,
1276 result: None,
1277 error: Some(McpError {
1278 code,
1279 message: message.to_string(),
1280 data: None,
1281 }),
1282 }
1283 }
1284
1285 fn tool_result(
1286 &self,
1287 id: Option<serde_json::Value>,
1288 text: &str,
1289 is_error: bool,
1290 ) -> McpResponse {
1291 let result = CallToolResult {
1292 content: vec![Content {
1293 content_type: "text".to_string(),
1294 text: text.to_string(),
1295 }],
1296 is_error: if is_error { Some(true) } else { None },
1297 };
1298 McpResponse {
1299 jsonrpc: "2.0".to_string(),
1300 id,
1301 result: Some(serde_json::to_value(result).unwrap()),
1302 error: None,
1303 }
1304 }
1305}