Skip to main content

grafeo_core/graph/lpg/
store.rs

1//! The in-memory LPG graph store.
2//!
3//! This is where your nodes and edges actually live. Most users interact
4//! through [`GrafeoDB`](grafeo_engine::GrafeoDB), but algorithm implementers
5//! sometimes need the raw [`LpgStore`] for direct adjacency traversal.
6//!
7//! Key features:
8//! - MVCC versioning - concurrent readers don't block each other
9//! - Columnar properties with zone maps for fast filtering
10//! - Forward and backward adjacency indexes
11
12use super::property::CompareOp;
13use super::{Edge, EdgeRecord, Node, NodeRecord, PropertyStorage};
14use crate::graph::Direction;
15use crate::index::adjacency::ChunkedAdjacency;
16use crate::index::zone_map::ZoneMapEntry;
17use crate::statistics::{EdgeTypeStatistics, LabelStatistics, Statistics};
18use grafeo_common::mvcc::VersionChain;
19use grafeo_common::types::{EdgeId, EpochId, NodeId, PropertyKey, TxId, Value};
20use grafeo_common::utils::hash::{FxHashMap, FxHashSet};
21use parking_lot::RwLock;
22use std::sync::Arc;
23use std::sync::atomic::{AtomicU64, Ordering};
24
25/// Configuration for the LPG store.
26///
27/// The defaults work well for most cases. Tune `backward_edges` if you only
28/// traverse in one direction (saves memory), or adjust capacities if you know
29/// your graph size upfront (avoids reallocations).
30#[derive(Debug, Clone)]
31pub struct LpgStoreConfig {
32    /// Maintain backward adjacency for incoming edge queries. Turn off if
33    /// you only traverse outgoing edges - saves ~50% adjacency memory.
34    pub backward_edges: bool,
35    /// Initial capacity for nodes (avoids early reallocations).
36    pub initial_node_capacity: usize,
37    /// Initial capacity for edges (avoids early reallocations).
38    pub initial_edge_capacity: usize,
39}
40
41impl Default for LpgStoreConfig {
42    fn default() -> Self {
43        Self {
44            backward_edges: true,
45            initial_node_capacity: 1024,
46            initial_edge_capacity: 4096,
47        }
48    }
49}
50
51/// The core in-memory graph storage.
52///
53/// Everything lives here: nodes, edges, properties, adjacency indexes, and
54/// version chains for MVCC. Concurrent reads never block each other.
55///
56/// Most users should go through `GrafeoDB` (from the `grafeo_engine` crate) which
57/// adds transaction management and query execution. Use `LpgStore` directly
58/// when you need raw performance for algorithm implementations.
59///
60/// # Example
61///
62/// ```
63/// use grafeo_core::graph::lpg::LpgStore;
64/// use grafeo_core::graph::Direction;
65///
66/// let store = LpgStore::new();
67///
68/// // Create a small social network
69/// let alice = store.create_node(&["Person"]);
70/// let bob = store.create_node(&["Person"]);
71/// store.create_edge(alice, bob, "KNOWS");
72///
73/// // Traverse outgoing edges
74/// for neighbor in store.neighbors(alice, Direction::Outgoing) {
75///     println!("Alice knows node {:?}", neighbor);
76/// }
77/// ```
78pub struct LpgStore {
79    /// Configuration.
80    #[allow(dead_code)]
81    config: LpgStoreConfig,
82
83    /// Node records indexed by NodeId, with version chains for MVCC.
84    nodes: RwLock<FxHashMap<NodeId, VersionChain<NodeRecord>>>,
85
86    /// Edge records indexed by EdgeId, with version chains for MVCC.
87    edges: RwLock<FxHashMap<EdgeId, VersionChain<EdgeRecord>>>,
88
89    /// Property storage for nodes.
90    node_properties: PropertyStorage<NodeId>,
91
92    /// Property storage for edges.
93    edge_properties: PropertyStorage<EdgeId>,
94
95    /// Label name to ID mapping.
96    label_to_id: RwLock<FxHashMap<Arc<str>, u32>>,
97
98    /// Label ID to name mapping.
99    id_to_label: RwLock<Vec<Arc<str>>>,
100
101    /// Edge type name to ID mapping.
102    edge_type_to_id: RwLock<FxHashMap<Arc<str>, u32>>,
103
104    /// Edge type ID to name mapping.
105    id_to_edge_type: RwLock<Vec<Arc<str>>>,
106
107    /// Forward adjacency lists (outgoing edges).
108    forward_adj: ChunkedAdjacency,
109
110    /// Backward adjacency lists (incoming edges).
111    /// Only populated if config.backward_edges is true.
112    backward_adj: Option<ChunkedAdjacency>,
113
114    /// Label index: label_id -> set of node IDs.
115    label_index: RwLock<Vec<FxHashMap<NodeId, ()>>>,
116
117    /// Node labels: node_id -> set of label IDs.
118    /// Reverse mapping to efficiently get labels for a node.
119    node_labels: RwLock<FxHashMap<NodeId, FxHashSet<u32>>>,
120
121    /// Next node ID.
122    next_node_id: AtomicU64,
123
124    /// Next edge ID.
125    next_edge_id: AtomicU64,
126
127    /// Current epoch.
128    current_epoch: AtomicU64,
129
130    /// Statistics for cost-based optimization.
131    statistics: RwLock<Statistics>,
132}
133
134impl LpgStore {
135    /// Creates a new LPG store with default configuration.
136    #[must_use]
137    pub fn new() -> Self {
138        Self::with_config(LpgStoreConfig::default())
139    }
140
141    /// Creates a new LPG store with custom configuration.
142    #[must_use]
143    pub fn with_config(config: LpgStoreConfig) -> Self {
144        let backward_adj = if config.backward_edges {
145            Some(ChunkedAdjacency::new())
146        } else {
147            None
148        };
149
150        Self {
151            nodes: RwLock::new(FxHashMap::default()),
152            edges: RwLock::new(FxHashMap::default()),
153            node_properties: PropertyStorage::new(),
154            edge_properties: PropertyStorage::new(),
155            label_to_id: RwLock::new(FxHashMap::default()),
156            id_to_label: RwLock::new(Vec::new()),
157            edge_type_to_id: RwLock::new(FxHashMap::default()),
158            id_to_edge_type: RwLock::new(Vec::new()),
159            forward_adj: ChunkedAdjacency::new(),
160            backward_adj,
161            label_index: RwLock::new(Vec::new()),
162            node_labels: RwLock::new(FxHashMap::default()),
163            next_node_id: AtomicU64::new(0),
164            next_edge_id: AtomicU64::new(0),
165            current_epoch: AtomicU64::new(0),
166            statistics: RwLock::new(Statistics::new()),
167            config,
168        }
169    }
170
171    /// Returns the current epoch.
172    #[must_use]
173    pub fn current_epoch(&self) -> EpochId {
174        EpochId::new(self.current_epoch.load(Ordering::Acquire))
175    }
176
177    /// Creates a new epoch.
178    pub fn new_epoch(&self) -> EpochId {
179        let id = self.current_epoch.fetch_add(1, Ordering::AcqRel) + 1;
180        EpochId::new(id)
181    }
182
183    // === Node Operations ===
184
185    /// Creates a new node with the given labels.
186    ///
187    /// Uses the system transaction for non-transactional operations.
188    pub fn create_node(&self, labels: &[&str]) -> NodeId {
189        self.create_node_versioned(labels, self.current_epoch(), TxId::SYSTEM)
190    }
191
192    /// Creates a new node with the given labels within a transaction context.
193    pub fn create_node_versioned(&self, labels: &[&str], epoch: EpochId, tx_id: TxId) -> NodeId {
194        let id = NodeId::new(self.next_node_id.fetch_add(1, Ordering::Relaxed));
195
196        let mut record = NodeRecord::new(id, epoch);
197        record.set_label_count(labels.len() as u16);
198
199        // Store labels in node_labels map and label_index
200        let mut node_label_set = FxHashSet::default();
201        for label in labels {
202            let label_id = self.get_or_create_label_id(*label);
203            node_label_set.insert(label_id);
204
205            // Update label index
206            let mut index = self.label_index.write();
207            while index.len() <= label_id as usize {
208                index.push(FxHashMap::default());
209            }
210            index[label_id as usize].insert(id, ());
211        }
212
213        // Store node's labels
214        self.node_labels.write().insert(id, node_label_set);
215
216        // Create version chain with initial version
217        let chain = VersionChain::with_initial(record, epoch, tx_id);
218        self.nodes.write().insert(id, chain);
219        id
220    }
221
222    /// Creates a new node with labels and properties.
223    pub fn create_node_with_props(
224        &self,
225        labels: &[&str],
226        properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
227    ) -> NodeId {
228        self.create_node_with_props_versioned(
229            labels,
230            properties,
231            self.current_epoch(),
232            TxId::SYSTEM,
233        )
234    }
235
236    /// Creates a new node with labels and properties within a transaction context.
237    pub fn create_node_with_props_versioned(
238        &self,
239        labels: &[&str],
240        properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
241        epoch: EpochId,
242        tx_id: TxId,
243    ) -> NodeId {
244        let id = self.create_node_versioned(labels, epoch, tx_id);
245
246        for (key, value) in properties {
247            self.node_properties.set(id, key.into(), value.into());
248        }
249
250        // Update props_count in record
251        let count = self.node_properties.get_all(id).len() as u16;
252        if let Some(chain) = self.nodes.write().get_mut(&id) {
253            if let Some(record) = chain.latest_mut() {
254                record.props_count = count;
255            }
256        }
257
258        id
259    }
260
261    /// Gets a node by ID (latest visible version).
262    #[must_use]
263    pub fn get_node(&self, id: NodeId) -> Option<Node> {
264        self.get_node_at_epoch(id, self.current_epoch())
265    }
266
267    /// Gets a node by ID at a specific epoch.
268    #[must_use]
269    pub fn get_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> Option<Node> {
270        let nodes = self.nodes.read();
271        let chain = nodes.get(&id)?;
272        let record = chain.visible_at(epoch)?;
273
274        if record.is_deleted() {
275            return None;
276        }
277
278        let mut node = Node::new(id);
279
280        // Get labels from node_labels map
281        let id_to_label = self.id_to_label.read();
282        let node_labels = self.node_labels.read();
283        if let Some(label_ids) = node_labels.get(&id) {
284            for &label_id in label_ids {
285                if let Some(label) = id_to_label.get(label_id as usize) {
286                    node.labels.push(label.clone());
287                }
288            }
289        }
290
291        // Get properties
292        node.properties = self.node_properties.get_all(id).into_iter().collect();
293
294        Some(node)
295    }
296
297    /// Gets a node visible to a specific transaction.
298    #[must_use]
299    pub fn get_node_versioned(&self, id: NodeId, epoch: EpochId, tx_id: TxId) -> Option<Node> {
300        let nodes = self.nodes.read();
301        let chain = nodes.get(&id)?;
302        let record = chain.visible_to(epoch, tx_id)?;
303
304        if record.is_deleted() {
305            return None;
306        }
307
308        let mut node = Node::new(id);
309
310        // Get labels from node_labels map
311        let id_to_label = self.id_to_label.read();
312        let node_labels = self.node_labels.read();
313        if let Some(label_ids) = node_labels.get(&id) {
314            for &label_id in label_ids {
315                if let Some(label) = id_to_label.get(label_id as usize) {
316                    node.labels.push(label.clone());
317                }
318            }
319        }
320
321        // Get properties
322        node.properties = self.node_properties.get_all(id).into_iter().collect();
323
324        Some(node)
325    }
326
327    /// Deletes a node and all its edges (using latest epoch).
328    pub fn delete_node(&self, id: NodeId) -> bool {
329        self.delete_node_at_epoch(id, self.current_epoch())
330    }
331
332    /// Deletes a node at a specific epoch.
333    pub fn delete_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> bool {
334        let mut nodes = self.nodes.write();
335        if let Some(chain) = nodes.get_mut(&id) {
336            // Check if visible at this epoch (not already deleted)
337            if let Some(record) = chain.visible_at(epoch) {
338                if record.is_deleted() {
339                    return false;
340                }
341            } else {
342                // Not visible at this epoch (already deleted or doesn't exist)
343                return false;
344            }
345
346            // Mark the version chain as deleted at this epoch
347            chain.mark_deleted(epoch);
348
349            // Remove from label index using node_labels map
350            let mut index = self.label_index.write();
351            let mut node_labels = self.node_labels.write();
352            if let Some(label_ids) = node_labels.remove(&id) {
353                for label_id in label_ids {
354                    if let Some(set) = index.get_mut(label_id as usize) {
355                        set.remove(&id);
356                    }
357                }
358            }
359
360            // Remove properties
361            drop(nodes); // Release lock before removing properties
362            drop(index);
363            drop(node_labels);
364            self.node_properties.remove_all(id);
365
366            // Note: Caller should use delete_node_edges() first if detach is needed
367
368            true
369        } else {
370            false
371        }
372    }
373
374    /// Deletes all edges connected to a node (implements DETACH DELETE).
375    ///
376    /// Call this before `delete_node()` if you want to remove a node that
377    /// has edges. Grafeo doesn't auto-delete edges - you have to be explicit.
378    pub fn delete_node_edges(&self, node_id: NodeId) {
379        // Get outgoing edges
380        let outgoing: Vec<EdgeId> = self
381            .forward_adj
382            .edges_from(node_id)
383            .into_iter()
384            .map(|(_, edge_id)| edge_id)
385            .collect();
386
387        // Get incoming edges
388        let incoming: Vec<EdgeId> = if let Some(ref backward) = self.backward_adj {
389            backward
390                .edges_from(node_id)
391                .into_iter()
392                .map(|(_, edge_id)| edge_id)
393                .collect()
394        } else {
395            // No backward adjacency - scan all edges
396            let epoch = self.current_epoch();
397            self.edges
398                .read()
399                .iter()
400                .filter_map(|(id, chain)| {
401                    chain.visible_at(epoch).and_then(|r| {
402                        if !r.is_deleted() && r.dst == node_id {
403                            Some(*id)
404                        } else {
405                            None
406                        }
407                    })
408                })
409                .collect()
410        };
411
412        // Delete all edges
413        for edge_id in outgoing.into_iter().chain(incoming) {
414            self.delete_edge(edge_id);
415        }
416    }
417
418    /// Sets a property on a node.
419    pub fn set_node_property(&self, id: NodeId, key: &str, value: Value) {
420        self.node_properties.set(id, key.into(), value);
421
422        // Update props_count in record
423        let count = self.node_properties.get_all(id).len() as u16;
424        if let Some(chain) = self.nodes.write().get_mut(&id) {
425            if let Some(record) = chain.latest_mut() {
426                record.props_count = count;
427            }
428        }
429    }
430
431    /// Sets a property on an edge.
432    pub fn set_edge_property(&self, id: EdgeId, key: &str, value: Value) {
433        self.edge_properties.set(id, key.into(), value);
434    }
435
436    /// Removes a property from a node.
437    ///
438    /// Returns the previous value if it existed, or None if the property didn't exist.
439    pub fn remove_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
440        let result = self.node_properties.remove(id, &key.into());
441
442        // Update props_count in record
443        let count = self.node_properties.get_all(id).len() as u16;
444        if let Some(chain) = self.nodes.write().get_mut(&id) {
445            if let Some(record) = chain.latest_mut() {
446                record.props_count = count;
447            }
448        }
449
450        result
451    }
452
453    /// Removes a property from an edge.
454    ///
455    /// Returns the previous value if it existed, or None if the property didn't exist.
456    pub fn remove_edge_property(&self, id: EdgeId, key: &str) -> Option<Value> {
457        self.edge_properties.remove(id, &key.into())
458    }
459
460    /// Adds a label to a node.
461    ///
462    /// Returns true if the label was added, false if the node doesn't exist
463    /// or already has the label.
464    pub fn add_label(&self, node_id: NodeId, label: &str) -> bool {
465        let epoch = self.current_epoch();
466
467        // Check if node exists
468        let nodes = self.nodes.read();
469        if let Some(chain) = nodes.get(&node_id) {
470            if chain.visible_at(epoch).map_or(true, |r| r.is_deleted()) {
471                return false;
472            }
473        } else {
474            return false;
475        }
476        drop(nodes);
477
478        // Get or create label ID
479        let label_id = self.get_or_create_label_id(label);
480
481        // Add to node_labels map
482        let mut node_labels = self.node_labels.write();
483        let label_set = node_labels
484            .entry(node_id)
485            .or_insert_with(FxHashSet::default);
486
487        if label_set.contains(&label_id) {
488            return false; // Already has this label
489        }
490
491        label_set.insert(label_id);
492        drop(node_labels);
493
494        // Add to label_index
495        let mut index = self.label_index.write();
496        if (label_id as usize) >= index.len() {
497            index.resize(label_id as usize + 1, FxHashMap::default());
498        }
499        index[label_id as usize].insert(node_id, ());
500
501        // Update label count in node record
502        if let Some(chain) = self.nodes.write().get_mut(&node_id) {
503            if let Some(record) = chain.latest_mut() {
504                let count = self.node_labels.read().get(&node_id).map_or(0, |s| s.len());
505                record.set_label_count(count as u16);
506            }
507        }
508
509        true
510    }
511
512    /// Removes a label from a node.
513    ///
514    /// Returns true if the label was removed, false if the node doesn't exist
515    /// or doesn't have the label.
516    pub fn remove_label(&self, node_id: NodeId, label: &str) -> bool {
517        let epoch = self.current_epoch();
518
519        // Check if node exists
520        let nodes = self.nodes.read();
521        if let Some(chain) = nodes.get(&node_id) {
522            if chain.visible_at(epoch).map_or(true, |r| r.is_deleted()) {
523                return false;
524            }
525        } else {
526            return false;
527        }
528        drop(nodes);
529
530        // Get label ID
531        let label_id = {
532            let label_ids = self.label_to_id.read();
533            match label_ids.get(label) {
534                Some(&id) => id,
535                None => return false, // Label doesn't exist
536            }
537        };
538
539        // Remove from node_labels map
540        let mut node_labels = self.node_labels.write();
541        if let Some(label_set) = node_labels.get_mut(&node_id) {
542            if !label_set.remove(&label_id) {
543                return false; // Node doesn't have this label
544            }
545        } else {
546            return false;
547        }
548        drop(node_labels);
549
550        // Remove from label_index
551        let mut index = self.label_index.write();
552        if (label_id as usize) < index.len() {
553            index[label_id as usize].remove(&node_id);
554        }
555
556        // Update label count in node record
557        if let Some(chain) = self.nodes.write().get_mut(&node_id) {
558            if let Some(record) = chain.latest_mut() {
559                let count = self.node_labels.read().get(&node_id).map_or(0, |s| s.len());
560                record.set_label_count(count as u16);
561            }
562        }
563
564        true
565    }
566
567    /// Returns the number of nodes (non-deleted at current epoch).
568    #[must_use]
569    pub fn node_count(&self) -> usize {
570        let epoch = self.current_epoch();
571        self.nodes
572            .read()
573            .values()
574            .filter_map(|chain| chain.visible_at(epoch))
575            .filter(|r| !r.is_deleted())
576            .count()
577    }
578
579    /// Returns all node IDs in the store.
580    ///
581    /// This returns a snapshot of current node IDs. The returned vector
582    /// excludes deleted nodes. Results are sorted by NodeId for deterministic
583    /// iteration order.
584    #[must_use]
585    pub fn node_ids(&self) -> Vec<NodeId> {
586        let epoch = self.current_epoch();
587        let mut ids: Vec<NodeId> = self
588            .nodes
589            .read()
590            .iter()
591            .filter_map(|(id, chain)| {
592                chain
593                    .visible_at(epoch)
594                    .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
595            })
596            .collect();
597        ids.sort_unstable();
598        ids
599    }
600
601    // === Edge Operations ===
602
603    /// Creates a new edge.
604    pub fn create_edge(&self, src: NodeId, dst: NodeId, edge_type: &str) -> EdgeId {
605        self.create_edge_versioned(src, dst, edge_type, self.current_epoch(), TxId::SYSTEM)
606    }
607
608    /// Creates a new edge within a transaction context.
609    pub fn create_edge_versioned(
610        &self,
611        src: NodeId,
612        dst: NodeId,
613        edge_type: &str,
614        epoch: EpochId,
615        tx_id: TxId,
616    ) -> EdgeId {
617        let id = EdgeId::new(self.next_edge_id.fetch_add(1, Ordering::Relaxed));
618        let type_id = self.get_or_create_edge_type_id(edge_type);
619
620        let record = EdgeRecord::new(id, src, dst, type_id, epoch);
621        let chain = VersionChain::with_initial(record, epoch, tx_id);
622        self.edges.write().insert(id, chain);
623
624        // Update adjacency
625        self.forward_adj.add_edge(src, dst, id);
626        if let Some(ref backward) = self.backward_adj {
627            backward.add_edge(dst, src, id);
628        }
629
630        id
631    }
632
633    /// Creates a new edge with properties.
634    pub fn create_edge_with_props(
635        &self,
636        src: NodeId,
637        dst: NodeId,
638        edge_type: &str,
639        properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
640    ) -> EdgeId {
641        let id = self.create_edge(src, dst, edge_type);
642
643        for (key, value) in properties {
644            self.edge_properties.set(id, key.into(), value.into());
645        }
646
647        id
648    }
649
650    /// Gets an edge by ID (latest visible version).
651    #[must_use]
652    pub fn get_edge(&self, id: EdgeId) -> Option<Edge> {
653        self.get_edge_at_epoch(id, self.current_epoch())
654    }
655
656    /// Gets an edge by ID at a specific epoch.
657    #[must_use]
658    pub fn get_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> Option<Edge> {
659        let edges = self.edges.read();
660        let chain = edges.get(&id)?;
661        let record = chain.visible_at(epoch)?;
662
663        if record.is_deleted() {
664            return None;
665        }
666
667        let edge_type = {
668            let id_to_type = self.id_to_edge_type.read();
669            id_to_type.get(record.type_id as usize)?.clone()
670        };
671
672        let mut edge = Edge::new(id, record.src, record.dst, edge_type);
673
674        // Get properties
675        edge.properties = self.edge_properties.get_all(id).into_iter().collect();
676
677        Some(edge)
678    }
679
680    /// Gets an edge visible to a specific transaction.
681    #[must_use]
682    pub fn get_edge_versioned(&self, id: EdgeId, epoch: EpochId, tx_id: TxId) -> Option<Edge> {
683        let edges = self.edges.read();
684        let chain = edges.get(&id)?;
685        let record = chain.visible_to(epoch, tx_id)?;
686
687        if record.is_deleted() {
688            return None;
689        }
690
691        let edge_type = {
692            let id_to_type = self.id_to_edge_type.read();
693            id_to_type.get(record.type_id as usize)?.clone()
694        };
695
696        let mut edge = Edge::new(id, record.src, record.dst, edge_type);
697
698        // Get properties
699        edge.properties = self.edge_properties.get_all(id).into_iter().collect();
700
701        Some(edge)
702    }
703
704    /// Deletes an edge (using latest epoch).
705    pub fn delete_edge(&self, id: EdgeId) -> bool {
706        self.delete_edge_at_epoch(id, self.current_epoch())
707    }
708
709    /// Deletes an edge at a specific epoch.
710    pub fn delete_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> bool {
711        let mut edges = self.edges.write();
712        if let Some(chain) = edges.get_mut(&id) {
713            // Get the visible record to check if deleted and get src/dst
714            let (src, dst) = {
715                match chain.visible_at(epoch) {
716                    Some(record) => {
717                        if record.is_deleted() {
718                            return false;
719                        }
720                        (record.src, record.dst)
721                    }
722                    None => return false, // Not visible at this epoch (already deleted)
723                }
724            };
725
726            // Mark the version chain as deleted
727            chain.mark_deleted(epoch);
728
729            drop(edges); // Release lock
730
731            // Mark as deleted in adjacency (soft delete)
732            self.forward_adj.mark_deleted(src, id);
733            if let Some(ref backward) = self.backward_adj {
734                backward.mark_deleted(dst, id);
735            }
736
737            // Remove properties
738            self.edge_properties.remove_all(id);
739
740            true
741        } else {
742            false
743        }
744    }
745
746    /// Returns the number of edges (non-deleted at current epoch).
747    #[must_use]
748    pub fn edge_count(&self) -> usize {
749        let epoch = self.current_epoch();
750        self.edges
751            .read()
752            .values()
753            .filter_map(|chain| chain.visible_at(epoch))
754            .filter(|r| !r.is_deleted())
755            .count()
756    }
757
758    /// Discards all uncommitted versions created by a transaction.
759    ///
760    /// This is called during transaction rollback to clean up uncommitted changes.
761    /// The method removes version chain entries created by the specified transaction.
762    pub fn discard_uncommitted_versions(&self, tx_id: TxId) {
763        // Remove uncommitted node versions
764        {
765            let mut nodes = self.nodes.write();
766            for chain in nodes.values_mut() {
767                chain.remove_versions_by(tx_id);
768            }
769            // Remove completely empty chains (no versions left)
770            nodes.retain(|_, chain| !chain.is_empty());
771        }
772
773        // Remove uncommitted edge versions
774        {
775            let mut edges = self.edges.write();
776            for chain in edges.values_mut() {
777                chain.remove_versions_by(tx_id);
778            }
779            // Remove completely empty chains (no versions left)
780            edges.retain(|_, chain| !chain.is_empty());
781        }
782    }
783
784    /// Returns the number of distinct labels in the store.
785    #[must_use]
786    pub fn label_count(&self) -> usize {
787        self.id_to_label.read().len()
788    }
789
790    /// Returns the number of distinct property keys in the store.
791    ///
792    /// This counts unique property keys across both nodes and edges.
793    #[must_use]
794    pub fn property_key_count(&self) -> usize {
795        let node_keys = self.node_properties.column_count();
796        let edge_keys = self.edge_properties.column_count();
797        // Note: This may count some keys twice if the same key is used
798        // for both nodes and edges. A more precise count would require
799        // tracking unique keys across both storages.
800        node_keys + edge_keys
801    }
802
803    /// Returns the number of distinct edge types in the store.
804    #[must_use]
805    pub fn edge_type_count(&self) -> usize {
806        self.id_to_edge_type.read().len()
807    }
808
809    // === Traversal ===
810
811    /// Iterates over neighbors of a node in the specified direction.
812    ///
813    /// This is the fast path for graph traversal - goes straight to the
814    /// adjacency index without loading full node data.
815    pub fn neighbors(
816        &self,
817        node: NodeId,
818        direction: Direction,
819    ) -> impl Iterator<Item = NodeId> + '_ {
820        let forward: Box<dyn Iterator<Item = NodeId>> = match direction {
821            Direction::Outgoing | Direction::Both => {
822                Box::new(self.forward_adj.neighbors(node).into_iter())
823            }
824            Direction::Incoming => Box::new(std::iter::empty()),
825        };
826
827        let backward: Box<dyn Iterator<Item = NodeId>> = match direction {
828            Direction::Incoming | Direction::Both => {
829                if let Some(ref adj) = self.backward_adj {
830                    Box::new(adj.neighbors(node).into_iter())
831                } else {
832                    Box::new(std::iter::empty())
833                }
834            }
835            Direction::Outgoing => Box::new(std::iter::empty()),
836        };
837
838        forward.chain(backward)
839    }
840
841    /// Returns edges from a node with their targets.
842    ///
843    /// Returns an iterator of (target_node, edge_id) pairs.
844    pub fn edges_from(
845        &self,
846        node: NodeId,
847        direction: Direction,
848    ) -> impl Iterator<Item = (NodeId, EdgeId)> + '_ {
849        let forward: Box<dyn Iterator<Item = (NodeId, EdgeId)>> = match direction {
850            Direction::Outgoing | Direction::Both => {
851                Box::new(self.forward_adj.edges_from(node).into_iter())
852            }
853            Direction::Incoming => Box::new(std::iter::empty()),
854        };
855
856        let backward: Box<dyn Iterator<Item = (NodeId, EdgeId)>> = match direction {
857            Direction::Incoming | Direction::Both => {
858                if let Some(ref adj) = self.backward_adj {
859                    Box::new(adj.edges_from(node).into_iter())
860                } else {
861                    Box::new(std::iter::empty())
862                }
863            }
864            Direction::Outgoing => Box::new(std::iter::empty()),
865        };
866
867        forward.chain(backward)
868    }
869
870    /// Gets the type of an edge by ID.
871    #[must_use]
872    pub fn edge_type(&self, id: EdgeId) -> Option<Arc<str>> {
873        let edges = self.edges.read();
874        let chain = edges.get(&id)?;
875        let epoch = self.current_epoch();
876        let record = chain.visible_at(epoch)?;
877        let id_to_type = self.id_to_edge_type.read();
878        id_to_type.get(record.type_id as usize).cloned()
879    }
880
881    /// Returns all nodes with a specific label.
882    ///
883    /// Uses the label index for O(1) lookup per label. Returns a snapshot -
884    /// concurrent modifications won't affect the returned vector. Results are
885    /// sorted by NodeId for deterministic iteration order.
886    pub fn nodes_by_label(&self, label: &str) -> Vec<NodeId> {
887        let label_to_id = self.label_to_id.read();
888        if let Some(&label_id) = label_to_id.get(label) {
889            let index = self.label_index.read();
890            if let Some(set) = index.get(label_id as usize) {
891                let mut ids: Vec<NodeId> = set.keys().copied().collect();
892                ids.sort_unstable();
893                return ids;
894            }
895        }
896        Vec::new()
897    }
898
899    // === Admin API: Iteration ===
900
901    /// Returns an iterator over all nodes in the database.
902    ///
903    /// This creates a snapshot of all visible nodes at the current epoch.
904    /// Useful for dump/export operations.
905    pub fn all_nodes(&self) -> impl Iterator<Item = Node> + '_ {
906        let epoch = self.current_epoch();
907        let node_ids: Vec<NodeId> = self
908            .nodes
909            .read()
910            .iter()
911            .filter_map(|(id, chain)| {
912                chain
913                    .visible_at(epoch)
914                    .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
915            })
916            .collect();
917
918        node_ids.into_iter().filter_map(move |id| self.get_node(id))
919    }
920
921    /// Returns an iterator over all edges in the database.
922    ///
923    /// This creates a snapshot of all visible edges at the current epoch.
924    /// Useful for dump/export operations.
925    pub fn all_edges(&self) -> impl Iterator<Item = Edge> + '_ {
926        let epoch = self.current_epoch();
927        let edge_ids: Vec<EdgeId> = self
928            .edges
929            .read()
930            .iter()
931            .filter_map(|(id, chain)| {
932                chain
933                    .visible_at(epoch)
934                    .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
935            })
936            .collect();
937
938        edge_ids.into_iter().filter_map(move |id| self.get_edge(id))
939    }
940
941    /// Returns all label names in the database.
942    pub fn all_labels(&self) -> Vec<String> {
943        self.id_to_label
944            .read()
945            .iter()
946            .map(|s| s.to_string())
947            .collect()
948    }
949
950    /// Returns all edge type names in the database.
951    pub fn all_edge_types(&self) -> Vec<String> {
952        self.id_to_edge_type
953            .read()
954            .iter()
955            .map(|s| s.to_string())
956            .collect()
957    }
958
959    /// Returns all property keys used in the database.
960    pub fn all_property_keys(&self) -> Vec<String> {
961        let mut keys = std::collections::HashSet::new();
962        for key in self.node_properties.keys() {
963            keys.insert(key.to_string());
964        }
965        for key in self.edge_properties.keys() {
966            keys.insert(key.to_string());
967        }
968        keys.into_iter().collect()
969    }
970
971    /// Returns an iterator over nodes with a specific label.
972    pub fn nodes_with_label<'a>(&'a self, label: &str) -> impl Iterator<Item = Node> + 'a {
973        let node_ids = self.nodes_by_label(label);
974        node_ids.into_iter().filter_map(move |id| self.get_node(id))
975    }
976
977    /// Returns an iterator over edges with a specific type.
978    pub fn edges_with_type<'a>(&'a self, edge_type: &str) -> impl Iterator<Item = Edge> + 'a {
979        let epoch = self.current_epoch();
980        let type_to_id = self.edge_type_to_id.read();
981
982        if let Some(&type_id) = type_to_id.get(edge_type) {
983            let edge_ids: Vec<EdgeId> = self
984                .edges
985                .read()
986                .iter()
987                .filter_map(|(id, chain)| {
988                    chain.visible_at(epoch).and_then(|r| {
989                        if !r.is_deleted() && r.type_id == type_id {
990                            Some(*id)
991                        } else {
992                            None
993                        }
994                    })
995                })
996                .collect();
997
998            // Return a boxed iterator for the found edges
999            Box::new(edge_ids.into_iter().filter_map(move |id| self.get_edge(id)))
1000                as Box<dyn Iterator<Item = Edge> + 'a>
1001        } else {
1002            // Return empty iterator
1003            Box::new(std::iter::empty()) as Box<dyn Iterator<Item = Edge> + 'a>
1004        }
1005    }
1006
1007    // === Zone Map Support ===
1008
1009    /// Checks if a node property predicate might match any nodes.
1010    ///
1011    /// Uses zone maps for early filtering. Returns `true` if there might be
1012    /// matching nodes, `false` if there definitely aren't.
1013    #[must_use]
1014    pub fn node_property_might_match(
1015        &self,
1016        property: &PropertyKey,
1017        op: CompareOp,
1018        value: &Value,
1019    ) -> bool {
1020        self.node_properties.might_match(property, op, value)
1021    }
1022
1023    /// Checks if an edge property predicate might match any edges.
1024    #[must_use]
1025    pub fn edge_property_might_match(
1026        &self,
1027        property: &PropertyKey,
1028        op: CompareOp,
1029        value: &Value,
1030    ) -> bool {
1031        self.edge_properties.might_match(property, op, value)
1032    }
1033
1034    /// Gets the zone map for a node property.
1035    #[must_use]
1036    pub fn node_property_zone_map(&self, property: &PropertyKey) -> Option<ZoneMapEntry> {
1037        self.node_properties.zone_map(property)
1038    }
1039
1040    /// Gets the zone map for an edge property.
1041    #[must_use]
1042    pub fn edge_property_zone_map(&self, property: &PropertyKey) -> Option<ZoneMapEntry> {
1043        self.edge_properties.zone_map(property)
1044    }
1045
1046    /// Rebuilds zone maps for all properties.
1047    pub fn rebuild_zone_maps(&self) {
1048        self.node_properties.rebuild_zone_maps();
1049        self.edge_properties.rebuild_zone_maps();
1050    }
1051
1052    // === Statistics ===
1053
1054    /// Returns the current statistics.
1055    #[must_use]
1056    pub fn statistics(&self) -> Statistics {
1057        self.statistics.read().clone()
1058    }
1059
1060    /// Recomputes statistics from current data.
1061    ///
1062    /// Scans all labels and edge types to build cardinality estimates for the
1063    /// query optimizer. Call this periodically or after bulk data loads.
1064    pub fn compute_statistics(&self) {
1065        let mut stats = Statistics::new();
1066
1067        // Compute total counts
1068        stats.total_nodes = self.node_count() as u64;
1069        stats.total_edges = self.edge_count() as u64;
1070
1071        // Compute per-label statistics
1072        let id_to_label = self.id_to_label.read();
1073        let label_index = self.label_index.read();
1074
1075        for (label_id, label_name) in id_to_label.iter().enumerate() {
1076            let node_count = label_index
1077                .get(label_id)
1078                .map(|set| set.len() as u64)
1079                .unwrap_or(0);
1080
1081            if node_count > 0 {
1082                // Estimate average degree
1083                let avg_out_degree = if stats.total_nodes > 0 {
1084                    stats.total_edges as f64 / stats.total_nodes as f64
1085                } else {
1086                    0.0
1087                };
1088
1089                let label_stats =
1090                    LabelStatistics::new(node_count).with_degrees(avg_out_degree, avg_out_degree);
1091
1092                stats.update_label(label_name.as_ref(), label_stats);
1093            }
1094        }
1095
1096        // Compute per-edge-type statistics
1097        let id_to_edge_type = self.id_to_edge_type.read();
1098        let edges = self.edges.read();
1099        let epoch = self.current_epoch();
1100
1101        let mut edge_type_counts: FxHashMap<u32, u64> = FxHashMap::default();
1102        for chain in edges.values() {
1103            if let Some(record) = chain.visible_at(epoch) {
1104                if !record.is_deleted() {
1105                    *edge_type_counts.entry(record.type_id).or_default() += 1;
1106                }
1107            }
1108        }
1109
1110        for (type_id, count) in edge_type_counts {
1111            if let Some(type_name) = id_to_edge_type.get(type_id as usize) {
1112                let avg_degree = if stats.total_nodes > 0 {
1113                    count as f64 / stats.total_nodes as f64
1114                } else {
1115                    0.0
1116                };
1117
1118                let edge_stats = EdgeTypeStatistics::new(count, avg_degree, avg_degree);
1119                stats.update_edge_type(type_name.as_ref(), edge_stats);
1120            }
1121        }
1122
1123        *self.statistics.write() = stats;
1124    }
1125
1126    /// Estimates cardinality for a label scan.
1127    #[must_use]
1128    pub fn estimate_label_cardinality(&self, label: &str) -> f64 {
1129        self.statistics.read().estimate_label_cardinality(label)
1130    }
1131
1132    /// Estimates average degree for an edge type.
1133    #[must_use]
1134    pub fn estimate_avg_degree(&self, edge_type: &str, outgoing: bool) -> f64 {
1135        self.statistics
1136            .read()
1137            .estimate_avg_degree(edge_type, outgoing)
1138    }
1139
1140    // === Internal Helpers ===
1141
1142    fn get_or_create_label_id(&self, label: &str) -> u32 {
1143        {
1144            let label_to_id = self.label_to_id.read();
1145            if let Some(&id) = label_to_id.get(label) {
1146                return id;
1147            }
1148        }
1149
1150        let mut label_to_id = self.label_to_id.write();
1151        let mut id_to_label = self.id_to_label.write();
1152
1153        // Double-check after acquiring write lock
1154        if let Some(&id) = label_to_id.get(label) {
1155            return id;
1156        }
1157
1158        let id = id_to_label.len() as u32;
1159
1160        let label: Arc<str> = label.into();
1161        label_to_id.insert(label.clone(), id);
1162        id_to_label.push(label);
1163
1164        id
1165    }
1166
1167    fn get_or_create_edge_type_id(&self, edge_type: &str) -> u32 {
1168        {
1169            let type_to_id = self.edge_type_to_id.read();
1170            if let Some(&id) = type_to_id.get(edge_type) {
1171                return id;
1172            }
1173        }
1174
1175        let mut type_to_id = self.edge_type_to_id.write();
1176        let mut id_to_type = self.id_to_edge_type.write();
1177
1178        // Double-check
1179        if let Some(&id) = type_to_id.get(edge_type) {
1180            return id;
1181        }
1182
1183        let id = id_to_type.len() as u32;
1184        let edge_type: Arc<str> = edge_type.into();
1185        type_to_id.insert(edge_type.clone(), id);
1186        id_to_type.push(edge_type);
1187
1188        id
1189    }
1190
1191    // === Recovery Support ===
1192
1193    /// Creates a node with a specific ID during recovery.
1194    ///
1195    /// This is used for WAL recovery to restore nodes with their original IDs.
1196    /// The caller must ensure IDs don't conflict with existing nodes.
1197    pub fn create_node_with_id(&self, id: NodeId, labels: &[&str]) {
1198        let epoch = self.current_epoch();
1199        let mut record = NodeRecord::new(id, epoch);
1200        record.set_label_count(labels.len() as u16);
1201
1202        // Store labels in node_labels map and label_index
1203        let mut node_label_set = FxHashSet::default();
1204        for label in labels {
1205            let label_id = self.get_or_create_label_id(*label);
1206            node_label_set.insert(label_id);
1207
1208            // Update label index
1209            let mut index = self.label_index.write();
1210            while index.len() <= label_id as usize {
1211                index.push(FxHashMap::default());
1212            }
1213            index[label_id as usize].insert(id, ());
1214        }
1215
1216        // Store node's labels
1217        self.node_labels.write().insert(id, node_label_set);
1218
1219        // Create version chain with initial version (using SYSTEM tx for recovery)
1220        let chain = VersionChain::with_initial(record, epoch, TxId::SYSTEM);
1221        self.nodes.write().insert(id, chain);
1222
1223        // Update next_node_id if necessary to avoid future collisions
1224        let id_val = id.as_u64();
1225        let _ = self
1226            .next_node_id
1227            .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
1228                if id_val >= current {
1229                    Some(id_val + 1)
1230                } else {
1231                    None
1232                }
1233            });
1234    }
1235
1236    /// Creates an edge with a specific ID during recovery.
1237    ///
1238    /// This is used for WAL recovery to restore edges with their original IDs.
1239    pub fn create_edge_with_id(&self, id: EdgeId, src: NodeId, dst: NodeId, edge_type: &str) {
1240        let epoch = self.current_epoch();
1241        let type_id = self.get_or_create_edge_type_id(edge_type);
1242
1243        let record = EdgeRecord::new(id, src, dst, type_id, epoch);
1244        let chain = VersionChain::with_initial(record, epoch, TxId::SYSTEM);
1245        self.edges.write().insert(id, chain);
1246
1247        // Update adjacency
1248        self.forward_adj.add_edge(src, dst, id);
1249        if let Some(ref backward) = self.backward_adj {
1250            backward.add_edge(dst, src, id);
1251        }
1252
1253        // Update next_edge_id if necessary
1254        let id_val = id.as_u64();
1255        let _ = self
1256            .next_edge_id
1257            .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
1258                if id_val >= current {
1259                    Some(id_val + 1)
1260                } else {
1261                    None
1262                }
1263            });
1264    }
1265
1266    /// Sets the current epoch during recovery.
1267    pub fn set_epoch(&self, epoch: EpochId) {
1268        self.current_epoch.store(epoch.as_u64(), Ordering::SeqCst);
1269    }
1270}
1271
1272impl Default for LpgStore {
1273    fn default() -> Self {
1274        Self::new()
1275    }
1276}
1277
1278#[cfg(test)]
1279mod tests {
1280    use super::*;
1281
1282    #[test]
1283    fn test_create_node() {
1284        let store = LpgStore::new();
1285
1286        let id = store.create_node(&["Person"]);
1287        assert!(id.is_valid());
1288
1289        let node = store.get_node(id).unwrap();
1290        assert!(node.has_label("Person"));
1291        assert!(!node.has_label("Animal"));
1292    }
1293
1294    #[test]
1295    fn test_create_node_with_props() {
1296        let store = LpgStore::new();
1297
1298        let id = store.create_node_with_props(
1299            &["Person"],
1300            [("name", Value::from("Alice")), ("age", Value::from(30i64))],
1301        );
1302
1303        let node = store.get_node(id).unwrap();
1304        assert_eq!(
1305            node.get_property("name").and_then(|v| v.as_str()),
1306            Some("Alice")
1307        );
1308        assert_eq!(
1309            node.get_property("age").and_then(|v| v.as_int64()),
1310            Some(30)
1311        );
1312    }
1313
1314    #[test]
1315    fn test_delete_node() {
1316        let store = LpgStore::new();
1317
1318        let id = store.create_node(&["Person"]);
1319        assert_eq!(store.node_count(), 1);
1320
1321        assert!(store.delete_node(id));
1322        assert_eq!(store.node_count(), 0);
1323        assert!(store.get_node(id).is_none());
1324
1325        // Double delete should return false
1326        assert!(!store.delete_node(id));
1327    }
1328
1329    #[test]
1330    fn test_create_edge() {
1331        let store = LpgStore::new();
1332
1333        let alice = store.create_node(&["Person"]);
1334        let bob = store.create_node(&["Person"]);
1335
1336        let edge_id = store.create_edge(alice, bob, "KNOWS");
1337        assert!(edge_id.is_valid());
1338
1339        let edge = store.get_edge(edge_id).unwrap();
1340        assert_eq!(edge.src, alice);
1341        assert_eq!(edge.dst, bob);
1342        assert_eq!(edge.edge_type.as_ref(), "KNOWS");
1343    }
1344
1345    #[test]
1346    fn test_neighbors() {
1347        let store = LpgStore::new();
1348
1349        let a = store.create_node(&["Person"]);
1350        let b = store.create_node(&["Person"]);
1351        let c = store.create_node(&["Person"]);
1352
1353        store.create_edge(a, b, "KNOWS");
1354        store.create_edge(a, c, "KNOWS");
1355
1356        let outgoing: Vec<_> = store.neighbors(a, Direction::Outgoing).collect();
1357        assert_eq!(outgoing.len(), 2);
1358        assert!(outgoing.contains(&b));
1359        assert!(outgoing.contains(&c));
1360
1361        let incoming: Vec<_> = store.neighbors(b, Direction::Incoming).collect();
1362        assert_eq!(incoming.len(), 1);
1363        assert!(incoming.contains(&a));
1364    }
1365
1366    #[test]
1367    fn test_nodes_by_label() {
1368        let store = LpgStore::new();
1369
1370        let p1 = store.create_node(&["Person"]);
1371        let p2 = store.create_node(&["Person"]);
1372        let _a = store.create_node(&["Animal"]);
1373
1374        let persons = store.nodes_by_label("Person");
1375        assert_eq!(persons.len(), 2);
1376        assert!(persons.contains(&p1));
1377        assert!(persons.contains(&p2));
1378
1379        let animals = store.nodes_by_label("Animal");
1380        assert_eq!(animals.len(), 1);
1381    }
1382
1383    #[test]
1384    fn test_delete_edge() {
1385        let store = LpgStore::new();
1386
1387        let a = store.create_node(&["Person"]);
1388        let b = store.create_node(&["Person"]);
1389        let edge_id = store.create_edge(a, b, "KNOWS");
1390
1391        assert_eq!(store.edge_count(), 1);
1392
1393        assert!(store.delete_edge(edge_id));
1394        assert_eq!(store.edge_count(), 0);
1395        assert!(store.get_edge(edge_id).is_none());
1396    }
1397}