ruvector_graph/
storage.rs1#[cfg(feature = "storage")]
6use crate::edge::Edge;
7#[cfg(feature = "storage")]
8use crate::hyperedge::{Hyperedge, HyperedgeId};
9#[cfg(feature = "storage")]
10use crate::node::Node;
11#[cfg(feature = "storage")]
12use crate::types::{EdgeId, NodeId};
13#[cfg(feature = "storage")]
14use anyhow::Result;
15#[cfg(feature = "storage")]
16use bincode::config;
17#[cfg(feature = "storage")]
18use once_cell::sync::Lazy;
19#[cfg(feature = "storage")]
20use parking_lot::Mutex;
21#[cfg(feature = "storage")]
22use redb::{Database, ReadableTable, TableDefinition};
23#[cfg(feature = "storage")]
24use std::collections::HashMap;
25#[cfg(feature = "storage")]
26use std::path::{Path, PathBuf};
27#[cfg(feature = "storage")]
28use std::sync::Arc;
29
30#[cfg(feature = "storage")]
31const NODES_TABLE: TableDefinition<&str, &[u8]> = TableDefinition::new("nodes");
33#[cfg(feature = "storage")]
34const EDGES_TABLE: TableDefinition<&str, &[u8]> = TableDefinition::new("edges");
35#[cfg(feature = "storage")]
36const HYPEREDGES_TABLE: TableDefinition<&str, &[u8]> = TableDefinition::new("hyperedges");
37#[cfg(feature = "storage")]
38const METADATA_TABLE: TableDefinition<&str, &str> = TableDefinition::new("metadata");
39
40#[cfg(feature = "storage")]
41static DB_POOL: Lazy<Mutex<HashMap<PathBuf, Arc<Database>>>> =
44 Lazy::new(|| Mutex::new(HashMap::new()));
45
46#[cfg(feature = "storage")]
47pub struct GraphStorage {
49 db: Arc<Database>,
50}
51
52#[cfg(feature = "storage")]
53impl GraphStorage {
54 pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
59 let path_ref = path.as_ref();
60
61 if let Some(parent) = path_ref.parent() {
63 if !parent.as_os_str().is_empty() && !parent.exists() {
64 std::fs::create_dir_all(parent)?;
65 }
66 }
67
68 let path_buf = if path_ref.is_absolute() {
70 path_ref.to_path_buf()
71 } else {
72 std::env::current_dir()?.join(path_ref)
73 };
74
75 let path_str = path_ref.to_string_lossy();
77 if path_str.contains("..") && !path_ref.is_absolute() {
78 if let Ok(cwd) = std::env::current_dir() {
79 let mut normalized = cwd.clone();
80 for component in path_ref.components() {
81 match component {
82 std::path::Component::ParentDir => {
83 if !normalized.pop() || !normalized.starts_with(&cwd) {
84 anyhow::bail!("Path traversal attempt detected");
85 }
86 }
87 std::path::Component::Normal(c) => normalized.push(c),
88 _ => {}
89 }
90 }
91 }
92 }
93
94 let db = {
96 let mut pool = DB_POOL.lock();
97
98 if let Some(existing_db) = pool.get(&path_buf) {
99 Arc::clone(existing_db)
101 } else {
102 let new_db = Arc::new(Database::create(&path_buf)?);
104
105 let write_txn = new_db.begin_write()?;
107 {
108 let _ = write_txn.open_table(NODES_TABLE)?;
109 let _ = write_txn.open_table(EDGES_TABLE)?;
110 let _ = write_txn.open_table(HYPEREDGES_TABLE)?;
111 let _ = write_txn.open_table(METADATA_TABLE)?;
112 }
113 write_txn.commit()?;
114
115 pool.insert(path_buf, Arc::clone(&new_db));
116 new_db
117 }
118 };
119
120 Ok(Self { db })
121 }
122
123 pub fn insert_node(&self, node: &Node) -> Result<NodeId> {
127 let write_txn = self.db.begin_write()?;
128 {
129 let mut table = write_txn.open_table(NODES_TABLE)?;
130
131 let node_data = bincode::encode_to_vec(node, config::standard())?;
133 table.insert(node.id.as_str(), node_data.as_slice())?;
134 }
135 write_txn.commit()?;
136
137 Ok(node.id.clone())
138 }
139
140 pub fn insert_nodes_batch(&self, nodes: &[Node]) -> Result<Vec<NodeId>> {
142 let write_txn = self.db.begin_write()?;
143 let mut ids = Vec::with_capacity(nodes.len());
144
145 {
146 let mut table = write_txn.open_table(NODES_TABLE)?;
147
148 for node in nodes {
149 let node_data = bincode::encode_to_vec(node, config::standard())?;
150 table.insert(node.id.as_str(), node_data.as_slice())?;
151 ids.push(node.id.clone());
152 }
153 }
154
155 write_txn.commit()?;
156 Ok(ids)
157 }
158
159 pub fn get_node(&self, id: &str) -> Result<Option<Node>> {
161 let read_txn = self.db.begin_read()?;
162 let table = read_txn.open_table(NODES_TABLE)?;
163
164 let Some(node_data) = table.get(id)? else {
165 return Ok(None);
166 };
167
168 let (node, _): (Node, usize) =
169 bincode::decode_from_slice(node_data.value(), config::standard())?;
170 Ok(Some(node))
171 }
172
173 pub fn delete_node(&self, id: &str) -> Result<bool> {
175 let write_txn = self.db.begin_write()?;
176 let deleted;
177 {
178 let mut table = write_txn.open_table(NODES_TABLE)?;
179 let result = table.remove(id)?;
180 deleted = result.is_some();
181 }
182 write_txn.commit()?;
183 Ok(deleted)
184 }
185
186 pub fn all_node_ids(&self) -> Result<Vec<NodeId>> {
188 let read_txn = self.db.begin_read()?;
189 let table = read_txn.open_table(NODES_TABLE)?;
190
191 let mut ids = Vec::new();
192 let iter = table.iter()?;
193 for item in iter {
194 let (key, _) = item?;
195 ids.push(key.value().to_string());
196 }
197
198 Ok(ids)
199 }
200
201 pub fn insert_edge(&self, edge: &Edge) -> Result<EdgeId> {
205 let write_txn = self.db.begin_write()?;
206 {
207 let mut table = write_txn.open_table(EDGES_TABLE)?;
208
209 let edge_data = bincode::encode_to_vec(edge, config::standard())?;
211 table.insert(edge.id.as_str(), edge_data.as_slice())?;
212 }
213 write_txn.commit()?;
214
215 Ok(edge.id.clone())
216 }
217
218 pub fn insert_edges_batch(&self, edges: &[Edge]) -> Result<Vec<EdgeId>> {
220 let write_txn = self.db.begin_write()?;
221 let mut ids = Vec::with_capacity(edges.len());
222
223 {
224 let mut table = write_txn.open_table(EDGES_TABLE)?;
225
226 for edge in edges {
227 let edge_data = bincode::encode_to_vec(edge, config::standard())?;
228 table.insert(edge.id.as_str(), edge_data.as_slice())?;
229 ids.push(edge.id.clone());
230 }
231 }
232
233 write_txn.commit()?;
234 Ok(ids)
235 }
236
237 pub fn get_edge(&self, id: &str) -> Result<Option<Edge>> {
239 let read_txn = self.db.begin_read()?;
240 let table = read_txn.open_table(EDGES_TABLE)?;
241
242 let Some(edge_data) = table.get(id)? else {
243 return Ok(None);
244 };
245
246 let (edge, _): (Edge, usize) =
247 bincode::decode_from_slice(edge_data.value(), config::standard())?;
248 Ok(Some(edge))
249 }
250
251 pub fn delete_edge(&self, id: &str) -> Result<bool> {
253 let write_txn = self.db.begin_write()?;
254 let deleted;
255 {
256 let mut table = write_txn.open_table(EDGES_TABLE)?;
257 let result = table.remove(id)?;
258 deleted = result.is_some();
259 }
260 write_txn.commit()?;
261 Ok(deleted)
262 }
263
264 pub fn all_edge_ids(&self) -> Result<Vec<EdgeId>> {
266 let read_txn = self.db.begin_read()?;
267 let table = read_txn.open_table(EDGES_TABLE)?;
268
269 let mut ids = Vec::new();
270 let iter = table.iter()?;
271 for item in iter {
272 let (key, _) = item?;
273 ids.push(key.value().to_string());
274 }
275
276 Ok(ids)
277 }
278
279 pub fn insert_hyperedge(&self, hyperedge: &Hyperedge) -> Result<HyperedgeId> {
283 let write_txn = self.db.begin_write()?;
284 {
285 let mut table = write_txn.open_table(HYPEREDGES_TABLE)?;
286
287 let hyperedge_data = bincode::encode_to_vec(hyperedge, config::standard())?;
289 table.insert(hyperedge.id.as_str(), hyperedge_data.as_slice())?;
290 }
291 write_txn.commit()?;
292
293 Ok(hyperedge.id.clone())
294 }
295
296 pub fn insert_hyperedges_batch(&self, hyperedges: &[Hyperedge]) -> Result<Vec<HyperedgeId>> {
298 let write_txn = self.db.begin_write()?;
299 let mut ids = Vec::with_capacity(hyperedges.len());
300
301 {
302 let mut table = write_txn.open_table(HYPEREDGES_TABLE)?;
303
304 for hyperedge in hyperedges {
305 let hyperedge_data = bincode::encode_to_vec(hyperedge, config::standard())?;
306 table.insert(hyperedge.id.as_str(), hyperedge_data.as_slice())?;
307 ids.push(hyperedge.id.clone());
308 }
309 }
310
311 write_txn.commit()?;
312 Ok(ids)
313 }
314
315 pub fn get_hyperedge(&self, id: &str) -> Result<Option<Hyperedge>> {
317 let read_txn = self.db.begin_read()?;
318 let table = read_txn.open_table(HYPEREDGES_TABLE)?;
319
320 let Some(hyperedge_data) = table.get(id)? else {
321 return Ok(None);
322 };
323
324 let (hyperedge, _): (Hyperedge, usize) =
325 bincode::decode_from_slice(hyperedge_data.value(), config::standard())?;
326 Ok(Some(hyperedge))
327 }
328
329 pub fn delete_hyperedge(&self, id: &str) -> Result<bool> {
331 let write_txn = self.db.begin_write()?;
332 let deleted;
333 {
334 let mut table = write_txn.open_table(HYPEREDGES_TABLE)?;
335 let result = table.remove(id)?;
336 deleted = result.is_some();
337 }
338 write_txn.commit()?;
339 Ok(deleted)
340 }
341
342 pub fn all_hyperedge_ids(&self) -> Result<Vec<HyperedgeId>> {
344 let read_txn = self.db.begin_read()?;
345 let table = read_txn.open_table(HYPEREDGES_TABLE)?;
346
347 let mut ids = Vec::new();
348 let iter = table.iter()?;
349 for item in iter {
350 let (key, _) = item?;
351 ids.push(key.value().to_string());
352 }
353
354 Ok(ids)
355 }
356
357 pub fn set_metadata(&self, key: &str, value: &str) -> Result<()> {
361 let write_txn = self.db.begin_write()?;
362 {
363 let mut table = write_txn.open_table(METADATA_TABLE)?;
364 table.insert(key, value)?;
365 }
366 write_txn.commit()?;
367 Ok(())
368 }
369
370 pub fn get_metadata(&self, key: &str) -> Result<Option<String>> {
372 let read_txn = self.db.begin_read()?;
373 let table = read_txn.open_table(METADATA_TABLE)?;
374
375 let value = table.get(key)?.map(|v| v.value().to_string());
376 Ok(value)
377 }
378
379 pub fn node_count(&self) -> Result<usize> {
383 let read_txn = self.db.begin_read()?;
384 let table = read_txn.open_table(NODES_TABLE)?;
385 Ok(table.iter()?.count())
386 }
387
388 pub fn edge_count(&self) -> Result<usize> {
390 let read_txn = self.db.begin_read()?;
391 let table = read_txn.open_table(EDGES_TABLE)?;
392 Ok(table.iter()?.count())
393 }
394
395 pub fn hyperedge_count(&self) -> Result<usize> {
397 let read_txn = self.db.begin_read()?;
398 let table = read_txn.open_table(HYPEREDGES_TABLE)?;
399 Ok(table.iter()?.count())
400 }
401}
402
403#[cfg(test)]
404mod tests {
405 use super::*;
406 use crate::edge::EdgeBuilder;
407 use crate::hyperedge::HyperedgeBuilder;
408 use crate::node::NodeBuilder;
409 use tempfile::tempdir;
410
411 #[test]
412 fn test_node_storage() -> Result<()> {
413 let dir = tempdir()?;
414 let storage = GraphStorage::new(dir.path().join("test.db"))?;
415
416 let node = NodeBuilder::new()
417 .label("Person")
418 .property("name", "Alice")
419 .build();
420
421 let id = storage.insert_node(&node)?;
422 assert_eq!(id, node.id);
423
424 let retrieved = storage.get_node(&id)?;
425 assert!(retrieved.is_some());
426 let retrieved = retrieved.unwrap();
427 assert_eq!(retrieved.id, node.id);
428 assert!(retrieved.has_label("Person"));
429
430 Ok(())
431 }
432
433 #[test]
434 fn test_edge_storage() -> Result<()> {
435 let dir = tempdir()?;
436 let storage = GraphStorage::new(dir.path().join("test.db"))?;
437
438 let edge = EdgeBuilder::new("n1".to_string(), "n2".to_string(), "KNOWS")
439 .property("since", 2020i64)
440 .build();
441
442 let id = storage.insert_edge(&edge)?;
443 assert_eq!(id, edge.id);
444
445 let retrieved = storage.get_edge(&id)?;
446 assert!(retrieved.is_some());
447
448 Ok(())
449 }
450
451 #[test]
452 fn test_batch_insert() -> Result<()> {
453 let dir = tempdir()?;
454 let storage = GraphStorage::new(dir.path().join("test.db"))?;
455
456 let nodes = vec![
457 NodeBuilder::new().label("Person").build(),
458 NodeBuilder::new().label("Person").build(),
459 ];
460
461 let ids = storage.insert_nodes_batch(&nodes)?;
462 assert_eq!(ids.len(), 2);
463 assert_eq!(storage.node_count()?, 2);
464
465 Ok(())
466 }
467
468 #[test]
469 fn test_hyperedge_storage() -> Result<()> {
470 let dir = tempdir()?;
471 let storage = GraphStorage::new(dir.path().join("test.db"))?;
472
473 let hyperedge = HyperedgeBuilder::new(
474 vec!["n1".to_string(), "n2".to_string(), "n3".to_string()],
475 "MEETING",
476 )
477 .description("Team meeting")
478 .build();
479
480 let id = storage.insert_hyperedge(&hyperedge)?;
481 assert_eq!(id, hyperedge.id);
482
483 let retrieved = storage.get_hyperedge(&id)?;
484 assert!(retrieved.is_some());
485
486 Ok(())
487 }
488}