ruvector_graph/
storage.rs1#[cfg(feature = "storage")]
6use crate::edge::Edge;
7#[cfg(feature = "storage")]
8use crate::hyperedge::{Hyperedge, HyperedgeId};
9#[cfg(feature = "storage")]
10use crate::node::Node;
11#[cfg(feature = "storage")]
12use crate::types::{EdgeId, NodeId};
13#[cfg(feature = "storage")]
14use anyhow::Result;
15#[cfg(feature = "storage")]
16use bincode::config;
17#[cfg(feature = "storage")]
18use once_cell::sync::Lazy;
19#[cfg(feature = "storage")]
20use parking_lot::Mutex;
21#[cfg(feature = "storage")]
22use redb::{Database, ReadableTable, TableDefinition};
23#[cfg(feature = "storage")]
24use std::collections::HashMap;
25#[cfg(feature = "storage")]
26use std::path::{Path, PathBuf};
27#[cfg(feature = "storage")]
28use std::sync::Arc;
29
30#[cfg(feature = "storage")]
31const NODES_TABLE: TableDefinition<&str, &[u8]> = TableDefinition::new("nodes");
33#[cfg(feature = "storage")]
34const EDGES_TABLE: TableDefinition<&str, &[u8]> = TableDefinition::new("edges");
35#[cfg(feature = "storage")]
36const HYPEREDGES_TABLE: TableDefinition<&str, &[u8]> = TableDefinition::new("hyperedges");
37#[cfg(feature = "storage")]
38const METADATA_TABLE: TableDefinition<&str, &str> = TableDefinition::new("metadata");
39
40#[cfg(feature = "storage")]
41static DB_POOL: Lazy<Mutex<HashMap<PathBuf, Arc<Database>>>> =
44 Lazy::new(|| Mutex::new(HashMap::new()));
45
46#[cfg(feature = "storage")]
47pub struct GraphStorage {
49 db: Arc<Database>,
50}
51
52#[cfg(feature = "storage")]
53impl GraphStorage {
54 pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
59 let path_buf = path
60 .as_ref()
61 .canonicalize()
62 .unwrap_or_else(|_| path.as_ref().to_path_buf());
63
64 let db = {
66 let mut pool = DB_POOL.lock();
67
68 if let Some(existing_db) = pool.get(&path_buf) {
69 Arc::clone(existing_db)
71 } else {
72 let new_db = Arc::new(Database::create(&path_buf)?);
74
75 let write_txn = new_db.begin_write()?;
77 {
78 let _ = write_txn.open_table(NODES_TABLE)?;
79 let _ = write_txn.open_table(EDGES_TABLE)?;
80 let _ = write_txn.open_table(HYPEREDGES_TABLE)?;
81 let _ = write_txn.open_table(METADATA_TABLE)?;
82 }
83 write_txn.commit()?;
84
85 pool.insert(path_buf, Arc::clone(&new_db));
86 new_db
87 }
88 };
89
90 Ok(Self { db })
91 }
92
93 pub fn insert_node(&self, node: &Node) -> Result<NodeId> {
97 let write_txn = self.db.begin_write()?;
98 {
99 let mut table = write_txn.open_table(NODES_TABLE)?;
100
101 let node_data = bincode::encode_to_vec(node, config::standard())?;
103 table.insert(node.id.as_str(), node_data.as_slice())?;
104 }
105 write_txn.commit()?;
106
107 Ok(node.id.clone())
108 }
109
110 pub fn insert_nodes_batch(&self, nodes: &[Node]) -> Result<Vec<NodeId>> {
112 let write_txn = self.db.begin_write()?;
113 let mut ids = Vec::with_capacity(nodes.len());
114
115 {
116 let mut table = write_txn.open_table(NODES_TABLE)?;
117
118 for node in nodes {
119 let node_data = bincode::encode_to_vec(node, config::standard())?;
120 table.insert(node.id.as_str(), node_data.as_slice())?;
121 ids.push(node.id.clone());
122 }
123 }
124
125 write_txn.commit()?;
126 Ok(ids)
127 }
128
129 pub fn get_node(&self, id: &str) -> Result<Option<Node>> {
131 let read_txn = self.db.begin_read()?;
132 let table = read_txn.open_table(NODES_TABLE)?;
133
134 let Some(node_data) = table.get(id)? else {
135 return Ok(None);
136 };
137
138 let (node, _): (Node, usize) =
139 bincode::decode_from_slice(node_data.value(), config::standard())?;
140 Ok(Some(node))
141 }
142
143 pub fn delete_node(&self, id: &str) -> Result<bool> {
145 let write_txn = self.db.begin_write()?;
146 let deleted;
147 {
148 let mut table = write_txn.open_table(NODES_TABLE)?;
149 let result = table.remove(id)?;
150 deleted = result.is_some();
151 }
152 write_txn.commit()?;
153 Ok(deleted)
154 }
155
156 pub fn all_node_ids(&self) -> Result<Vec<NodeId>> {
158 let read_txn = self.db.begin_read()?;
159 let table = read_txn.open_table(NODES_TABLE)?;
160
161 let mut ids = Vec::new();
162 let iter = table.iter()?;
163 for item in iter {
164 let (key, _) = item?;
165 ids.push(key.value().to_string());
166 }
167
168 Ok(ids)
169 }
170
171 pub fn insert_edge(&self, edge: &Edge) -> Result<EdgeId> {
175 let write_txn = self.db.begin_write()?;
176 {
177 let mut table = write_txn.open_table(EDGES_TABLE)?;
178
179 let edge_data = bincode::encode_to_vec(edge, config::standard())?;
181 table.insert(edge.id.as_str(), edge_data.as_slice())?;
182 }
183 write_txn.commit()?;
184
185 Ok(edge.id.clone())
186 }
187
188 pub fn insert_edges_batch(&self, edges: &[Edge]) -> Result<Vec<EdgeId>> {
190 let write_txn = self.db.begin_write()?;
191 let mut ids = Vec::with_capacity(edges.len());
192
193 {
194 let mut table = write_txn.open_table(EDGES_TABLE)?;
195
196 for edge in edges {
197 let edge_data = bincode::encode_to_vec(edge, config::standard())?;
198 table.insert(edge.id.as_str(), edge_data.as_slice())?;
199 ids.push(edge.id.clone());
200 }
201 }
202
203 write_txn.commit()?;
204 Ok(ids)
205 }
206
207 pub fn get_edge(&self, id: &str) -> Result<Option<Edge>> {
209 let read_txn = self.db.begin_read()?;
210 let table = read_txn.open_table(EDGES_TABLE)?;
211
212 let Some(edge_data) = table.get(id)? else {
213 return Ok(None);
214 };
215
216 let (edge, _): (Edge, usize) =
217 bincode::decode_from_slice(edge_data.value(), config::standard())?;
218 Ok(Some(edge))
219 }
220
221 pub fn delete_edge(&self, id: &str) -> Result<bool> {
223 let write_txn = self.db.begin_write()?;
224 let deleted;
225 {
226 let mut table = write_txn.open_table(EDGES_TABLE)?;
227 let result = table.remove(id)?;
228 deleted = result.is_some();
229 }
230 write_txn.commit()?;
231 Ok(deleted)
232 }
233
234 pub fn all_edge_ids(&self) -> Result<Vec<EdgeId>> {
236 let read_txn = self.db.begin_read()?;
237 let table = read_txn.open_table(EDGES_TABLE)?;
238
239 let mut ids = Vec::new();
240 let iter = table.iter()?;
241 for item in iter {
242 let (key, _) = item?;
243 ids.push(key.value().to_string());
244 }
245
246 Ok(ids)
247 }
248
249 pub fn insert_hyperedge(&self, hyperedge: &Hyperedge) -> Result<HyperedgeId> {
253 let write_txn = self.db.begin_write()?;
254 {
255 let mut table = write_txn.open_table(HYPEREDGES_TABLE)?;
256
257 let hyperedge_data = bincode::encode_to_vec(hyperedge, config::standard())?;
259 table.insert(hyperedge.id.as_str(), hyperedge_data.as_slice())?;
260 }
261 write_txn.commit()?;
262
263 Ok(hyperedge.id.clone())
264 }
265
266 pub fn insert_hyperedges_batch(&self, hyperedges: &[Hyperedge]) -> Result<Vec<HyperedgeId>> {
268 let write_txn = self.db.begin_write()?;
269 let mut ids = Vec::with_capacity(hyperedges.len());
270
271 {
272 let mut table = write_txn.open_table(HYPEREDGES_TABLE)?;
273
274 for hyperedge in hyperedges {
275 let hyperedge_data = bincode::encode_to_vec(hyperedge, config::standard())?;
276 table.insert(hyperedge.id.as_str(), hyperedge_data.as_slice())?;
277 ids.push(hyperedge.id.clone());
278 }
279 }
280
281 write_txn.commit()?;
282 Ok(ids)
283 }
284
285 pub fn get_hyperedge(&self, id: &str) -> Result<Option<Hyperedge>> {
287 let read_txn = self.db.begin_read()?;
288 let table = read_txn.open_table(HYPEREDGES_TABLE)?;
289
290 let Some(hyperedge_data) = table.get(id)? else {
291 return Ok(None);
292 };
293
294 let (hyperedge, _): (Hyperedge, usize) =
295 bincode::decode_from_slice(hyperedge_data.value(), config::standard())?;
296 Ok(Some(hyperedge))
297 }
298
299 pub fn delete_hyperedge(&self, id: &str) -> Result<bool> {
301 let write_txn = self.db.begin_write()?;
302 let deleted;
303 {
304 let mut table = write_txn.open_table(HYPEREDGES_TABLE)?;
305 let result = table.remove(id)?;
306 deleted = result.is_some();
307 }
308 write_txn.commit()?;
309 Ok(deleted)
310 }
311
312 pub fn all_hyperedge_ids(&self) -> Result<Vec<HyperedgeId>> {
314 let read_txn = self.db.begin_read()?;
315 let table = read_txn.open_table(HYPEREDGES_TABLE)?;
316
317 let mut ids = Vec::new();
318 let iter = table.iter()?;
319 for item in iter {
320 let (key, _) = item?;
321 ids.push(key.value().to_string());
322 }
323
324 Ok(ids)
325 }
326
327 pub fn set_metadata(&self, key: &str, value: &str) -> Result<()> {
331 let write_txn = self.db.begin_write()?;
332 {
333 let mut table = write_txn.open_table(METADATA_TABLE)?;
334 table.insert(key, value)?;
335 }
336 write_txn.commit()?;
337 Ok(())
338 }
339
340 pub fn get_metadata(&self, key: &str) -> Result<Option<String>> {
342 let read_txn = self.db.begin_read()?;
343 let table = read_txn.open_table(METADATA_TABLE)?;
344
345 let value = table.get(key)?.map(|v| v.value().to_string());
346 Ok(value)
347 }
348
349 pub fn node_count(&self) -> Result<usize> {
353 let read_txn = self.db.begin_read()?;
354 let table = read_txn.open_table(NODES_TABLE)?;
355 Ok(table.iter()?.count())
356 }
357
358 pub fn edge_count(&self) -> Result<usize> {
360 let read_txn = self.db.begin_read()?;
361 let table = read_txn.open_table(EDGES_TABLE)?;
362 Ok(table.iter()?.count())
363 }
364
365 pub fn hyperedge_count(&self) -> Result<usize> {
367 let read_txn = self.db.begin_read()?;
368 let table = read_txn.open_table(HYPEREDGES_TABLE)?;
369 Ok(table.iter()?.count())
370 }
371}
372
373#[cfg(test)]
374mod tests {
375 use super::*;
376 use crate::edge::EdgeBuilder;
377 use crate::hyperedge::HyperedgeBuilder;
378 use crate::node::NodeBuilder;
379 use tempfile::tempdir;
380
381 #[test]
382 fn test_node_storage() -> Result<()> {
383 let dir = tempdir()?;
384 let storage = GraphStorage::new(dir.path().join("test.db"))?;
385
386 let node = NodeBuilder::new()
387 .label("Person")
388 .property("name", "Alice")
389 .build();
390
391 let id = storage.insert_node(&node)?;
392 assert_eq!(id, node.id);
393
394 let retrieved = storage.get_node(&id)?;
395 assert!(retrieved.is_some());
396 let retrieved = retrieved.unwrap();
397 assert_eq!(retrieved.id, node.id);
398 assert!(retrieved.has_label("Person"));
399
400 Ok(())
401 }
402
403 #[test]
404 fn test_edge_storage() -> Result<()> {
405 let dir = tempdir()?;
406 let storage = GraphStorage::new(dir.path().join("test.db"))?;
407
408 let edge = EdgeBuilder::new("n1".to_string(), "n2".to_string(), "KNOWS")
409 .property("since", 2020i64)
410 .build();
411
412 let id = storage.insert_edge(&edge)?;
413 assert_eq!(id, edge.id);
414
415 let retrieved = storage.get_edge(&id)?;
416 assert!(retrieved.is_some());
417
418 Ok(())
419 }
420
421 #[test]
422 fn test_batch_insert() -> Result<()> {
423 let dir = tempdir()?;
424 let storage = GraphStorage::new(dir.path().join("test.db"))?;
425
426 let nodes = vec![
427 NodeBuilder::new().label("Person").build(),
428 NodeBuilder::new().label("Person").build(),
429 ];
430
431 let ids = storage.insert_nodes_batch(&nodes)?;
432 assert_eq!(ids.len(), 2);
433 assert_eq!(storage.node_count()?, 2);
434
435 Ok(())
436 }
437
438 #[test]
439 fn test_hyperedge_storage() -> Result<()> {
440 let dir = tempdir()?;
441 let storage = GraphStorage::new(dir.path().join("test.db"))?;
442
443 let hyperedge = HyperedgeBuilder::new(
444 vec!["n1".to_string(), "n2".to_string(), "n3".to_string()],
445 "MEETING",
446 )
447 .description("Team meeting")
448 .build();
449
450 let id = storage.insert_hyperedge(&hyperedge)?;
451 assert_eq!(id, hyperedge.id);
452
453 let retrieved = storage.get_hyperedge(&id)?;
454 assert!(retrieved.is_some());
455
456 Ok(())
457 }
458}