Skip to main content

graphos_core/graph/lpg/
store.rs

1//! LPG graph store implementation.
2
3use super::{Edge, EdgeRecord, Node, NodeRecord, PropertyStorage};
4use crate::graph::Direction;
5use crate::index::adjacency::ChunkedAdjacency;
6use graphos_common::types::{EdgeId, EpochId, NodeId, PropertyKey, Value};
7use graphos_common::utils::hash::FxHashMap;
8use parking_lot::RwLock;
9use std::sync::atomic::{AtomicU64, Ordering};
10use std::sync::Arc;
11
12/// Configuration for the LPG store.
13#[derive(Debug, Clone)]
14pub struct LpgStoreConfig {
15    /// Whether to maintain backward adjacency lists.
16    pub backward_edges: bool,
17    /// Initial capacity for nodes.
18    pub initial_node_capacity: usize,
19    /// Initial capacity for edges.
20    pub initial_edge_capacity: usize,
21}
22
23impl Default for LpgStoreConfig {
24    fn default() -> Self {
25        Self {
26            backward_edges: true,
27            initial_node_capacity: 1024,
28            initial_edge_capacity: 4096,
29        }
30    }
31}
32
33/// The main LPG graph store.
34///
35/// This is the core storage for labeled property graphs, providing
36/// efficient node/edge storage and adjacency indexing.
37pub struct LpgStore {
38    /// Configuration.
39    config: LpgStoreConfig,
40
41    /// Node records indexed by NodeId.
42    nodes: RwLock<FxHashMap<NodeId, NodeRecord>>,
43
44    /// Edge records indexed by EdgeId.
45    edges: RwLock<FxHashMap<EdgeId, EdgeRecord>>,
46
47    /// Property storage for nodes.
48    node_properties: PropertyStorage<NodeId>,
49
50    /// Property storage for edges.
51    edge_properties: PropertyStorage<EdgeId>,
52
53    /// Label name to ID mapping.
54    label_to_id: RwLock<FxHashMap<Arc<str>, u8>>,
55
56    /// Label ID to name mapping.
57    id_to_label: RwLock<Vec<Arc<str>>>,
58
59    /// Edge type name to ID mapping.
60    edge_type_to_id: RwLock<FxHashMap<Arc<str>, u32>>,
61
62    /// Edge type ID to name mapping.
63    id_to_edge_type: RwLock<Vec<Arc<str>>>,
64
65    /// Forward adjacency lists (outgoing edges).
66    forward_adj: ChunkedAdjacency,
67
68    /// Backward adjacency lists (incoming edges).
69    /// Only populated if config.backward_edges is true.
70    backward_adj: Option<ChunkedAdjacency>,
71
72    /// Label index: label_id -> set of node IDs.
73    label_index: RwLock<Vec<FxHashMap<NodeId, ()>>>,
74
75    /// Next node ID.
76    next_node_id: AtomicU64,
77
78    /// Next edge ID.
79    next_edge_id: AtomicU64,
80
81    /// Current epoch.
82    current_epoch: AtomicU64,
83}
84
85impl LpgStore {
86    /// Creates a new LPG store with default configuration.
87    #[must_use]
88    pub fn new() -> Self {
89        Self::with_config(LpgStoreConfig::default())
90    }
91
92    /// Creates a new LPG store with custom configuration.
93    #[must_use]
94    pub fn with_config(config: LpgStoreConfig) -> Self {
95        let backward_adj = if config.backward_edges {
96            Some(ChunkedAdjacency::new())
97        } else {
98            None
99        };
100
101        Self {
102            nodes: RwLock::new(FxHashMap::default()),
103            edges: RwLock::new(FxHashMap::default()),
104            node_properties: PropertyStorage::new(),
105            edge_properties: PropertyStorage::new(),
106            label_to_id: RwLock::new(FxHashMap::default()),
107            id_to_label: RwLock::new(Vec::new()),
108            edge_type_to_id: RwLock::new(FxHashMap::default()),
109            id_to_edge_type: RwLock::new(Vec::new()),
110            forward_adj: ChunkedAdjacency::new(),
111            backward_adj,
112            label_index: RwLock::new(Vec::new()),
113            next_node_id: AtomicU64::new(0),
114            next_edge_id: AtomicU64::new(0),
115            current_epoch: AtomicU64::new(0),
116            config,
117        }
118    }
119
120    /// Returns the current epoch.
121    #[must_use]
122    pub fn current_epoch(&self) -> EpochId {
123        EpochId::new(self.current_epoch.load(Ordering::Acquire))
124    }
125
126    /// Creates a new epoch.
127    pub fn new_epoch(&self) -> EpochId {
128        let id = self.current_epoch.fetch_add(1, Ordering::AcqRel) + 1;
129        EpochId::new(id)
130    }
131
132    // === Node Operations ===
133
134    /// Creates a new node with the given labels.
135    pub fn create_node(&self, labels: &[&str]) -> NodeId {
136        let id = NodeId::new(self.next_node_id.fetch_add(1, Ordering::Relaxed));
137        let epoch = self.current_epoch();
138
139        let mut record = NodeRecord::new(id, epoch);
140
141        // Set label bits
142        for label in labels {
143            let label_id = self.get_or_create_label_id(*label);
144            record.set_label_bit(label_id);
145
146            // Update label index
147            let mut index = self.label_index.write();
148            while index.len() <= label_id as usize {
149                index.push(FxHashMap::default());
150            }
151            index[label_id as usize].insert(id, ());
152        }
153
154        self.nodes.write().insert(id, record);
155        id
156    }
157
158    /// Creates a new node with labels and properties.
159    pub fn create_node_with_props(
160        &self,
161        labels: &[&str],
162        properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
163    ) -> NodeId {
164        let id = self.create_node(labels);
165
166        for (key, value) in properties {
167            self.node_properties.set(id, key.into(), value.into());
168        }
169
170        // Update props_count in record
171        let count = self.node_properties.get_all(id).len() as u16;
172        if let Some(record) = self.nodes.write().get_mut(&id) {
173            record.props_count = count;
174        }
175
176        id
177    }
178
179    /// Gets a node by ID.
180    #[must_use]
181    pub fn get_node(&self, id: NodeId) -> Option<Node> {
182        let nodes = self.nodes.read();
183        let record = nodes.get(&id)?;
184
185        if record.is_deleted() {
186            return None;
187        }
188
189        let mut node = Node::new(id);
190
191        // Get labels
192        let id_to_label = self.id_to_label.read();
193        for bit in record.label_bits_iter() {
194            if let Some(label) = id_to_label.get(bit as usize) {
195                node.labels.push(label.clone());
196            }
197        }
198
199        // Get properties
200        node.properties = self
201            .node_properties
202            .get_all(id)
203            .into_iter()
204            .collect();
205
206        Some(node)
207    }
208
209    /// Deletes a node and all its edges.
210    pub fn delete_node(&self, id: NodeId) -> bool {
211        let mut nodes = self.nodes.write();
212        if let Some(record) = nodes.get_mut(&id) {
213            if record.is_deleted() {
214                return false;
215            }
216
217            record.set_deleted(true);
218
219            // Remove from label index
220            let mut index = self.label_index.write();
221            for bit in record.label_bits_iter() {
222                if let Some(set) = index.get_mut(bit as usize) {
223                    set.remove(&id);
224                }
225            }
226
227            // Remove properties
228            drop(nodes); // Release lock before removing properties
229            self.node_properties.remove_all(id);
230
231            // TODO: Delete incident edges
232
233            true
234        } else {
235            false
236        }
237    }
238
239    /// Returns the number of nodes.
240    #[must_use]
241    pub fn node_count(&self) -> usize {
242        self.nodes
243            .read()
244            .values()
245            .filter(|r| !r.is_deleted())
246            .count()
247    }
248
249    // === Edge Operations ===
250
251    /// Creates a new edge.
252    pub fn create_edge(&self, src: NodeId, dst: NodeId, edge_type: &str) -> EdgeId {
253        let id = EdgeId::new(self.next_edge_id.fetch_add(1, Ordering::Relaxed));
254        let epoch = self.current_epoch();
255        let type_id = self.get_or_create_edge_type_id(edge_type);
256
257        let record = EdgeRecord::new(id, src, dst, type_id, epoch);
258        self.edges.write().insert(id, record);
259
260        // Update adjacency
261        self.forward_adj.add_edge(src, dst, id);
262        if let Some(ref backward) = self.backward_adj {
263            backward.add_edge(dst, src, id);
264        }
265
266        id
267    }
268
269    /// Creates a new edge with properties.
270    pub fn create_edge_with_props(
271        &self,
272        src: NodeId,
273        dst: NodeId,
274        edge_type: &str,
275        properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
276    ) -> EdgeId {
277        let id = self.create_edge(src, dst, edge_type);
278
279        for (key, value) in properties {
280            self.edge_properties.set(id, key.into(), value.into());
281        }
282
283        id
284    }
285
286    /// Gets an edge by ID.
287    #[must_use]
288    pub fn get_edge(&self, id: EdgeId) -> Option<Edge> {
289        let edges = self.edges.read();
290        let record = edges.get(&id)?;
291
292        if record.is_deleted() {
293            return None;
294        }
295
296        let edge_type = {
297            let id_to_type = self.id_to_edge_type.read();
298            id_to_type.get(record.type_id as usize)?.clone()
299        };
300
301        let mut edge = Edge::new(id, record.src, record.dst, edge_type);
302
303        // Get properties
304        edge.properties = self
305            .edge_properties
306            .get_all(id)
307            .into_iter()
308            .collect();
309
310        Some(edge)
311    }
312
313    /// Deletes an edge.
314    pub fn delete_edge(&self, id: EdgeId) -> bool {
315        let mut edges = self.edges.write();
316        if let Some(record) = edges.get_mut(&id) {
317            if record.is_deleted() {
318                return false;
319            }
320
321            let src = record.src;
322            let dst = record.dst;
323
324            record.set_deleted(true);
325
326            drop(edges); // Release lock
327
328            // Mark as deleted in adjacency (soft delete)
329            self.forward_adj.mark_deleted(src, id);
330            if let Some(ref backward) = self.backward_adj {
331                backward.mark_deleted(dst, id);
332            }
333
334            // Remove properties
335            self.edge_properties.remove_all(id);
336
337            true
338        } else {
339            false
340        }
341    }
342
343    /// Returns the number of edges.
344    #[must_use]
345    pub fn edge_count(&self) -> usize {
346        self.edges
347            .read()
348            .values()
349            .filter(|r| !r.is_deleted())
350            .count()
351    }
352
353    // === Traversal ===
354
355    /// Returns an iterator over neighbors of a node.
356    pub fn neighbors(&self, node: NodeId, direction: Direction) -> impl Iterator<Item = NodeId> + '_ {
357        let forward: Box<dyn Iterator<Item = NodeId>> = match direction {
358            Direction::Outgoing | Direction::Both => {
359                Box::new(self.forward_adj.neighbors(node))
360            }
361            Direction::Incoming => Box::new(std::iter::empty()),
362        };
363
364        let backward: Box<dyn Iterator<Item = NodeId>> = match direction {
365            Direction::Incoming | Direction::Both => {
366                if let Some(ref adj) = self.backward_adj {
367                    Box::new(adj.neighbors(node))
368                } else {
369                    Box::new(std::iter::empty())
370                }
371            }
372            Direction::Outgoing => Box::new(std::iter::empty()),
373        };
374
375        forward.chain(backward)
376    }
377
378    /// Returns nodes with a specific label.
379    pub fn nodes_by_label(&self, label: &str) -> Vec<NodeId> {
380        let label_to_id = self.label_to_id.read();
381        if let Some(&label_id) = label_to_id.get(label) {
382            let index = self.label_index.read();
383            if let Some(set) = index.get(label_id as usize) {
384                return set.keys().copied().collect();
385            }
386        }
387        Vec::new()
388    }
389
390    // === Internal Helpers ===
391
392    fn get_or_create_label_id(&self, label: &str) -> u8 {
393        {
394            let label_to_id = self.label_to_id.read();
395            if let Some(&id) = label_to_id.get(label) {
396                return id;
397            }
398        }
399
400        let mut label_to_id = self.label_to_id.write();
401        let mut id_to_label = self.id_to_label.write();
402
403        // Double-check after acquiring write lock
404        if let Some(&id) = label_to_id.get(label) {
405            return id;
406        }
407
408        let id = id_to_label.len() as u8;
409        assert!(id < 64, "Maximum 64 labels supported");
410
411        let label: Arc<str> = label.into();
412        label_to_id.insert(label.clone(), id);
413        id_to_label.push(label);
414
415        id
416    }
417
418    fn get_or_create_edge_type_id(&self, edge_type: &str) -> u32 {
419        {
420            let type_to_id = self.edge_type_to_id.read();
421            if let Some(&id) = type_to_id.get(edge_type) {
422                return id;
423            }
424        }
425
426        let mut type_to_id = self.edge_type_to_id.write();
427        let mut id_to_type = self.id_to_edge_type.write();
428
429        // Double-check
430        if let Some(&id) = type_to_id.get(edge_type) {
431            return id;
432        }
433
434        let id = id_to_type.len() as u32;
435        let edge_type: Arc<str> = edge_type.into();
436        type_to_id.insert(edge_type.clone(), id);
437        id_to_type.push(edge_type);
438
439        id
440    }
441}
442
443impl Default for LpgStore {
444    fn default() -> Self {
445        Self::new()
446    }
447}
448
449#[cfg(test)]
450mod tests {
451    use super::*;
452
453    #[test]
454    fn test_create_node() {
455        let store = LpgStore::new();
456
457        let id = store.create_node(&["Person"]);
458        assert!(id.is_valid());
459
460        let node = store.get_node(id).unwrap();
461        assert!(node.has_label("Person"));
462        assert!(!node.has_label("Animal"));
463    }
464
465    #[test]
466    fn test_create_node_with_props() {
467        let store = LpgStore::new();
468
469        let id = store.create_node_with_props(
470            &["Person"],
471            [("name", Value::from("Alice")), ("age", Value::from(30i64))],
472        );
473
474        let node = store.get_node(id).unwrap();
475        assert_eq!(node.get_property("name").and_then(|v| v.as_str()), Some("Alice"));
476        assert_eq!(node.get_property("age").and_then(|v| v.as_int64()), Some(30));
477    }
478
479    #[test]
480    fn test_delete_node() {
481        let store = LpgStore::new();
482
483        let id = store.create_node(&["Person"]);
484        assert_eq!(store.node_count(), 1);
485
486        assert!(store.delete_node(id));
487        assert_eq!(store.node_count(), 0);
488        assert!(store.get_node(id).is_none());
489
490        // Double delete should return false
491        assert!(!store.delete_node(id));
492    }
493
494    #[test]
495    fn test_create_edge() {
496        let store = LpgStore::new();
497
498        let alice = store.create_node(&["Person"]);
499        let bob = store.create_node(&["Person"]);
500
501        let edge_id = store.create_edge(alice, bob, "KNOWS");
502        assert!(edge_id.is_valid());
503
504        let edge = store.get_edge(edge_id).unwrap();
505        assert_eq!(edge.src, alice);
506        assert_eq!(edge.dst, bob);
507        assert_eq!(edge.edge_type.as_ref(), "KNOWS");
508    }
509
510    #[test]
511    fn test_neighbors() {
512        let store = LpgStore::new();
513
514        let a = store.create_node(&["Person"]);
515        let b = store.create_node(&["Person"]);
516        let c = store.create_node(&["Person"]);
517
518        store.create_edge(a, b, "KNOWS");
519        store.create_edge(a, c, "KNOWS");
520
521        let outgoing: Vec<_> = store.neighbors(a, Direction::Outgoing).collect();
522        assert_eq!(outgoing.len(), 2);
523        assert!(outgoing.contains(&b));
524        assert!(outgoing.contains(&c));
525
526        let incoming: Vec<_> = store.neighbors(b, Direction::Incoming).collect();
527        assert_eq!(incoming.len(), 1);
528        assert!(incoming.contains(&a));
529    }
530
531    #[test]
532    fn test_nodes_by_label() {
533        let store = LpgStore::new();
534
535        let p1 = store.create_node(&["Person"]);
536        let p2 = store.create_node(&["Person"]);
537        let _a = store.create_node(&["Animal"]);
538
539        let persons = store.nodes_by_label("Person");
540        assert_eq!(persons.len(), 2);
541        assert!(persons.contains(&p1));
542        assert!(persons.contains(&p2));
543
544        let animals = store.nodes_by_label("Animal");
545        assert_eq!(animals.len(), 1);
546    }
547
548    #[test]
549    fn test_delete_edge() {
550        let store = LpgStore::new();
551
552        let a = store.create_node(&["Person"]);
553        let b = store.create_node(&["Person"]);
554        let edge_id = store.create_edge(a, b, "KNOWS");
555
556        assert_eq!(store.edge_count(), 1);
557
558        assert!(store.delete_edge(edge_id));
559        assert_eq!(store.edge_count(), 0);
560        assert!(store.get_edge(edge_id).is_none());
561    }
562}