ruvector_graph/
storage.rs

1//! Persistent storage layer with redb and memory-mapped vectors
2//!
3//! Provides ACID-compliant storage for graph nodes, edges, and hyperedges
4
5#[cfg(feature = "storage")]
6use crate::edge::Edge;
7#[cfg(feature = "storage")]
8use crate::hyperedge::{Hyperedge, HyperedgeId};
9#[cfg(feature = "storage")]
10use crate::node::Node;
11#[cfg(feature = "storage")]
12use crate::types::{EdgeId, NodeId};
13#[cfg(feature = "storage")]
14use anyhow::Result;
15#[cfg(feature = "storage")]
16use bincode::config;
17#[cfg(feature = "storage")]
18use once_cell::sync::Lazy;
19#[cfg(feature = "storage")]
20use parking_lot::Mutex;
21#[cfg(feature = "storage")]
22use redb::{Database, ReadableTable, TableDefinition};
23#[cfg(feature = "storage")]
24use std::collections::HashMap;
25#[cfg(feature = "storage")]
26use std::path::{Path, PathBuf};
27#[cfg(feature = "storage")]
28use std::sync::Arc;
29
30#[cfg(feature = "storage")]
31// Table definitions
32const NODES_TABLE: TableDefinition<&str, &[u8]> = TableDefinition::new("nodes");
33#[cfg(feature = "storage")]
34const EDGES_TABLE: TableDefinition<&str, &[u8]> = TableDefinition::new("edges");
35#[cfg(feature = "storage")]
36const HYPEREDGES_TABLE: TableDefinition<&str, &[u8]> = TableDefinition::new("hyperedges");
37#[cfg(feature = "storage")]
38const METADATA_TABLE: TableDefinition<&str, &str> = TableDefinition::new("metadata");
39
40#[cfg(feature = "storage")]
41// Global database connection pool to allow multiple GraphStorage instances
42// to share the same underlying database file
43static DB_POOL: Lazy<Mutex<HashMap<PathBuf, Arc<Database>>>> =
44    Lazy::new(|| Mutex::new(HashMap::new()));
45
46#[cfg(feature = "storage")]
47/// Storage backend for graph database
48pub struct GraphStorage {
49    db: Arc<Database>,
50}
51
52#[cfg(feature = "storage")]
53impl GraphStorage {
54    /// Create or open a graph storage at the given path
55    ///
56    /// Uses a global connection pool to allow multiple GraphStorage
57    /// instances to share the same underlying database file
58    pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
59        let path_buf = path
60            .as_ref()
61            .canonicalize()
62            .unwrap_or_else(|_| path.as_ref().to_path_buf());
63
64        // Check if we already have a Database instance for this path
65        let db = {
66            let mut pool = DB_POOL.lock();
67
68            if let Some(existing_db) = pool.get(&path_buf) {
69                // Reuse existing database connection
70                Arc::clone(existing_db)
71            } else {
72                // Create new database and add to pool
73                let new_db = Arc::new(Database::create(&path_buf)?);
74
75                // Initialize tables
76                let write_txn = new_db.begin_write()?;
77                {
78                    let _ = write_txn.open_table(NODES_TABLE)?;
79                    let _ = write_txn.open_table(EDGES_TABLE)?;
80                    let _ = write_txn.open_table(HYPEREDGES_TABLE)?;
81                    let _ = write_txn.open_table(METADATA_TABLE)?;
82                }
83                write_txn.commit()?;
84
85                pool.insert(path_buf, Arc::clone(&new_db));
86                new_db
87            }
88        };
89
90        Ok(Self { db })
91    }
92
93    // Node operations
94
95    /// Insert a node
96    pub fn insert_node(&self, node: &Node) -> Result<NodeId> {
97        let write_txn = self.db.begin_write()?;
98        {
99            let mut table = write_txn.open_table(NODES_TABLE)?;
100
101            // Serialize node data
102            let node_data = bincode::encode_to_vec(node, config::standard())?;
103            table.insert(node.id.as_str(), node_data.as_slice())?;
104        }
105        write_txn.commit()?;
106
107        Ok(node.id.clone())
108    }
109
110    /// Insert multiple nodes in a batch
111    pub fn insert_nodes_batch(&self, nodes: &[Node]) -> Result<Vec<NodeId>> {
112        let write_txn = self.db.begin_write()?;
113        let mut ids = Vec::with_capacity(nodes.len());
114
115        {
116            let mut table = write_txn.open_table(NODES_TABLE)?;
117
118            for node in nodes {
119                let node_data = bincode::encode_to_vec(node, config::standard())?;
120                table.insert(node.id.as_str(), node_data.as_slice())?;
121                ids.push(node.id.clone());
122            }
123        }
124
125        write_txn.commit()?;
126        Ok(ids)
127    }
128
129    /// Get a node by ID
130    pub fn get_node(&self, id: &str) -> Result<Option<Node>> {
131        let read_txn = self.db.begin_read()?;
132        let table = read_txn.open_table(NODES_TABLE)?;
133
134        let Some(node_data) = table.get(id)? else {
135            return Ok(None);
136        };
137
138        let (node, _): (Node, usize) =
139            bincode::decode_from_slice(node_data.value(), config::standard())?;
140        Ok(Some(node))
141    }
142
143    /// Delete a node by ID
144    pub fn delete_node(&self, id: &str) -> Result<bool> {
145        let write_txn = self.db.begin_write()?;
146        let deleted;
147        {
148            let mut table = write_txn.open_table(NODES_TABLE)?;
149            let result = table.remove(id)?;
150            deleted = result.is_some();
151        }
152        write_txn.commit()?;
153        Ok(deleted)
154    }
155
156    /// Get all node IDs
157    pub fn all_node_ids(&self) -> Result<Vec<NodeId>> {
158        let read_txn = self.db.begin_read()?;
159        let table = read_txn.open_table(NODES_TABLE)?;
160
161        let mut ids = Vec::new();
162        let iter = table.iter()?;
163        for item in iter {
164            let (key, _) = item?;
165            ids.push(key.value().to_string());
166        }
167
168        Ok(ids)
169    }
170
171    // Edge operations
172
173    /// Insert an edge
174    pub fn insert_edge(&self, edge: &Edge) -> Result<EdgeId> {
175        let write_txn = self.db.begin_write()?;
176        {
177            let mut table = write_txn.open_table(EDGES_TABLE)?;
178
179            // Serialize edge data
180            let edge_data = bincode::encode_to_vec(edge, config::standard())?;
181            table.insert(edge.id.as_str(), edge_data.as_slice())?;
182        }
183        write_txn.commit()?;
184
185        Ok(edge.id.clone())
186    }
187
188    /// Insert multiple edges in a batch
189    pub fn insert_edges_batch(&self, edges: &[Edge]) -> Result<Vec<EdgeId>> {
190        let write_txn = self.db.begin_write()?;
191        let mut ids = Vec::with_capacity(edges.len());
192
193        {
194            let mut table = write_txn.open_table(EDGES_TABLE)?;
195
196            for edge in edges {
197                let edge_data = bincode::encode_to_vec(edge, config::standard())?;
198                table.insert(edge.id.as_str(), edge_data.as_slice())?;
199                ids.push(edge.id.clone());
200            }
201        }
202
203        write_txn.commit()?;
204        Ok(ids)
205    }
206
207    /// Get an edge by ID
208    pub fn get_edge(&self, id: &str) -> Result<Option<Edge>> {
209        let read_txn = self.db.begin_read()?;
210        let table = read_txn.open_table(EDGES_TABLE)?;
211
212        let Some(edge_data) = table.get(id)? else {
213            return Ok(None);
214        };
215
216        let (edge, _): (Edge, usize) =
217            bincode::decode_from_slice(edge_data.value(), config::standard())?;
218        Ok(Some(edge))
219    }
220
221    /// Delete an edge by ID
222    pub fn delete_edge(&self, id: &str) -> Result<bool> {
223        let write_txn = self.db.begin_write()?;
224        let deleted;
225        {
226            let mut table = write_txn.open_table(EDGES_TABLE)?;
227            let result = table.remove(id)?;
228            deleted = result.is_some();
229        }
230        write_txn.commit()?;
231        Ok(deleted)
232    }
233
234    /// Get all edge IDs
235    pub fn all_edge_ids(&self) -> Result<Vec<EdgeId>> {
236        let read_txn = self.db.begin_read()?;
237        let table = read_txn.open_table(EDGES_TABLE)?;
238
239        let mut ids = Vec::new();
240        let iter = table.iter()?;
241        for item in iter {
242            let (key, _) = item?;
243            ids.push(key.value().to_string());
244        }
245
246        Ok(ids)
247    }
248
249    // Hyperedge operations
250
251    /// Insert a hyperedge
252    pub fn insert_hyperedge(&self, hyperedge: &Hyperedge) -> Result<HyperedgeId> {
253        let write_txn = self.db.begin_write()?;
254        {
255            let mut table = write_txn.open_table(HYPEREDGES_TABLE)?;
256
257            // Serialize hyperedge data
258            let hyperedge_data = bincode::encode_to_vec(hyperedge, config::standard())?;
259            table.insert(hyperedge.id.as_str(), hyperedge_data.as_slice())?;
260        }
261        write_txn.commit()?;
262
263        Ok(hyperedge.id.clone())
264    }
265
266    /// Insert multiple hyperedges in a batch
267    pub fn insert_hyperedges_batch(&self, hyperedges: &[Hyperedge]) -> Result<Vec<HyperedgeId>> {
268        let write_txn = self.db.begin_write()?;
269        let mut ids = Vec::with_capacity(hyperedges.len());
270
271        {
272            let mut table = write_txn.open_table(HYPEREDGES_TABLE)?;
273
274            for hyperedge in hyperedges {
275                let hyperedge_data = bincode::encode_to_vec(hyperedge, config::standard())?;
276                table.insert(hyperedge.id.as_str(), hyperedge_data.as_slice())?;
277                ids.push(hyperedge.id.clone());
278            }
279        }
280
281        write_txn.commit()?;
282        Ok(ids)
283    }
284
285    /// Get a hyperedge by ID
286    pub fn get_hyperedge(&self, id: &str) -> Result<Option<Hyperedge>> {
287        let read_txn = self.db.begin_read()?;
288        let table = read_txn.open_table(HYPEREDGES_TABLE)?;
289
290        let Some(hyperedge_data) = table.get(id)? else {
291            return Ok(None);
292        };
293
294        let (hyperedge, _): (Hyperedge, usize) =
295            bincode::decode_from_slice(hyperedge_data.value(), config::standard())?;
296        Ok(Some(hyperedge))
297    }
298
299    /// Delete a hyperedge by ID
300    pub fn delete_hyperedge(&self, id: &str) -> Result<bool> {
301        let write_txn = self.db.begin_write()?;
302        let deleted;
303        {
304            let mut table = write_txn.open_table(HYPEREDGES_TABLE)?;
305            let result = table.remove(id)?;
306            deleted = result.is_some();
307        }
308        write_txn.commit()?;
309        Ok(deleted)
310    }
311
312    /// Get all hyperedge IDs
313    pub fn all_hyperedge_ids(&self) -> Result<Vec<HyperedgeId>> {
314        let read_txn = self.db.begin_read()?;
315        let table = read_txn.open_table(HYPEREDGES_TABLE)?;
316
317        let mut ids = Vec::new();
318        let iter = table.iter()?;
319        for item in iter {
320            let (key, _) = item?;
321            ids.push(key.value().to_string());
322        }
323
324        Ok(ids)
325    }
326
327    // Metadata operations
328
329    /// Set metadata
330    pub fn set_metadata(&self, key: &str, value: &str) -> Result<()> {
331        let write_txn = self.db.begin_write()?;
332        {
333            let mut table = write_txn.open_table(METADATA_TABLE)?;
334            table.insert(key, value)?;
335        }
336        write_txn.commit()?;
337        Ok(())
338    }
339
340    /// Get metadata
341    pub fn get_metadata(&self, key: &str) -> Result<Option<String>> {
342        let read_txn = self.db.begin_read()?;
343        let table = read_txn.open_table(METADATA_TABLE)?;
344
345        let value = table.get(key)?.map(|v| v.value().to_string());
346        Ok(value)
347    }
348
349    // Statistics
350
351    /// Get the number of nodes
352    pub fn node_count(&self) -> Result<usize> {
353        let read_txn = self.db.begin_read()?;
354        let table = read_txn.open_table(NODES_TABLE)?;
355        Ok(table.iter()?.count())
356    }
357
358    /// Get the number of edges
359    pub fn edge_count(&self) -> Result<usize> {
360        let read_txn = self.db.begin_read()?;
361        let table = read_txn.open_table(EDGES_TABLE)?;
362        Ok(table.iter()?.count())
363    }
364
365    /// Get the number of hyperedges
366    pub fn hyperedge_count(&self) -> Result<usize> {
367        let read_txn = self.db.begin_read()?;
368        let table = read_txn.open_table(HYPEREDGES_TABLE)?;
369        Ok(table.iter()?.count())
370    }
371}
372
373#[cfg(test)]
374mod tests {
375    use super::*;
376    use crate::edge::EdgeBuilder;
377    use crate::hyperedge::HyperedgeBuilder;
378    use crate::node::NodeBuilder;
379    use tempfile::tempdir;
380
381    #[test]
382    fn test_node_storage() -> Result<()> {
383        let dir = tempdir()?;
384        let storage = GraphStorage::new(dir.path().join("test.db"))?;
385
386        let node = NodeBuilder::new()
387            .label("Person")
388            .property("name", "Alice")
389            .build();
390
391        let id = storage.insert_node(&node)?;
392        assert_eq!(id, node.id);
393
394        let retrieved = storage.get_node(&id)?;
395        assert!(retrieved.is_some());
396        let retrieved = retrieved.unwrap();
397        assert_eq!(retrieved.id, node.id);
398        assert!(retrieved.has_label("Person"));
399
400        Ok(())
401    }
402
403    #[test]
404    fn test_edge_storage() -> Result<()> {
405        let dir = tempdir()?;
406        let storage = GraphStorage::new(dir.path().join("test.db"))?;
407
408        let edge = EdgeBuilder::new("n1".to_string(), "n2".to_string(), "KNOWS")
409            .property("since", 2020i64)
410            .build();
411
412        let id = storage.insert_edge(&edge)?;
413        assert_eq!(id, edge.id);
414
415        let retrieved = storage.get_edge(&id)?;
416        assert!(retrieved.is_some());
417
418        Ok(())
419    }
420
421    #[test]
422    fn test_batch_insert() -> Result<()> {
423        let dir = tempdir()?;
424        let storage = GraphStorage::new(dir.path().join("test.db"))?;
425
426        let nodes = vec![
427            NodeBuilder::new().label("Person").build(),
428            NodeBuilder::new().label("Person").build(),
429        ];
430
431        let ids = storage.insert_nodes_batch(&nodes)?;
432        assert_eq!(ids.len(), 2);
433        assert_eq!(storage.node_count()?, 2);
434
435        Ok(())
436    }
437
438    #[test]
439    fn test_hyperedge_storage() -> Result<()> {
440        let dir = tempdir()?;
441        let storage = GraphStorage::new(dir.path().join("test.db"))?;
442
443        let hyperedge = HyperedgeBuilder::new(
444            vec!["n1".to_string(), "n2".to_string(), "n3".to_string()],
445            "MEETING",
446        )
447        .description("Team meeting")
448        .build();
449
450        let id = storage.insert_hyperedge(&hyperedge)?;
451        assert_eq!(id, hyperedge.id);
452
453        let retrieved = storage.get_hyperedge(&id)?;
454        assert!(retrieved.is_some());
455
456        Ok(())
457    }
458}