1use anyhow::Result;
2use oxigraph::model::*;
3use oxigraph::store::Store;
4use std::collections::HashMap;
5use std::path::PathBuf;
6use std::sync::{Arc, RwLock};
7use crate::vector_store::VectorStore;
8use serde::{Deserialize, Serialize};
9
10#[derive(Serialize, Deserialize, Default)]
12struct UriMappings {
13 uri_to_id: HashMap<String, u32>,
14 next_id: u32,
15}
16
17pub struct SynapseStore {
18 pub store: Store,
19 pub namespace: String,
20 pub storage_path: PathBuf,
21 pub id_to_uri: RwLock<HashMap<u32, String>>,
23 pub uri_to_id: RwLock<HashMap<String, u32>>,
24 pub next_id: std::sync::atomic::AtomicU32,
25 pub vector_store: Option<Arc<VectorStore>>,
27}
28
29impl SynapseStore {
30 pub fn open(namespace: &str, storage_path: &str) -> Result<Self> {
31 let path = PathBuf::from(storage_path).join(namespace);
32 std::fs::create_dir_all(&path)?;
33 let store = Store::open(&path)?;
34
35 let mappings_path = path.join("uri_mappings.json");
37 let (uri_to_id, id_to_uri, next_id) = if mappings_path.exists() {
38 let content = std::fs::read_to_string(&mappings_path)?;
39 let mappings: UriMappings = serde_json::from_str(&content)?;
40 let id_to_uri: HashMap<u32, String> = mappings.uri_to_id
41 .iter()
42 .map(|(uri, &id)| (id, uri.clone()))
43 .collect();
44 (mappings.uri_to_id, id_to_uri, mappings.next_id)
45 } else {
46 (HashMap::new(), HashMap::new(), 1)
47 };
48
49 let vector_store = VectorStore::new(namespace)
51 .ok()
52 .map(Arc::new);
53
54 Ok(Self {
55 store,
56 namespace: namespace.to_string(),
57 storage_path: path,
58 id_to_uri: RwLock::new(id_to_uri),
59 uri_to_id: RwLock::new(uri_to_id),
60 next_id: std::sync::atomic::AtomicU32::new(next_id),
61 vector_store,
62 })
63 }
64
65 fn save_mappings(&self) -> Result<()> {
67 let uri_map = self.uri_to_id.read().unwrap();
68 let mappings = UriMappings {
69 uri_to_id: uri_map.clone(),
70 next_id: self.next_id.load(std::sync::atomic::Ordering::Relaxed),
71 };
72 let content = serde_json::to_string_pretty(&mappings)?;
73 std::fs::write(self.storage_path.join("uri_mappings.json"), content)?;
74 Ok(())
75 }
76
77 pub fn get_or_create_id(&self, uri: &str) -> u32 {
78 {
79 let map = self.uri_to_id.read().unwrap();
80 if let Some(&id) = map.get(uri) {
81 return id;
82 }
83 }
84
85 let mut uri_map = self.uri_to_id.write().unwrap();
86 let mut id_map = self.id_to_uri.write().unwrap();
87
88 if let Some(&id) = uri_map.get(uri) {
89 return id;
90 }
91
92 let id = self.next_id.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
93 uri_map.insert(uri.to_string(), id);
94 id_map.insert(id, uri.to_string());
95
96 drop(uri_map);
98 drop(id_map);
99 let _ = self.save_mappings();
100
101 id
102 }
103
104 pub fn get_uri(&self, id: u32) -> Option<String> {
105 self.id_to_uri.read().unwrap().get(&id).cloned()
106 }
107
108 pub async fn ingest_triples(&self, triples: Vec<(String, String, String)>) -> Result<(u32, u32)> {
109 let mut added = 0;
110
111 for (s, p, o) in triples {
112 let subject_uri = self.ensure_uri(&s);
113 let predicate_uri = self.ensure_uri(&p);
114 let object_uri = self.ensure_uri(&o);
115
116 let subject = Subject::NamedNode(NamedNode::new_unchecked(&subject_uri));
117 let predicate = NamedNode::new_unchecked(&predicate_uri);
118 let object = Term::NamedNode(NamedNode::new_unchecked(&object_uri));
119
120 let quad = Quad::new(subject, predicate, object, GraphName::DefaultGraph);
121 if self.store.insert(&quad)? {
122 added += 1;
123
124 if let Some(ref vs) = self.vector_store {
126 let content = format!("{} {} {}", s, p, o);
128 let _ = vs.add(&subject_uri, &content).await;
129 }
130 }
131 }
132
133 Ok((added, 0))
134 }
135
136 pub async fn hybrid_search(
138 &self,
139 query: &str,
140 vector_k: usize,
141 graph_depth: u32,
142 ) -> Result<Vec<(String, f32)>> {
143 let mut results = Vec::new();
144
145 if let Some(ref vs) = self.vector_store {
147 let vector_results = vs.search(query, vector_k).await?;
148
149 for result in vector_results {
150 results.push((result.uri.clone(), result.score));
151
152 if graph_depth > 0 {
154 let expanded = self.expand_graph(&result.uri, graph_depth)?;
155 for uri in expanded {
156 results.push((uri, result.score * 0.8));
158 }
159 }
160 }
161 }
162
163 results.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap());
165 results.dedup_by(|a, b| a.0 == b.0);
166
167 Ok(results)
168 }
169
170 fn expand_graph(&self, start_uri: &str, depth: u32) -> Result<Vec<String>> {
172 let mut expanded = Vec::new();
173
174 if depth == 0 {
175 return Ok(expanded);
176 }
177
178 let subject = NamedNodeRef::new(start_uri).ok();
180
181 if let Some(subj) = subject {
182 for quad in self.store.quads_for_pattern(
183 Some(subj.into()),
184 None,
185 None,
186 None,
187 ) {
188 if let Ok(q) = quad {
189 expanded.push(q.object.to_string());
190
191 if depth > 1 {
193 let nested = self.expand_graph(&q.object.to_string(), depth - 1)?;
194 expanded.extend(nested);
195 }
196 }
197 }
198 }
199
200 Ok(expanded)
201 }
202
203 pub fn query_sparql(&self, query: &str) -> Result<String> {
204 use oxigraph::sparql::QueryResults;
205
206 let results = self.store.query(query)?;
207
208 match results {
209 QueryResults::Solutions(solutions) => {
210 let mut results_array = Vec::new();
211 for solution in solutions {
212 let sol = solution?;
213 let mut mapping = serde_json::Map::new();
214 for (variable, value) in sol.iter() {
215 mapping.insert(variable.to_string(), serde_json::to_value(value.to_string()).unwrap());
216 }
217 results_array.push(serde_json::Value::Object(mapping));
218 }
219 Ok(serde_json::to_string(&results_array)?)
220 }
221 _ => Ok("[]".to_string()),
222 }
223 }
224
225 fn ensure_uri(&self, s: &str) -> String {
226 if s.starts_with("http") {
227 s.to_string()
228 } else {
229 format!("http://synapse.os/{}", s)
230 }
231 }
232}