1use crate::store::SynapseStore;
2use std::collections::HashMap;
3use std::path::Path;
4use std::sync::Arc;
5use std::sync::RwLock;
6use tonic::{Request, Response, Status};
7
8pub mod semantic_engine {
10 tonic::include_proto!("semantic_engine");
11}
12
13use semantic_engine::semantic_engine_server::SemanticEngine;
14use semantic_engine::{
15 DeleteResponse, EmptyRequest, IngestRequest, IngestResponse, Neighbor, NeighborResponse,
16 NodeRequest, Provenance, ResolveRequest, ResolveResponse, SearchRequest, SearchResponse,
17 Triple, TriplesResponse, SparqlRequest, SparqlResponse,
18};
19
20pub struct MySemanticEngine {
21 pub namespaces: Arc<RwLock<HashMap<String, Arc<SynapseStore>>>>,
22 pub storage_path: Arc<String>,
23}
24
25impl Clone for MySemanticEngine {
26 fn clone(&self) -> Self {
27 Self {
28 namespaces: Arc::clone(&self.namespaces),
29 storage_path: Arc::clone(&self.storage_path),
30 }
31 }
32}
33
34impl MySemanticEngine {
35 pub fn new(storage_path: &str) -> Self {
36 if !Path::new(storage_path).exists() {
37 std::fs::create_dir_all(storage_path).unwrap();
38 }
39
40 Self {
41 namespaces: Arc::new(RwLock::new(HashMap::new())),
42 storage_path: Arc::new(storage_path.to_string()),
43 }
44 }
45
46 fn get_namespace_store(&self, namespace: &str) -> Result<Arc<SynapseStore>, Status> {
47 let ns = if namespace.is_empty() { "default" } else { namespace };
48
49 {
50 let namespaces = self.namespaces.read().unwrap();
51 if let Some(store) = namespaces.get(ns) {
52 return Ok(store.clone());
53 }
54 }
55
56 let mut namespaces = self.namespaces.write().unwrap();
57 if let Some(store) = namespaces.get(ns) {
58 return Ok(store.clone());
59 }
60
61 let store = SynapseStore::open(ns, &self.storage_path)
62 .map_err(|e| Status::internal(format!("Failed to open store: {}", e)))?;
63 let store_arc = Arc::new(store);
64 namespaces.insert(ns.to_string(), store_arc.clone());
65 Ok(store_arc)
66 }
67}
68
69#[tonic::async_trait]
70impl SemanticEngine for MySemanticEngine {
71 async fn ingest_triples(
72 &self,
73 request: Request<IngestRequest>,
74 ) -> Result<Response<IngestResponse>, Status> {
75 let req = request.into_inner();
76 let store = self.get_namespace_store(&req.namespace)?;
77
78 let triples: Vec<(String, String, String)> = req.triples.into_iter()
79 .map(|t| (t.subject, t.predicate, t.object))
80 .collect();
81
82 match store.ingest_triples(triples) {
83 Ok((added, _)) => Ok(Response::new(IngestResponse {
84 nodes_added: added,
85 edges_added: added,
86 })),
87 Err(e) => Err(Status::internal(e.to_string())),
88 }
89 }
90
91 async fn get_neighbors(
92 &self,
93 request: Request<NodeRequest>,
94 ) -> Result<Response<NeighborResponse>, Status> {
95 Ok(Response::new(NeighborResponse { neighbors: vec![] }))
97 }
98
99 async fn search(
100 &self,
101 _request: Request<SearchRequest>,
102 ) -> Result<Response<SearchResponse>, Status> {
103 Ok(Response::new(SearchResponse { results: vec![] }))
104 }
105
106 async fn resolve_id(
107 &self,
108 request: Request<ResolveRequest>,
109 ) -> Result<Response<ResolveResponse>, Status> {
110 let req = request.into_inner();
111 let store = self.get_namespace_store(&req.namespace)?;
112 let id = store.get_or_create_id(&req.content);
113 Ok(Response::new(ResolveResponse {
114 node_id: id,
115 found: true,
116 }))
117 }
118
119 async fn get_all_triples(
120 &self,
121 request: Request<EmptyRequest>,
122 ) -> Result<Response<TriplesResponse>, Status> {
123 let req = request.into_inner();
124 let store = self.get_namespace_store(&req.namespace)?;
125
126 let mut triples = Vec::new();
127 for quad in store.store.iter().map(|q| q.unwrap()) {
129 triples.push(Triple {
130 subject: format!("{}", quad.subject),
131 predicate: format!("{}", quad.predicate),
132 object: format!("{}", quad.object),
133 provenance: None,
134 });
135 }
136
137 Ok(Response::new(TriplesResponse { triples }))
138 }
139
140 async fn query_sparql(
141 &self,
142 request: Request<SparqlRequest>,
143 ) -> Result<Response<SparqlResponse>, Status> {
144 let req = request.into_inner();
145 let store = self.get_namespace_store(&req.namespace)?;
146
147 match store.store.query(&req.query) {
148 Ok(results) => {
149 let mut output = String::new();
150 match results {
151 oxigraph::sparql::QueryResults::Solutions(solutions) => {
152 for solution in solutions {
153 let s = solution.unwrap();
154 output.push_str(&format!("{:?}\n", s));
155 }
156 }
157 oxigraph::sparql::QueryResults::Boolean(v) => {
158 output = format!("{}", v);
159 }
160 oxigraph::sparql::QueryResults::Graph(quads) => {
161 for quad in quads {
162 output.push_str(&format!("{:?}\n", quad.unwrap()));
163 }
164 }
165 }
166 Ok(Response::new(SparqlResponse {
167 results_json: output,
168 }))
169 }
170 Err(e) => Err(Status::internal(e.to_string())),
171 }
172 }
173
174 async fn delete_namespace_data(
175 &self,
176 request: Request<EmptyRequest>,
177 ) -> Result<Response<DeleteResponse>, Status> {
178 let req = request.into_inner();
179 let ns = req.namespace;
180
181 {
182 let mut namespaces = self.namespaces.write().unwrap();
183 namespaces.remove(&ns);
184 }
185
186 Ok(Response::new(DeleteResponse {
187 success: true,
188 message: format!("Deleted namespace {}", ns),
189 }))
190 }
191}