Skip to main content

synapse_core/
store.rs

1use crate::vector_store::VectorStore;
2use anyhow::Result;
3use oxigraph::model::*;
4use oxigraph::store::Store;
5use serde::{Deserialize, Serialize};
6use std::collections::HashMap;
7use std::path::PathBuf;
8use std::sync::{Arc, RwLock};
9use uuid::Uuid;
10
11/// Persisted URI mappings
12#[derive(Serialize, Deserialize, Default)]
13struct UriMappings {
14    uri_to_id: HashMap<String, u32>,
15    next_id: u32,
16}
17
18#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
19pub struct Provenance {
20    pub source: String,
21    pub timestamp: String,
22    pub method: String,
23}
24
25pub struct IngestTriple {
26    pub subject: String,
27    pub predicate: String,
28    pub object: String,
29    pub provenance: Option<Provenance>,
30}
31
32pub struct SynapseStore {
33    pub store: Store,
34    pub namespace: String,
35    pub storage_path: PathBuf,
36    // Mapping for gRPC compatibility (ID <-> URI)
37    pub id_to_uri: RwLock<HashMap<u32, String>>,
38    pub uri_to_id: RwLock<HashMap<String, u32>>,
39    pub next_id: std::sync::atomic::AtomicU32,
40    // Vector store for hybrid search
41    pub vector_store: Option<Arc<VectorStore>>,
42}
43
44impl SynapseStore {
45    pub fn open(namespace: &str, storage_path: &str) -> Result<Self> {
46        let path = PathBuf::from(storage_path).join(namespace);
47        std::fs::create_dir_all(&path)?;
48        let store = Store::open(&path)?;
49
50        // Load persisted URI mappings if they exist
51        let mappings_path = path.join("uri_mappings.json");
52        let (uri_to_id, id_to_uri, next_id) = if mappings_path.exists() {
53            let content = std::fs::read_to_string(&mappings_path)?;
54            let mappings: UriMappings = serde_json::from_str(&content)?;
55            let id_to_uri: HashMap<u32, String> = mappings
56                .uri_to_id
57                .iter()
58                .map(|(uri, &id)| (id, uri.clone()))
59                .collect();
60            (mappings.uri_to_id, id_to_uri, mappings.next_id)
61        } else {
62            (HashMap::new(), HashMap::new(), 1)
63        };
64
65        // Initialize vector store (optional, can fail gracefully)
66        let vector_store = VectorStore::new(namespace).ok().map(Arc::new);
67
68        Ok(Self {
69            store,
70            namespace: namespace.to_string(),
71            storage_path: path,
72            id_to_uri: RwLock::new(id_to_uri),
73            uri_to_id: RwLock::new(uri_to_id),
74            next_id: std::sync::atomic::AtomicU32::new(next_id),
75            vector_store,
76        })
77    }
78
79    /// Save URI mappings to disk
80    fn save_mappings(&self, mappings: UriMappings) -> Result<()> {
81        let content = serde_json::to_string_pretty(&mappings)?;
82        std::fs::write(self.storage_path.join("uri_mappings.json"), content)?;
83        Ok(())
84    }
85
86    pub fn get_or_create_id(&self, uri: &str) -> u32 {
87        {
88            let map = self.uri_to_id.read().unwrap();
89            if let Some(&id) = map.get(uri) {
90                return id;
91            }
92        }
93
94        let mut uri_map = self.uri_to_id.write().unwrap();
95        let mut id_map = self.id_to_uri.write().unwrap();
96
97        if let Some(&id) = uri_map.get(uri) {
98            return id;
99        }
100
101        let id = self
102            .next_id
103            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
104        uri_map.insert(uri.to_string(), id);
105        id_map.insert(id, uri.to_string());
106
107        // Prepare mappings for persistence
108        let mappings = UriMappings {
109            uri_to_id: uri_map.clone(),
110            next_id: self.next_id.load(std::sync::atomic::Ordering::Relaxed),
111        };
112
113        // Persist mappings (best effort, don't block on error)
114        drop(uri_map);
115        drop(id_map);
116        let _ = self.save_mappings(mappings);
117
118        id
119    }
120
121    pub fn get_uri(&self, id: u32) -> Option<String> {
122        self.id_to_uri.read().unwrap().get(&id).cloned()
123    }
124
125    pub async fn ingest_triples(
126        &self,
127        triples: Vec<IngestTriple>,
128    ) -> Result<(u32, u32)> {
129        let mut added = 0;
130
131        // Group by provenance to optimize batch insertion into named graphs
132        let mut batches: HashMap<Option<Provenance>, Vec<(String, String, String)>> = HashMap::new();
133
134        for t in triples {
135            batches.entry(t.provenance).or_default().push((t.subject, t.predicate, t.object));
136        }
137
138        for (prov, batch_triples) in batches {
139             let graph_name = if let Some(p) = &prov {
140                 let uuid = Uuid::new_v4();
141                 let uri = format!("urn:batch:{}", uuid);
142
143                 let batch_node = NamedNode::new_unchecked(&uri);
144                 let p_derived = NamedNode::new_unchecked("http://www.w3.org/ns/prov#wasDerivedFrom");
145                 let p_time = NamedNode::new_unchecked("http://www.w3.org/ns/prov#generatedAtTime");
146                 let p_method = NamedNode::new_unchecked("http://www.w3.org/ns/prov#wasGeneratedBy");
147
148                 let o_source = Literal::new_simple_literal(&p.source);
149                 let o_time = Literal::new_simple_literal(&p.timestamp);
150                 let o_method = Literal::new_simple_literal(&p.method);
151
152                 self.store.insert(&Quad::new(batch_node.clone(), p_derived, o_source, GraphName::DefaultGraph))?;
153                 self.store.insert(&Quad::new(batch_node.clone(), p_time, o_time, GraphName::DefaultGraph))?;
154                 self.store.insert(&Quad::new(batch_node.clone(), p_method, o_method, GraphName::DefaultGraph))?;
155
156                 GraphName::NamedNode(batch_node)
157             } else {
158                 GraphName::DefaultGraph
159             };
160
161            for (s, p, o) in batch_triples {
162                let subject_uri = self.ensure_uri(&s);
163                let predicate_uri = self.ensure_uri(&p);
164                let object_uri = self.ensure_uri(&o);
165
166                // Register URIs in the ID mapping (for gRPC compatibility)
167                self.get_or_create_id(&subject_uri);
168                self.get_or_create_id(&predicate_uri);
169                self.get_or_create_id(&object_uri);
170
171                let subject = Subject::NamedNode(NamedNode::new_unchecked(&subject_uri));
172                let predicate = NamedNode::new_unchecked(&predicate_uri);
173                let object = Term::NamedNode(NamedNode::new_unchecked(&object_uri));
174
175                let quad = Quad::new(subject, predicate, object, graph_name.clone());
176                if self.store.insert(&quad)? {
177                    // Also index in vector store if available
178                    if let Some(ref vs) = self.vector_store {
179                        // Create searchable content from triple
180                        let content = format!("{} {} {}", s, p, o);
181                        // Use a deterministic hash/key for the triple to allow multiple triples per subject
182                        // We use the content itself as key or a hash of it.
183                        // Ideally we should use a hash, but for simplicity let's use the formatted content string as key prefix?
184                        // Actually, just using a unique ID is fine, but we want idempotency.
185                        // format!("{}|{}|{}", s, p, o)
186                        let key = format!("{}|{}|{}", subject_uri, predicate_uri, object_uri);
187
188                        // Pass metadata including the subject URI for graph expansion later
189                        let metadata = serde_json::json!({
190                            "uri": subject_uri,
191                            "predicate": predicate_uri,
192                            "object": object_uri,
193                            "type": "triple"
194                        });
195
196                        if let Err(e) = vs.add(&key, &content, metadata).await {
197                            // Rollback graph insertion to ensure consistency
198                            self.store.remove(&quad)?;
199                            return Err(anyhow::anyhow!(
200                                "Vector store insertion failed, rolled back graph change: {}",
201                                e
202                            ));
203                        }
204                    }
205                    added += 1;
206                }
207            }
208        }
209
210        Ok((added, 0))
211    }
212
213    /// Hybrid search: vector similarity + graph expansion
214    pub async fn hybrid_search(
215        &self,
216        query: &str,
217        vector_k: usize,
218        graph_depth: u32,
219    ) -> Result<Vec<(String, f32)>> {
220        let mut results = Vec::new();
221
222        // Step 1: Vector search
223        if let Some(ref vs) = self.vector_store {
224            let vector_results = vs.search(query, vector_k).await?;
225
226            for result in vector_results {
227                // Use the URI from metadata/result (which maps to Subject URI for triples)
228                let uri = result.uri.clone();
229                results.push((uri.clone(), result.score));
230
231                // Step 2: Graph expansion (if depth > 0)
232                if graph_depth > 0 {
233                    let expanded = self.expand_graph(&uri, graph_depth)?;
234                    for expanded_uri in expanded {
235                        // Add with slightly lower score
236                        results.push((expanded_uri, result.score * 0.8));
237                    }
238                }
239            }
240        }
241
242        // Remove duplicates and sort by score
243        results.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap());
244        results.dedup_by(|a, b| a.0 == b.0);
245
246        Ok(results)
247    }
248
249    /// Expand graph from a starting URI
250    fn expand_graph(&self, start_uri: &str, depth: u32) -> Result<Vec<String>> {
251        let mut expanded = Vec::new();
252
253        if depth == 0 {
254            return Ok(expanded);
255        }
256
257        // Query for all triples where start_uri is subject or object
258        let subject = NamedNodeRef::new(start_uri).ok();
259
260        if let Some(subj) = subject {
261            for quad in self
262                .store
263                .quads_for_pattern(Some(subj.into()), None, None, None)
264            {
265                if let Ok(q) = quad {
266                    expanded.push(q.object.to_string());
267
268                    // Recursive expansion (simplified, depth-1)
269                    if depth > 1 {
270                        let nested = self.expand_graph(&q.object.to_string(), depth - 1)?;
271                        expanded.extend(nested);
272                    }
273                }
274            }
275        }
276
277        Ok(expanded)
278    }
279
280    pub fn query_sparql(&self, query: &str) -> Result<String> {
281        use oxigraph::sparql::QueryResults;
282
283        let results = self.store.query(query)?;
284
285        match results {
286            QueryResults::Solutions(solutions) => {
287                let mut results_array = Vec::new();
288                for solution in solutions {
289                    let sol = solution?;
290                    let mut mapping = serde_json::Map::new();
291                    for (variable, value) in sol.iter() {
292                        mapping.insert(
293                            variable.to_string(),
294                            serde_json::to_value(value.to_string()).unwrap(),
295                        );
296                    }
297                    results_array.push(serde_json::Value::Object(mapping));
298                }
299                Ok(serde_json::to_string(&results_array)?)
300            }
301            _ => Ok("[]".to_string()),
302        }
303    }
304
305    pub fn get_degree(&self, uri: &str) -> usize {
306        let node = NamedNodeRef::new(uri).ok();
307        if let Some(n) = node {
308             let outgoing = self.store.quads_for_pattern(Some(n.into()), None, None, None).count();
309             let incoming = self.store.quads_for_pattern(None, None, Some(n.into()), None).count();
310             outgoing + incoming
311        } else {
312            0
313        }
314    }
315
316    pub fn ensure_uri(&self, s: &str) -> String {
317        if s.starts_with("http") || s.starts_with("urn:") {
318            s.to_string()
319        } else {
320            format!("http://synapse.os/{}", s)
321        }
322    }
323}