1use dashmap::DashMap;
2use std::sync::Arc;
3use tonic::{Request, Response, Status};
4
5pub mod proto {
6 tonic::include_proto!("semantic_engine");
7}
8
9use proto::semantic_engine_server::SemanticEngine;
10use proto::*;
11
12use crate::ingest::IngestionEngine;
13use crate::reasoner::{ReasoningStrategy as InternalStrategy, SynapseReasoner};
14use crate::scenarios::ScenarioManager;
15use crate::server::proto::{ReasoningStrategy, SearchMode};
16use crate::store::{IngestTriple, SynapseStore};
17use std::path::Path;
18
19use crate::audit::InferenceAudit;
20use crate::auth::NamespaceAuth;
21
22#[derive(Clone)]
23pub struct AuthToken(pub String);
24
25#[allow(clippy::result_large_err)]
26pub fn auth_interceptor(mut req: Request<()>) -> Result<Request<()>, Status> {
27 if let Some(token) = req
28 .metadata()
29 .get("authorization")
30 .and_then(|t| t.to_str().ok())
31 .map(|s| s.trim_start_matches("Bearer ").to_string())
32 {
33 req.extensions_mut().insert(AuthToken(token));
34 }
35 Ok(req)
36}
37
38fn get_token<T>(req: &Request<T>) -> Option<String> {
39 if let Some(token) = req.extensions().get::<AuthToken>() {
40 return Some(token.0.clone());
41 }
42 req.metadata()
43 .get("authorization")
44 .and_then(|t| t.to_str().ok())
45 .map(|s| s.trim_start_matches("Bearer ").to_string())
46}
47
48#[derive(Clone)]
49pub struct MySemanticEngine {
50 pub storage_path: String,
51 pub stores: Arc<DashMap<String, Arc<SynapseStore>>>,
52 pub auth: Arc<NamespaceAuth>,
53 pub audit: Arc<InferenceAudit>,
54 pub scenario_manager: Arc<ScenarioManager>,
55}
56
57impl MySemanticEngine {
58 pub fn new(storage_path: &str) -> Self {
59 let auth = Arc::new(NamespaceAuth::new());
60 auth.load_from_env();
61 let scenario_manager = Arc::new(ScenarioManager::new(std::path::Path::new(".")));
62
63 Self {
64 storage_path: storage_path.to_string(),
65 stores: Arc::new(DashMap::new()),
66 auth,
67 audit: Arc::new(InferenceAudit::new()),
68 scenario_manager,
69 }
70 }
71
72 pub async fn install_scenario(&self, name: &str, namespace: &str) -> Result<String, String> {
73 let path = self
74 .scenario_manager
75 .install_scenario(name)
76 .await
77 .map_err(|e| format!("Failed to install scenario assets: {}", e))?;
78
79 let store = self
80 .get_store(namespace)
81 .map_err(|e| e.message().to_string())?;
82
83 let schema_path = path.join("schema");
85 let mut triples_loaded = 0;
86 if schema_path.exists() {
87 triples_loaded +=
88 crate::ingest::ontology::OntologyLoader::load_directory(&store, &schema_path)
89 .await
90 .map_err(|e| format!("Failed to load ontologies: {}", e))?;
91 }
92
93 let data_path = path.join("data");
95 let mut data_files_loaded = 0;
96 if data_path.exists() {
97 if let Ok(entries) = std::fs::read_dir(data_path) {
98 for entry in entries.flatten() {
99 let p = entry.path();
100 if p.is_file() {
101 let engine = IngestionEngine::new(store.clone());
103 if let Ok(count) = engine.ingest_file(&p, namespace).await {
104 triples_loaded += count as usize;
105 data_files_loaded += 1;
106 }
107 }
108 }
109 }
110 }
111
112 let docs_path = path.join("docs");
114 let mut docs_loaded = 0;
115 if docs_path.exists() {
116 if let Ok(entries) = std::fs::read_dir(docs_path) {
117 for entry in entries.flatten() {
118 let p = entry.path();
119 if p.is_file() {
120 if let Ok(content) = std::fs::read_to_string(&p) {
121 let processor = crate::processor::TextProcessor::new();
122 let chunks = processor.chunk_text(&content, 1000, 150);
123 if let Some(ref vector_store) = store.vector_store {
124 for (i, chunk) in chunks.iter().enumerate() {
125 let chunk_uri = format!("file://{}#chunk-{}", p.display(), i);
126 let metadata = serde_json::json!({
127 "uri": format!("file://{}", p.display()),
128 "type": "doc_chunk",
129 "scenario": name
130 });
131 let _ = vector_store.add(&chunk_uri, chunk, metadata).await;
132 }
133 docs_loaded += 1;
134 }
135 }
136 }
137 }
138 }
139 }
140
141 Ok(format!(
142 "Scenario '{}' installed. Loaded {} triples ({} data files) and {} docs.",
143 name, triples_loaded, data_files_loaded, docs_loaded
144 ))
145 }
146
147 pub async fn shutdown(&self) {
148 eprintln!("Shutting down... flushing {} stores", self.stores.len());
149 for entry in self.stores.iter() {
150 let store = entry.value();
151 if let Err(e) = store.flush() {
152 eprintln!("Failed to flush store '{}': {}", entry.key(), e);
153 }
154 }
155 eprintln!("Shutdown complete.");
156 }
157
158 #[allow(clippy::result_large_err)]
159 pub fn get_store(&self, namespace: &str) -> Result<Arc<SynapseStore>, Status> {
160 let store = self.stores.entry(namespace.to_string()).or_insert_with(|| {
162 let s =
163 SynapseStore::open(namespace, &self.storage_path).expect("Failed to open store");
164 Arc::new(s)
165 });
166
167 Ok(store.value().clone())
168 }
169}
170
171#[tonic::async_trait]
172impl SemanticEngine for MySemanticEngine {
173 async fn ingest_triples(
174 &self,
175 request: Request<IngestRequest>,
176 ) -> Result<Response<IngestResponse>, Status> {
177 let token = get_token(&request);
179 let req = request.into_inner();
180 let namespace = if req.namespace.is_empty() {
181 "default"
182 } else {
183 &req.namespace
184 };
185
186 if let Err(e) = self.auth.check(token.as_deref(), namespace, "write") {
187 return Err(Status::permission_denied(e));
188 }
189
190 let store = self.get_store(namespace)?;
191
192 let timestamp = chrono::Utc::now().to_rfc3339();
194 let triple_count = req.triples.len();
195 let mut sources: Vec<String> = Vec::new();
196
197 let triples: Vec<IngestTriple> = req
198 .triples
199 .into_iter()
200 .map(|t| {
201 if let Some(ref prov) = t.provenance {
203 if !prov.source.is_empty() && !sources.contains(&prov.source) {
204 sources.push(prov.source.clone());
205 }
206 }
207 IngestTriple {
208 subject: t.subject,
209 predicate: t.predicate,
210 object: t.object,
211 provenance: t.provenance.map(|p| crate::store::Provenance {
212 source: p.source,
213 timestamp: p.timestamp,
214 method: p.method,
215 }),
216 }
217 })
218 .collect();
219
220 match store.ingest_triples(triples).await {
221 Ok((added, _)) => {
222 eprintln!(
224 "INGEST [{timestamp}] namespace={namespace} triples={triple_count} added={added} sources={:?}",
225 sources
226 );
227 Ok(Response::new(IngestResponse {
228 nodes_added: added,
229 edges_added: added,
230 }))
231 }
232 Err(e) => Err(Status::internal(e.to_string())),
233 }
234 }
235
236 async fn ingest_file(
237 &self,
238 request: Request<IngestFileRequest>,
239 ) -> Result<Response<IngestResponse>, Status> {
240 let token = get_token(&request);
244 let req = request.into_inner();
245 let namespace = if req.namespace.is_empty() {
246 "default"
247 } else {
248 &req.namespace
249 };
250
251 if let Err(e) = self.auth.check(token.as_deref(), namespace, "write") {
252 return Err(Status::permission_denied(e));
253 }
254 let store = self.get_store(namespace)?;
255
256 let engine = IngestionEngine::new(store);
257 let path = Path::new(&req.file_path);
258
259 match engine.ingest_file(path, namespace).await {
260 Ok(count) => Ok(Response::new(IngestResponse {
261 nodes_added: count,
262 edges_added: count,
263 })),
264 Err(e) => Err(Status::internal(e.to_string())),
265 }
266 }
267
268 async fn get_neighbors(
269 &self,
270 request: Request<NodeRequest>,
271 ) -> Result<Response<NeighborResponse>, Status> {
272 let token = get_token(&request);
273 let req = request.into_inner();
274 let namespace = if req.namespace.is_empty() {
275 "default"
276 } else {
277 &req.namespace
278 };
279
280 if let Err(e) = self.auth.check(token.as_deref(), namespace, "read") {
281 return Err(Status::permission_denied(e));
282 }
283
284 let store = self.get_store(namespace)?;
285
286 let direction = if req.direction.is_empty() {
287 "outgoing"
288 } else {
289 &req.direction
290 };
291 let edge_filter = if req.edge_filter.is_empty() {
292 None
293 } else {
294 Some(req.edge_filter.as_str())
295 };
296 let node_type_filter = if req.node_type_filter.is_empty() {
297 None
298 } else {
299 Some(req.node_type_filter.as_str())
300 };
301 let max_depth = if req.depth == 0 {
302 1
303 } else {
304 req.depth as usize
305 };
306 let limit_per_layer = if req.limit_per_layer == 0 {
307 usize::MAX
308 } else {
309 req.limit_per_layer as usize
310 };
311
312 let mut neighbors = Vec::new();
313 let mut visited = std::collections::HashSet::new();
314 let mut current_frontier = Vec::new();
315
316 if let Some(start_uri) = store.get_uri(req.node_id) {
318 current_frontier.push(start_uri.clone());
319 visited.insert(start_uri);
320 }
321
322 for current_depth in 1..=max_depth {
324 let mut next_frontier = Vec::new();
325 let mut layer_count = 0;
326 let base_score = 1.0 / current_depth as f32; for uri in ¤t_frontier {
329 if layer_count >= limit_per_layer {
330 break;
331 }
332
333 if direction == "outgoing" || direction == "both" {
335 if let Ok(subj) = oxigraph::model::NamedNodeRef::new(uri) {
336 for quad in
337 store
338 .store
339 .quads_for_pattern(Some(subj.into()), None, None, None)
340 {
341 if layer_count >= limit_per_layer {
342 break;
343 }
344 if let Ok(q) = quad {
345 let pred = q.predicate.to_string();
346 if let Some(filter) = edge_filter {
348 if !pred.contains(filter) {
349 continue;
350 }
351 }
352 let obj_term = q.object;
353 let obj_uri = obj_term.to_string();
354
355 if let Some(type_filter) = node_type_filter {
357 let passed =
358 if let oxigraph::model::Term::NamedNode(ref n) = obj_term {
359 let rdf_type = oxigraph::model::NamedNodeRef::new(
360 "http://www.w3.org/1999/02/22-rdf-syntax-ns#type",
361 )
362 .unwrap();
363 if let Ok(target_type) =
364 oxigraph::model::NamedNodeRef::new(type_filter)
365 {
366 store
367 .store
368 .quads_for_pattern(
369 Some(n.into()),
370 Some(rdf_type),
371 Some(target_type.into()),
372 None,
373 )
374 .next()
375 .is_some()
376 } else {
377 false
378 }
379 } else {
380 false
381 };
382 if !passed {
383 continue;
384 }
385 }
386
387 let clean_uri = match &obj_term {
388 oxigraph::model::Term::NamedNode(n) => n.as_str(),
389 _ => &obj_uri,
390 };
391
392 if !visited.contains(&obj_uri) {
399 visited.insert(obj_uri.clone());
400 let obj_id = store.get_or_create_id(&obj_uri);
401
402 let mut neighbor_score = base_score;
403 if req.scoring_strategy == "degree" {
404 let degree = store.get_degree(clean_uri);
405 neighbor_score /= (degree as f32 + 1.0).ln().max(0.1);
406 }
407
408 neighbors.push(Neighbor {
409 node_id: obj_id,
410 edge_type: pred,
411 uri: obj_uri.clone(), direction: "outgoing".to_string(),
413 depth: current_depth as u32,
414 score: neighbor_score,
415 });
416 next_frontier.push(clean_uri.to_string());
418 layer_count += 1;
419 }
420 }
421 }
422 }
423 }
424
425 if direction == "incoming" || direction == "both" {
427 if let Ok(obj) = oxigraph::model::NamedNodeRef::new(uri) {
428 for quad in
429 store
430 .store
431 .quads_for_pattern(None, None, Some(obj.into()), None)
432 {
433 if layer_count >= limit_per_layer {
434 break;
435 }
436 if let Ok(q) = quad {
437 let pred = q.predicate.to_string();
438 if let Some(filter) = edge_filter {
440 if !pred.contains(filter) {
441 continue;
442 }
443 }
444 let subj_term = q.subject;
445 let subj_uri = subj_term.to_string();
446
447 if let Some(type_filter) = node_type_filter {
449 let passed = if let oxigraph::model::Subject::NamedNode(ref n) =
450 subj_term
451 {
452 let rdf_type = oxigraph::model::NamedNodeRef::new(
453 "http://www.w3.org/1999/02/22-rdf-syntax-ns#type",
454 )
455 .unwrap();
456 if let Ok(target_type) =
457 oxigraph::model::NamedNodeRef::new(type_filter)
458 {
459 store
460 .store
461 .quads_for_pattern(
462 Some(n.into()),
463 Some(rdf_type),
464 Some(target_type.into()),
465 None,
466 )
467 .next()
468 .is_some()
469 } else {
470 false
471 }
472 } else {
473 false
474 };
475 if !passed {
476 continue;
477 }
478 }
479
480 let clean_uri = match &subj_term {
481 oxigraph::model::Subject::NamedNode(n) => n.as_str(),
482 _ => &subj_uri,
483 };
484
485 if !visited.contains(&subj_uri) {
486 visited.insert(subj_uri.clone());
487 let subj_id = store.get_or_create_id(&subj_uri);
488
489 let mut neighbor_score = base_score;
490 if req.scoring_strategy == "degree" {
491 let degree = store.get_degree(clean_uri);
492 neighbor_score /= (degree as f32 + 1.0).ln().max(0.1);
494 }
495
496 neighbors.push(Neighbor {
497 node_id: subj_id,
498 edge_type: pred,
499 uri: subj_uri.clone(),
500 direction: "incoming".to_string(),
501 depth: current_depth as u32,
502 score: neighbor_score,
503 });
504 next_frontier.push(clean_uri.to_string());
506 layer_count += 1;
507 }
508 }
509 }
510 }
511 }
512 }
513
514 current_frontier = next_frontier;
515 if current_frontier.is_empty() {
516 break;
517 }
518 }
519
520 neighbors.sort_by(|a, b| {
522 b.score
523 .partial_cmp(&a.score)
524 .unwrap_or(std::cmp::Ordering::Equal)
525 });
526
527 Ok(Response::new(NeighborResponse { neighbors }))
528 }
529
530 async fn search(
531 &self,
532 request: Request<SearchRequest>,
533 ) -> Result<Response<SearchResponse>, Status> {
534 let token = get_token(&request);
535 let req = request.into_inner();
536 let namespace = if req.namespace.is_empty() {
537 "default"
538 } else {
539 &req.namespace
540 };
541
542 if let Err(e) = self.auth.check(token.as_deref(), namespace, "read") {
543 return Err(Status::permission_denied(e));
544 }
545
546 let store = self.get_store(namespace)?;
547
548 match store.hybrid_search(&req.query, req.limit as usize, 0).await {
549 Ok(results) => {
550 let grpc_results = results
551 .into_iter()
552 .enumerate()
553 .map(|(idx, (uri, score))| SearchResult {
554 node_id: idx as u32,
555 score,
556 content: uri.clone(),
557 uri,
558 })
559 .collect();
560 Ok(Response::new(SearchResponse {
561 results: grpc_results,
562 }))
563 }
564 Err(e) => Err(Status::internal(e.to_string())),
565 }
566 }
567
568 async fn resolve_id(
569 &self,
570 request: Request<ResolveRequest>,
571 ) -> Result<Response<ResolveResponse>, Status> {
572 let token = get_token(&request);
573 let req = request.into_inner();
574 let namespace = if req.namespace.is_empty() {
575 "default"
576 } else {
577 &req.namespace
578 };
579
580 if let Err(e) = self.auth.check(token.as_deref(), namespace, "read") {
581 return Err(Status::permission_denied(e));
582 }
583
584 let store = self.get_store(namespace)?;
585
586 let uri = store.ensure_uri(&req.content);
587
588 let uri_to_id = store.uri_to_id.read().unwrap();
590 if let Some(&node_id) = uri_to_id.get(&uri) {
591 Ok(Response::new(ResolveResponse {
592 node_id,
593 found: true,
594 }))
595 } else {
596 Ok(Response::new(ResolveResponse {
597 node_id: 0,
598 found: false,
599 }))
600 }
601 }
602
603 async fn get_all_triples(
604 &self,
605 request: Request<EmptyRequest>,
606 ) -> Result<Response<TriplesResponse>, Status> {
607 let token = get_token(&request);
608 let req = request.into_inner();
609 let namespace = if req.namespace.is_empty() {
610 "default"
611 } else {
612 &req.namespace
613 };
614
615 if let Err(e) = self.auth.check(token.as_deref(), namespace, "read") {
616 return Err(Status::permission_denied(e));
617 }
618
619 let store = self.get_store(namespace)?;
620
621 let mut triples = Vec::new();
622
623 for quad in store.store.iter().map(|q| q.unwrap()) {
624 let s = quad.subject.to_string();
625 let p = quad.predicate.to_string();
626 let o = quad.object.to_string();
627
628 let clean_s = if s.starts_with('<') && s.ends_with('>') {
630 s[1..s.len() - 1].to_string()
631 } else {
632 s
633 };
634 let clean_p = if p.starts_with('<') && p.ends_with('>') {
635 p[1..p.len() - 1].to_string()
636 } else {
637 p
638 };
639 let clean_o = if o.starts_with('<') && o.ends_with('>') {
640 o[1..o.len() - 1].to_string()
641 } else {
642 o
643 };
644
645 triples.push(Triple {
646 subject: clean_s,
647 predicate: clean_p,
648 object: clean_o,
649 provenance: Some(Provenance {
650 source: "oxigraph".to_string(),
651 timestamp: "".to_string(),
652 method: "storage".to_string(),
653 }),
654 embedding: vec![],
655 });
656 }
657
658 Ok(Response::new(TriplesResponse { triples }))
659 }
660
661 async fn query_sparql(
662 &self,
663 request: Request<SparqlRequest>,
664 ) -> Result<Response<SparqlResponse>, Status> {
665 let token = get_token(&request);
666 let req = request.into_inner();
667 let namespace = if req.namespace.is_empty() {
668 "default"
669 } else {
670 &req.namespace
671 };
672
673 if let Err(e) = self.auth.check(token.as_deref(), namespace, "read") {
674 return Err(Status::permission_denied(e));
675 }
676
677 let store = self.get_store(namespace)?;
678
679 match store.query_sparql(&req.query) {
680 Ok(json) => Ok(Response::new(SparqlResponse { results_json: json })),
681 Err(e) => Err(Status::internal(e.to_string())),
682 }
683 }
684
685 async fn delete_namespace_data(
686 &self,
687 request: Request<EmptyRequest>,
688 ) -> Result<Response<DeleteResponse>, Status> {
689 let token = get_token(&request);
690 let req = request.into_inner();
691 let namespace = if req.namespace.is_empty() {
692 "default"
693 } else {
694 &req.namespace
695 };
696
697 if let Err(e) = self.auth.check(token.as_deref(), namespace, "delete") {
698 return Err(Status::permission_denied(e));
699 }
700
701 self.stores.remove(namespace);
703
704 let path = Path::new(&self.storage_path).join(namespace);
706 if path.exists() {
707 std::fs::remove_dir_all(path).map_err(|e| Status::internal(e.to_string()))?;
708 }
709
710 Ok(Response::new(DeleteResponse {
711 success: true,
712 message: format!("Deleted namespace '{}'", namespace),
713 }))
714 }
715
716 async fn hybrid_search(
717 &self,
718 request: Request<HybridSearchRequest>,
719 ) -> Result<Response<SearchResponse>, Status> {
720 let token = get_token(&request);
721 let req = request.into_inner();
722 let namespace = if req.namespace.is_empty() {
723 "default"
724 } else {
725 &req.namespace
726 };
727
728 if let Err(e) = self.auth.check(token.as_deref(), namespace, "read") {
729 return Err(Status::permission_denied(e));
730 }
731
732 let store = self.get_store(namespace)?;
733
734 let vector_k = req.vector_k as usize;
735 let graph_depth = req.graph_depth;
736
737 let results = match SearchMode::try_from(req.mode) {
738 Ok(SearchMode::VectorOnly) | Ok(SearchMode::Hybrid) => store
739 .hybrid_search(&req.query, vector_k, graph_depth)
740 .await
741 .map_err(|e| Status::internal(format!("Hybrid search failed: {}", e)))?,
742 _ => vec![],
743 };
744
745 let grpc_results = results
746 .into_iter()
747 .enumerate()
748 .map(|(idx, (uri, score))| SearchResult {
749 node_id: idx as u32,
750 score,
751 content: uri.clone(),
752 uri,
753 })
754 .collect();
755
756 Ok(Response::new(SearchResponse {
757 results: grpc_results,
758 }))
759 }
760
761 async fn apply_reasoning(
762 &self,
763 request: Request<ReasoningRequest>,
764 ) -> Result<Response<ReasoningResponse>, Status> {
765 let token = get_token(&request);
767 let req = request.into_inner();
768 let namespace = if req.namespace.is_empty() {
769 "default"
770 } else {
771 &req.namespace
772 };
773
774 if let Err(e) = self.auth.check(token.as_deref(), namespace, "reason") {
775 return Err(Status::permission_denied(e));
776 }
777
778 let store = self.get_store(namespace)?;
779
780 let strategy = match ReasoningStrategy::try_from(req.strategy) {
781 Ok(ReasoningStrategy::Rdfs) => InternalStrategy::RDFS,
782 Ok(ReasoningStrategy::Owlrl) => InternalStrategy::OWLRL,
783 _ => InternalStrategy::None,
784 };
785 let strategy_name = format!("{:?}", strategy);
786
787 let reasoner = SynapseReasoner::new(strategy);
788 let start_triples = store.store.len().unwrap_or(0);
789
790 let response = if req.materialize {
791 match reasoner.materialize(&store.store) {
792 Ok(count) => Ok(Response::new(ReasoningResponse {
793 success: true,
794 triples_inferred: count as u32,
795 message: format!(
796 "Materialized {} triples in namespace '{}'",
797 count, namespace
798 ),
799 })),
800 Err(e) => Err(Status::internal(e.to_string())),
801 }
802 } else {
803 match reasoner.apply(&store.store) {
804 Ok(triples) => Ok(Response::new(ReasoningResponse {
805 success: true,
806 triples_inferred: triples.len() as u32,
807 message: format!(
808 "Found {} inferred triples in namespace '{}'",
809 triples.len(),
810 namespace
811 ),
812 })),
813 Err(e) => Err(Status::internal(e.to_string())),
814 }
815 };
816
817 if let Ok(ref res) = response {
819 let inferred = res.get_ref().triples_inferred as usize;
820 self.audit.log(
821 namespace,
822 &strategy_name,
823 start_triples,
824 inferred,
825 0, vec![], );
828 }
829
830 response
831 }
832}
833
834pub async fn run_mcp_stdio(
835 engine: Arc<MySemanticEngine>,
836) -> Result<(), Box<dyn std::error::Error>> {
837 let server = crate::mcp_stdio::McpStdioServer::new(engine);
838 server.run().await
839}