Skip to main content

synapse_core/
store.rs

1use crate::persistence::{load_bincode, save_bincode};
2use crate::vector_store::VectorStore;
3use anyhow::Result;
4use oxigraph::model::*;
5use oxigraph::store::Store;
6use serde::{Deserialize, Serialize};
7use std::collections::HashMap;
8use std::path::PathBuf;
9use std::sync::atomic::{AtomicUsize, Ordering};
10use std::sync::{Arc, RwLock};
11use uuid::Uuid;
12
13const DEFAULT_MAPPING_SAVE_THRESHOLD: usize = 1000;
14
15/// Persisted URI mappings
16#[derive(Serialize, Deserialize, Default)]
17struct UriMappings {
18    uri_to_id: HashMap<String, u32>,
19    next_id: u32,
20}
21
22#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
23pub struct Provenance {
24    pub source: String,
25    pub timestamp: String,
26    pub method: String,
27}
28
29pub struct IngestTriple {
30    pub subject: String,
31    pub predicate: String,
32    pub object: String,
33    pub provenance: Option<Provenance>,
34}
35
36pub struct SynapseStore {
37    pub store: Store,
38    pub namespace: String,
39    pub storage_path: PathBuf,
40    // Mapping for gRPC compatibility (ID <-> URI)
41    pub id_to_uri: RwLock<HashMap<u32, String>>,
42    pub uri_to_id: RwLock<HashMap<String, u32>>,
43    pub next_id: std::sync::atomic::AtomicU32,
44    // Vector store for hybrid search
45    pub vector_store: Option<Arc<VectorStore>>,
46    // Persistence state
47    dirty_count: AtomicUsize,
48    save_threshold: usize,
49}
50
51impl SynapseStore {
52    pub fn open(namespace: &str, storage_path: &str) -> Result<Self> {
53        let path = PathBuf::from(storage_path).join(namespace);
54        std::fs::create_dir_all(&path)?;
55        let store = Store::open(&path)?;
56
57        // Load persisted URI mappings if they exist
58        let mappings_path_bin = path.join("uri_mappings.bin");
59        let mappings_path_json = path.join("uri_mappings.json");
60
61        let (uri_to_id, id_to_uri, next_id) = if mappings_path_bin.exists() {
62            let mappings: UriMappings = load_bincode(&mappings_path_bin)?;
63            let id_to_uri: HashMap<u32, String> = mappings
64                .uri_to_id
65                .iter()
66                .map(|(uri, &id)| (id, uri.clone()))
67                .collect();
68            (mappings.uri_to_id, id_to_uri, mappings.next_id)
69        } else if mappings_path_json.exists() {
70            let content = std::fs::read_to_string(&mappings_path_json)?;
71            let mappings: UriMappings = serde_json::from_str(&content)?;
72            let id_to_uri: HashMap<u32, String> = mappings
73                .uri_to_id
74                .iter()
75                .map(|(uri, &id)| (id, uri.clone()))
76                .collect();
77            (mappings.uri_to_id, id_to_uri, mappings.next_id)
78        } else {
79            (HashMap::new(), HashMap::new(), 1)
80        };
81
82        // Initialize vector store (optional, can fail gracefully)
83        let vector_store = match VectorStore::new(namespace) {
84            Ok(vs) => Some(Arc::new(vs)),
85            Err(e) => {
86                eprintln!("WARNING: Failed to initialize vector store for namespace '{}': {}", namespace, e);
87                None
88            }
89        };
90
91        Ok(Self {
92            store,
93            namespace: namespace.to_string(),
94            storage_path: path,
95            id_to_uri: RwLock::new(id_to_uri),
96            uri_to_id: RwLock::new(uri_to_id),
97            next_id: std::sync::atomic::AtomicU32::new(next_id),
98            vector_store,
99            dirty_count: AtomicUsize::new(0),
100            save_threshold: DEFAULT_MAPPING_SAVE_THRESHOLD,
101        })
102    }
103
104    /// Save URI mappings to disk
105    fn save_mappings(&self) -> Result<()> {
106        let mappings = UriMappings {
107            uri_to_id: self.uri_to_id.read().unwrap().clone(),
108            next_id: self.next_id.load(std::sync::atomic::Ordering::Relaxed),
109        };
110        // Capture the count before saving? No, we just care that we saved the current state.
111        // But if new items are added during save, the dirty count will increment.
112        // We need to subtract what we think we saved.
113        // Since we save the *entire* map, we effectively save *all* dirty items up to that point.
114        // So we can read the dirty count, save, then subtract.
115        let current_dirty = self.dirty_count.load(Ordering::Relaxed);
116
117        save_bincode(&self.storage_path.join("uri_mappings.bin"), &mappings)?;
118
119        if current_dirty > 0 {
120            let _ = self.dirty_count.fetch_sub(current_dirty, Ordering::Relaxed);
121        }
122        Ok(())
123    }
124
125    /// Force save all data to disk
126    pub fn flush(&self) -> Result<()> {
127        self.save_mappings()?;
128        if let Some(ref vs) = self.vector_store {
129            vs.flush()?;
130        }
131        Ok(())
132    }
133
134    pub fn get_or_create_id(&self, uri: &str) -> u32 {
135        {
136            let map = self.uri_to_id.read().unwrap();
137            if let Some(&id) = map.get(uri) {
138                return id;
139            }
140        }
141
142        let mut uri_map = self.uri_to_id.write().unwrap();
143        let mut id_map = self.id_to_uri.write().unwrap();
144
145        if let Some(&id) = uri_map.get(uri) {
146            return id;
147        }
148
149        let id = self
150            .next_id
151            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
152        uri_map.insert(uri.to_string(), id);
153        id_map.insert(id, uri.to_string());
154
155        drop(uri_map);
156        drop(id_map);
157
158        // Check if we need to auto-save mappings
159        let count = self.dirty_count.fetch_add(1, Ordering::Relaxed);
160        if count + 1 >= self.save_threshold {
161            let _ = self.save_mappings();
162        }
163
164        id
165    }
166
167    pub fn get_uri(&self, id: u32) -> Option<String> {
168        self.id_to_uri.read().unwrap().get(&id).cloned()
169    }
170
171    pub async fn ingest_triples(&self, triples: Vec<IngestTriple>) -> Result<(u32, u32)> {
172        let mut added = 0;
173
174        // Group by provenance to optimize batch insertion into named graphs
175        let mut batches: HashMap<Option<Provenance>, Vec<(String, String, String)>> =
176            HashMap::new();
177
178        for t in triples {
179            batches
180                .entry(t.provenance)
181                .or_default()
182                .push((t.subject, t.predicate, t.object));
183        }
184
185        for (prov, batch_triples) in batches {
186            let graph_name = if let Some(p) = &prov {
187                let uuid = Uuid::new_v4();
188                let uri = format!("urn:batch:{}", uuid);
189
190                let batch_node = NamedNode::new_unchecked(&uri);
191                let p_derived =
192                    NamedNode::new_unchecked("http://www.w3.org/ns/prov#wasDerivedFrom");
193                let p_time = NamedNode::new_unchecked("http://www.w3.org/ns/prov#generatedAtTime");
194                let p_method = NamedNode::new_unchecked("http://www.w3.org/ns/prov#wasGeneratedBy");
195
196                let o_source = Literal::new_simple_literal(&p.source);
197                let o_time = Literal::new_simple_literal(&p.timestamp);
198                let o_method = Literal::new_simple_literal(&p.method);
199
200                self.store.insert(&Quad::new(
201                    batch_node.clone(),
202                    p_derived,
203                    o_source,
204                    GraphName::DefaultGraph,
205                ))?;
206                self.store.insert(&Quad::new(
207                    batch_node.clone(),
208                    p_time,
209                    o_time,
210                    GraphName::DefaultGraph,
211                ))?;
212                self.store.insert(&Quad::new(
213                    batch_node.clone(),
214                    p_method,
215                    o_method,
216                    GraphName::DefaultGraph,
217                ))?;
218
219                // If source is "mcp", put triples in default graph for easier querying
220                if p.source == "mcp" {
221                    GraphName::DefaultGraph
222                } else {
223                    GraphName::NamedNode(batch_node)
224                }
225            } else {
226                GraphName::DefaultGraph
227            };
228
229            for (s, p, o) in batch_triples {
230                let subject_uri = self.ensure_uri(&s);
231                let predicate_uri = self.ensure_uri(&p);
232                let object_uri = self.ensure_uri(&o);
233
234                // Register URIs in the ID mapping (for gRPC compatibility)
235                self.get_or_create_id(&subject_uri);
236                self.get_or_create_id(&predicate_uri);
237                self.get_or_create_id(&object_uri);
238
239                let subject = Subject::NamedNode(NamedNode::new_unchecked(&subject_uri));
240                let predicate = NamedNode::new_unchecked(&predicate_uri);
241                let object = Term::NamedNode(NamedNode::new_unchecked(&object_uri));
242
243                let quad = Quad::new(subject, predicate, object, graph_name.clone());
244                let inserted = self.store.insert(&quad)?;
245                
246                // Also index in vector store if available
247                if let Some(ref vs) = self.vector_store {
248                    // We check if it's already in the vector store by key
249                    let key = format!("{}|{}|{}", subject_uri, predicate_uri, object_uri);
250                    if vs.get_id(&key).is_none() {
251                        // Create searchable content from triple
252                        let content = format!("{} {} {}", s, p, o);
253                        // Pass metadata including the subject URI for graph expansion later
254                        let metadata = serde_json::json!({
255                            "uri": subject_uri,
256                            "predicate": predicate_uri,
257                            "object": object_uri,
258                            "type": "triple"
259                        });
260
261                        if let Err(e) = vs.add(&key, &content, metadata).await {
262                            // If we just inserted it into the graph but vector failed, 
263                            // we technically have an inconsistency, but for now we just log.
264                            eprintln!("Vector store insertion failed for {}: {}", key, e);
265                        }
266                    }
267                }
268
269                if inserted {
270                    added += 1;
271                }
272            }
273        }
274
275        Ok((added, 0))
276    }
277
278    /// Hybrid search: vector similarity + graph expansion
279    pub async fn hybrid_search(
280        &self,
281        query: &str,
282        vector_k: usize,
283        graph_depth: u32,
284    ) -> Result<Vec<(String, f32)>> {
285        let mut results = Vec::new();
286
287        // Step 1: Vector search
288        if let Some(ref vs) = self.vector_store {
289            let vector_results = vs.search(query, vector_k).await?;
290
291            for result in vector_results {
292                // Use the URI from metadata/result (which maps to Subject URI for triples)
293                let uri = result.uri.clone();
294                results.push((uri.clone(), result.score));
295
296                // Step 2: Graph expansion (if depth > 0)
297                if graph_depth > 0 {
298                    let expanded = self.expand_graph(&uri, graph_depth)?;
299                    for expanded_uri in expanded {
300                        // Add with slightly lower score
301                        results.push((expanded_uri, result.score * 0.8));
302                    }
303                }
304            }
305        }
306
307        // Remove duplicates and sort by score
308        results.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap());
309        results.dedup_by(|a, b| a.0 == b.0);
310
311        Ok(results)
312    }
313
314    /// Expand graph from a starting URI
315    fn expand_graph(&self, start_uri: &str, depth: u32) -> Result<Vec<String>> {
316        let mut expanded = Vec::new();
317
318        if depth == 0 {
319            return Ok(expanded);
320        }
321
322        // Query for all triples where start_uri is subject or object
323        let subject = NamedNodeRef::new(start_uri).ok();
324
325        if let Some(subj) = subject {
326            for q in self
327                .store
328                .quads_for_pattern(Some(subj.into()), None, None, None)
329                .flatten()
330            {
331                expanded.push(q.object.to_string());
332
333                // Recursive expansion (simplified, depth-1)
334                if depth > 1 {
335                    let nested = self.expand_graph(&q.object.to_string(), depth - 1)?;
336                    expanded.extend(nested);
337                }
338            }
339        }
340
341        Ok(expanded)
342    }
343
344    pub fn query_sparql(&self, query: &str) -> Result<String> {
345        use oxigraph::sparql::QueryResults;
346
347        let results = self.store.query(query)?;
348
349        match results {
350            QueryResults::Solutions(solutions) => {
351                let mut results_array = Vec::new();
352                for solution in solutions {
353                    let sol = solution?;
354                    let mut mapping = serde_json::Map::new();
355                    for (variable, value) in sol.iter() {
356                        mapping.insert(
357                            variable.to_string(),
358                            serde_json::to_value(value.to_string()).unwrap(),
359                        );
360                    }
361                    results_array.push(serde_json::Value::Object(mapping));
362                }
363                Ok(serde_json::to_string(&results_array)?)
364            }
365            _ => Ok("[]".to_string()),
366        }
367    }
368
369    pub fn get_degree(&self, uri: &str) -> usize {
370        let node = NamedNodeRef::new(uri).ok();
371        if let Some(n) = node {
372            let outgoing = self
373                .store
374                .quads_for_pattern(Some(n.into()), None, None, None)
375                .count();
376            let incoming = self
377                .store
378                .quads_for_pattern(None, None, Some(n.into()), None)
379                .count();
380            outgoing + incoming
381        } else {
382            0
383        }
384    }
385
386    pub fn ensure_uri(&self, s: &str) -> String {
387        let clean = s.trim_start_matches('<').trim_end_matches('>');
388        if clean.starts_with("http") || clean.starts_with("urn:") {
389            clean.to_string()
390        } else {
391            format!("http://synapse.os/{}", clean)
392        }
393    }
394}