Skip to main content

synapse_core/
store.rs

1use crate::vector_store::VectorStore;
2use anyhow::Result;
3use oxigraph::model::*;
4use oxigraph::store::Store;
5use serde::{Deserialize, Serialize};
6use std::collections::HashMap;
7use std::path::PathBuf;
8use std::sync::{Arc, RwLock};
9
10/// Persisted URI mappings
11#[derive(Serialize, Deserialize, Default)]
12struct UriMappings {
13    uri_to_id: HashMap<String, u32>,
14    next_id: u32,
15}
16
17pub struct SynapseStore {
18    pub store: Store,
19    pub namespace: String,
20    pub storage_path: PathBuf,
21    // Mapping for gRPC compatibility (ID <-> URI)
22    pub id_to_uri: RwLock<HashMap<u32, String>>,
23    pub uri_to_id: RwLock<HashMap<String, u32>>,
24    pub next_id: std::sync::atomic::AtomicU32,
25    // Vector store for hybrid search
26    pub vector_store: Option<Arc<VectorStore>>,
27}
28
29impl SynapseStore {
30    pub fn open(namespace: &str, storage_path: &str) -> Result<Self> {
31        let path = PathBuf::from(storage_path).join(namespace);
32        std::fs::create_dir_all(&path)?;
33        let store = Store::open(&path)?;
34
35        // Load persisted URI mappings if they exist
36        let mappings_path = path.join("uri_mappings.json");
37        let (uri_to_id, id_to_uri, next_id) = if mappings_path.exists() {
38            let content = std::fs::read_to_string(&mappings_path)?;
39            let mappings: UriMappings = serde_json::from_str(&content)?;
40            let id_to_uri: HashMap<u32, String> = mappings
41                .uri_to_id
42                .iter()
43                .map(|(uri, &id)| (id, uri.clone()))
44                .collect();
45            (mappings.uri_to_id, id_to_uri, mappings.next_id)
46        } else {
47            (HashMap::new(), HashMap::new(), 1)
48        };
49
50        // Initialize vector store (optional, can fail gracefully)
51        let vector_store = VectorStore::new(namespace).ok().map(Arc::new);
52
53        Ok(Self {
54            store,
55            namespace: namespace.to_string(),
56            storage_path: path,
57            id_to_uri: RwLock::new(id_to_uri),
58            uri_to_id: RwLock::new(uri_to_id),
59            next_id: std::sync::atomic::AtomicU32::new(next_id),
60            vector_store,
61        })
62    }
63
64    /// Save URI mappings to disk
65    fn save_mappings(&self, mappings: UriMappings) -> Result<()> {
66        let content = serde_json::to_string_pretty(&mappings)?;
67        std::fs::write(self.storage_path.join("uri_mappings.json"), content)?;
68        Ok(())
69    }
70
71    pub fn get_or_create_id(&self, uri: &str) -> u32 {
72        {
73            let map = self.uri_to_id.read().unwrap();
74            if let Some(&id) = map.get(uri) {
75                return id;
76            }
77        }
78
79        let mut uri_map = self.uri_to_id.write().unwrap();
80        let mut id_map = self.id_to_uri.write().unwrap();
81
82        if let Some(&id) = uri_map.get(uri) {
83            return id;
84        }
85
86        let id = self
87            .next_id
88            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
89        uri_map.insert(uri.to_string(), id);
90        id_map.insert(id, uri.to_string());
91
92        // Prepare mappings for persistence
93        let mappings = UriMappings {
94            uri_to_id: uri_map.clone(),
95            next_id: self.next_id.load(std::sync::atomic::Ordering::Relaxed),
96        };
97
98        // Persist mappings (best effort, don't block on error)
99        drop(uri_map);
100        drop(id_map);
101        let _ = self.save_mappings(mappings);
102
103        id
104    }
105
106    pub fn get_uri(&self, id: u32) -> Option<String> {
107        self.id_to_uri.read().unwrap().get(&id).cloned()
108    }
109
110    pub async fn ingest_triples(
111        &self,
112        triples: Vec<(String, String, String)>,
113    ) -> Result<(u32, u32)> {
114        let mut added = 0;
115
116        for (s, p, o) in triples {
117            let subject_uri = self.ensure_uri(&s);
118            let predicate_uri = self.ensure_uri(&p);
119            let object_uri = self.ensure_uri(&o);
120
121            let subject = Subject::NamedNode(NamedNode::new_unchecked(&subject_uri));
122            let predicate = NamedNode::new_unchecked(&predicate_uri);
123            let object = Term::NamedNode(NamedNode::new_unchecked(&object_uri));
124
125            let quad = Quad::new(subject, predicate, object, GraphName::DefaultGraph);
126            if self.store.insert(&quad)? {
127                // Also index in vector store if available
128                if let Some(ref vs) = self.vector_store {
129                    // Create searchable content from triple
130                    let content = format!("{} {} {}", s, p, o);
131                    if let Err(e) = vs.add(&subject_uri, &content).await {
132                        // Rollback graph insertion to ensure consistency
133                        self.store.remove(&quad)?;
134                        return Err(anyhow::anyhow!(
135                            "Vector store insertion failed, rolled back graph change: {}",
136                            e
137                        ));
138                    }
139                }
140                added += 1;
141            }
142        }
143
144        Ok((added, 0))
145    }
146
147    /// Hybrid search: vector similarity + graph expansion
148    pub async fn hybrid_search(
149        &self,
150        query: &str,
151        vector_k: usize,
152        graph_depth: u32,
153    ) -> Result<Vec<(String, f32)>> {
154        let mut results = Vec::new();
155
156        // Step 1: Vector search
157        if let Some(ref vs) = self.vector_store {
158            let vector_results = vs.search(query, vector_k).await?;
159
160            for result in vector_results {
161                results.push((result.uri.clone(), result.score));
162
163                // Step 2: Graph expansion (if depth > 0)
164                if graph_depth > 0 {
165                    let expanded = self.expand_graph(&result.uri, graph_depth)?;
166                    for uri in expanded {
167                        // Add with slightly lower score
168                        results.push((uri, result.score * 0.8));
169                    }
170                }
171            }
172        }
173
174        // Remove duplicates and sort by score
175        results.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap());
176        results.dedup_by(|a, b| a.0 == b.0);
177
178        Ok(results)
179    }
180
181    /// Expand graph from a starting URI
182    fn expand_graph(&self, start_uri: &str, depth: u32) -> Result<Vec<String>> {
183        let mut expanded = Vec::new();
184
185        if depth == 0 {
186            return Ok(expanded);
187        }
188
189        // Query for all triples where start_uri is subject or object
190        let subject = NamedNodeRef::new(start_uri).ok();
191
192        if let Some(subj) = subject {
193            for quad in self
194                .store
195                .quads_for_pattern(Some(subj.into()), None, None, None)
196            {
197                if let Ok(q) = quad {
198                    expanded.push(q.object.to_string());
199
200                    // Recursive expansion (simplified, depth-1)
201                    if depth > 1 {
202                        let nested = self.expand_graph(&q.object.to_string(), depth - 1)?;
203                        expanded.extend(nested);
204                    }
205                }
206            }
207        }
208
209        Ok(expanded)
210    }
211
212    pub fn query_sparql(&self, query: &str) -> Result<String> {
213        use oxigraph::sparql::QueryResults;
214
215        let results = self.store.query(query)?;
216
217        match results {
218            QueryResults::Solutions(solutions) => {
219                let mut results_array = Vec::new();
220                for solution in solutions {
221                    let sol = solution?;
222                    let mut mapping = serde_json::Map::new();
223                    for (variable, value) in sol.iter() {
224                        mapping.insert(
225                            variable.to_string(),
226                            serde_json::to_value(value.to_string()).unwrap(),
227                        );
228                    }
229                    results_array.push(serde_json::Value::Object(mapping));
230                }
231                Ok(serde_json::to_string(&results_array)?)
232            }
233            _ => Ok("[]".to_string()),
234        }
235    }
236
237    fn ensure_uri(&self, s: &str) -> String {
238        if s.starts_with("http") {
239            s.to_string()
240        } else {
241            format!("http://synapse.os/{}", s)
242        }
243    }
244}