Skip to main content

ruvector_graph/
graph.rs

1//! Graph database implementation with concurrent access and indexing
2
3use crate::edge::Edge;
4use crate::error::Result;
5use crate::hyperedge::{Hyperedge, HyperedgeId};
6use crate::index::{AdjacencyIndex, EdgeTypeIndex, HyperedgeNodeIndex, LabelIndex, PropertyIndex};
7use crate::node::Node;
8#[cfg(feature = "storage")]
9use crate::storage::GraphStorage;
10use crate::types::{EdgeId, NodeId, PropertyValue};
11use dashmap::DashMap;
12#[cfg(feature = "storage")]
13use std::path::Path;
14use std::sync::Arc;
15
16/// High-performance graph database with concurrent access
17pub struct GraphDB {
18    /// In-memory node storage (DashMap for lock-free concurrent reads)
19    nodes: Arc<DashMap<NodeId, Node>>,
20    /// In-memory edge storage
21    edges: Arc<DashMap<EdgeId, Edge>>,
22    /// In-memory hyperedge storage
23    hyperedges: Arc<DashMap<HyperedgeId, Hyperedge>>,
24    /// Label index for fast label-based lookups
25    label_index: LabelIndex,
26    /// Property index for fast property-based lookups
27    property_index: PropertyIndex,
28    /// Edge type index
29    edge_type_index: EdgeTypeIndex,
30    /// Adjacency index for neighbor lookups
31    adjacency_index: AdjacencyIndex,
32    /// Hyperedge node index
33    hyperedge_node_index: HyperedgeNodeIndex,
34    /// Optional persistent storage
35    #[cfg(feature = "storage")]
36    storage: Option<GraphStorage>,
37}
38
39impl GraphDB {
40    /// Create a new in-memory graph database
41    pub fn new() -> Self {
42        Self {
43            nodes: Arc::new(DashMap::new()),
44            edges: Arc::new(DashMap::new()),
45            hyperedges: Arc::new(DashMap::new()),
46            label_index: LabelIndex::new(),
47            property_index: PropertyIndex::new(),
48            edge_type_index: EdgeTypeIndex::new(),
49            adjacency_index: AdjacencyIndex::new(),
50            hyperedge_node_index: HyperedgeNodeIndex::new(),
51            #[cfg(feature = "storage")]
52            storage: None,
53        }
54    }
55
56    /// Create a new graph database with persistent storage
57    #[cfg(feature = "storage")]
58    pub fn with_storage<P: AsRef<Path>>(path: P) -> anyhow::Result<Self> {
59        let storage = GraphStorage::new(path)?;
60
61        let mut db = Self::new();
62        db.storage = Some(storage);
63
64        // Load existing data from storage
65        db.load_from_storage()?;
66
67        Ok(db)
68    }
69
70    /// Load all data from storage into memory
71    #[cfg(feature = "storage")]
72    fn load_from_storage(&mut self) -> anyhow::Result<()> {
73        if let Some(storage) = &self.storage {
74            // Load nodes
75            for node_id in storage.all_node_ids()? {
76                if let Some(node) = storage.get_node(&node_id)? {
77                    self.nodes.insert(node_id.clone(), node.clone());
78                    self.label_index.add_node(&node);
79                    self.property_index.add_node(&node);
80                }
81            }
82
83            // Load edges
84            for edge_id in storage.all_edge_ids()? {
85                if let Some(edge) = storage.get_edge(&edge_id)? {
86                    self.edges.insert(edge_id.clone(), edge.clone());
87                    self.edge_type_index.add_edge(&edge);
88                    self.adjacency_index.add_edge(&edge);
89                }
90            }
91
92            // Load hyperedges
93            for hyperedge_id in storage.all_hyperedge_ids()? {
94                if let Some(hyperedge) = storage.get_hyperedge(&hyperedge_id)? {
95                    self.hyperedges
96                        .insert(hyperedge_id.clone(), hyperedge.clone());
97                    self.hyperedge_node_index.add_hyperedge(&hyperedge);
98                }
99            }
100        }
101        Ok(())
102    }
103
104    // Node operations
105
106    /// Create a node
107    pub fn create_node(&self, node: Node) -> Result<NodeId> {
108        let id = node.id.clone();
109
110        // Update indexes
111        self.label_index.add_node(&node);
112        self.property_index.add_node(&node);
113
114        // Insert into memory
115        self.nodes.insert(id.clone(), node.clone());
116
117        // Persist to storage if available
118        #[cfg(feature = "storage")]
119        if let Some(storage) = &self.storage {
120            storage.insert_node(&node)?;
121        }
122
123        Ok(id)
124    }
125
126    /// Get a node by ID
127    pub fn get_node(&self, id: impl AsRef<str>) -> Option<Node> {
128        self.nodes.get(id.as_ref()).map(|entry| entry.clone())
129    }
130
131    /// Delete a node
132    pub fn delete_node(&self, id: impl AsRef<str>) -> Result<bool> {
133        if let Some((_, node)) = self.nodes.remove(id.as_ref()) {
134            // Update indexes
135            self.label_index.remove_node(&node);
136            self.property_index.remove_node(&node);
137
138            // Delete from storage if available
139            #[cfg(feature = "storage")]
140            if let Some(storage) = &self.storage {
141                storage.delete_node(id.as_ref())?;
142            }
143
144            Ok(true)
145        } else {
146            Ok(false)
147        }
148    }
149
150    /// Get nodes by label
151    pub fn get_nodes_by_label(&self, label: &str) -> Vec<Node> {
152        self.label_index
153            .get_nodes_by_label(label)
154            .into_iter()
155            .filter_map(|id| self.get_node(&id))
156            .collect()
157    }
158
159    /// Get nodes by property
160    pub fn get_nodes_by_property(&self, key: &str, value: &PropertyValue) -> Vec<Node> {
161        self.property_index
162            .get_nodes_by_property(key, value)
163            .into_iter()
164            .filter_map(|id| self.get_node(&id))
165            .collect()
166    }
167
168    // Edge operations
169
170    /// Create an edge
171    pub fn create_edge(&self, edge: Edge) -> Result<EdgeId> {
172        let id = edge.id.clone();
173
174        // Verify nodes exist
175        if !self.nodes.contains_key(&edge.from) || !self.nodes.contains_key(&edge.to) {
176            return Err(crate::error::GraphError::NodeNotFound(
177                "Source or target node not found".to_string(),
178            ));
179        }
180
181        // Update indexes
182        self.edge_type_index.add_edge(&edge);
183        self.adjacency_index.add_edge(&edge);
184
185        // Insert into memory
186        self.edges.insert(id.clone(), edge.clone());
187
188        // Persist to storage if available
189        #[cfg(feature = "storage")]
190        if let Some(storage) = &self.storage {
191            storage.insert_edge(&edge)?;
192        }
193
194        Ok(id)
195    }
196
197    /// Get an edge by ID
198    pub fn get_edge(&self, id: impl AsRef<str>) -> Option<Edge> {
199        self.edges.get(id.as_ref()).map(|entry| entry.clone())
200    }
201
202    /// Delete an edge
203    pub fn delete_edge(&self, id: impl AsRef<str>) -> Result<bool> {
204        if let Some((_, edge)) = self.edges.remove(id.as_ref()) {
205            // Update indexes
206            self.edge_type_index.remove_edge(&edge);
207            self.adjacency_index.remove_edge(&edge);
208
209            // Delete from storage if available
210            #[cfg(feature = "storage")]
211            if let Some(storage) = &self.storage {
212                storage.delete_edge(id.as_ref())?;
213            }
214
215            Ok(true)
216        } else {
217            Ok(false)
218        }
219    }
220
221    /// Delete multiple edges (batch)
222    pub fn delete_edges_batch(&self, ids: &[impl AsRef<str>]) -> Result<usize> {
223        let mut deleted = 0;
224        let mut edges_to_update = Vec::with_capacity(ids.len());
225
226        for id in ids {
227            let key: &str = id.as_ref();
228            if let Some((_, edge)) = self.edges.remove(key) {
229                edges_to_update.push(edge);
230                deleted += 1;
231            }
232        }
233
234        for edge in &edges_to_update {
235            self.edge_type_index.remove_edge(edge);
236            self.adjacency_index.remove_edge(edge);
237        }
238
239        #[cfg(feature = "storage")]
240        if let Some(storage) = &self.storage {
241            let str_ids = ids.iter().map(|id| id.as_ref()).collect::<Vec<_>>();
242            storage.delete_edges_batch(&str_ids)?;
243        }
244
245        Ok(deleted)
246    }
247
248    /// Get edges by type
249    pub fn get_edges_by_type(&self, edge_type: &str) -> Vec<Edge> {
250        self.edge_type_index
251            .get_edges_by_type(edge_type)
252            .into_iter()
253            .filter_map(|id| self.get_edge(&id))
254            .collect()
255    }
256
257    /// Get outgoing edges from a node
258    pub fn get_outgoing_edges(&self, node_id: &NodeId) -> Vec<Edge> {
259        self.adjacency_index
260            .get_outgoing_edges(node_id)
261            .into_iter()
262            .filter_map(|id| self.get_edge(&id))
263            .collect()
264    }
265
266    /// Get incoming edges to a node
267    pub fn get_incoming_edges(&self, node_id: &NodeId) -> Vec<Edge> {
268        self.adjacency_index
269            .get_incoming_edges(node_id)
270            .into_iter()
271            .filter_map(|id| self.get_edge(&id))
272            .collect()
273    }
274
275    /// Checks whether an edge exists from `from` → `to` with type `edge_type`.
276    /// Returns true if found, false otherwise.
277    ///
278    /// Fast path: avoids cloning `Edge` by reading fields through the `DashMap`
279    /// reference guard and short-circuits on first match.
280    pub fn has_edge(&self, from: &NodeId, to: &NodeId, edge_type: &str) -> bool {
281        self.adjacency_index
282            .get_outgoing_edges(from)
283            .into_iter()
284            .any(|id| {
285                self.edges
286                    .get(&id)
287                    .is_some_and(|e| e.to == *to && e.edge_type == edge_type)
288            })
289    }
290
291    /// Get outgoing edges for multiple nodes in one call (O(k×avg_degree) vs O(E) for full scan).
292    pub fn get_edges_for_nodes(&self, node_ids: &[NodeId]) -> Vec<Edge> {
293        let mut result = Vec::with_capacity(node_ids.len() * 4);
294        self.adjacency_index.for_each_outgoing_edge(node_ids, |edge_id| {
295            if let Some(edge) = self.edges.get(edge_id.as_str()) {
296                result.push(edge.clone());
297            }
298        });
299
300        result
301    }
302
303    // Hyperedge operations
304
305    /// Create a hyperedge
306    pub fn create_hyperedge(&self, hyperedge: Hyperedge) -> Result<HyperedgeId> {
307        let id = hyperedge.id.clone();
308
309        // Verify all nodes exist
310        for node_id in &hyperedge.nodes {
311            if !self.nodes.contains_key(node_id) {
312                return Err(crate::error::GraphError::NodeNotFound(format!(
313                    "Node {} not found",
314                    node_id
315                )));
316            }
317        }
318
319        // Update index
320        self.hyperedge_node_index.add_hyperedge(&hyperedge);
321
322        // Insert into memory
323        self.hyperedges.insert(id.clone(), hyperedge.clone());
324
325        // Persist to storage if available
326        #[cfg(feature = "storage")]
327        if let Some(storage) = &self.storage {
328            storage.insert_hyperedge(&hyperedge)?;
329        }
330
331        Ok(id)
332    }
333
334    /// Get a hyperedge by ID
335    pub fn get_hyperedge(&self, id: &HyperedgeId) -> Option<Hyperedge> {
336        self.hyperedges.get(id).map(|entry| entry.clone())
337    }
338
339    /// Get hyperedges containing a node
340    pub fn get_hyperedges_by_node(&self, node_id: &NodeId) -> Vec<Hyperedge> {
341        self.hyperedge_node_index
342            .get_hyperedges_by_node(node_id)
343            .into_iter()
344            .filter_map(|id| self.get_hyperedge(&id))
345            .collect()
346    }
347
348    // Statistics
349
350    /// Get the number of nodes
351    pub fn node_count(&self) -> usize {
352        self.nodes.len()
353    }
354
355    /// Get the number of edges
356    pub fn edge_count(&self) -> usize {
357        self.edges.len()
358    }
359
360    /// Get the number of hyperedges
361    pub fn hyperedge_count(&self) -> usize {
362        self.hyperedges.len()
363    }
364}
365
366impl Default for GraphDB {
367    fn default() -> Self {
368        Self::new()
369    }
370}
371
372#[cfg(test)]
373mod tests {
374    use super::*;
375    use crate::edge::EdgeBuilder;
376    use crate::hyperedge::HyperedgeBuilder;
377    use crate::node::NodeBuilder;
378
379    #[test]
380    fn test_graph_creation() {
381        let db = GraphDB::new();
382        assert_eq!(db.node_count(), 0);
383        assert_eq!(db.edge_count(), 0);
384    }
385
386    #[test]
387    fn test_node_operations() {
388        let db = GraphDB::new();
389
390        let node = NodeBuilder::new()
391            .label("Person")
392            .property("name", "Alice")
393            .build();
394
395        let id = db.create_node(node.clone()).unwrap();
396        assert_eq!(db.node_count(), 1);
397
398        let retrieved = db.get_node(&id);
399        assert!(retrieved.is_some());
400
401        let deleted = db.delete_node(&id).unwrap();
402        assert!(deleted);
403        assert_eq!(db.node_count(), 0);
404    }
405
406    #[test]
407    fn test_edge_operations() {
408        let db = GraphDB::new();
409
410        let node1 = NodeBuilder::new().build();
411        let node2 = NodeBuilder::new().build();
412
413        let id1 = db.create_node(node1.clone()).unwrap();
414        let id2 = db.create_node(node2.clone()).unwrap();
415
416        let edge = EdgeBuilder::new(id1.clone(), id2.clone(), "KNOWS")
417            .property("since", 2020i64)
418            .build();
419
420        let edge_id = db.create_edge(edge).unwrap();
421        assert_eq!(db.edge_count(), 1);
422
423        let retrieved = db.get_edge(&edge_id);
424        assert!(retrieved.is_some());
425    }
426
427    #[test]
428    fn test_label_index() {
429        let db = GraphDB::new();
430
431        let node1 = NodeBuilder::new().label("Person").build();
432        let node2 = NodeBuilder::new().label("Person").build();
433        let node3 = NodeBuilder::new().label("Organization").build();
434
435        db.create_node(node1).unwrap();
436        db.create_node(node2).unwrap();
437        db.create_node(node3).unwrap();
438
439        let people = db.get_nodes_by_label("Person");
440        assert_eq!(people.len(), 2);
441
442        let orgs = db.get_nodes_by_label("Organization");
443        assert_eq!(orgs.len(), 1);
444    }
445
446    #[test]
447    fn test_hyperedge_operations() {
448        let db = GraphDB::new();
449
450        let node1 = NodeBuilder::new().build();
451        let node2 = NodeBuilder::new().build();
452        let node3 = NodeBuilder::new().build();
453
454        let id1 = db.create_node(node1).unwrap();
455        let id2 = db.create_node(node2).unwrap();
456        let id3 = db.create_node(node3).unwrap();
457
458        let hyperedge =
459            HyperedgeBuilder::new(vec![id1.clone(), id2.clone(), id3.clone()], "MEETING")
460                .description("Team meeting")
461                .build();
462
463        let hedge_id = db.create_hyperedge(hyperedge).unwrap();
464        assert_eq!(db.hyperedge_count(), 1);
465
466        let hedges = db.get_hyperedges_by_node(&id1);
467        assert_eq!(hedges.len(), 1);
468    }
469}