1use dashmap::DashMap;
2use std::sync::Arc;
3use tonic::{Request, Response, Status};
4
5pub mod proto {
6 tonic::include_proto!("semantic_engine");
7}
8
9use proto::semantic_engine_server::SemanticEngine;
10use proto::*;
11
12use crate::ingest::IngestionEngine;
13use crate::reasoner::{ReasoningStrategy as InternalStrategy, SynapseReasoner};
14use crate::scenarios::ScenarioManager;
15use crate::server::proto::{ReasoningStrategy, SearchMode};
16use crate::store::{IngestTriple, SynapseStore};
17use std::path::Path;
18
19use crate::audit::InferenceAudit;
20use crate::auth::NamespaceAuth;
21
22#[derive(Clone)]
23pub struct AuthToken(pub String);
24
25#[allow(clippy::result_large_err)]
26pub fn auth_interceptor(mut req: Request<()>) -> Result<Request<()>, Status> {
27 if let Some(token) = req
28 .metadata()
29 .get("authorization")
30 .and_then(|t| t.to_str().ok())
31 .map(|s| s.trim_start_matches("Bearer ").to_string())
32 {
33 req.extensions_mut().insert(AuthToken(token));
34 }
35 Ok(req)
36}
37
38fn get_token<T>(req: &Request<T>) -> Option<String> {
39 if let Some(token) = req.extensions().get::<AuthToken>() {
40 return Some(token.0.clone());
41 }
42 req.metadata()
43 .get("authorization")
44 .and_then(|t| t.to_str().ok())
45 .map(|s| s.trim_start_matches("Bearer ").to_string())
46}
47
48#[derive(Clone)]
49pub struct MySemanticEngine {
50 pub storage_path: String,
51 pub stores: Arc<DashMap<String, Arc<SynapseStore>>>,
52 pub auth: Arc<NamespaceAuth>,
53 pub audit: Arc<InferenceAudit>,
54 pub scenario_manager: Arc<ScenarioManager>,
55}
56
57impl MySemanticEngine {
58 pub fn new(storage_path: &str) -> Self {
59 let auth = Arc::new(NamespaceAuth::new());
60 auth.load_from_env();
61 let scenario_manager = Arc::new(ScenarioManager::new(std::path::Path::new(".")));
62
63 Self {
64 storage_path: storage_path.to_string(),
65 stores: Arc::new(DashMap::new()),
66 auth,
67 audit: Arc::new(InferenceAudit::new()),
68 scenario_manager,
69 }
70 }
71
72 pub async fn install_scenario(&self, name: &str, namespace: &str) -> Result<String, String> {
73 let path = self
74 .scenario_manager
75 .install_scenario(name)
76 .await
77 .map_err(|e| format!("Failed to install scenario assets: {}", e))?;
78
79 let store = self
80 .get_store(namespace)
81 .map_err(|e| e.message().to_string())?;
82
83 let schema_path = path.join("schema");
85 let mut triples_loaded = 0;
86 if schema_path.exists() {
87 triples_loaded +=
88 crate::ingest::ontology::OntologyLoader::load_directory(&store, &schema_path)
89 .await
90 .map_err(|e| format!("Failed to load ontologies: {}", e))?;
91 }
92
93 let data_path = path.join("data");
95 let mut data_files_loaded = 0;
96 if data_path.exists() {
97 if let Ok(entries) = std::fs::read_dir(data_path) {
98 for entry in entries.flatten() {
99 let p = entry.path();
100 if p.is_file() {
101 let engine = IngestionEngine::new(store.clone());
103 if let Ok(count) = engine.ingest_file(&p, namespace).await {
104 triples_loaded += count as usize;
105 data_files_loaded += 1;
106 }
107 }
108 }
109 }
110 }
111
112 let docs_path = path.join("docs");
114 let mut docs_loaded = 0;
115 if docs_path.exists() {
116 if let Ok(entries) = std::fs::read_dir(docs_path) {
117 for entry in entries.flatten() {
118 let p = entry.path();
119 if p.is_file() {
120 if let Ok(content) = std::fs::read_to_string(&p) {
121 let processor = crate::processor::TextProcessor::new();
122 let chunks = processor.chunk_text(&content, 1000, 150);
123 if let Some(ref vector_store) = store.vector_store {
124 for (i, chunk) in chunks.iter().enumerate() {
125 let chunk_uri = format!("file://{}#chunk-{}", p.display(), i);
126 let metadata = serde_json::json!({
127 "uri": format!("file://{}", p.display()),
128 "type": "doc_chunk",
129 "scenario": name
130 });
131 let _ = vector_store.add(&chunk_uri, chunk, metadata).await;
132 }
133 docs_loaded += 1;
134 }
135 }
136 }
137 }
138 }
139 }
140
141 Ok(format!(
142 "Scenario '{}' installed. Loaded {} triples ({} data files) and {} docs.",
143 name, triples_loaded, data_files_loaded, docs_loaded
144 ))
145 }
146
147 pub async fn shutdown(&self) {
148 eprintln!("Shutting down... flushing {} stores", self.stores.len());
149 for entry in self.stores.iter() {
150 let store = entry.value();
151 if let Err(e) = store.flush() {
152 eprintln!("Failed to flush store '{}': {}", entry.key(), e);
153 }
154 }
155 eprintln!("Shutdown complete.");
156 }
157
158 #[allow(clippy::result_large_err)]
159 pub fn get_store(&self, namespace: &str) -> Result<Arc<SynapseStore>, Status> {
160 if let Some(store) = self.stores.get(namespace) {
161 return Ok(store.clone());
162 }
163
164 let store = SynapseStore::open(namespace, &self.storage_path).map_err(|e| {
165 Status::internal(format!(
166 "Failed to open store for namespace '{}': {}",
167 namespace, e
168 ))
169 })?;
170
171 let store_arc = Arc::new(store);
172 self.stores.insert(namespace.to_string(), store_arc.clone());
173 Ok(store_arc)
174 }
175}
176
177#[tonic::async_trait]
178impl SemanticEngine for MySemanticEngine {
179 async fn ingest_triples(
180 &self,
181 request: Request<IngestRequest>,
182 ) -> Result<Response<IngestResponse>, Status> {
183 let token = get_token(&request);
185 let req = request.into_inner();
186 let namespace = if req.namespace.is_empty() {
187 "default"
188 } else {
189 &req.namespace
190 };
191
192 if let Err(e) = self.auth.check(token.as_deref(), namespace, "write") {
193 return Err(Status::permission_denied(e));
194 }
195
196 let store = self.get_store(namespace)?;
197
198 let timestamp = chrono::Utc::now().to_rfc3339();
200 let triple_count = req.triples.len();
201 let mut sources: Vec<String> = Vec::new();
202
203 let triples: Vec<IngestTriple> = req
204 .triples
205 .into_iter()
206 .map(|t| {
207 if let Some(ref prov) = t.provenance {
209 if !prov.source.is_empty() && !sources.contains(&prov.source) {
210 sources.push(prov.source.clone());
211 }
212 }
213 IngestTriple {
214 subject: t.subject,
215 predicate: t.predicate,
216 object: t.object,
217 provenance: t.provenance.map(|p| crate::store::Provenance {
218 source: p.source,
219 timestamp: p.timestamp,
220 method: p.method,
221 }),
222 }
223 })
224 .collect();
225
226 match store.ingest_triples(triples).await {
227 Ok((added, _)) => {
228 eprintln!(
230 "INGEST [{timestamp}] namespace={namespace} triples={triple_count} added={added} sources={:?}",
231 sources
232 );
233 Ok(Response::new(IngestResponse {
234 nodes_added: added,
235 edges_added: added,
236 }))
237 }
238 Err(e) => Err(Status::internal(e.to_string())),
239 }
240 }
241
242 async fn ingest_file(
243 &self,
244 request: Request<IngestFileRequest>,
245 ) -> Result<Response<IngestResponse>, Status> {
246 let token = get_token(&request);
250 let req = request.into_inner();
251 let namespace = if req.namespace.is_empty() {
252 "default"
253 } else {
254 &req.namespace
255 };
256
257 if let Err(e) = self.auth.check(token.as_deref(), namespace, "write") {
258 return Err(Status::permission_denied(e));
259 }
260 let store = self.get_store(namespace)?;
261
262 let engine = IngestionEngine::new(store);
263 let path = Path::new(&req.file_path);
264
265 match engine.ingest_file(path, namespace).await {
266 Ok(count) => Ok(Response::new(IngestResponse {
267 nodes_added: count,
268 edges_added: count,
269 })),
270 Err(e) => Err(Status::internal(e.to_string())),
271 }
272 }
273
274 async fn get_neighbors(
275 &self,
276 request: Request<NodeRequest>,
277 ) -> Result<Response<NeighborResponse>, Status> {
278 let token = get_token(&request);
279 let req = request.into_inner();
280 let namespace = if req.namespace.is_empty() {
281 "default"
282 } else {
283 &req.namespace
284 };
285
286 if let Err(e) = self.auth.check(token.as_deref(), namespace, "read") {
287 return Err(Status::permission_denied(e));
288 }
289
290 let store = self.get_store(namespace)?;
291
292 let direction = if req.direction.is_empty() {
293 "outgoing"
294 } else {
295 &req.direction
296 };
297 let edge_filter = if req.edge_filter.is_empty() {
298 None
299 } else {
300 Some(req.edge_filter.as_str())
301 };
302 let node_type_filter = if req.node_type_filter.is_empty() {
303 None
304 } else {
305 Some(req.node_type_filter.as_str())
306 };
307 let max_depth = if req.depth == 0 {
308 1
309 } else {
310 req.depth as usize
311 };
312 let limit_per_layer = if req.limit_per_layer == 0 {
313 usize::MAX
314 } else {
315 req.limit_per_layer as usize
316 };
317
318 let mut neighbors = Vec::new();
319 let mut visited = std::collections::HashSet::new();
320 let mut current_frontier = Vec::new();
321
322 if let Some(start_uri) = store.get_uri(req.node_id) {
324 current_frontier.push(start_uri.clone());
325 visited.insert(start_uri);
326 }
327
328 for current_depth in 1..=max_depth {
330 let mut next_frontier = Vec::new();
331 let mut layer_count = 0;
332 let base_score = 1.0 / current_depth as f32; for uri in ¤t_frontier {
335 if layer_count >= limit_per_layer {
336 break;
337 }
338
339 if direction == "outgoing" || direction == "both" {
341 if let Ok(subj) = oxigraph::model::NamedNodeRef::new(uri) {
342 for quad in
343 store
344 .store
345 .quads_for_pattern(Some(subj.into()), None, None, None)
346 {
347 if layer_count >= limit_per_layer {
348 break;
349 }
350 if let Ok(q) = quad {
351 let pred = q.predicate.to_string();
352 if let Some(filter) = edge_filter {
354 if !pred.contains(filter) {
355 continue;
356 }
357 }
358 let obj_term = q.object;
359 let obj_uri = obj_term.to_string();
360
361 if let Some(type_filter) = node_type_filter {
363 let passed =
364 if let oxigraph::model::Term::NamedNode(ref n) = obj_term {
365 let rdf_type = oxigraph::model::NamedNodeRef::new(
366 "http://www.w3.org/1999/02/22-rdf-syntax-ns#type",
367 )
368 .unwrap();
369 if let Ok(target_type) =
370 oxigraph::model::NamedNodeRef::new(type_filter)
371 {
372 store
373 .store
374 .quads_for_pattern(
375 Some(n.into()),
376 Some(rdf_type),
377 Some(target_type.into()),
378 None,
379 )
380 .next()
381 .is_some()
382 } else {
383 false
384 }
385 } else {
386 false
387 };
388 if !passed {
389 continue;
390 }
391 }
392
393 let clean_uri = match &obj_term {
394 oxigraph::model::Term::NamedNode(n) => n.as_str(),
395 _ => &obj_uri,
396 };
397
398 if !visited.contains(&obj_uri) {
405 visited.insert(obj_uri.clone());
406 let obj_id = store.get_or_create_id(&obj_uri);
407
408 let mut neighbor_score = base_score;
409 if req.scoring_strategy == "degree" {
410 let degree = store.get_degree(clean_uri);
411 neighbor_score /= (degree as f32 + 1.0).ln().max(0.1);
412 }
413
414 neighbors.push(Neighbor {
415 node_id: obj_id,
416 edge_type: pred,
417 uri: obj_uri.clone(), direction: "outgoing".to_string(),
419 depth: current_depth as u32,
420 score: neighbor_score,
421 });
422 next_frontier.push(clean_uri.to_string());
424 layer_count += 1;
425 }
426 }
427 }
428 }
429 }
430
431 if direction == "incoming" || direction == "both" {
433 if let Ok(obj) = oxigraph::model::NamedNodeRef::new(uri) {
434 for quad in
435 store
436 .store
437 .quads_for_pattern(None, None, Some(obj.into()), None)
438 {
439 if layer_count >= limit_per_layer {
440 break;
441 }
442 if let Ok(q) = quad {
443 let pred = q.predicate.to_string();
444 if let Some(filter) = edge_filter {
446 if !pred.contains(filter) {
447 continue;
448 }
449 }
450 let subj_term = q.subject;
451 let subj_uri = subj_term.to_string();
452
453 if let Some(type_filter) = node_type_filter {
455 let passed = if let oxigraph::model::Subject::NamedNode(ref n) =
456 subj_term
457 {
458 let rdf_type = oxigraph::model::NamedNodeRef::new(
459 "http://www.w3.org/1999/02/22-rdf-syntax-ns#type",
460 )
461 .unwrap();
462 if let Ok(target_type) =
463 oxigraph::model::NamedNodeRef::new(type_filter)
464 {
465 store
466 .store
467 .quads_for_pattern(
468 Some(n.into()),
469 Some(rdf_type),
470 Some(target_type.into()),
471 None,
472 )
473 .next()
474 .is_some()
475 } else {
476 false
477 }
478 } else {
479 false
480 };
481 if !passed {
482 continue;
483 }
484 }
485
486 let clean_uri = match &subj_term {
487 oxigraph::model::Subject::NamedNode(n) => n.as_str(),
488 _ => &subj_uri,
489 };
490
491 if !visited.contains(&subj_uri) {
492 visited.insert(subj_uri.clone());
493 let subj_id = store.get_or_create_id(&subj_uri);
494
495 let mut neighbor_score = base_score;
496 if req.scoring_strategy == "degree" {
497 let degree = store.get_degree(clean_uri);
498 neighbor_score /= (degree as f32 + 1.0).ln().max(0.1);
500 }
501
502 neighbors.push(Neighbor {
503 node_id: subj_id,
504 edge_type: pred,
505 uri: subj_uri.clone(),
506 direction: "incoming".to_string(),
507 depth: current_depth as u32,
508 score: neighbor_score,
509 });
510 next_frontier.push(clean_uri.to_string());
512 layer_count += 1;
513 }
514 }
515 }
516 }
517 }
518 }
519
520 current_frontier = next_frontier;
521 if current_frontier.is_empty() {
522 break;
523 }
524 }
525
526 neighbors.sort_by(|a, b| {
528 b.score
529 .partial_cmp(&a.score)
530 .unwrap_or(std::cmp::Ordering::Equal)
531 });
532
533 Ok(Response::new(NeighborResponse { neighbors }))
534 }
535
536 async fn search(
537 &self,
538 request: Request<SearchRequest>,
539 ) -> Result<Response<SearchResponse>, Status> {
540 let token = get_token(&request);
541 let req = request.into_inner();
542 let namespace = if req.namespace.is_empty() {
543 "default"
544 } else {
545 &req.namespace
546 };
547
548 if let Err(e) = self.auth.check(token.as_deref(), namespace, "read") {
549 return Err(Status::permission_denied(e));
550 }
551
552 let store = self.get_store(namespace)?;
553
554 match store.hybrid_search(&req.query, req.limit as usize, 0).await {
555 Ok(results) => {
556 let grpc_results = results
557 .into_iter()
558 .enumerate()
559 .map(|(idx, (uri, score))| SearchResult {
560 node_id: idx as u32,
561 score,
562 content: uri.clone(),
563 uri,
564 })
565 .collect();
566 Ok(Response::new(SearchResponse {
567 results: grpc_results,
568 }))
569 }
570 Err(e) => Err(Status::internal(e.to_string())),
571 }
572 }
573
574 async fn resolve_id(
575 &self,
576 request: Request<ResolveRequest>,
577 ) -> Result<Response<ResolveResponse>, Status> {
578 let token = get_token(&request);
579 let req = request.into_inner();
580 let namespace = if req.namespace.is_empty() {
581 "default"
582 } else {
583 &req.namespace
584 };
585
586 if let Err(e) = self.auth.check(token.as_deref(), namespace, "read") {
587 return Err(Status::permission_denied(e));
588 }
589
590 let store = self.get_store(namespace)?;
591
592 let uri = store.ensure_uri(&req.content);
593
594 let uri_to_id = store.uri_to_id.read().unwrap();
596 if let Some(&node_id) = uri_to_id.get(&uri) {
597 Ok(Response::new(ResolveResponse {
598 node_id,
599 found: true,
600 }))
601 } else {
602 Ok(Response::new(ResolveResponse {
603 node_id: 0,
604 found: false,
605 }))
606 }
607 }
608
609 async fn get_all_triples(
610 &self,
611 request: Request<EmptyRequest>,
612 ) -> Result<Response<TriplesResponse>, Status> {
613 let token = get_token(&request);
614 let req = request.into_inner();
615 let namespace = if req.namespace.is_empty() {
616 "default"
617 } else {
618 &req.namespace
619 };
620
621 if let Err(e) = self.auth.check(token.as_deref(), namespace, "read") {
622 return Err(Status::permission_denied(e));
623 }
624
625 let store = self.get_store(namespace)?;
626
627 let mut triples = Vec::new();
628
629 for quad in store.store.iter().map(|q| q.unwrap()) {
630 let s = quad.subject.to_string();
631 let p = quad.predicate.to_string();
632 let o = quad.object.to_string();
633
634 let clean_s = if s.starts_with('<') && s.ends_with('>') {
636 s[1..s.len() - 1].to_string()
637 } else {
638 s
639 };
640 let clean_p = if p.starts_with('<') && p.ends_with('>') {
641 p[1..p.len() - 1].to_string()
642 } else {
643 p
644 };
645 let clean_o = if o.starts_with('<') && o.ends_with('>') {
646 o[1..o.len() - 1].to_string()
647 } else {
648 o
649 };
650
651 triples.push(Triple {
652 subject: clean_s,
653 predicate: clean_p,
654 object: clean_o,
655 provenance: Some(Provenance {
656 source: "oxigraph".to_string(),
657 timestamp: "".to_string(),
658 method: "storage".to_string(),
659 }),
660 embedding: vec![],
661 });
662 }
663
664 Ok(Response::new(TriplesResponse { triples }))
665 }
666
667 async fn query_sparql(
668 &self,
669 request: Request<SparqlRequest>,
670 ) -> Result<Response<SparqlResponse>, Status> {
671 let token = get_token(&request);
672 let req = request.into_inner();
673 let namespace = if req.namespace.is_empty() {
674 "default"
675 } else {
676 &req.namespace
677 };
678
679 if let Err(e) = self.auth.check(token.as_deref(), namespace, "read") {
680 return Err(Status::permission_denied(e));
681 }
682
683 let store = self.get_store(namespace)?;
684
685 match store.query_sparql(&req.query) {
686 Ok(json) => Ok(Response::new(SparqlResponse { results_json: json })),
687 Err(e) => Err(Status::internal(e.to_string())),
688 }
689 }
690
691 async fn delete_namespace_data(
692 &self,
693 request: Request<EmptyRequest>,
694 ) -> Result<Response<DeleteResponse>, Status> {
695 let token = get_token(&request);
696 let req = request.into_inner();
697 let namespace = if req.namespace.is_empty() {
698 "default"
699 } else {
700 &req.namespace
701 };
702
703 if let Err(e) = self.auth.check(token.as_deref(), namespace, "delete") {
704 return Err(Status::permission_denied(e));
705 }
706
707 self.stores.remove(namespace);
709
710 let path = Path::new(&self.storage_path).join(namespace);
712 if path.exists() {
713 std::fs::remove_dir_all(path).map_err(|e| Status::internal(e.to_string()))?;
714 }
715
716 Ok(Response::new(DeleteResponse {
717 success: true,
718 message: format!("Deleted namespace '{}'", namespace),
719 }))
720 }
721
722 async fn hybrid_search(
723 &self,
724 request: Request<HybridSearchRequest>,
725 ) -> Result<Response<SearchResponse>, Status> {
726 let token = get_token(&request);
727 let req = request.into_inner();
728 let namespace = if req.namespace.is_empty() {
729 "default"
730 } else {
731 &req.namespace
732 };
733
734 if let Err(e) = self.auth.check(token.as_deref(), namespace, "read") {
735 return Err(Status::permission_denied(e));
736 }
737
738 let store = self.get_store(namespace)?;
739
740 let vector_k = req.vector_k as usize;
741 let graph_depth = req.graph_depth;
742
743 let results = match SearchMode::try_from(req.mode) {
744 Ok(SearchMode::VectorOnly) | Ok(SearchMode::Hybrid) => store
745 .hybrid_search(&req.query, vector_k, graph_depth)
746 .await
747 .map_err(|e| Status::internal(format!("Hybrid search failed: {}", e)))?,
748 _ => vec![],
749 };
750
751 let grpc_results = results
752 .into_iter()
753 .enumerate()
754 .map(|(idx, (uri, score))| SearchResult {
755 node_id: idx as u32,
756 score,
757 content: uri.clone(),
758 uri,
759 })
760 .collect();
761
762 Ok(Response::new(SearchResponse {
763 results: grpc_results,
764 }))
765 }
766
767 async fn apply_reasoning(
768 &self,
769 request: Request<ReasoningRequest>,
770 ) -> Result<Response<ReasoningResponse>, Status> {
771 let token = get_token(&request);
773 let req = request.into_inner();
774 let namespace = if req.namespace.is_empty() {
775 "default"
776 } else {
777 &req.namespace
778 };
779
780 if let Err(e) = self.auth.check(token.as_deref(), namespace, "reason") {
781 return Err(Status::permission_denied(e));
782 }
783
784 let store = self.get_store(namespace)?;
785
786 let strategy = match ReasoningStrategy::try_from(req.strategy) {
787 Ok(ReasoningStrategy::Rdfs) => InternalStrategy::RDFS,
788 Ok(ReasoningStrategy::Owlrl) => InternalStrategy::OWLRL,
789 _ => InternalStrategy::None,
790 };
791 let strategy_name = format!("{:?}", strategy);
792
793 let reasoner = SynapseReasoner::new(strategy);
794 let start_triples = store.store.len().unwrap_or(0);
795
796 let response = if req.materialize {
797 match reasoner.materialize(&store.store) {
798 Ok(count) => Ok(Response::new(ReasoningResponse {
799 success: true,
800 triples_inferred: count as u32,
801 message: format!(
802 "Materialized {} triples in namespace '{}'",
803 count, namespace
804 ),
805 })),
806 Err(e) => Err(Status::internal(e.to_string())),
807 }
808 } else {
809 match reasoner.apply(&store.store) {
810 Ok(triples) => Ok(Response::new(ReasoningResponse {
811 success: true,
812 triples_inferred: triples.len() as u32,
813 message: format!(
814 "Found {} inferred triples in namespace '{}'",
815 triples.len(),
816 namespace
817 ),
818 })),
819 Err(e) => Err(Status::internal(e.to_string())),
820 }
821 };
822
823 if let Ok(ref res) = response {
825 let inferred = res.get_ref().triples_inferred as usize;
826 self.audit.log(
827 namespace,
828 &strategy_name,
829 start_triples,
830 inferred,
831 0, vec![], );
834 }
835
836 response
837 }
838}
839
840pub async fn run_mcp_stdio(
841 engine: Arc<MySemanticEngine>,
842) -> Result<(), Box<dyn std::error::Error>> {
843 let server = crate::mcp_stdio::McpStdioServer::new(engine);
844 server.run().await
845}