Skip to main content

ruvector_graph/
graph.rs

1//! Graph database implementation with concurrent access and indexing
2
3use crate::edge::Edge;
4use crate::error::Result;
5use crate::hyperedge::{Hyperedge, HyperedgeId};
6use crate::index::{AdjacencyIndex, EdgeTypeIndex, HyperedgeNodeIndex, LabelIndex, PropertyIndex};
7use crate::node::Node;
8#[cfg(feature = "storage")]
9use crate::storage::GraphStorage;
10use crate::types::{EdgeId, NodeId, PropertyValue};
11use dashmap::DashMap;
12#[cfg(feature = "storage")]
13use std::path::Path;
14use std::sync::Arc;
15
16/// High-performance graph database with concurrent access
17pub struct GraphDB {
18    /// In-memory node storage (DashMap for lock-free concurrent reads)
19    nodes: Arc<DashMap<NodeId, Node>>,
20    /// In-memory edge storage
21    edges: Arc<DashMap<EdgeId, Edge>>,
22    /// In-memory hyperedge storage
23    hyperedges: Arc<DashMap<HyperedgeId, Hyperedge>>,
24    /// Label index for fast label-based lookups
25    label_index: LabelIndex,
26    /// Property index for fast property-based lookups
27    property_index: PropertyIndex,
28    /// Edge type index
29    edge_type_index: EdgeTypeIndex,
30    /// Adjacency index for neighbor lookups
31    adjacency_index: AdjacencyIndex,
32    /// Hyperedge node index
33    hyperedge_node_index: HyperedgeNodeIndex,
34    /// Optional persistent storage
35    #[cfg(feature = "storage")]
36    storage: Option<GraphStorage>,
37}
38
39impl GraphDB {
40    /// Create a new in-memory graph database
41    pub fn new() -> Self {
42        Self {
43            nodes: Arc::new(DashMap::new()),
44            edges: Arc::new(DashMap::new()),
45            hyperedges: Arc::new(DashMap::new()),
46            label_index: LabelIndex::new(),
47            property_index: PropertyIndex::new(),
48            edge_type_index: EdgeTypeIndex::new(),
49            adjacency_index: AdjacencyIndex::new(),
50            hyperedge_node_index: HyperedgeNodeIndex::new(),
51            #[cfg(feature = "storage")]
52            storage: None,
53        }
54    }
55
56    /// Create a new graph database with persistent storage
57    #[cfg(feature = "storage")]
58    pub fn with_storage<P: AsRef<Path>>(path: P) -> anyhow::Result<Self> {
59        let storage = GraphStorage::new(path)?;
60
61        let mut db = Self::new();
62        db.storage = Some(storage);
63
64        // Load existing data from storage
65        db.load_from_storage()?;
66
67        Ok(db)
68    }
69
70    /// Load all data from storage into memory
71    #[cfg(feature = "storage")]
72    fn load_from_storage(&mut self) -> anyhow::Result<()> {
73        if let Some(storage) = &self.storage {
74            // Load nodes
75            for node_id in storage.all_node_ids()? {
76                if let Some(node) = storage.get_node(&node_id)? {
77                    self.nodes.insert(node_id.clone(), node.clone());
78                    self.label_index.add_node(&node);
79                    self.property_index.add_node(&node);
80                }
81            }
82
83            // Load edges
84            for edge_id in storage.all_edge_ids()? {
85                if let Some(edge) = storage.get_edge(&edge_id)? {
86                    self.edges.insert(edge_id.clone(), edge.clone());
87                    self.edge_type_index.add_edge(&edge);
88                    self.adjacency_index.add_edge(&edge);
89                }
90            }
91
92            // Load hyperedges
93            for hyperedge_id in storage.all_hyperedge_ids()? {
94                if let Some(hyperedge) = storage.get_hyperedge(&hyperedge_id)? {
95                    self.hyperedges
96                        .insert(hyperedge_id.clone(), hyperedge.clone());
97                    self.hyperedge_node_index.add_hyperedge(&hyperedge);
98                }
99            }
100        }
101        Ok(())
102    }
103
104    // Node operations
105
106    /// Create a node
107    pub fn create_node(&self, node: Node) -> Result<NodeId> {
108        let id = node.id.clone();
109
110        // Update indexes
111        self.label_index.add_node(&node);
112        self.property_index.add_node(&node);
113
114        // Insert into memory
115        self.nodes.insert(id.clone(), node.clone());
116
117        // Persist to storage if available
118        #[cfg(feature = "storage")]
119        if let Some(storage) = &self.storage {
120            storage.insert_node(&node)?;
121        }
122
123        Ok(id)
124    }
125
126    /// Get a node by ID
127    pub fn get_node(&self, id: impl AsRef<str>) -> Option<Node> {
128        self.nodes.get(id.as_ref()).map(|entry| entry.clone())
129    }
130
131    /// Borrow a node and apply `f` without cloning it.
132    ///
133    /// Hot-path accessor for scans that only need to read a node (e.g. vector
134    /// scoring). Avoids the full `Node` + embedding clone that `get_node`
135    /// incurs. Returns `None` if the node is absent.
136    pub fn with_node<R>(&self, id: &str, f: impl FnOnce(&Node) -> R) -> Option<R> {
137        self.nodes.get(id).map(|entry| f(entry.value()))
138    }
139
140    /// Node ids carrying `label`, straight from the label index (no node clones).
141    pub fn node_ids_by_label(&self, label: &str) -> Vec<NodeId> {
142        self.label_index.get_nodes_by_label(label)
143    }
144
145    /// Delete a node
146    pub fn delete_node(&self, id: impl AsRef<str>) -> Result<bool> {
147        if let Some((_, node)) = self.nodes.remove(id.as_ref()) {
148            // Update indexes
149            self.label_index.remove_node(&node);
150            self.property_index.remove_node(&node);
151
152            // Delete from storage if available
153            #[cfg(feature = "storage")]
154            if let Some(storage) = &self.storage {
155                storage.delete_node(id.as_ref())?;
156            }
157
158            Ok(true)
159        } else {
160            Ok(false)
161        }
162    }
163
164    /// Get nodes by label
165    pub fn get_nodes_by_label(&self, label: &str) -> Vec<Node> {
166        self.label_index
167            .get_nodes_by_label(label)
168            .into_iter()
169            .filter_map(|id| self.get_node(&id))
170            .collect()
171    }
172
173    /// Get nodes by property
174    pub fn get_nodes_by_property(&self, key: &str, value: &PropertyValue) -> Vec<Node> {
175        self.property_index
176            .get_nodes_by_property(key, value)
177            .into_iter()
178            .filter_map(|id| self.get_node(&id))
179            .collect()
180    }
181
182    // Edge operations
183
184    /// Create an edge
185    pub fn create_edge(&self, edge: Edge) -> Result<EdgeId> {
186        let id = edge.id.clone();
187
188        // Verify nodes exist
189        if !self.nodes.contains_key(&edge.from) || !self.nodes.contains_key(&edge.to) {
190            return Err(crate::error::GraphError::NodeNotFound(
191                "Source or target node not found".to_string(),
192            ));
193        }
194
195        // Update indexes
196        self.edge_type_index.add_edge(&edge);
197        self.adjacency_index.add_edge(&edge);
198
199        // Insert into memory
200        self.edges.insert(id.clone(), edge.clone());
201
202        // Persist to storage if available
203        #[cfg(feature = "storage")]
204        if let Some(storage) = &self.storage {
205            storage.insert_edge(&edge)?;
206        }
207
208        Ok(id)
209    }
210
211    /// Get an edge by ID
212    pub fn get_edge(&self, id: impl AsRef<str>) -> Option<Edge> {
213        self.edges.get(id.as_ref()).map(|entry| entry.clone())
214    }
215
216    /// Delete an edge
217    pub fn delete_edge(&self, id: impl AsRef<str>) -> Result<bool> {
218        if let Some((_, edge)) = self.edges.remove(id.as_ref()) {
219            // Update indexes
220            self.edge_type_index.remove_edge(&edge);
221            self.adjacency_index.remove_edge(&edge);
222
223            // Delete from storage if available
224            #[cfg(feature = "storage")]
225            if let Some(storage) = &self.storage {
226                storage.delete_edge(id.as_ref())?;
227            }
228
229            Ok(true)
230        } else {
231            Ok(false)
232        }
233    }
234
235    /// Delete multiple edges (batch)
236    pub fn delete_edges_batch(&self, ids: &[impl AsRef<str>]) -> Result<usize> {
237        let mut deleted = 0;
238        let mut edges_to_update = Vec::with_capacity(ids.len());
239
240        for id in ids {
241            let key: &str = id.as_ref();
242            if let Some((_, edge)) = self.edges.remove(key) {
243                edges_to_update.push(edge);
244                deleted += 1;
245            }
246        }
247
248        for edge in &edges_to_update {
249            self.edge_type_index.remove_edge(edge);
250            self.adjacency_index.remove_edge(edge);
251        }
252
253        #[cfg(feature = "storage")]
254        if let Some(storage) = &self.storage {
255            let str_ids = ids.iter().map(|id| id.as_ref()).collect::<Vec<_>>();
256            storage.delete_edges_batch(&str_ids)?;
257        }
258
259        Ok(deleted)
260    }
261
262    /// Get edges by type
263    pub fn get_edges_by_type(&self, edge_type: &str) -> Vec<Edge> {
264        self.edge_type_index
265            .get_edges_by_type(edge_type)
266            .into_iter()
267            .filter_map(|id| self.get_edge(&id))
268            .collect()
269    }
270
271    /// Get outgoing edges from a node
272    pub fn get_outgoing_edges(&self, node_id: &NodeId) -> Vec<Edge> {
273        self.adjacency_index
274            .get_outgoing_edges(node_id)
275            .into_iter()
276            .filter_map(|id| self.get_edge(&id))
277            .collect()
278    }
279
280    /// Get incoming edges to a node
281    pub fn get_incoming_edges(&self, node_id: &NodeId) -> Vec<Edge> {
282        self.adjacency_index
283            .get_incoming_edges(node_id)
284            .into_iter()
285            .filter_map(|id| self.get_edge(&id))
286            .collect()
287    }
288
289    /// Checks whether an edge exists from `from` → `to` with type `edge_type`.
290    /// Returns true if found, false otherwise.
291    ///
292    /// Fast path: avoids cloning `Edge` by reading fields through the `DashMap`
293    /// reference guard and short-circuits on first match.
294    pub fn has_edge(&self, from: &NodeId, to: &NodeId, edge_type: &str) -> bool {
295        self.adjacency_index
296            .get_outgoing_edges(from)
297            .into_iter()
298            .any(|id| {
299                self.edges
300                    .get(&id)
301                    .is_some_and(|e| e.to == *to && e.edge_type == edge_type)
302            })
303    }
304
305    /// Get outgoing edges for multiple nodes in one call (O(k×avg_degree) vs O(E) for full scan).
306    pub fn get_edges_for_nodes(&self, node_ids: &[NodeId]) -> Vec<Edge> {
307        let mut result = Vec::with_capacity(node_ids.len() * 4);
308        self.adjacency_index
309            .for_each_outgoing_edge(node_ids, |edge_id| {
310                if let Some(edge) = self.edges.get(edge_id.as_str()) {
311                    result.push(edge.clone());
312                }
313            });
314
315        result
316    }
317
318    // Hyperedge operations
319
320    /// Create a hyperedge
321    pub fn create_hyperedge(&self, hyperedge: Hyperedge) -> Result<HyperedgeId> {
322        let id = hyperedge.id.clone();
323
324        // Verify all nodes exist
325        for node_id in &hyperedge.nodes {
326            if !self.nodes.contains_key(node_id) {
327                return Err(crate::error::GraphError::NodeNotFound(format!(
328                    "Node {} not found",
329                    node_id
330                )));
331            }
332        }
333
334        // Update index
335        self.hyperedge_node_index.add_hyperedge(&hyperedge);
336
337        // Insert into memory
338        self.hyperedges.insert(id.clone(), hyperedge.clone());
339
340        // Persist to storage if available
341        #[cfg(feature = "storage")]
342        if let Some(storage) = &self.storage {
343            storage.insert_hyperedge(&hyperedge)?;
344        }
345
346        Ok(id)
347    }
348
349    /// Get a hyperedge by ID
350    pub fn get_hyperedge(&self, id: &HyperedgeId) -> Option<Hyperedge> {
351        self.hyperedges.get(id).map(|entry| entry.clone())
352    }
353
354    /// Get hyperedges containing a node
355    pub fn get_hyperedges_by_node(&self, node_id: &NodeId) -> Vec<Hyperedge> {
356        self.hyperedge_node_index
357            .get_hyperedges_by_node(node_id)
358            .into_iter()
359            .filter_map(|id| self.get_hyperedge(&id))
360            .collect()
361    }
362
363    /// Delete a hyperedge by ID
364    pub fn delete_hyperedge(&self, id: &HyperedgeId) -> Result<bool> {
365        if let Some((_, hyperedge)) = self.hyperedges.remove(id) {
366            self.hyperedge_node_index.remove_hyperedge(&hyperedge);
367
368            #[cfg(feature = "storage")]
369            if let Some(storage) = &self.storage {
370                storage.delete_hyperedge(id)?;
371            }
372
373            Ok(true)
374        } else {
375            Ok(false)
376        }
377    }
378
379    /// Delete all hyperedges that contain a given node
380    pub fn delete_hyperedges_by_node(&self, node_id: &NodeId) -> Result<usize> {
381        let ids: Vec<HyperedgeId> = self.hyperedge_node_index.get_hyperedges_by_node(node_id);
382        let mut deleted = 0;
383        for id in &ids {
384            if self.delete_hyperedge(id)? {
385                deleted += 1;
386            }
387        }
388        Ok(deleted)
389    }
390
391    // Statistics
392
393    /// Get the number of nodes
394    pub fn node_count(&self) -> usize {
395        self.nodes.len()
396    }
397
398    /// Get the number of edges
399    pub fn edge_count(&self) -> usize {
400        self.edges.len()
401    }
402
403    /// Get the number of hyperedges
404    pub fn hyperedge_count(&self) -> usize {
405        self.hyperedges.len()
406    }
407}
408
409impl Default for GraphDB {
410    fn default() -> Self {
411        Self::new()
412    }
413}
414
415#[cfg(test)]
416mod tests {
417    use super::*;
418    use crate::edge::EdgeBuilder;
419    use crate::hyperedge::HyperedgeBuilder;
420    use crate::node::NodeBuilder;
421
422    #[test]
423    fn test_graph_creation() {
424        let db = GraphDB::new();
425        assert_eq!(db.node_count(), 0);
426        assert_eq!(db.edge_count(), 0);
427    }
428
429    #[test]
430    fn test_node_operations() {
431        let db = GraphDB::new();
432
433        let node = NodeBuilder::new()
434            .label("Person")
435            .property("name", "Alice")
436            .build();
437
438        let id = db.create_node(node.clone()).unwrap();
439        assert_eq!(db.node_count(), 1);
440
441        let retrieved = db.get_node(&id);
442        assert!(retrieved.is_some());
443
444        let deleted = db.delete_node(&id).unwrap();
445        assert!(deleted);
446        assert_eq!(db.node_count(), 0);
447    }
448
449    #[test]
450    fn test_edge_operations() {
451        let db = GraphDB::new();
452
453        let node1 = NodeBuilder::new().build();
454        let node2 = NodeBuilder::new().build();
455
456        let id1 = db.create_node(node1.clone()).unwrap();
457        let id2 = db.create_node(node2.clone()).unwrap();
458
459        let edge = EdgeBuilder::new(id1.clone(), id2.clone(), "KNOWS")
460            .property("since", 2020i64)
461            .build();
462
463        let edge_id = db.create_edge(edge).unwrap();
464        assert_eq!(db.edge_count(), 1);
465
466        let retrieved = db.get_edge(&edge_id);
467        assert!(retrieved.is_some());
468    }
469
470    #[test]
471    fn test_label_index() {
472        let db = GraphDB::new();
473
474        let node1 = NodeBuilder::new().label("Person").build();
475        let node2 = NodeBuilder::new().label("Person").build();
476        let node3 = NodeBuilder::new().label("Organization").build();
477
478        db.create_node(node1).unwrap();
479        db.create_node(node2).unwrap();
480        db.create_node(node3).unwrap();
481
482        let people = db.get_nodes_by_label("Person");
483        assert_eq!(people.len(), 2);
484
485        let orgs = db.get_nodes_by_label("Organization");
486        assert_eq!(orgs.len(), 1);
487    }
488
489    #[test]
490    fn test_hyperedge_operations() {
491        let db = GraphDB::new();
492
493        let node1 = NodeBuilder::new().build();
494        let node2 = NodeBuilder::new().build();
495        let node3 = NodeBuilder::new().build();
496
497        let id1 = db.create_node(node1).unwrap();
498        let id2 = db.create_node(node2).unwrap();
499        let id3 = db.create_node(node3).unwrap();
500
501        let hyperedge =
502            HyperedgeBuilder::new(vec![id1.clone(), id2.clone(), id3.clone()], "MEETING")
503                .description("Team meeting")
504                .build();
505
506        let hedge_id = db.create_hyperedge(hyperedge).unwrap();
507        assert_eq!(db.hyperedge_count(), 1);
508
509        let hedges = db.get_hyperedges_by_node(&id1);
510        assert_eq!(hedges.len(), 1);
511    }
512}