1use crate::persistence::{load_bincode, save_bincode};
2use crate::vector_store::VectorStore;
3use anyhow::Result;
4use oxigraph::model::*;
5use oxigraph::store::Store;
6use serde::{Deserialize, Serialize};
7use std::collections::HashMap;
8use std::path::PathBuf;
9use std::sync::atomic::{AtomicUsize, Ordering};
10use std::sync::{Arc, RwLock};
11use uuid::Uuid;
12
13const DEFAULT_MAPPING_SAVE_THRESHOLD: usize = 1000;
14
15#[derive(Serialize, Deserialize, Default)]
17struct UriMappings {
18 uri_to_id: HashMap<String, u32>,
19 next_id: u32,
20}
21
22#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
23pub struct Provenance {
24 pub source: String,
25 pub timestamp: String,
26 pub method: String,
27}
28
29pub struct IngestTriple {
30 pub subject: String,
31 pub predicate: String,
32 pub object: String,
33 pub provenance: Option<Provenance>,
34}
35
36pub struct SynapseStore {
37 pub store: Store,
38 pub namespace: String,
39 pub storage_path: PathBuf,
40 pub id_to_uri: RwLock<HashMap<u32, String>>,
42 pub uri_to_id: RwLock<HashMap<String, u32>>,
43 pub next_id: std::sync::atomic::AtomicU32,
44 pub vector_store: Option<Arc<VectorStore>>,
46 dirty_count: AtomicUsize,
48 save_threshold: usize,
49}
50
51impl SynapseStore {
52 pub fn open(namespace: &str, storage_path: &str) -> Result<Self> {
53 let path = PathBuf::from(storage_path).join(namespace);
54 std::fs::create_dir_all(&path)?;
55 let store = Store::open(&path)?;
56
57 let mappings_path_bin = path.join("uri_mappings.bin");
59 let mappings_path_json = path.join("uri_mappings.json");
60
61 let (uri_to_id, id_to_uri, next_id) = if mappings_path_bin.exists() {
62 let mappings: UriMappings = load_bincode(&mappings_path_bin)?;
63 let id_to_uri: HashMap<u32, String> = mappings
64 .uri_to_id
65 .iter()
66 .map(|(uri, &id)| (id, uri.clone()))
67 .collect();
68 (mappings.uri_to_id, id_to_uri, mappings.next_id)
69 } else if mappings_path_json.exists() {
70 let content = std::fs::read_to_string(&mappings_path_json)?;
71 let mappings: UriMappings = serde_json::from_str(&content)?;
72 let id_to_uri: HashMap<u32, String> = mappings
73 .uri_to_id
74 .iter()
75 .map(|(uri, &id)| (id, uri.clone()))
76 .collect();
77 (mappings.uri_to_id, id_to_uri, mappings.next_id)
78 } else {
79 (HashMap::new(), HashMap::new(), 1)
80 };
81
82 let vector_store = match VectorStore::new(namespace) {
84 Ok(vs) => Some(Arc::new(vs)),
85 Err(e) => {
86 eprintln!("WARNING: Failed to initialize vector store for namespace '{}': {}", namespace, e);
87 None
88 }
89 };
90
91 Ok(Self {
92 store,
93 namespace: namespace.to_string(),
94 storage_path: path,
95 id_to_uri: RwLock::new(id_to_uri),
96 uri_to_id: RwLock::new(uri_to_id),
97 next_id: std::sync::atomic::AtomicU32::new(next_id),
98 vector_store,
99 dirty_count: AtomicUsize::new(0),
100 save_threshold: DEFAULT_MAPPING_SAVE_THRESHOLD,
101 })
102 }
103
104 fn save_mappings(&self) -> Result<()> {
106 let mappings = UriMappings {
107 uri_to_id: self.uri_to_id.read().unwrap().clone(),
108 next_id: self.next_id.load(std::sync::atomic::Ordering::Relaxed),
109 };
110 let current_dirty = self.dirty_count.load(Ordering::Relaxed);
116
117 save_bincode(&self.storage_path.join("uri_mappings.bin"), &mappings)?;
118
119 if current_dirty > 0 {
120 let _ = self.dirty_count.fetch_sub(current_dirty, Ordering::Relaxed);
121 }
122 Ok(())
123 }
124
125 pub fn flush(&self) -> Result<()> {
127 self.save_mappings()?;
128 if let Some(ref vs) = self.vector_store {
129 vs.flush()?;
130 }
131 Ok(())
132 }
133
134 pub fn get_or_create_id(&self, uri: &str) -> u32 {
135 {
136 let map = self.uri_to_id.read().unwrap();
137 if let Some(&id) = map.get(uri) {
138 return id;
139 }
140 }
141
142 let mut uri_map = self.uri_to_id.write().unwrap();
143 let mut id_map = self.id_to_uri.write().unwrap();
144
145 if let Some(&id) = uri_map.get(uri) {
146 return id;
147 }
148
149 let id = self
150 .next_id
151 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
152 uri_map.insert(uri.to_string(), id);
153 id_map.insert(id, uri.to_string());
154
155 drop(uri_map);
156 drop(id_map);
157
158 let count = self.dirty_count.fetch_add(1, Ordering::Relaxed);
160 if count + 1 >= self.save_threshold {
161 let _ = self.save_mappings();
162 }
163
164 id
165 }
166
167 pub fn get_uri(&self, id: u32) -> Option<String> {
168 self.id_to_uri.read().unwrap().get(&id).cloned()
169 }
170
171 pub async fn ingest_triples(&self, triples: Vec<IngestTriple>) -> Result<(u32, u32)> {
172 let mut added = 0;
173
174 let mut batches: HashMap<Option<Provenance>, Vec<(String, String, String)>> =
176 HashMap::new();
177
178 for t in triples {
179 batches
180 .entry(t.provenance)
181 .or_default()
182 .push((t.subject, t.predicate, t.object));
183 }
184
185 for (prov, batch_triples) in batches {
186 let graph_name = if let Some(p) = &prov {
187 let uuid = Uuid::new_v4();
188 let uri = format!("urn:batch:{}", uuid);
189
190 let batch_node = NamedNode::new_unchecked(&uri);
191 let p_derived =
192 NamedNode::new_unchecked("http://www.w3.org/ns/prov#wasDerivedFrom");
193 let p_time = NamedNode::new_unchecked("http://www.w3.org/ns/prov#generatedAtTime");
194 let p_method = NamedNode::new_unchecked("http://www.w3.org/ns/prov#wasGeneratedBy");
195
196 let o_source = Literal::new_simple_literal(&p.source);
197 let o_time = Literal::new_simple_literal(&p.timestamp);
198 let o_method = Literal::new_simple_literal(&p.method);
199
200 self.store.insert(&Quad::new(
201 batch_node.clone(),
202 p_derived,
203 o_source,
204 GraphName::DefaultGraph,
205 ))?;
206 self.store.insert(&Quad::new(
207 batch_node.clone(),
208 p_time,
209 o_time,
210 GraphName::DefaultGraph,
211 ))?;
212 self.store.insert(&Quad::new(
213 batch_node.clone(),
214 p_method,
215 o_method,
216 GraphName::DefaultGraph,
217 ))?;
218
219 if p.source == "mcp" {
221 GraphName::DefaultGraph
222 } else {
223 GraphName::NamedNode(batch_node)
224 }
225 } else {
226 GraphName::DefaultGraph
227 };
228
229 for (s, p, o) in batch_triples {
230 let subject_uri = self.ensure_uri(&s);
231 let predicate_uri = self.ensure_uri(&p);
232 let object_uri = self.ensure_uri(&o);
233
234 self.get_or_create_id(&subject_uri);
236 self.get_or_create_id(&predicate_uri);
237 self.get_or_create_id(&object_uri);
238
239 let subject = Subject::NamedNode(NamedNode::new_unchecked(&subject_uri));
240 let predicate = NamedNode::new_unchecked(&predicate_uri);
241 let object = Term::NamedNode(NamedNode::new_unchecked(&object_uri));
242
243 let quad = Quad::new(subject, predicate, object, graph_name.clone());
244 let inserted = self.store.insert(&quad)?;
245
246 if let Some(ref vs) = self.vector_store {
248 let key = format!("{}|{}|{}", subject_uri, predicate_uri, object_uri);
250 if vs.get_id(&key).is_none() {
251 let content = format!("{} {} {}", s, p, o);
253 let metadata = serde_json::json!({
255 "uri": subject_uri,
256 "predicate": predicate_uri,
257 "object": object_uri,
258 "type": "triple"
259 });
260
261 if let Err(e) = vs.add(&key, &content, metadata).await {
262 eprintln!("Vector store insertion failed for {}: {}", key, e);
265 }
266 }
267 }
268
269 if inserted {
270 added += 1;
271 }
272 }
273 }
274
275 Ok((added, 0))
276 }
277
278 pub async fn hybrid_search(
280 &self,
281 query: &str,
282 vector_k: usize,
283 graph_depth: u32,
284 ) -> Result<Vec<(String, f32)>> {
285 let mut results = Vec::new();
286
287 if let Some(ref vs) = self.vector_store {
289 let vector_results = vs.search(query, vector_k).await?;
290
291 for result in vector_results {
292 let uri = result.uri.clone();
294 results.push((uri.clone(), result.score));
295
296 if graph_depth > 0 {
298 let expanded = self.expand_graph(&uri, graph_depth)?;
299 for expanded_uri in expanded {
300 results.push((expanded_uri, result.score * 0.8));
302 }
303 }
304 }
305 }
306
307 results.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap());
309 results.dedup_by(|a, b| a.0 == b.0);
310
311 Ok(results)
312 }
313
314 fn expand_graph(&self, start_uri: &str, depth: u32) -> Result<Vec<String>> {
316 let mut expanded = Vec::new();
317
318 if depth == 0 {
319 return Ok(expanded);
320 }
321
322 let subject = NamedNodeRef::new(start_uri).ok();
324
325 if let Some(subj) = subject {
326 for q in self
327 .store
328 .quads_for_pattern(Some(subj.into()), None, None, None)
329 .flatten()
330 {
331 expanded.push(q.object.to_string());
332
333 if depth > 1 {
335 let nested = self.expand_graph(&q.object.to_string(), depth - 1)?;
336 expanded.extend(nested);
337 }
338 }
339 }
340
341 Ok(expanded)
342 }
343
344 pub fn query_sparql(&self, query: &str) -> Result<String> {
345 use oxigraph::sparql::QueryResults;
346
347 let results = self.store.query(query)?;
348
349 match results {
350 QueryResults::Solutions(solutions) => {
351 let mut results_array = Vec::new();
352 for solution in solutions {
353 let sol = solution?;
354 let mut mapping = serde_json::Map::new();
355 for (variable, value) in sol.iter() {
356 mapping.insert(
357 variable.to_string(),
358 serde_json::to_value(value.to_string()).unwrap(),
359 );
360 }
361 results_array.push(serde_json::Value::Object(mapping));
362 }
363 Ok(serde_json::to_string(&results_array)?)
364 }
365 _ => Ok("[]".to_string()),
366 }
367 }
368
369 pub fn get_degree(&self, uri: &str) -> usize {
370 let node = NamedNodeRef::new(uri).ok();
371 if let Some(n) = node {
372 let outgoing = self
373 .store
374 .quads_for_pattern(Some(n.into()), None, None, None)
375 .count();
376 let incoming = self
377 .store
378 .quads_for_pattern(None, None, Some(n.into()), None)
379 .count();
380 outgoing + incoming
381 } else {
382 0
383 }
384 }
385
386 pub fn ensure_uri(&self, s: &str) -> String {
387 let clean = s.trim_start_matches('<').trim_end_matches('>');
388 if clean.starts_with("http") || clean.starts_with("urn:") {
389 clean.to_string()
390 } else {
391 format!("http://synapse.os/{}", clean)
392 }
393 }
394}