1use crate::vector_store::VectorStore;
2use anyhow::Result;
3use oxigraph::model::*;
4use oxigraph::store::Store;
5use serde::{Deserialize, Serialize};
6use std::collections::HashMap;
7use std::path::PathBuf;
8use std::sync::{Arc, RwLock};
9
10#[derive(Serialize, Deserialize, Default)]
12struct UriMappings {
13 uri_to_id: HashMap<String, u32>,
14 next_id: u32,
15}
16
17pub struct SynapseStore {
18 pub store: Store,
19 pub namespace: String,
20 pub storage_path: PathBuf,
21 pub id_to_uri: RwLock<HashMap<u32, String>>,
23 pub uri_to_id: RwLock<HashMap<String, u32>>,
24 pub next_id: std::sync::atomic::AtomicU32,
25 pub vector_store: Option<Arc<VectorStore>>,
27}
28
29impl SynapseStore {
30 pub fn open(namespace: &str, storage_path: &str) -> Result<Self> {
31 let path = PathBuf::from(storage_path).join(namespace);
32 std::fs::create_dir_all(&path)?;
33 let store = Store::open(&path)?;
34
35 let mappings_path = path.join("uri_mappings.json");
37 let (uri_to_id, id_to_uri, next_id) = if mappings_path.exists() {
38 let content = std::fs::read_to_string(&mappings_path)?;
39 let mappings: UriMappings = serde_json::from_str(&content)?;
40 let id_to_uri: HashMap<u32, String> = mappings
41 .uri_to_id
42 .iter()
43 .map(|(uri, &id)| (id, uri.clone()))
44 .collect();
45 (mappings.uri_to_id, id_to_uri, mappings.next_id)
46 } else {
47 (HashMap::new(), HashMap::new(), 1)
48 };
49
50 let vector_store = VectorStore::new(namespace).ok().map(Arc::new);
52
53 Ok(Self {
54 store,
55 namespace: namespace.to_string(),
56 storage_path: path,
57 id_to_uri: RwLock::new(id_to_uri),
58 uri_to_id: RwLock::new(uri_to_id),
59 next_id: std::sync::atomic::AtomicU32::new(next_id),
60 vector_store,
61 })
62 }
63
64 fn save_mappings(&self, mappings: UriMappings) -> Result<()> {
66 let content = serde_json::to_string_pretty(&mappings)?;
67 std::fs::write(self.storage_path.join("uri_mappings.json"), content)?;
68 Ok(())
69 }
70
71 pub fn get_or_create_id(&self, uri: &str) -> u32 {
72 {
73 let map = self.uri_to_id.read().unwrap();
74 if let Some(&id) = map.get(uri) {
75 return id;
76 }
77 }
78
79 let mut uri_map = self.uri_to_id.write().unwrap();
80 let mut id_map = self.id_to_uri.write().unwrap();
81
82 if let Some(&id) = uri_map.get(uri) {
83 return id;
84 }
85
86 let id = self
87 .next_id
88 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
89 uri_map.insert(uri.to_string(), id);
90 id_map.insert(id, uri.to_string());
91
92 let mappings = UriMappings {
94 uri_to_id: uri_map.clone(),
95 next_id: self.next_id.load(std::sync::atomic::Ordering::Relaxed),
96 };
97
98 drop(uri_map);
100 drop(id_map);
101 let _ = self.save_mappings(mappings);
102
103 id
104 }
105
106 pub fn get_uri(&self, id: u32) -> Option<String> {
107 self.id_to_uri.read().unwrap().get(&id).cloned()
108 }
109
110 pub async fn ingest_triples(
111 &self,
112 triples: Vec<(String, String, String)>,
113 ) -> Result<(u32, u32)> {
114 let mut added = 0;
115
116 for (s, p, o) in triples {
117 let subject_uri = self.ensure_uri(&s);
118 let predicate_uri = self.ensure_uri(&p);
119 let object_uri = self.ensure_uri(&o);
120
121 let subject = Subject::NamedNode(NamedNode::new_unchecked(&subject_uri));
122 let predicate = NamedNode::new_unchecked(&predicate_uri);
123 let object = Term::NamedNode(NamedNode::new_unchecked(&object_uri));
124
125 let quad = Quad::new(subject, predicate, object, GraphName::DefaultGraph);
126 if self.store.insert(&quad)? {
127 if let Some(ref vs) = self.vector_store {
129 let content = format!("{} {} {}", s, p, o);
131 if let Err(e) = vs.add(&subject_uri, &content).await {
132 self.store.remove(&quad)?;
134 return Err(anyhow::anyhow!(
135 "Vector store insertion failed, rolled back graph change: {}",
136 e
137 ));
138 }
139 }
140 added += 1;
141 }
142 }
143
144 Ok((added, 0))
145 }
146
147 pub async fn hybrid_search(
149 &self,
150 query: &str,
151 vector_k: usize,
152 graph_depth: u32,
153 ) -> Result<Vec<(String, f32)>> {
154 let mut results = Vec::new();
155
156 if let Some(ref vs) = self.vector_store {
158 let vector_results = vs.search(query, vector_k).await?;
159
160 for result in vector_results {
161 results.push((result.uri.clone(), result.score));
162
163 if graph_depth > 0 {
165 let expanded = self.expand_graph(&result.uri, graph_depth)?;
166 for uri in expanded {
167 results.push((uri, result.score * 0.8));
169 }
170 }
171 }
172 }
173
174 results.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap());
176 results.dedup_by(|a, b| a.0 == b.0);
177
178 Ok(results)
179 }
180
181 fn expand_graph(&self, start_uri: &str, depth: u32) -> Result<Vec<String>> {
183 let mut expanded = Vec::new();
184
185 if depth == 0 {
186 return Ok(expanded);
187 }
188
189 let subject = NamedNodeRef::new(start_uri).ok();
191
192 if let Some(subj) = subject {
193 for quad in self
194 .store
195 .quads_for_pattern(Some(subj.into()), None, None, None)
196 {
197 if let Ok(q) = quad {
198 expanded.push(q.object.to_string());
199
200 if depth > 1 {
202 let nested = self.expand_graph(&q.object.to_string(), depth - 1)?;
203 expanded.extend(nested);
204 }
205 }
206 }
207 }
208
209 Ok(expanded)
210 }
211
212 pub fn query_sparql(&self, query: &str) -> Result<String> {
213 use oxigraph::sparql::QueryResults;
214
215 let results = self.store.query(query)?;
216
217 match results {
218 QueryResults::Solutions(solutions) => {
219 let mut results_array = Vec::new();
220 for solution in solutions {
221 let sol = solution?;
222 let mut mapping = serde_json::Map::new();
223 for (variable, value) in sol.iter() {
224 mapping.insert(
225 variable.to_string(),
226 serde_json::to_value(value.to_string()).unwrap(),
227 );
228 }
229 results_array.push(serde_json::Value::Object(mapping));
230 }
231 Ok(serde_json::to_string(&results_array)?)
232 }
233 _ => Ok("[]".to_string()),
234 }
235 }
236
237 fn ensure_uri(&self, s: &str) -> String {
238 if s.starts_with("http") {
239 s.to_string()
240 } else {
241 format!("http://synapse.os/{}", s)
242 }
243 }
244}