Skip to main content

synapse_core/
server.rs

1use dashmap::DashMap;
2use std::sync::Arc;
3use tonic::{Request, Response, Status};
4
5pub mod proto {
6    tonic::include_proto!("semantic_engine");
7}
8
9use proto::semantic_engine_server::SemanticEngine;
10use proto::*;
11
12use crate::ingest::IngestionEngine;
13use crate::reasoner::{ReasoningStrategy as InternalStrategy, SynapseReasoner};
14use crate::scenarios::ScenarioManager;
15use crate::server::proto::{ReasoningStrategy, SearchMode};
16use crate::store::{IngestTriple, SynapseStore};
17use std::path::Path;
18
19use crate::audit::InferenceAudit;
20use crate::auth::NamespaceAuth;
21
22#[derive(Clone)]
23pub struct AuthToken(pub String);
24
25#[allow(clippy::result_large_err)]
26pub fn auth_interceptor(mut req: Request<()>) -> Result<Request<()>, Status> {
27    if let Some(token) = req
28        .metadata()
29        .get("authorization")
30        .and_then(|t| t.to_str().ok())
31        .map(|s| s.trim_start_matches("Bearer ").to_string())
32    {
33        req.extensions_mut().insert(AuthToken(token));
34    }
35    Ok(req)
36}
37
38fn get_token<T>(req: &Request<T>) -> Option<String> {
39    if let Some(token) = req.extensions().get::<AuthToken>() {
40        return Some(token.0.clone());
41    }
42    req.metadata()
43        .get("authorization")
44        .and_then(|t| t.to_str().ok())
45        .map(|s| s.trim_start_matches("Bearer ").to_string())
46}
47
48#[derive(Clone)]
49pub struct MySemanticEngine {
50    pub storage_path: String,
51    pub stores: Arc<DashMap<String, Arc<SynapseStore>>>,
52    pub auth: Arc<NamespaceAuth>,
53    pub audit: Arc<InferenceAudit>,
54    pub scenario_manager: Arc<ScenarioManager>,
55}
56
57impl MySemanticEngine {
58    pub fn new(storage_path: &str) -> Self {
59        let auth = Arc::new(NamespaceAuth::new());
60        auth.load_from_env();
61        let scenario_manager = Arc::new(ScenarioManager::new(std::path::Path::new(".")));
62
63        Self {
64            storage_path: storage_path.to_string(),
65            stores: Arc::new(DashMap::new()),
66            auth,
67            audit: Arc::new(InferenceAudit::new()),
68            scenario_manager,
69        }
70    }
71
72    pub async fn install_scenario(&self, name: &str, namespace: &str) -> Result<String, String> {
73        let path = self
74            .scenario_manager
75            .install_scenario(name)
76            .await
77            .map_err(|e| format!("Failed to install scenario assets: {}", e))?;
78
79        let store = self
80            .get_store(namespace)
81            .map_err(|e| e.message().to_string())?;
82
83        // Load Ontologies
84        let schema_path = path.join("schema");
85        let mut triples_loaded = 0;
86        if schema_path.exists() {
87            triples_loaded +=
88                crate::ingest::ontology::OntologyLoader::load_directory(&store, &schema_path)
89                    .await
90                    .map_err(|e| format!("Failed to load ontologies: {}", e))?;
91        }
92
93        // Load Data (Files)
94        let data_path = path.join("data");
95        let mut data_files_loaded = 0;
96        if data_path.exists() {
97            if let Ok(entries) = std::fs::read_dir(data_path) {
98                for entry in entries.flatten() {
99                    let p = entry.path();
100                    if p.is_file() {
101                        // Use ingestion engine
102                        let engine = IngestionEngine::new(store.clone());
103                        if let Ok(count) = engine.ingest_file(&p, namespace).await {
104                            triples_loaded += count as usize;
105                            data_files_loaded += 1;
106                        }
107                    }
108                }
109            }
110        }
111
112        // Load Docs (RAG)
113        let docs_path = path.join("docs");
114        let mut docs_loaded = 0;
115        if docs_path.exists() {
116            if let Ok(entries) = std::fs::read_dir(docs_path) {
117                for entry in entries.flatten() {
118                    let p = entry.path();
119                    if p.is_file() {
120                        if let Ok(content) = std::fs::read_to_string(&p) {
121                            let processor = crate::processor::TextProcessor::new();
122                            let chunks = processor.chunk_text(&content, 1000, 150);
123                            if let Some(ref vector_store) = store.vector_store {
124                                for (i, chunk) in chunks.iter().enumerate() {
125                                    let chunk_uri = format!("file://{}#chunk-{}", p.display(), i);
126                                    let metadata = serde_json::json!({
127                                        "uri": format!("file://{}", p.display()),
128                                        "type": "doc_chunk",
129                                        "scenario": name
130                                    });
131                                    let _ = vector_store.add(&chunk_uri, chunk, metadata).await;
132                                }
133                                docs_loaded += 1;
134                            }
135                        }
136                    }
137                }
138            }
139        }
140
141        Ok(format!(
142            "Scenario '{}' installed. Loaded {} triples ({} data files) and {} docs.",
143            name, triples_loaded, data_files_loaded, docs_loaded
144        ))
145    }
146
147    pub async fn shutdown(&self) {
148        eprintln!("Shutting down... flushing {} stores", self.stores.len());
149        for entry in self.stores.iter() {
150            let store = entry.value();
151            if let Err(e) = store.flush() {
152                eprintln!("Failed to flush store '{}': {}", entry.key(), e);
153            }
154        }
155        eprintln!("Shutdown complete.");
156    }
157
158    #[allow(clippy::result_large_err)]
159    pub fn get_store(&self, namespace: &str) -> Result<Arc<SynapseStore>, Status> {
160        if let Some(store) = self.stores.get(namespace) {
161            return Ok(store.clone());
162        }
163
164        let store = SynapseStore::open(namespace, &self.storage_path).map_err(|e| {
165            Status::internal(format!(
166                "Failed to open store for namespace '{}': {}",
167                namespace, e
168            ))
169        })?;
170
171        let store_arc = Arc::new(store);
172        self.stores.insert(namespace.to_string(), store_arc.clone());
173        Ok(store_arc)
174    }
175}
176
177#[tonic::async_trait]
178impl SemanticEngine for MySemanticEngine {
179    async fn ingest_triples(
180        &self,
181        request: Request<IngestRequest>,
182    ) -> Result<Response<IngestResponse>, Status> {
183        // Auth check (Write permission)
184        let token = get_token(&request);
185        let req = request.into_inner();
186        let namespace = if req.namespace.is_empty() {
187            "default"
188        } else {
189            &req.namespace
190        };
191
192        if let Err(e) = self.auth.check(token.as_deref(), namespace, "write") {
193            return Err(Status::permission_denied(e));
194        }
195
196        let store = self.get_store(namespace)?;
197
198        // Log provenance for audit
199        let timestamp = chrono::Utc::now().to_rfc3339();
200        let triple_count = req.triples.len();
201        let mut sources: Vec<String> = Vec::new();
202
203        let triples: Vec<IngestTriple> = req
204            .triples
205            .into_iter()
206            .map(|t| {
207                // Capture provenance sources for logging
208                if let Some(ref prov) = t.provenance {
209                    if !prov.source.is_empty() && !sources.contains(&prov.source) {
210                        sources.push(prov.source.clone());
211                    }
212                }
213                IngestTriple {
214                    subject: t.subject,
215                    predicate: t.predicate,
216                    object: t.object,
217                    provenance: t.provenance.map(|p| crate::store::Provenance {
218                        source: p.source,
219                        timestamp: p.timestamp,
220                        method: p.method,
221                    }),
222                }
223            })
224            .collect();
225
226        match store.ingest_triples(triples).await {
227            Ok((added, _)) => {
228                // Log ingestion for audit trail
229                eprintln!(
230                    "INGEST [{timestamp}] namespace={namespace} triples={triple_count} added={added} sources={:?}",
231                    sources
232                );
233                Ok(Response::new(IngestResponse {
234                    nodes_added: added,
235                    edges_added: added,
236                }))
237            }
238            Err(e) => Err(Status::internal(e.to_string())),
239        }
240    }
241
242    async fn ingest_file(
243        &self,
244        request: Request<IngestFileRequest>,
245    ) -> Result<Response<IngestResponse>, Status> {
246        // Auth check (Write permission) - previously missing? or just implicit?
247        // Note: The original code didn't check auth for ingest_file!
248        // Adding it now for consistency as we are touching auth.
249        let token = get_token(&request);
250        let req = request.into_inner();
251        let namespace = if req.namespace.is_empty() {
252            "default"
253        } else {
254            &req.namespace
255        };
256
257        if let Err(e) = self.auth.check(token.as_deref(), namespace, "write") {
258            return Err(Status::permission_denied(e));
259        }
260        let store = self.get_store(namespace)?;
261
262        let engine = IngestionEngine::new(store);
263        let path = Path::new(&req.file_path);
264
265        match engine.ingest_file(path, namespace).await {
266            Ok(count) => Ok(Response::new(IngestResponse {
267                nodes_added: count,
268                edges_added: count,
269            })),
270            Err(e) => Err(Status::internal(e.to_string())),
271        }
272    }
273
274    async fn get_neighbors(
275        &self,
276        request: Request<NodeRequest>,
277    ) -> Result<Response<NeighborResponse>, Status> {
278        let token = get_token(&request);
279        let req = request.into_inner();
280        let namespace = if req.namespace.is_empty() {
281            "default"
282        } else {
283            &req.namespace
284        };
285
286        if let Err(e) = self.auth.check(token.as_deref(), namespace, "read") {
287            return Err(Status::permission_denied(e));
288        }
289
290        let store = self.get_store(namespace)?;
291
292        let direction = if req.direction.is_empty() {
293            "outgoing"
294        } else {
295            &req.direction
296        };
297        let edge_filter = if req.edge_filter.is_empty() {
298            None
299        } else {
300            Some(req.edge_filter.as_str())
301        };
302        let node_type_filter = if req.node_type_filter.is_empty() {
303            None
304        } else {
305            Some(req.node_type_filter.as_str())
306        };
307        let max_depth = if req.depth == 0 {
308            1
309        } else {
310            req.depth as usize
311        };
312        let limit_per_layer = if req.limit_per_layer == 0 {
313            usize::MAX
314        } else {
315            req.limit_per_layer as usize
316        };
317
318        let mut neighbors = Vec::new();
319        let mut visited = std::collections::HashSet::new();
320        let mut current_frontier = Vec::new();
321
322        // Start with the initial node
323        if let Some(start_uri) = store.get_uri(req.node_id) {
324            current_frontier.push(start_uri.clone());
325            visited.insert(start_uri);
326        }
327
328        // BFS traversal up to max_depth
329        for current_depth in 1..=max_depth {
330            let mut next_frontier = Vec::new();
331            let mut layer_count = 0;
332            let base_score = 1.0 / current_depth as f32; // Path scoring: closer = higher
333
334            for uri in &current_frontier {
335                if layer_count >= limit_per_layer {
336                    break;
337                }
338
339                // Query outgoing edges (URI as subject)
340                if direction == "outgoing" || direction == "both" {
341                    if let Ok(subj) = oxigraph::model::NamedNodeRef::new(uri) {
342                        for quad in
343                            store
344                                .store
345                                .quads_for_pattern(Some(subj.into()), None, None, None)
346                        {
347                            if layer_count >= limit_per_layer {
348                                break;
349                            }
350                            if let Ok(q) = quad {
351                                let pred = q.predicate.to_string();
352                                // Apply edge filter if specified
353                                if let Some(filter) = edge_filter {
354                                    if !pred.contains(filter) {
355                                        continue;
356                                    }
357                                }
358                                let obj_term = q.object;
359                                let obj_uri = obj_term.to_string();
360
361                                // Node Type Filter Logic
362                                if let Some(type_filter) = node_type_filter {
363                                    let passed =
364                                        if let oxigraph::model::Term::NamedNode(ref n) = obj_term {
365                                            let rdf_type = oxigraph::model::NamedNodeRef::new(
366                                                "http://www.w3.org/1999/02/22-rdf-syntax-ns#type",
367                                            )
368                                            .unwrap();
369                                            if let Ok(target_type) =
370                                                oxigraph::model::NamedNodeRef::new(type_filter)
371                                            {
372                                                store
373                                                    .store
374                                                    .quads_for_pattern(
375                                                        Some(n.into()),
376                                                        Some(rdf_type),
377                                                        Some(target_type.into()),
378                                                        None,
379                                                    )
380                                                    .next()
381                                                    .is_some()
382                                            } else {
383                                                false
384                                            }
385                                        } else {
386                                            false
387                                        };
388                                    if !passed {
389                                        continue;
390                                    }
391                                }
392
393                                let clean_uri = match &obj_term {
394                                    oxigraph::model::Term::NamedNode(n) => n.as_str(),
395                                    _ => &obj_uri,
396                                };
397
398                                // Always add to neighbors if not already in neighbors list to avoid duplicates there
399                                // But we must allow revisiting nodes for graph expansion if we want to find paths?
400                                // BFS typically avoids cycles by checking visited.
401
402                                // NOTE: visited set prevents processing same node twice in BFS.
403                                // If we reach a node that was already visited in a previous layer (or this layer), skip it.
404                                if !visited.contains(&obj_uri) {
405                                    visited.insert(obj_uri.clone());
406                                    let obj_id = store.get_or_create_id(&obj_uri);
407
408                                    let mut neighbor_score = base_score;
409                                    if req.scoring_strategy == "degree" {
410                                        let degree = store.get_degree(clean_uri);
411                                        neighbor_score /= (degree as f32 + 1.0).ln().max(0.1);
412                                    }
413
414                                    neighbors.push(Neighbor {
415                                        node_id: obj_id,
416                                        edge_type: pred,
417                                        uri: obj_uri.clone(), // This is the N-Triples formatted string for display
418                                        direction: "outgoing".to_string(),
419                                        depth: current_depth as u32,
420                                        score: neighbor_score,
421                                    });
422                                    // Use clean_uri for next frontier to ensure we query with raw URI, not <uri>
423                                    next_frontier.push(clean_uri.to_string());
424                                    layer_count += 1;
425                                }
426                            }
427                        }
428                    }
429                }
430
431                // Query incoming edges (URI as object)
432                if direction == "incoming" || direction == "both" {
433                    if let Ok(obj) = oxigraph::model::NamedNodeRef::new(uri) {
434                        for quad in
435                            store
436                                .store
437                                .quads_for_pattern(None, None, Some(obj.into()), None)
438                        {
439                            if layer_count >= limit_per_layer {
440                                break;
441                            }
442                            if let Ok(q) = quad {
443                                let pred = q.predicate.to_string();
444                                // Apply edge filter if specified
445                                if let Some(filter) = edge_filter {
446                                    if !pred.contains(filter) {
447                                        continue;
448                                    }
449                                }
450                                let subj_term = q.subject;
451                                let subj_uri = subj_term.to_string();
452
453                                // Node Type Filter Logic
454                                if let Some(type_filter) = node_type_filter {
455                                    let passed = if let oxigraph::model::Subject::NamedNode(ref n) =
456                                        subj_term
457                                    {
458                                        let rdf_type = oxigraph::model::NamedNodeRef::new(
459                                            "http://www.w3.org/1999/02/22-rdf-syntax-ns#type",
460                                        )
461                                        .unwrap();
462                                        if let Ok(target_type) =
463                                            oxigraph::model::NamedNodeRef::new(type_filter)
464                                        {
465                                            store
466                                                .store
467                                                .quads_for_pattern(
468                                                    Some(n.into()),
469                                                    Some(rdf_type),
470                                                    Some(target_type.into()),
471                                                    None,
472                                                )
473                                                .next()
474                                                .is_some()
475                                        } else {
476                                            false
477                                        }
478                                    } else {
479                                        false
480                                    };
481                                    if !passed {
482                                        continue;
483                                    }
484                                }
485
486                                let clean_uri = match &subj_term {
487                                    oxigraph::model::Subject::NamedNode(n) => n.as_str(),
488                                    _ => &subj_uri,
489                                };
490
491                                if !visited.contains(&subj_uri) {
492                                    visited.insert(subj_uri.clone());
493                                    let subj_id = store.get_or_create_id(&subj_uri);
494
495                                    let mut neighbor_score = base_score;
496                                    if req.scoring_strategy == "degree" {
497                                        let degree = store.get_degree(clean_uri);
498                                        // Penalize super nodes
499                                        neighbor_score /= (degree as f32 + 1.0).ln().max(0.1);
500                                    }
501
502                                    neighbors.push(Neighbor {
503                                        node_id: subj_id,
504                                        edge_type: pred,
505                                        uri: subj_uri.clone(),
506                                        direction: "incoming".to_string(),
507                                        depth: current_depth as u32,
508                                        score: neighbor_score,
509                                    });
510                                    // Use clean_uri for next frontier
511                                    next_frontier.push(clean_uri.to_string());
512                                    layer_count += 1;
513                                }
514                            }
515                        }
516                    }
517                }
518            }
519
520            current_frontier = next_frontier;
521            if current_frontier.is_empty() {
522                break;
523            }
524        }
525
526        // Sort by score (highest first)
527        neighbors.sort_by(|a, b| {
528            b.score
529                .partial_cmp(&a.score)
530                .unwrap_or(std::cmp::Ordering::Equal)
531        });
532
533        Ok(Response::new(NeighborResponse { neighbors }))
534    }
535
536    async fn search(
537        &self,
538        request: Request<SearchRequest>,
539    ) -> Result<Response<SearchResponse>, Status> {
540        let token = get_token(&request);
541        let req = request.into_inner();
542        let namespace = if req.namespace.is_empty() {
543            "default"
544        } else {
545            &req.namespace
546        };
547
548        if let Err(e) = self.auth.check(token.as_deref(), namespace, "read") {
549            return Err(Status::permission_denied(e));
550        }
551
552        let store = self.get_store(namespace)?;
553
554        match store.hybrid_search(&req.query, req.limit as usize, 0).await {
555            Ok(results) => {
556                let grpc_results = results
557                    .into_iter()
558                    .enumerate()
559                    .map(|(idx, (uri, score))| SearchResult {
560                        node_id: idx as u32,
561                        score,
562                        content: uri.clone(),
563                        uri,
564                    })
565                    .collect();
566                Ok(Response::new(SearchResponse {
567                    results: grpc_results,
568                }))
569            }
570            Err(e) => Err(Status::internal(e.to_string())),
571        }
572    }
573
574    async fn resolve_id(
575        &self,
576        request: Request<ResolveRequest>,
577    ) -> Result<Response<ResolveResponse>, Status> {
578        let token = get_token(&request);
579        let req = request.into_inner();
580        let namespace = if req.namespace.is_empty() {
581            "default"
582        } else {
583            &req.namespace
584        };
585
586        if let Err(e) = self.auth.check(token.as_deref(), namespace, "read") {
587            return Err(Status::permission_denied(e));
588        }
589
590        let store = self.get_store(namespace)?;
591
592        let uri = store.ensure_uri(&req.content);
593
594        // Look up the URI in our mapping
595        let uri_to_id = store.uri_to_id.read().unwrap();
596        if let Some(&node_id) = uri_to_id.get(&uri) {
597            Ok(Response::new(ResolveResponse {
598                node_id,
599                found: true,
600            }))
601        } else {
602            Ok(Response::new(ResolveResponse {
603                node_id: 0,
604                found: false,
605            }))
606        }
607    }
608
609    async fn get_all_triples(
610        &self,
611        request: Request<EmptyRequest>,
612    ) -> Result<Response<TriplesResponse>, Status> {
613        let token = get_token(&request);
614        let req = request.into_inner();
615        let namespace = if req.namespace.is_empty() {
616            "default"
617        } else {
618            &req.namespace
619        };
620
621        if let Err(e) = self.auth.check(token.as_deref(), namespace, "read") {
622            return Err(Status::permission_denied(e));
623        }
624
625        let store = self.get_store(namespace)?;
626
627        let mut triples = Vec::new();
628
629        for quad in store.store.iter().map(|q| q.unwrap()) {
630            let s = quad.subject.to_string();
631            let p = quad.predicate.to_string();
632            let o = quad.object.to_string();
633
634            // Clean up NTriples formatting (<uri> -> uri)
635            let clean_s = if s.starts_with('<') && s.ends_with('>') {
636                s[1..s.len() - 1].to_string()
637            } else {
638                s
639            };
640            let clean_p = if p.starts_with('<') && p.ends_with('>') {
641                p[1..p.len() - 1].to_string()
642            } else {
643                p
644            };
645            let clean_o = if o.starts_with('<') && o.ends_with('>') {
646                o[1..o.len() - 1].to_string()
647            } else {
648                o
649            };
650
651            triples.push(Triple {
652                subject: clean_s,
653                predicate: clean_p,
654                object: clean_o,
655                provenance: Some(Provenance {
656                    source: "oxigraph".to_string(),
657                    timestamp: "".to_string(),
658                    method: "storage".to_string(),
659                }),
660                embedding: vec![],
661            });
662        }
663
664        Ok(Response::new(TriplesResponse { triples }))
665    }
666
667    async fn query_sparql(
668        &self,
669        request: Request<SparqlRequest>,
670    ) -> Result<Response<SparqlResponse>, Status> {
671        let token = get_token(&request);
672        let req = request.into_inner();
673        let namespace = if req.namespace.is_empty() {
674            "default"
675        } else {
676            &req.namespace
677        };
678
679        if let Err(e) = self.auth.check(token.as_deref(), namespace, "read") {
680            return Err(Status::permission_denied(e));
681        }
682
683        let store = self.get_store(namespace)?;
684
685        match store.query_sparql(&req.query) {
686            Ok(json) => Ok(Response::new(SparqlResponse { results_json: json })),
687            Err(e) => Err(Status::internal(e.to_string())),
688        }
689    }
690
691    async fn delete_namespace_data(
692        &self,
693        request: Request<EmptyRequest>,
694    ) -> Result<Response<DeleteResponse>, Status> {
695        let token = get_token(&request);
696        let req = request.into_inner();
697        let namespace = if req.namespace.is_empty() {
698            "default"
699        } else {
700            &req.namespace
701        };
702
703        if let Err(e) = self.auth.check(token.as_deref(), namespace, "delete") {
704            return Err(Status::permission_denied(e));
705        }
706
707        // Remove from cache
708        self.stores.remove(namespace);
709
710        // Delete directory
711        let path = Path::new(&self.storage_path).join(namespace);
712        if path.exists() {
713            std::fs::remove_dir_all(path).map_err(|e| Status::internal(e.to_string()))?;
714        }
715
716        Ok(Response::new(DeleteResponse {
717            success: true,
718            message: format!("Deleted namespace '{}'", namespace),
719        }))
720    }
721
722    async fn hybrid_search(
723        &self,
724        request: Request<HybridSearchRequest>,
725    ) -> Result<Response<SearchResponse>, Status> {
726        let token = get_token(&request);
727        let req = request.into_inner();
728        let namespace = if req.namespace.is_empty() {
729            "default"
730        } else {
731            &req.namespace
732        };
733
734        if let Err(e) = self.auth.check(token.as_deref(), namespace, "read") {
735            return Err(Status::permission_denied(e));
736        }
737
738        let store = self.get_store(namespace)?;
739
740        let vector_k = req.vector_k as usize;
741        let graph_depth = req.graph_depth;
742
743        let results = match SearchMode::try_from(req.mode) {
744            Ok(SearchMode::VectorOnly) | Ok(SearchMode::Hybrid) => store
745                .hybrid_search(&req.query, vector_k, graph_depth)
746                .await
747                .map_err(|e| Status::internal(format!("Hybrid search failed: {}", e)))?,
748            _ => vec![],
749        };
750
751        let grpc_results = results
752            .into_iter()
753            .enumerate()
754            .map(|(idx, (uri, score))| SearchResult {
755                node_id: idx as u32,
756                score,
757                content: uri.clone(),
758                uri,
759            })
760            .collect();
761
762        Ok(Response::new(SearchResponse {
763            results: grpc_results,
764        }))
765    }
766
767    async fn apply_reasoning(
768        &self,
769        request: Request<ReasoningRequest>,
770    ) -> Result<Response<ReasoningResponse>, Status> {
771        // Auth check (Reason permission)
772        let token = get_token(&request);
773        let req = request.into_inner();
774        let namespace = if req.namespace.is_empty() {
775            "default"
776        } else {
777            &req.namespace
778        };
779
780        if let Err(e) = self.auth.check(token.as_deref(), namespace, "reason") {
781            return Err(Status::permission_denied(e));
782        }
783
784        let store = self.get_store(namespace)?;
785
786        let strategy = match ReasoningStrategy::try_from(req.strategy) {
787            Ok(ReasoningStrategy::Rdfs) => InternalStrategy::RDFS,
788            Ok(ReasoningStrategy::Owlrl) => InternalStrategy::OWLRL,
789            _ => InternalStrategy::None,
790        };
791        let strategy_name = format!("{:?}", strategy);
792
793        let reasoner = SynapseReasoner::new(strategy);
794        let start_triples = store.store.len().unwrap_or(0);
795
796        let response = if req.materialize {
797            match reasoner.materialize(&store.store) {
798                Ok(count) => Ok(Response::new(ReasoningResponse {
799                    success: true,
800                    triples_inferred: count as u32,
801                    message: format!(
802                        "Materialized {} triples in namespace '{}'",
803                        count, namespace
804                    ),
805                })),
806                Err(e) => Err(Status::internal(e.to_string())),
807            }
808        } else {
809            match reasoner.apply(&store.store) {
810                Ok(triples) => Ok(Response::new(ReasoningResponse {
811                    success: true,
812                    triples_inferred: triples.len() as u32,
813                    message: format!(
814                        "Found {} inferred triples in namespace '{}'",
815                        triples.len(),
816                        namespace
817                    ),
818                })),
819                Err(e) => Err(Status::internal(e.to_string())),
820            }
821        };
822
823        // Audit Log
824        if let Ok(ref res) = response {
825            let inferred = res.get_ref().triples_inferred as usize;
826            self.audit.log(
827                namespace,
828                &strategy_name,
829                start_triples,
830                inferred,
831                0, // Duplicates skipped not easily tracked here without changing reasoner return signature
832                vec![], // Sample inferences
833            );
834        }
835
836        response
837    }
838}
839
840pub async fn run_mcp_stdio(
841    engine: Arc<MySemanticEngine>,
842) -> Result<(), Box<dyn std::error::Error>> {
843    let server = crate::mcp_stdio::McpStdioServer::new(engine);
844    server.run().await
845}