ruvector_graph/
storage.rs

1//! Persistent storage layer with redb and memory-mapped vectors
2//!
3//! Provides ACID-compliant storage for graph nodes, edges, and hyperedges
4
5#[cfg(feature = "storage")]
6use crate::edge::Edge;
7#[cfg(feature = "storage")]
8use crate::hyperedge::{Hyperedge, HyperedgeId};
9#[cfg(feature = "storage")]
10use crate::node::Node;
11#[cfg(feature = "storage")]
12use crate::types::{EdgeId, NodeId};
13#[cfg(feature = "storage")]
14use anyhow::Result;
15#[cfg(feature = "storage")]
16use bincode::config;
17#[cfg(feature = "storage")]
18use once_cell::sync::Lazy;
19#[cfg(feature = "storage")]
20use parking_lot::Mutex;
21#[cfg(feature = "storage")]
22use redb::{Database, ReadableTable, TableDefinition};
23#[cfg(feature = "storage")]
24use std::collections::HashMap;
25#[cfg(feature = "storage")]
26use std::path::{Path, PathBuf};
27#[cfg(feature = "storage")]
28use std::sync::Arc;
29
30#[cfg(feature = "storage")]
31// Table definitions
32const NODES_TABLE: TableDefinition<&str, &[u8]> = TableDefinition::new("nodes");
33#[cfg(feature = "storage")]
34const EDGES_TABLE: TableDefinition<&str, &[u8]> = TableDefinition::new("edges");
35#[cfg(feature = "storage")]
36const HYPEREDGES_TABLE: TableDefinition<&str, &[u8]> = TableDefinition::new("hyperedges");
37#[cfg(feature = "storage")]
38const METADATA_TABLE: TableDefinition<&str, &str> = TableDefinition::new("metadata");
39
40#[cfg(feature = "storage")]
41// Global database connection pool to allow multiple GraphStorage instances
42// to share the same underlying database file
43static DB_POOL: Lazy<Mutex<HashMap<PathBuf, Arc<Database>>>> =
44    Lazy::new(|| Mutex::new(HashMap::new()));
45
46#[cfg(feature = "storage")]
47/// Storage backend for graph database
48pub struct GraphStorage {
49    db: Arc<Database>,
50}
51
52#[cfg(feature = "storage")]
53impl GraphStorage {
54    /// Create or open a graph storage at the given path
55    ///
56    /// Uses a global connection pool to allow multiple GraphStorage
57    /// instances to share the same underlying database file
58    pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
59        let path_ref = path.as_ref();
60
61        // Create parent directories if they don't exist
62        if let Some(parent) = path_ref.parent() {
63            if !parent.as_os_str().is_empty() && !parent.exists() {
64                std::fs::create_dir_all(parent)?;
65            }
66        }
67
68        // Convert to absolute path
69        let path_buf = if path_ref.is_absolute() {
70            path_ref.to_path_buf()
71        } else {
72            std::env::current_dir()?.join(path_ref)
73        };
74
75        // SECURITY: Check for path traversal attempts
76        let path_str = path_ref.to_string_lossy();
77        if path_str.contains("..") && !path_ref.is_absolute() {
78            if let Ok(cwd) = std::env::current_dir() {
79                let mut normalized = cwd.clone();
80                for component in path_ref.components() {
81                    match component {
82                        std::path::Component::ParentDir => {
83                            if !normalized.pop() || !normalized.starts_with(&cwd) {
84                                anyhow::bail!("Path traversal attempt detected");
85                            }
86                        }
87                        std::path::Component::Normal(c) => normalized.push(c),
88                        _ => {}
89                    }
90                }
91            }
92        }
93
94        // Check if we already have a Database instance for this path
95        let db = {
96            let mut pool = DB_POOL.lock();
97
98            if let Some(existing_db) = pool.get(&path_buf) {
99                // Reuse existing database connection
100                Arc::clone(existing_db)
101            } else {
102                // Create new database and add to pool
103                let new_db = Arc::new(Database::create(&path_buf)?);
104
105                // Initialize tables
106                let write_txn = new_db.begin_write()?;
107                {
108                    let _ = write_txn.open_table(NODES_TABLE)?;
109                    let _ = write_txn.open_table(EDGES_TABLE)?;
110                    let _ = write_txn.open_table(HYPEREDGES_TABLE)?;
111                    let _ = write_txn.open_table(METADATA_TABLE)?;
112                }
113                write_txn.commit()?;
114
115                pool.insert(path_buf, Arc::clone(&new_db));
116                new_db
117            }
118        };
119
120        Ok(Self { db })
121    }
122
123    // Node operations
124
125    /// Insert a node
126    pub fn insert_node(&self, node: &Node) -> Result<NodeId> {
127        let write_txn = self.db.begin_write()?;
128        {
129            let mut table = write_txn.open_table(NODES_TABLE)?;
130
131            // Serialize node data
132            let node_data = bincode::encode_to_vec(node, config::standard())?;
133            table.insert(node.id.as_str(), node_data.as_slice())?;
134        }
135        write_txn.commit()?;
136
137        Ok(node.id.clone())
138    }
139
140    /// Insert multiple nodes in a batch
141    pub fn insert_nodes_batch(&self, nodes: &[Node]) -> Result<Vec<NodeId>> {
142        let write_txn = self.db.begin_write()?;
143        let mut ids = Vec::with_capacity(nodes.len());
144
145        {
146            let mut table = write_txn.open_table(NODES_TABLE)?;
147
148            for node in nodes {
149                let node_data = bincode::encode_to_vec(node, config::standard())?;
150                table.insert(node.id.as_str(), node_data.as_slice())?;
151                ids.push(node.id.clone());
152            }
153        }
154
155        write_txn.commit()?;
156        Ok(ids)
157    }
158
159    /// Get a node by ID
160    pub fn get_node(&self, id: &str) -> Result<Option<Node>> {
161        let read_txn = self.db.begin_read()?;
162        let table = read_txn.open_table(NODES_TABLE)?;
163
164        let Some(node_data) = table.get(id)? else {
165            return Ok(None);
166        };
167
168        let (node, _): (Node, usize) =
169            bincode::decode_from_slice(node_data.value(), config::standard())?;
170        Ok(Some(node))
171    }
172
173    /// Delete a node by ID
174    pub fn delete_node(&self, id: &str) -> Result<bool> {
175        let write_txn = self.db.begin_write()?;
176        let deleted;
177        {
178            let mut table = write_txn.open_table(NODES_TABLE)?;
179            let result = table.remove(id)?;
180            deleted = result.is_some();
181        }
182        write_txn.commit()?;
183        Ok(deleted)
184    }
185
186    /// Get all node IDs
187    pub fn all_node_ids(&self) -> Result<Vec<NodeId>> {
188        let read_txn = self.db.begin_read()?;
189        let table = read_txn.open_table(NODES_TABLE)?;
190
191        let mut ids = Vec::new();
192        let iter = table.iter()?;
193        for item in iter {
194            let (key, _) = item?;
195            ids.push(key.value().to_string());
196        }
197
198        Ok(ids)
199    }
200
201    // Edge operations
202
203    /// Insert an edge
204    pub fn insert_edge(&self, edge: &Edge) -> Result<EdgeId> {
205        let write_txn = self.db.begin_write()?;
206        {
207            let mut table = write_txn.open_table(EDGES_TABLE)?;
208
209            // Serialize edge data
210            let edge_data = bincode::encode_to_vec(edge, config::standard())?;
211            table.insert(edge.id.as_str(), edge_data.as_slice())?;
212        }
213        write_txn.commit()?;
214
215        Ok(edge.id.clone())
216    }
217
218    /// Insert multiple edges in a batch
219    pub fn insert_edges_batch(&self, edges: &[Edge]) -> Result<Vec<EdgeId>> {
220        let write_txn = self.db.begin_write()?;
221        let mut ids = Vec::with_capacity(edges.len());
222
223        {
224            let mut table = write_txn.open_table(EDGES_TABLE)?;
225
226            for edge in edges {
227                let edge_data = bincode::encode_to_vec(edge, config::standard())?;
228                table.insert(edge.id.as_str(), edge_data.as_slice())?;
229                ids.push(edge.id.clone());
230            }
231        }
232
233        write_txn.commit()?;
234        Ok(ids)
235    }
236
237    /// Get an edge by ID
238    pub fn get_edge(&self, id: &str) -> Result<Option<Edge>> {
239        let read_txn = self.db.begin_read()?;
240        let table = read_txn.open_table(EDGES_TABLE)?;
241
242        let Some(edge_data) = table.get(id)? else {
243            return Ok(None);
244        };
245
246        let (edge, _): (Edge, usize) =
247            bincode::decode_from_slice(edge_data.value(), config::standard())?;
248        Ok(Some(edge))
249    }
250
251    /// Delete an edge by ID
252    pub fn delete_edge(&self, id: &str) -> Result<bool> {
253        let write_txn = self.db.begin_write()?;
254        let deleted;
255        {
256            let mut table = write_txn.open_table(EDGES_TABLE)?;
257            let result = table.remove(id)?;
258            deleted = result.is_some();
259        }
260        write_txn.commit()?;
261        Ok(deleted)
262    }
263
264    /// Get all edge IDs
265    pub fn all_edge_ids(&self) -> Result<Vec<EdgeId>> {
266        let read_txn = self.db.begin_read()?;
267        let table = read_txn.open_table(EDGES_TABLE)?;
268
269        let mut ids = Vec::new();
270        let iter = table.iter()?;
271        for item in iter {
272            let (key, _) = item?;
273            ids.push(key.value().to_string());
274        }
275
276        Ok(ids)
277    }
278
279    // Hyperedge operations
280
281    /// Insert a hyperedge
282    pub fn insert_hyperedge(&self, hyperedge: &Hyperedge) -> Result<HyperedgeId> {
283        let write_txn = self.db.begin_write()?;
284        {
285            let mut table = write_txn.open_table(HYPEREDGES_TABLE)?;
286
287            // Serialize hyperedge data
288            let hyperedge_data = bincode::encode_to_vec(hyperedge, config::standard())?;
289            table.insert(hyperedge.id.as_str(), hyperedge_data.as_slice())?;
290        }
291        write_txn.commit()?;
292
293        Ok(hyperedge.id.clone())
294    }
295
296    /// Insert multiple hyperedges in a batch
297    pub fn insert_hyperedges_batch(&self, hyperedges: &[Hyperedge]) -> Result<Vec<HyperedgeId>> {
298        let write_txn = self.db.begin_write()?;
299        let mut ids = Vec::with_capacity(hyperedges.len());
300
301        {
302            let mut table = write_txn.open_table(HYPEREDGES_TABLE)?;
303
304            for hyperedge in hyperedges {
305                let hyperedge_data = bincode::encode_to_vec(hyperedge, config::standard())?;
306                table.insert(hyperedge.id.as_str(), hyperedge_data.as_slice())?;
307                ids.push(hyperedge.id.clone());
308            }
309        }
310
311        write_txn.commit()?;
312        Ok(ids)
313    }
314
315    /// Get a hyperedge by ID
316    pub fn get_hyperedge(&self, id: &str) -> Result<Option<Hyperedge>> {
317        let read_txn = self.db.begin_read()?;
318        let table = read_txn.open_table(HYPEREDGES_TABLE)?;
319
320        let Some(hyperedge_data) = table.get(id)? else {
321            return Ok(None);
322        };
323
324        let (hyperedge, _): (Hyperedge, usize) =
325            bincode::decode_from_slice(hyperedge_data.value(), config::standard())?;
326        Ok(Some(hyperedge))
327    }
328
329    /// Delete a hyperedge by ID
330    pub fn delete_hyperedge(&self, id: &str) -> Result<bool> {
331        let write_txn = self.db.begin_write()?;
332        let deleted;
333        {
334            let mut table = write_txn.open_table(HYPEREDGES_TABLE)?;
335            let result = table.remove(id)?;
336            deleted = result.is_some();
337        }
338        write_txn.commit()?;
339        Ok(deleted)
340    }
341
342    /// Get all hyperedge IDs
343    pub fn all_hyperedge_ids(&self) -> Result<Vec<HyperedgeId>> {
344        let read_txn = self.db.begin_read()?;
345        let table = read_txn.open_table(HYPEREDGES_TABLE)?;
346
347        let mut ids = Vec::new();
348        let iter = table.iter()?;
349        for item in iter {
350            let (key, _) = item?;
351            ids.push(key.value().to_string());
352        }
353
354        Ok(ids)
355    }
356
357    // Metadata operations
358
359    /// Set metadata
360    pub fn set_metadata(&self, key: &str, value: &str) -> Result<()> {
361        let write_txn = self.db.begin_write()?;
362        {
363            let mut table = write_txn.open_table(METADATA_TABLE)?;
364            table.insert(key, value)?;
365        }
366        write_txn.commit()?;
367        Ok(())
368    }
369
370    /// Get metadata
371    pub fn get_metadata(&self, key: &str) -> Result<Option<String>> {
372        let read_txn = self.db.begin_read()?;
373        let table = read_txn.open_table(METADATA_TABLE)?;
374
375        let value = table.get(key)?.map(|v| v.value().to_string());
376        Ok(value)
377    }
378
379    // Statistics
380
381    /// Get the number of nodes
382    pub fn node_count(&self) -> Result<usize> {
383        let read_txn = self.db.begin_read()?;
384        let table = read_txn.open_table(NODES_TABLE)?;
385        Ok(table.iter()?.count())
386    }
387
388    /// Get the number of edges
389    pub fn edge_count(&self) -> Result<usize> {
390        let read_txn = self.db.begin_read()?;
391        let table = read_txn.open_table(EDGES_TABLE)?;
392        Ok(table.iter()?.count())
393    }
394
395    /// Get the number of hyperedges
396    pub fn hyperedge_count(&self) -> Result<usize> {
397        let read_txn = self.db.begin_read()?;
398        let table = read_txn.open_table(HYPEREDGES_TABLE)?;
399        Ok(table.iter()?.count())
400    }
401}
402
403#[cfg(test)]
404mod tests {
405    use super::*;
406    use crate::edge::EdgeBuilder;
407    use crate::hyperedge::HyperedgeBuilder;
408    use crate::node::NodeBuilder;
409    use tempfile::tempdir;
410
411    #[test]
412    fn test_node_storage() -> Result<()> {
413        let dir = tempdir()?;
414        let storage = GraphStorage::new(dir.path().join("test.db"))?;
415
416        let node = NodeBuilder::new()
417            .label("Person")
418            .property("name", "Alice")
419            .build();
420
421        let id = storage.insert_node(&node)?;
422        assert_eq!(id, node.id);
423
424        let retrieved = storage.get_node(&id)?;
425        assert!(retrieved.is_some());
426        let retrieved = retrieved.unwrap();
427        assert_eq!(retrieved.id, node.id);
428        assert!(retrieved.has_label("Person"));
429
430        Ok(())
431    }
432
433    #[test]
434    fn test_edge_storage() -> Result<()> {
435        let dir = tempdir()?;
436        let storage = GraphStorage::new(dir.path().join("test.db"))?;
437
438        let edge = EdgeBuilder::new("n1".to_string(), "n2".to_string(), "KNOWS")
439            .property("since", 2020i64)
440            .build();
441
442        let id = storage.insert_edge(&edge)?;
443        assert_eq!(id, edge.id);
444
445        let retrieved = storage.get_edge(&id)?;
446        assert!(retrieved.is_some());
447
448        Ok(())
449    }
450
451    #[test]
452    fn test_batch_insert() -> Result<()> {
453        let dir = tempdir()?;
454        let storage = GraphStorage::new(dir.path().join("test.db"))?;
455
456        let nodes = vec![
457            NodeBuilder::new().label("Person").build(),
458            NodeBuilder::new().label("Person").build(),
459        ];
460
461        let ids = storage.insert_nodes_batch(&nodes)?;
462        assert_eq!(ids.len(), 2);
463        assert_eq!(storage.node_count()?, 2);
464
465        Ok(())
466    }
467
468    #[test]
469    fn test_hyperedge_storage() -> Result<()> {
470        let dir = tempdir()?;
471        let storage = GraphStorage::new(dir.path().join("test.db"))?;
472
473        let hyperedge = HyperedgeBuilder::new(
474            vec!["n1".to_string(), "n2".to_string(), "n3".to_string()],
475            "MEETING",
476        )
477        .description("Team meeting")
478        .build();
479
480        let id = storage.insert_hyperedge(&hyperedge)?;
481        assert_eq!(id, hyperedge.id);
482
483        let retrieved = storage.get_hyperedge(&id)?;
484        assert!(retrieved.is_some());
485
486        Ok(())
487    }
488}