Skip to main content

synapse_core/
server.rs

1use dashmap::DashMap;
2use std::sync::Arc;
3use tonic::{Request, Response, Status};
4
5pub mod proto {
6    tonic::include_proto!("semantic_engine");
7}
8
9use proto::semantic_engine_server::SemanticEngine;
10use proto::*;
11
12use crate::ingest::IngestionEngine;
13use crate::reasoner::{ReasoningStrategy as InternalStrategy, SynapseReasoner};
14use crate::scenarios::ScenarioManager;
15use crate::server::proto::{ReasoningStrategy, SearchMode};
16use crate::store::{IngestTriple, SynapseStore};
17use std::path::Path;
18
19use crate::audit::InferenceAudit;
20use crate::auth::NamespaceAuth;
21
22#[derive(Clone)]
23pub struct AuthToken(pub String);
24
25#[allow(clippy::result_large_err)]
26pub fn auth_interceptor(mut req: Request<()>) -> Result<Request<()>, Status> {
27    if let Some(token) = req
28        .metadata()
29        .get("authorization")
30        .and_then(|t| t.to_str().ok())
31        .map(|s| s.trim_start_matches("Bearer ").to_string())
32    {
33        req.extensions_mut().insert(AuthToken(token));
34    }
35    Ok(req)
36}
37
38fn get_token<T>(req: &Request<T>) -> Option<String> {
39    if let Some(token) = req.extensions().get::<AuthToken>() {
40        return Some(token.0.clone());
41    }
42    req.metadata()
43        .get("authorization")
44        .and_then(|t| t.to_str().ok())
45        .map(|s| s.trim_start_matches("Bearer ").to_string())
46}
47
48#[derive(Clone)]
49pub struct MySemanticEngine {
50    pub storage_path: String,
51    pub stores: Arc<DashMap<String, Arc<SynapseStore>>>,
52    pub auth: Arc<NamespaceAuth>,
53    pub audit: Arc<InferenceAudit>,
54    pub scenario_manager: Arc<ScenarioManager>,
55}
56
57impl MySemanticEngine {
58    pub fn new(storage_path: &str) -> Self {
59        let auth = Arc::new(NamespaceAuth::new());
60        auth.load_from_env();
61        let scenario_manager = Arc::new(ScenarioManager::new(std::path::Path::new(".")));
62
63        Self {
64            storage_path: storage_path.to_string(),
65            stores: Arc::new(DashMap::new()),
66            auth,
67            audit: Arc::new(InferenceAudit::new()),
68            scenario_manager,
69        }
70    }
71
72    pub async fn install_scenario(&self, name: &str, namespace: &str) -> Result<String, String> {
73        let path = self
74            .scenario_manager
75            .install_scenario(name)
76            .await
77            .map_err(|e| format!("Failed to install scenario assets: {}", e))?;
78
79        let store = self
80            .get_store(namespace)
81            .map_err(|e| e.message().to_string())?;
82
83        // Load Ontologies
84        let schema_path = path.join("schema");
85        let mut triples_loaded = 0;
86        if schema_path.exists() {
87            triples_loaded +=
88                crate::ingest::ontology::OntologyLoader::load_directory(&store, &schema_path)
89                    .await
90                    .map_err(|e| format!("Failed to load ontologies: {}", e))?;
91        }
92
93        // Load Data (Files)
94        let data_path = path.join("data");
95        let mut data_files_loaded = 0;
96        if data_path.exists() {
97            if let Ok(entries) = std::fs::read_dir(data_path) {
98                for entry in entries.flatten() {
99                    let p = entry.path();
100                    if p.is_file() {
101                        // Use ingestion engine
102                        let engine = IngestionEngine::new(store.clone());
103                        if let Ok(count) = engine.ingest_file(&p, namespace).await {
104                            triples_loaded += count as usize;
105                            data_files_loaded += 1;
106                        }
107                    }
108                }
109            }
110        }
111
112        // Load Docs (RAG)
113        let docs_path = path.join("docs");
114        let mut docs_loaded = 0;
115        if docs_path.exists() {
116            if let Ok(entries) = std::fs::read_dir(docs_path) {
117                for entry in entries.flatten() {
118                    let p = entry.path();
119                    if p.is_file() {
120                        if let Ok(content) = std::fs::read_to_string(&p) {
121                            let processor = crate::processor::TextProcessor::new();
122                            let chunks = processor.chunk_text(&content, 1000, 150);
123                            if let Some(ref vector_store) = store.vector_store {
124                                for (i, chunk) in chunks.iter().enumerate() {
125                                    let chunk_uri = format!("file://{}#chunk-{}", p.display(), i);
126                                    let metadata = serde_json::json!({
127                                        "uri": format!("file://{}", p.display()),
128                                        "type": "doc_chunk",
129                                        "scenario": name
130                                    });
131                                    let _ = vector_store.add(&chunk_uri, chunk, metadata).await;
132                                }
133                                docs_loaded += 1;
134                            }
135                        }
136                    }
137                }
138            }
139        }
140
141        Ok(format!(
142            "Scenario '{}' installed. Loaded {} triples ({} data files) and {} docs.",
143            name, triples_loaded, data_files_loaded, docs_loaded
144        ))
145    }
146
147    pub async fn shutdown(&self) {
148        eprintln!("Shutting down... flushing {} stores", self.stores.len());
149        for entry in self.stores.iter() {
150            let store = entry.value();
151            if let Err(e) = store.flush() {
152                eprintln!("Failed to flush store '{}': {}", entry.key(), e);
153            }
154        }
155        eprintln!("Shutdown complete.");
156    }
157
158    #[allow(clippy::result_large_err)]
159    pub fn get_store(&self, namespace: &str) -> Result<Arc<SynapseStore>, Status> {
160        // Use entry API to ensure atomicity
161        let store = self.stores.entry(namespace.to_string()).or_insert_with(|| {
162            let s =
163                SynapseStore::open(namespace, &self.storage_path).expect("Failed to open store");
164            Arc::new(s)
165        });
166
167        Ok(store.value().clone())
168    }
169}
170
171#[tonic::async_trait]
172impl SemanticEngine for MySemanticEngine {
173    async fn ingest_triples(
174        &self,
175        request: Request<IngestRequest>,
176    ) -> Result<Response<IngestResponse>, Status> {
177        // Auth check (Write permission)
178        let token = get_token(&request);
179        let req = request.into_inner();
180        let namespace = if req.namespace.is_empty() {
181            "default"
182        } else {
183            &req.namespace
184        };
185
186        if let Err(e) = self.auth.check(token.as_deref(), namespace, "write") {
187            return Err(Status::permission_denied(e));
188        }
189
190        let store = self.get_store(namespace)?;
191
192        // Log provenance for audit
193        let timestamp = chrono::Utc::now().to_rfc3339();
194        let triple_count = req.triples.len();
195        let mut sources: Vec<String> = Vec::new();
196
197        let triples: Vec<IngestTriple> = req
198            .triples
199            .into_iter()
200            .map(|t| {
201                // Capture provenance sources for logging
202                if let Some(ref prov) = t.provenance {
203                    if !prov.source.is_empty() && !sources.contains(&prov.source) {
204                        sources.push(prov.source.clone());
205                    }
206                }
207                IngestTriple {
208                    subject: t.subject,
209                    predicate: t.predicate,
210                    object: t.object,
211                    provenance: t.provenance.map(|p| crate::store::Provenance {
212                        source: p.source,
213                        timestamp: p.timestamp,
214                        method: p.method,
215                    }),
216                }
217            })
218            .collect();
219
220        match store.ingest_triples(triples).await {
221            Ok((added, _)) => {
222                // Log ingestion for audit trail
223                eprintln!(
224                    "INGEST [{timestamp}] namespace={namespace} triples={triple_count} added={added} sources={:?}",
225                    sources
226                );
227                Ok(Response::new(IngestResponse {
228                    nodes_added: added,
229                    edges_added: added,
230                }))
231            }
232            Err(e) => Err(Status::internal(e.to_string())),
233        }
234    }
235
236    async fn ingest_file(
237        &self,
238        request: Request<IngestFileRequest>,
239    ) -> Result<Response<IngestResponse>, Status> {
240        // Auth check (Write permission) - previously missing? or just implicit?
241        // Note: The original code didn't check auth for ingest_file!
242        // Adding it now for consistency as we are touching auth.
243        let token = get_token(&request);
244        let req = request.into_inner();
245        let namespace = if req.namespace.is_empty() {
246            "default"
247        } else {
248            &req.namespace
249        };
250
251        if let Err(e) = self.auth.check(token.as_deref(), namespace, "write") {
252            return Err(Status::permission_denied(e));
253        }
254        let store = self.get_store(namespace)?;
255
256        let engine = IngestionEngine::new(store);
257        let path = Path::new(&req.file_path);
258
259        match engine.ingest_file(path, namespace).await {
260            Ok(count) => Ok(Response::new(IngestResponse {
261                nodes_added: count,
262                edges_added: count,
263            })),
264            Err(e) => Err(Status::internal(e.to_string())),
265        }
266    }
267
268    async fn get_neighbors(
269        &self,
270        request: Request<NodeRequest>,
271    ) -> Result<Response<NeighborResponse>, Status> {
272        let token = get_token(&request);
273        let req = request.into_inner();
274        let namespace = if req.namespace.is_empty() {
275            "default"
276        } else {
277            &req.namespace
278        };
279
280        if let Err(e) = self.auth.check(token.as_deref(), namespace, "read") {
281            return Err(Status::permission_denied(e));
282        }
283
284        let store = self.get_store(namespace)?;
285
286        let direction = if req.direction.is_empty() {
287            "outgoing"
288        } else {
289            &req.direction
290        };
291        let edge_filter = if req.edge_filter.is_empty() {
292            None
293        } else {
294            Some(req.edge_filter.as_str())
295        };
296        let node_type_filter = if req.node_type_filter.is_empty() {
297            None
298        } else {
299            Some(req.node_type_filter.as_str())
300        };
301        let max_depth = if req.depth == 0 {
302            1
303        } else {
304            req.depth as usize
305        };
306        let limit_per_layer = if req.limit_per_layer == 0 {
307            usize::MAX
308        } else {
309            req.limit_per_layer as usize
310        };
311
312        let mut neighbors = Vec::new();
313        let mut visited = std::collections::HashSet::new();
314        let mut current_frontier = Vec::new();
315
316        // Start with the initial node
317        if let Some(start_uri) = store.get_uri(req.node_id) {
318            current_frontier.push(start_uri.clone());
319            visited.insert(start_uri);
320        }
321
322        // BFS traversal up to max_depth
323        for current_depth in 1..=max_depth {
324            let mut next_frontier = Vec::new();
325            let mut layer_count = 0;
326            let base_score = 1.0 / current_depth as f32; // Path scoring: closer = higher
327
328            for uri in &current_frontier {
329                if layer_count >= limit_per_layer {
330                    break;
331                }
332
333                // Query outgoing edges (URI as subject)
334                if direction == "outgoing" || direction == "both" {
335                    if let Ok(subj) = oxigraph::model::NamedNodeRef::new(uri) {
336                        for quad in
337                            store
338                                .store
339                                .quads_for_pattern(Some(subj.into()), None, None, None)
340                        {
341                            if layer_count >= limit_per_layer {
342                                break;
343                            }
344                            if let Ok(q) = quad {
345                                let pred = q.predicate.to_string();
346                                // Apply edge filter if specified
347                                if let Some(filter) = edge_filter {
348                                    if !pred.contains(filter) {
349                                        continue;
350                                    }
351                                }
352                                let obj_term = q.object;
353                                let obj_uri = obj_term.to_string();
354
355                                // Node Type Filter Logic
356                                if let Some(type_filter) = node_type_filter {
357                                    let passed =
358                                        if let oxigraph::model::Term::NamedNode(ref n) = obj_term {
359                                            let rdf_type = oxigraph::model::NamedNodeRef::new(
360                                                "http://www.w3.org/1999/02/22-rdf-syntax-ns#type",
361                                            )
362                                            .unwrap();
363                                            if let Ok(target_type) =
364                                                oxigraph::model::NamedNodeRef::new(type_filter)
365                                            {
366                                                store
367                                                    .store
368                                                    .quads_for_pattern(
369                                                        Some(n.into()),
370                                                        Some(rdf_type),
371                                                        Some(target_type.into()),
372                                                        None,
373                                                    )
374                                                    .next()
375                                                    .is_some()
376                                            } else {
377                                                false
378                                            }
379                                        } else {
380                                            false
381                                        };
382                                    if !passed {
383                                        continue;
384                                    }
385                                }
386
387                                let clean_uri = match &obj_term {
388                                    oxigraph::model::Term::NamedNode(n) => n.as_str(),
389                                    _ => &obj_uri,
390                                };
391
392                                // Always add to neighbors if not already in neighbors list to avoid duplicates there
393                                // But we must allow revisiting nodes for graph expansion if we want to find paths?
394                                // BFS typically avoids cycles by checking visited.
395
396                                // NOTE: visited set prevents processing same node twice in BFS.
397                                // If we reach a node that was already visited in a previous layer (or this layer), skip it.
398                                if !visited.contains(&obj_uri) {
399                                    visited.insert(obj_uri.clone());
400                                    let obj_id = store.get_or_create_id(&obj_uri);
401
402                                    let mut neighbor_score = base_score;
403                                    if req.scoring_strategy == "degree" {
404                                        let degree = store.get_degree(clean_uri);
405                                        neighbor_score /= (degree as f32 + 1.0).ln().max(0.1);
406                                    }
407
408                                    neighbors.push(Neighbor {
409                                        node_id: obj_id,
410                                        edge_type: pred,
411                                        uri: obj_uri.clone(), // This is the N-Triples formatted string for display
412                                        direction: "outgoing".to_string(),
413                                        depth: current_depth as u32,
414                                        score: neighbor_score,
415                                    });
416                                    // Use clean_uri for next frontier to ensure we query with raw URI, not <uri>
417                                    next_frontier.push(clean_uri.to_string());
418                                    layer_count += 1;
419                                }
420                            }
421                        }
422                    }
423                }
424
425                // Query incoming edges (URI as object)
426                if direction == "incoming" || direction == "both" {
427                    if let Ok(obj) = oxigraph::model::NamedNodeRef::new(uri) {
428                        for quad in
429                            store
430                                .store
431                                .quads_for_pattern(None, None, Some(obj.into()), None)
432                        {
433                            if layer_count >= limit_per_layer {
434                                break;
435                            }
436                            if let Ok(q) = quad {
437                                let pred = q.predicate.to_string();
438                                // Apply edge filter if specified
439                                if let Some(filter) = edge_filter {
440                                    if !pred.contains(filter) {
441                                        continue;
442                                    }
443                                }
444                                let subj_term = q.subject;
445                                let subj_uri = subj_term.to_string();
446
447                                // Node Type Filter Logic
448                                if let Some(type_filter) = node_type_filter {
449                                    let passed = if let oxigraph::model::Subject::NamedNode(ref n) =
450                                        subj_term
451                                    {
452                                        let rdf_type = oxigraph::model::NamedNodeRef::new(
453                                            "http://www.w3.org/1999/02/22-rdf-syntax-ns#type",
454                                        )
455                                        .unwrap();
456                                        if let Ok(target_type) =
457                                            oxigraph::model::NamedNodeRef::new(type_filter)
458                                        {
459                                            store
460                                                .store
461                                                .quads_for_pattern(
462                                                    Some(n.into()),
463                                                    Some(rdf_type),
464                                                    Some(target_type.into()),
465                                                    None,
466                                                )
467                                                .next()
468                                                .is_some()
469                                        } else {
470                                            false
471                                        }
472                                    } else {
473                                        false
474                                    };
475                                    if !passed {
476                                        continue;
477                                    }
478                                }
479
480                                let clean_uri = match &subj_term {
481                                    oxigraph::model::Subject::NamedNode(n) => n.as_str(),
482                                    _ => &subj_uri,
483                                };
484
485                                if !visited.contains(&subj_uri) {
486                                    visited.insert(subj_uri.clone());
487                                    let subj_id = store.get_or_create_id(&subj_uri);
488
489                                    let mut neighbor_score = base_score;
490                                    if req.scoring_strategy == "degree" {
491                                        let degree = store.get_degree(clean_uri);
492                                        // Penalize super nodes
493                                        neighbor_score /= (degree as f32 + 1.0).ln().max(0.1);
494                                    }
495
496                                    neighbors.push(Neighbor {
497                                        node_id: subj_id,
498                                        edge_type: pred,
499                                        uri: subj_uri.clone(),
500                                        direction: "incoming".to_string(),
501                                        depth: current_depth as u32,
502                                        score: neighbor_score,
503                                    });
504                                    // Use clean_uri for next frontier
505                                    next_frontier.push(clean_uri.to_string());
506                                    layer_count += 1;
507                                }
508                            }
509                        }
510                    }
511                }
512            }
513
514            current_frontier = next_frontier;
515            if current_frontier.is_empty() {
516                break;
517            }
518        }
519
520        // Sort by score (highest first)
521        neighbors.sort_by(|a, b| {
522            b.score
523                .partial_cmp(&a.score)
524                .unwrap_or(std::cmp::Ordering::Equal)
525        });
526
527        Ok(Response::new(NeighborResponse { neighbors }))
528    }
529
530    async fn search(
531        &self,
532        request: Request<SearchRequest>,
533    ) -> Result<Response<SearchResponse>, Status> {
534        let token = get_token(&request);
535        let req = request.into_inner();
536        let namespace = if req.namespace.is_empty() {
537            "default"
538        } else {
539            &req.namespace
540        };
541
542        if let Err(e) = self.auth.check(token.as_deref(), namespace, "read") {
543            return Err(Status::permission_denied(e));
544        }
545
546        let store = self.get_store(namespace)?;
547
548        match store.hybrid_search(&req.query, req.limit as usize, 0).await {
549            Ok(results) => {
550                let grpc_results = results
551                    .into_iter()
552                    .enumerate()
553                    .map(|(idx, (uri, score))| SearchResult {
554                        node_id: idx as u32,
555                        score,
556                        content: uri.clone(),
557                        uri,
558                    })
559                    .collect();
560                Ok(Response::new(SearchResponse {
561                    results: grpc_results,
562                }))
563            }
564            Err(e) => Err(Status::internal(e.to_string())),
565        }
566    }
567
568    async fn resolve_id(
569        &self,
570        request: Request<ResolveRequest>,
571    ) -> Result<Response<ResolveResponse>, Status> {
572        let token = get_token(&request);
573        let req = request.into_inner();
574        let namespace = if req.namespace.is_empty() {
575            "default"
576        } else {
577            &req.namespace
578        };
579
580        if let Err(e) = self.auth.check(token.as_deref(), namespace, "read") {
581            return Err(Status::permission_denied(e));
582        }
583
584        let store = self.get_store(namespace)?;
585
586        let uri = store.ensure_uri(&req.content);
587
588        // Look up the URI in our mapping
589        let uri_to_id = store.uri_to_id.read().unwrap();
590        if let Some(&node_id) = uri_to_id.get(&uri) {
591            Ok(Response::new(ResolveResponse {
592                node_id,
593                found: true,
594            }))
595        } else {
596            Ok(Response::new(ResolveResponse {
597                node_id: 0,
598                found: false,
599            }))
600        }
601    }
602
603    async fn get_all_triples(
604        &self,
605        request: Request<EmptyRequest>,
606    ) -> Result<Response<TriplesResponse>, Status> {
607        let token = get_token(&request);
608        let req = request.into_inner();
609        let namespace = if req.namespace.is_empty() {
610            "default"
611        } else {
612            &req.namespace
613        };
614
615        if let Err(e) = self.auth.check(token.as_deref(), namespace, "read") {
616            return Err(Status::permission_denied(e));
617        }
618
619        let store = self.get_store(namespace)?;
620
621        let mut triples = Vec::new();
622
623        for quad in store.store.iter().map(|q| q.unwrap()) {
624            let s = quad.subject.to_string();
625            let p = quad.predicate.to_string();
626            let o = quad.object.to_string();
627
628            // Clean up NTriples formatting (<uri> -> uri)
629            let clean_s = if s.starts_with('<') && s.ends_with('>') {
630                s[1..s.len() - 1].to_string()
631            } else {
632                s
633            };
634            let clean_p = if p.starts_with('<') && p.ends_with('>') {
635                p[1..p.len() - 1].to_string()
636            } else {
637                p
638            };
639            let clean_o = if o.starts_with('<') && o.ends_with('>') {
640                o[1..o.len() - 1].to_string()
641            } else {
642                o
643            };
644
645            triples.push(Triple {
646                subject: clean_s,
647                predicate: clean_p,
648                object: clean_o,
649                provenance: Some(Provenance {
650                    source: "oxigraph".to_string(),
651                    timestamp: "".to_string(),
652                    method: "storage".to_string(),
653                }),
654                embedding: vec![],
655            });
656        }
657
658        Ok(Response::new(TriplesResponse { triples }))
659    }
660
661    async fn query_sparql(
662        &self,
663        request: Request<SparqlRequest>,
664    ) -> Result<Response<SparqlResponse>, Status> {
665        let token = get_token(&request);
666        let req = request.into_inner();
667        let namespace = if req.namespace.is_empty() {
668            "default"
669        } else {
670            &req.namespace
671        };
672
673        if let Err(e) = self.auth.check(token.as_deref(), namespace, "read") {
674            return Err(Status::permission_denied(e));
675        }
676
677        let store = self.get_store(namespace)?;
678
679        match store.query_sparql(&req.query) {
680            Ok(json) => Ok(Response::new(SparqlResponse { results_json: json })),
681            Err(e) => Err(Status::internal(e.to_string())),
682        }
683    }
684
685    async fn delete_namespace_data(
686        &self,
687        request: Request<EmptyRequest>,
688    ) -> Result<Response<DeleteResponse>, Status> {
689        let token = get_token(&request);
690        let req = request.into_inner();
691        let namespace = if req.namespace.is_empty() {
692            "default"
693        } else {
694            &req.namespace
695        };
696
697        if let Err(e) = self.auth.check(token.as_deref(), namespace, "delete") {
698            return Err(Status::permission_denied(e));
699        }
700
701        // Remove from cache
702        self.stores.remove(namespace);
703
704        // Delete directory
705        let path = Path::new(&self.storage_path).join(namespace);
706        if path.exists() {
707            std::fs::remove_dir_all(path).map_err(|e| Status::internal(e.to_string()))?;
708        }
709
710        Ok(Response::new(DeleteResponse {
711            success: true,
712            message: format!("Deleted namespace '{}'", namespace),
713        }))
714    }
715
716    async fn hybrid_search(
717        &self,
718        request: Request<HybridSearchRequest>,
719    ) -> Result<Response<SearchResponse>, Status> {
720        let token = get_token(&request);
721        let req = request.into_inner();
722        let namespace = if req.namespace.is_empty() {
723            "default"
724        } else {
725            &req.namespace
726        };
727
728        if let Err(e) = self.auth.check(token.as_deref(), namespace, "read") {
729            return Err(Status::permission_denied(e));
730        }
731
732        let store = self.get_store(namespace)?;
733
734        let vector_k = req.vector_k as usize;
735        let graph_depth = req.graph_depth;
736
737        let results = match SearchMode::try_from(req.mode) {
738            Ok(SearchMode::VectorOnly) | Ok(SearchMode::Hybrid) => store
739                .hybrid_search(&req.query, vector_k, graph_depth)
740                .await
741                .map_err(|e| Status::internal(format!("Hybrid search failed: {}", e)))?,
742            _ => vec![],
743        };
744
745        let grpc_results = results
746            .into_iter()
747            .enumerate()
748            .map(|(idx, (uri, score))| SearchResult {
749                node_id: idx as u32,
750                score,
751                content: uri.clone(),
752                uri,
753            })
754            .collect();
755
756        Ok(Response::new(SearchResponse {
757            results: grpc_results,
758        }))
759    }
760
761    async fn apply_reasoning(
762        &self,
763        request: Request<ReasoningRequest>,
764    ) -> Result<Response<ReasoningResponse>, Status> {
765        // Auth check (Reason permission)
766        let token = get_token(&request);
767        let req = request.into_inner();
768        let namespace = if req.namespace.is_empty() {
769            "default"
770        } else {
771            &req.namespace
772        };
773
774        if let Err(e) = self.auth.check(token.as_deref(), namespace, "reason") {
775            return Err(Status::permission_denied(e));
776        }
777
778        let store = self.get_store(namespace)?;
779
780        let strategy = match ReasoningStrategy::try_from(req.strategy) {
781            Ok(ReasoningStrategy::Rdfs) => InternalStrategy::RDFS,
782            Ok(ReasoningStrategy::Owlrl) => InternalStrategy::OWLRL,
783            _ => InternalStrategy::None,
784        };
785        let strategy_name = format!("{:?}", strategy);
786
787        let reasoner = SynapseReasoner::new(strategy);
788        let start_triples = store.store.len().unwrap_or(0);
789
790        let response = if req.materialize {
791            match reasoner.materialize(&store.store) {
792                Ok(count) => Ok(Response::new(ReasoningResponse {
793                    success: true,
794                    triples_inferred: count as u32,
795                    message: format!(
796                        "Materialized {} triples in namespace '{}'",
797                        count, namespace
798                    ),
799                })),
800                Err(e) => Err(Status::internal(e.to_string())),
801            }
802        } else {
803            match reasoner.apply(&store.store) {
804                Ok(triples) => Ok(Response::new(ReasoningResponse {
805                    success: true,
806                    triples_inferred: triples.len() as u32,
807                    message: format!(
808                        "Found {} inferred triples in namespace '{}'",
809                        triples.len(),
810                        namespace
811                    ),
812                })),
813                Err(e) => Err(Status::internal(e.to_string())),
814            }
815        };
816
817        // Audit Log
818        if let Ok(ref res) = response {
819            let inferred = res.get_ref().triples_inferred as usize;
820            self.audit.log(
821                namespace,
822                &strategy_name,
823                start_triples,
824                inferred,
825                0, // Duplicates skipped not easily tracked here without changing reasoner return signature
826                vec![], // Sample inferences
827            );
828        }
829
830        response
831    }
832}
833
834pub async fn run_mcp_stdio(
835    engine: Arc<MySemanticEngine>,
836) -> Result<(), Box<dyn std::error::Error>> {
837    let server = crate::mcp_stdio::McpStdioServer::new(engine);
838    server.run().await
839}