1use crate::vector_store::VectorStore;
2use anyhow::Result;
3use oxigraph::model::*;
4use oxigraph::store::Store;
5use serde::{Deserialize, Serialize};
6use std::collections::HashMap;
7use std::path::PathBuf;
8use std::sync::{Arc, RwLock};
9use uuid::Uuid;
10
11#[derive(Serialize, Deserialize, Default)]
13struct UriMappings {
14 uri_to_id: HashMap<String, u32>,
15 next_id: u32,
16}
17
18#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
19pub struct Provenance {
20 pub source: String,
21 pub timestamp: String,
22 pub method: String,
23}
24
25pub struct IngestTriple {
26 pub subject: String,
27 pub predicate: String,
28 pub object: String,
29 pub provenance: Option<Provenance>,
30}
31
32pub struct SynapseStore {
33 pub store: Store,
34 pub namespace: String,
35 pub storage_path: PathBuf,
36 pub id_to_uri: RwLock<HashMap<u32, String>>,
38 pub uri_to_id: RwLock<HashMap<String, u32>>,
39 pub next_id: std::sync::atomic::AtomicU32,
40 pub vector_store: Option<Arc<VectorStore>>,
42}
43
44impl SynapseStore {
45 pub fn open(namespace: &str, storage_path: &str) -> Result<Self> {
46 let path = PathBuf::from(storage_path).join(namespace);
47 std::fs::create_dir_all(&path)?;
48 let store = Store::open(&path)?;
49
50 let mappings_path = path.join("uri_mappings.json");
52 let (uri_to_id, id_to_uri, next_id) = if mappings_path.exists() {
53 let content = std::fs::read_to_string(&mappings_path)?;
54 let mappings: UriMappings = serde_json::from_str(&content)?;
55 let id_to_uri: HashMap<u32, String> = mappings
56 .uri_to_id
57 .iter()
58 .map(|(uri, &id)| (id, uri.clone()))
59 .collect();
60 (mappings.uri_to_id, id_to_uri, mappings.next_id)
61 } else {
62 (HashMap::new(), HashMap::new(), 1)
63 };
64
65 let vector_store = VectorStore::new(namespace).ok().map(Arc::new);
67
68 Ok(Self {
69 store,
70 namespace: namespace.to_string(),
71 storage_path: path,
72 id_to_uri: RwLock::new(id_to_uri),
73 uri_to_id: RwLock::new(uri_to_id),
74 next_id: std::sync::atomic::AtomicU32::new(next_id),
75 vector_store,
76 })
77 }
78
79 fn save_mappings(&self, mappings: UriMappings) -> Result<()> {
81 let content = serde_json::to_string_pretty(&mappings)?;
82 std::fs::write(self.storage_path.join("uri_mappings.json"), content)?;
83 Ok(())
84 }
85
86 pub fn get_or_create_id(&self, uri: &str) -> u32 {
87 {
88 let map = self.uri_to_id.read().unwrap();
89 if let Some(&id) = map.get(uri) {
90 return id;
91 }
92 }
93
94 let mut uri_map = self.uri_to_id.write().unwrap();
95 let mut id_map = self.id_to_uri.write().unwrap();
96
97 if let Some(&id) = uri_map.get(uri) {
98 return id;
99 }
100
101 let id = self
102 .next_id
103 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
104 uri_map.insert(uri.to_string(), id);
105 id_map.insert(id, uri.to_string());
106
107 let mappings = UriMappings {
109 uri_to_id: uri_map.clone(),
110 next_id: self.next_id.load(std::sync::atomic::Ordering::Relaxed),
111 };
112
113 drop(uri_map);
115 drop(id_map);
116 let _ = self.save_mappings(mappings);
117
118 id
119 }
120
121 pub fn get_uri(&self, id: u32) -> Option<String> {
122 self.id_to_uri.read().unwrap().get(&id).cloned()
123 }
124
125 pub async fn ingest_triples(
126 &self,
127 triples: Vec<IngestTriple>,
128 ) -> Result<(u32, u32)> {
129 let mut added = 0;
130
131 let mut batches: HashMap<Option<Provenance>, Vec<(String, String, String)>> = HashMap::new();
133
134 for t in triples {
135 batches.entry(t.provenance).or_default().push((t.subject, t.predicate, t.object));
136 }
137
138 for (prov, batch_triples) in batches {
139 let graph_name = if let Some(p) = &prov {
140 let uuid = Uuid::new_v4();
141 let uri = format!("urn:batch:{}", uuid);
142
143 let batch_node = NamedNode::new_unchecked(&uri);
144 let p_derived = NamedNode::new_unchecked("http://www.w3.org/ns/prov#wasDerivedFrom");
145 let p_time = NamedNode::new_unchecked("http://www.w3.org/ns/prov#generatedAtTime");
146 let p_method = NamedNode::new_unchecked("http://www.w3.org/ns/prov#wasGeneratedBy");
147
148 let o_source = Literal::new_simple_literal(&p.source);
149 let o_time = Literal::new_simple_literal(&p.timestamp);
150 let o_method = Literal::new_simple_literal(&p.method);
151
152 self.store.insert(&Quad::new(batch_node.clone(), p_derived, o_source, GraphName::DefaultGraph))?;
153 self.store.insert(&Quad::new(batch_node.clone(), p_time, o_time, GraphName::DefaultGraph))?;
154 self.store.insert(&Quad::new(batch_node.clone(), p_method, o_method, GraphName::DefaultGraph))?;
155
156 GraphName::NamedNode(batch_node)
157 } else {
158 GraphName::DefaultGraph
159 };
160
161 for (s, p, o) in batch_triples {
162 let subject_uri = self.ensure_uri(&s);
163 let predicate_uri = self.ensure_uri(&p);
164 let object_uri = self.ensure_uri(&o);
165
166 self.get_or_create_id(&subject_uri);
168 self.get_or_create_id(&predicate_uri);
169 self.get_or_create_id(&object_uri);
170
171 let subject = Subject::NamedNode(NamedNode::new_unchecked(&subject_uri));
172 let predicate = NamedNode::new_unchecked(&predicate_uri);
173 let object = Term::NamedNode(NamedNode::new_unchecked(&object_uri));
174
175 let quad = Quad::new(subject, predicate, object, graph_name.clone());
176 if self.store.insert(&quad)? {
177 if let Some(ref vs) = self.vector_store {
179 let content = format!("{} {} {}", s, p, o);
181 let key = format!("{}|{}|{}", subject_uri, predicate_uri, object_uri);
187
188 let metadata = serde_json::json!({
190 "uri": subject_uri,
191 "predicate": predicate_uri,
192 "object": object_uri,
193 "type": "triple"
194 });
195
196 if let Err(e) = vs.add(&key, &content, metadata).await {
197 self.store.remove(&quad)?;
199 return Err(anyhow::anyhow!(
200 "Vector store insertion failed, rolled back graph change: {}",
201 e
202 ));
203 }
204 }
205 added += 1;
206 }
207 }
208 }
209
210 Ok((added, 0))
211 }
212
213 pub async fn hybrid_search(
215 &self,
216 query: &str,
217 vector_k: usize,
218 graph_depth: u32,
219 ) -> Result<Vec<(String, f32)>> {
220 let mut results = Vec::new();
221
222 if let Some(ref vs) = self.vector_store {
224 let vector_results = vs.search(query, vector_k).await?;
225
226 for result in vector_results {
227 let uri = result.uri.clone();
229 results.push((uri.clone(), result.score));
230
231 if graph_depth > 0 {
233 let expanded = self.expand_graph(&uri, graph_depth)?;
234 for expanded_uri in expanded {
235 results.push((expanded_uri, result.score * 0.8));
237 }
238 }
239 }
240 }
241
242 results.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap());
244 results.dedup_by(|a, b| a.0 == b.0);
245
246 Ok(results)
247 }
248
249 fn expand_graph(&self, start_uri: &str, depth: u32) -> Result<Vec<String>> {
251 let mut expanded = Vec::new();
252
253 if depth == 0 {
254 return Ok(expanded);
255 }
256
257 let subject = NamedNodeRef::new(start_uri).ok();
259
260 if let Some(subj) = subject {
261 for quad in self
262 .store
263 .quads_for_pattern(Some(subj.into()), None, None, None)
264 {
265 if let Ok(q) = quad {
266 expanded.push(q.object.to_string());
267
268 if depth > 1 {
270 let nested = self.expand_graph(&q.object.to_string(), depth - 1)?;
271 expanded.extend(nested);
272 }
273 }
274 }
275 }
276
277 Ok(expanded)
278 }
279
280 pub fn query_sparql(&self, query: &str) -> Result<String> {
281 use oxigraph::sparql::QueryResults;
282
283 let results = self.store.query(query)?;
284
285 match results {
286 QueryResults::Solutions(solutions) => {
287 let mut results_array = Vec::new();
288 for solution in solutions {
289 let sol = solution?;
290 let mut mapping = serde_json::Map::new();
291 for (variable, value) in sol.iter() {
292 mapping.insert(
293 variable.to_string(),
294 serde_json::to_value(value.to_string()).unwrap(),
295 );
296 }
297 results_array.push(serde_json::Value::Object(mapping));
298 }
299 Ok(serde_json::to_string(&results_array)?)
300 }
301 _ => Ok("[]".to_string()),
302 }
303 }
304
305 pub fn get_degree(&self, uri: &str) -> usize {
306 let node = NamedNodeRef::new(uri).ok();
307 if let Some(n) = node {
308 let outgoing = self.store.quads_for_pattern(Some(n.into()), None, None, None).count();
309 let incoming = self.store.quads_for_pattern(None, None, Some(n.into()), None).count();
310 outgoing + incoming
311 } else {
312 0
313 }
314 }
315
316 pub fn ensure_uri(&self, s: &str) -> String {
317 if s.starts_with("http") || s.starts_with("urn:") {
318 s.to_string()
319 } else {
320 format!("http://synapse.os/{}", s)
321 }
322 }
323}