Skip to main content

synapse_core/
server.rs

1use crate::store::SynapseStore;
2use std::collections::HashMap;
3use std::path::Path;
4use std::sync::Arc;
5use std::sync::RwLock;
6use tonic::{Request, Response, Status};
7
8// Import the generated proto code
9pub mod semantic_engine {
10    tonic::include_proto!("semantic_engine");
11}
12
13use semantic_engine::semantic_engine_server::SemanticEngine;
14use semantic_engine::{
15    DeleteResponse, EmptyRequest, IngestRequest, IngestResponse, Neighbor, NeighborResponse,
16    NodeRequest, Provenance, ResolveRequest, ResolveResponse, SearchRequest, SearchResponse,
17    Triple, TriplesResponse, SparqlRequest, SparqlResponse,
18};
19
20pub struct MySemanticEngine {
21    pub namespaces: Arc<RwLock<HashMap<String, Arc<SynapseStore>>>>,
22    pub storage_path: Arc<String>,
23}
24
25impl Clone for MySemanticEngine {
26    fn clone(&self) -> Self {
27        Self {
28            namespaces: Arc::clone(&self.namespaces),
29            storage_path: Arc::clone(&self.storage_path),
30        }
31    }
32}
33
34impl MySemanticEngine {
35    pub fn new(storage_path: &str) -> Self {
36        if !Path::new(storage_path).exists() {
37            std::fs::create_dir_all(storage_path).unwrap();
38        }
39
40        Self {
41            namespaces: Arc::new(RwLock::new(HashMap::new())),
42            storage_path: Arc::new(storage_path.to_string()),
43        }
44    }
45
46    fn get_namespace_store(&self, namespace: &str) -> Result<Arc<SynapseStore>, Status> {
47        let ns = if namespace.is_empty() { "default" } else { namespace };
48
49        {
50            let namespaces = self.namespaces.read().unwrap();
51            if let Some(store) = namespaces.get(ns) {
52                return Ok(store.clone());
53            }
54        }
55
56        let mut namespaces = self.namespaces.write().unwrap();
57        if let Some(store) = namespaces.get(ns) {
58            return Ok(store.clone());
59        }
60
61        let store = SynapseStore::open(ns, &self.storage_path)
62            .map_err(|e| Status::internal(format!("Failed to open store: {}", e)))?;
63        let store_arc = Arc::new(store);
64        namespaces.insert(ns.to_string(), store_arc.clone());
65        Ok(store_arc)
66    }
67}
68
69#[tonic::async_trait]
70impl SemanticEngine for MySemanticEngine {
71    async fn ingest_triples(
72        &self,
73        request: Request<IngestRequest>,
74    ) -> Result<Response<IngestResponse>, Status> {
75        let req = request.into_inner();
76        let store = self.get_namespace_store(&req.namespace)?;
77
78        let triples: Vec<(String, String, String)> = req.triples.into_iter()
79            .map(|t| (t.subject, t.predicate, t.object))
80            .collect();
81
82        match store.ingest_triples(triples) {
83            Ok((added, _)) => Ok(Response::new(IngestResponse {
84                nodes_added: added,
85                edges_added: added,
86            })),
87            Err(e) => Err(Status::internal(e.to_string())),
88        }
89    }
90
91    async fn get_neighbors(
92        &self,
93        request: Request<NodeRequest>,
94    ) -> Result<Response<NeighborResponse>, Status> {
95        // Implementation for traversal...
96        Ok(Response::new(NeighborResponse { neighbors: vec![] }))
97    }
98
99    async fn search(
100        &self,
101        _request: Request<SearchRequest>,
102    ) -> Result<Response<SearchResponse>, Status> {
103        Ok(Response::new(SearchResponse { results: vec![] }))
104    }
105
106    async fn resolve_id(
107        &self,
108        request: Request<ResolveRequest>,
109    ) -> Result<Response<ResolveResponse>, Status> {
110        let req = request.into_inner();
111        let store = self.get_namespace_store(&req.namespace)?;
112        let id = store.get_or_create_id(&req.content);
113        Ok(Response::new(ResolveResponse {
114            node_id: id,
115            found: true,
116        }))
117    }
118
119    async fn get_all_triples(
120        &self,
121        request: Request<EmptyRequest>,
122    ) -> Result<Response<TriplesResponse>, Status> {
123        let req = request.into_inner();
124        let store = self.get_namespace_store(&req.namespace)?;
125
126        let mut triples = Vec::new();
127        // Fetch from Oxigraph...
128        for quad in store.store.iter().map(|q| q.unwrap()) {
129            triples.push(Triple {
130                subject: format!("{}", quad.subject),
131                predicate: format!("{}", quad.predicate),
132                object: format!("{}", quad.object),
133                provenance: None,
134            });
135        }
136
137        Ok(Response::new(TriplesResponse { triples }))
138    }
139
140    async fn query_sparql(
141        &self,
142        request: Request<SparqlRequest>,
143    ) -> Result<Response<SparqlResponse>, Status> {
144        let req = request.into_inner();
145        let store = self.get_namespace_store(&req.namespace)?;
146
147        match store.store.query(&req.query) {
148            Ok(results) => {
149                let mut output = String::new();
150                match results {
151                    oxigraph::sparql::QueryResults::Solutions(solutions) => {
152                        for solution in solutions {
153                            let s = solution.unwrap();
154                            output.push_str(&format!("{:?}\n", s));
155                        }
156                    }
157                    oxigraph::sparql::QueryResults::Boolean(v) => {
158                        output = format!("{}", v);
159                    }
160                    oxigraph::sparql::QueryResults::Graph(quads) => {
161                        for quad in quads {
162                            output.push_str(&format!("{:?}\n", quad.unwrap()));
163                        }
164                    }
165                }
166                Ok(Response::new(SparqlResponse {
167                    results_json: output,
168                }))
169            }
170            Err(e) => Err(Status::internal(e.to_string())),
171        }
172    }
173
174    async fn delete_namespace_data(
175        &self,
176        request: Request<EmptyRequest>,
177    ) -> Result<Response<DeleteResponse>, Status> {
178        let req = request.into_inner();
179        let ns = req.namespace;
180
181        {
182            let mut namespaces = self.namespaces.write().unwrap();
183            namespaces.remove(&ns);
184        }
185
186        Ok(Response::new(DeleteResponse {
187            success: true,
188            message: format!("Deleted namespace {}", ns),
189        }))
190    }
191}