Skip to main content

synapse_core/
store.rs

1use anyhow::Result;
2use oxigraph::model::*;
3use oxigraph::store::Store;
4use std::collections::HashMap;
5use std::path::PathBuf;
6use std::sync::{Arc, RwLock};
7use crate::vector_store::VectorStore;
8use serde::{Deserialize, Serialize};
9
10/// Persisted URI mappings
11#[derive(Serialize, Deserialize, Default)]
12struct UriMappings {
13    uri_to_id: HashMap<String, u32>,
14    next_id: u32,
15}
16
17pub struct SynapseStore {
18    pub store: Store,
19    pub namespace: String,
20    pub storage_path: PathBuf,
21    // Mapping for gRPC compatibility (ID <-> URI)
22    pub id_to_uri: RwLock<HashMap<u32, String>>,
23    pub uri_to_id: RwLock<HashMap<String, u32>>,
24    pub next_id: std::sync::atomic::AtomicU32,
25    // Vector store for hybrid search
26    pub vector_store: Option<Arc<VectorStore>>,
27}
28
29impl SynapseStore {
30    pub fn open(namespace: &str, storage_path: &str) -> Result<Self> {
31        let path = PathBuf::from(storage_path).join(namespace);
32        std::fs::create_dir_all(&path)?;
33        let store = Store::open(&path)?;
34        
35        // Load persisted URI mappings if they exist
36        let mappings_path = path.join("uri_mappings.json");
37        let (uri_to_id, id_to_uri, next_id) = if mappings_path.exists() {
38            let content = std::fs::read_to_string(&mappings_path)?;
39            let mappings: UriMappings = serde_json::from_str(&content)?;
40            let id_to_uri: HashMap<u32, String> = mappings.uri_to_id
41                .iter()
42                .map(|(uri, &id)| (id, uri.clone()))
43                .collect();
44            (mappings.uri_to_id, id_to_uri, mappings.next_id)
45        } else {
46            (HashMap::new(), HashMap::new(), 1)
47        };
48        
49        // Initialize vector store (optional, can fail gracefully)
50        let vector_store = VectorStore::new(namespace)
51            .ok()
52            .map(Arc::new);
53        
54        Ok(Self {
55            store,
56            namespace: namespace.to_string(),
57            storage_path: path,
58            id_to_uri: RwLock::new(id_to_uri),
59            uri_to_id: RwLock::new(uri_to_id),
60            next_id: std::sync::atomic::AtomicU32::new(next_id),
61            vector_store,
62        })
63    }
64
65    /// Save URI mappings to disk
66    fn save_mappings(&self) -> Result<()> {
67        let uri_map = self.uri_to_id.read().unwrap();
68        let mappings = UriMappings {
69            uri_to_id: uri_map.clone(),
70            next_id: self.next_id.load(std::sync::atomic::Ordering::Relaxed),
71        };
72        let content = serde_json::to_string_pretty(&mappings)?;
73        std::fs::write(self.storage_path.join("uri_mappings.json"), content)?;
74        Ok(())
75    }
76
77    pub fn get_or_create_id(&self, uri: &str) -> u32 {
78        {
79            let map = self.uri_to_id.read().unwrap();
80            if let Some(&id) = map.get(uri) {
81                return id;
82            }
83        }
84        
85        let mut uri_map = self.uri_to_id.write().unwrap();
86        let mut id_map = self.id_to_uri.write().unwrap();
87        
88        if let Some(&id) = uri_map.get(uri) {
89            return id;
90        }
91        
92        let id = self.next_id.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
93        uri_map.insert(uri.to_string(), id);
94        id_map.insert(id, uri.to_string());
95        
96        // Persist mappings (best effort, don't block on error)
97        drop(uri_map);
98        drop(id_map);
99        let _ = self.save_mappings();
100        
101        id
102    }
103
104    pub fn get_uri(&self, id: u32) -> Option<String> {
105        self.id_to_uri.read().unwrap().get(&id).cloned()
106    }
107
108    pub async fn ingest_triples(&self, triples: Vec<(String, String, String)>) -> Result<(u32, u32)> {
109        let mut added = 0;
110        
111        for (s, p, o) in triples {
112            let subject_uri = self.ensure_uri(&s);
113            let predicate_uri = self.ensure_uri(&p);
114            let object_uri = self.ensure_uri(&o);
115            
116            let subject = Subject::NamedNode(NamedNode::new_unchecked(&subject_uri));
117            let predicate = NamedNode::new_unchecked(&predicate_uri);
118            let object = Term::NamedNode(NamedNode::new_unchecked(&object_uri));
119            
120            let quad = Quad::new(subject, predicate, object, GraphName::DefaultGraph);
121            if self.store.insert(&quad)? {
122                added += 1;
123                
124                // Also index in vector store if available
125                if let Some(ref vs) = self.vector_store {
126                    // Create searchable content from triple
127                    let content = format!("{} {} {}", s, p, o);
128                    let _ = vs.add(&subject_uri, &content).await;
129                }
130            }
131        }
132
133        Ok((added, 0))
134    }
135
136    /// Hybrid search: vector similarity + graph expansion
137    pub async fn hybrid_search(
138        &self,
139        query: &str,
140        vector_k: usize,
141        graph_depth: u32,
142    ) -> Result<Vec<(String, f32)>> {
143        let mut results = Vec::new();
144        
145        // Step 1: Vector search
146        if let Some(ref vs) = self.vector_store {
147            let vector_results = vs.search(query, vector_k).await?;
148            
149            for result in vector_results {
150                results.push((result.uri.clone(), result.score));
151                
152                // Step 2: Graph expansion (if depth > 0)
153                if graph_depth > 0 {
154                    let expanded = self.expand_graph(&result.uri, graph_depth)?;
155                    for uri in expanded {
156                        // Add with slightly lower score
157                        results.push((uri, result.score * 0.8));
158                    }
159                }
160            }
161        }
162        
163        // Remove duplicates and sort by score
164        results.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap());
165        results.dedup_by(|a, b| a.0 == b.0);
166        
167        Ok(results)
168    }
169
170    /// Expand graph from a starting URI
171    fn expand_graph(&self, start_uri: &str, depth: u32) -> Result<Vec<String>> {
172        let mut expanded = Vec::new();
173        
174        if depth == 0 {
175            return Ok(expanded);
176        }
177        
178        // Query for all triples where start_uri is subject or object
179        let subject = NamedNodeRef::new(start_uri).ok();
180        
181        if let Some(subj) = subject {
182            for quad in self.store.quads_for_pattern(
183                Some(subj.into()),
184                None,
185                None,
186                None,
187            ) {
188                if let Ok(q) = quad {
189                    expanded.push(q.object.to_string());
190                    
191                    // Recursive expansion (simplified, depth-1)
192                    if depth > 1 {
193                        let nested = self.expand_graph(&q.object.to_string(), depth - 1)?;
194                        expanded.extend(nested);
195                    }
196                }
197            }
198        }
199        
200        Ok(expanded)
201    }
202
203    pub fn query_sparql(&self, query: &str) -> Result<String> {
204        use oxigraph::sparql::QueryResults;
205        
206        let results = self.store.query(query)?;
207        
208        match results {
209            QueryResults::Solutions(solutions) => {
210                let mut results_array = Vec::new();
211                for solution in solutions {
212                    let sol = solution?;
213                    let mut mapping = serde_json::Map::new();
214                    for (variable, value) in sol.iter() {
215                        mapping.insert(variable.to_string(), serde_json::to_value(value.to_string()).unwrap());
216                    }
217                    results_array.push(serde_json::Value::Object(mapping));
218                }
219                Ok(serde_json::to_string(&results_array)?)
220            }
221            _ => Ok("[]".to_string()),
222        }
223    }
224
225    fn ensure_uri(&self, s: &str) -> String {
226        if s.starts_with("http") {
227            s.to_string()
228        } else {
229            format!("http://synapse.os/{}", s)
230        }
231    }
232}