Skip to main content

grafeo_core/graph/lpg/
store.rs

1//! The in-memory LPG graph store.
2//!
3//! This is where your nodes and edges actually live. Most users interact
4//! through [`GrafeoDB`](grafeo_engine::GrafeoDB), but algorithm implementers
5//! sometimes need the raw [`LpgStore`] for direct adjacency traversal.
6//!
7//! Key features:
8//! - MVCC versioning - concurrent readers don't block each other
9//! - Columnar properties with zone maps for fast filtering
10//! - Forward and backward adjacency indexes
11
12use super::property::CompareOp;
13use super::{Edge, EdgeRecord, Node, NodeRecord, PropertyStorage};
14use crate::graph::Direction;
15use crate::index::adjacency::ChunkedAdjacency;
16use crate::index::zone_map::ZoneMapEntry;
17use crate::statistics::{EdgeTypeStatistics, LabelStatistics, Statistics};
18use grafeo_common::mvcc::VersionChain;
19use grafeo_common::types::{EdgeId, EpochId, NodeId, PropertyKey, TxId, Value};
20use grafeo_common::utils::hash::{FxHashMap, FxHashSet};
21use parking_lot::RwLock;
22use std::sync::Arc;
23use std::sync::atomic::{AtomicU64, Ordering};
24
25/// Configuration for the LPG store.
26///
27/// The defaults work well for most cases. Tune `backward_edges` if you only
28/// traverse in one direction (saves memory), or adjust capacities if you know
29/// your graph size upfront (avoids reallocations).
30#[derive(Debug, Clone)]
31pub struct LpgStoreConfig {
32    /// Maintain backward adjacency for incoming edge queries. Turn off if
33    /// you only traverse outgoing edges - saves ~50% adjacency memory.
34    pub backward_edges: bool,
35    /// Initial capacity for nodes (avoids early reallocations).
36    pub initial_node_capacity: usize,
37    /// Initial capacity for edges (avoids early reallocations).
38    pub initial_edge_capacity: usize,
39}
40
41impl Default for LpgStoreConfig {
42    fn default() -> Self {
43        Self {
44            backward_edges: true,
45            initial_node_capacity: 1024,
46            initial_edge_capacity: 4096,
47        }
48    }
49}
50
51/// The core in-memory graph storage.
52///
53/// Everything lives here: nodes, edges, properties, adjacency indexes, and
54/// version chains for MVCC. Concurrent reads never block each other.
55///
56/// Most users should go through `GrafeoDB` (from the `grafeo_engine` crate) which
57/// adds transaction management and query execution. Use `LpgStore` directly
58/// when you need raw performance for algorithm implementations.
59///
60/// # Example
61///
62/// ```
63/// use grafeo_core::graph::lpg::LpgStore;
64/// use grafeo_core::graph::Direction;
65///
66/// let store = LpgStore::new();
67///
68/// // Create a small social network
69/// let alice = store.create_node(&["Person"]);
70/// let bob = store.create_node(&["Person"]);
71/// store.create_edge(alice, bob, "KNOWS");
72///
73/// // Traverse outgoing edges
74/// for neighbor in store.neighbors(alice, Direction::Outgoing) {
75///     println!("Alice knows node {:?}", neighbor);
76/// }
77/// ```
78pub struct LpgStore {
79    /// Configuration.
80    #[allow(dead_code)]
81    config: LpgStoreConfig,
82
83    /// Node records indexed by NodeId, with version chains for MVCC.
84    nodes: RwLock<FxHashMap<NodeId, VersionChain<NodeRecord>>>,
85
86    /// Edge records indexed by EdgeId, with version chains for MVCC.
87    edges: RwLock<FxHashMap<EdgeId, VersionChain<EdgeRecord>>>,
88
89    /// Property storage for nodes.
90    node_properties: PropertyStorage<NodeId>,
91
92    /// Property storage for edges.
93    edge_properties: PropertyStorage<EdgeId>,
94
95    /// Label name to ID mapping.
96    label_to_id: RwLock<FxHashMap<Arc<str>, u32>>,
97
98    /// Label ID to name mapping.
99    id_to_label: RwLock<Vec<Arc<str>>>,
100
101    /// Edge type name to ID mapping.
102    edge_type_to_id: RwLock<FxHashMap<Arc<str>, u32>>,
103
104    /// Edge type ID to name mapping.
105    id_to_edge_type: RwLock<Vec<Arc<str>>>,
106
107    /// Forward adjacency lists (outgoing edges).
108    forward_adj: ChunkedAdjacency,
109
110    /// Backward adjacency lists (incoming edges).
111    /// Only populated if config.backward_edges is true.
112    backward_adj: Option<ChunkedAdjacency>,
113
114    /// Label index: label_id -> set of node IDs.
115    label_index: RwLock<Vec<FxHashMap<NodeId, ()>>>,
116
117    /// Node labels: node_id -> set of label IDs.
118    /// Reverse mapping to efficiently get labels for a node.
119    node_labels: RwLock<FxHashMap<NodeId, FxHashSet<u32>>>,
120
121    /// Next node ID.
122    next_node_id: AtomicU64,
123
124    /// Next edge ID.
125    next_edge_id: AtomicU64,
126
127    /// Current epoch.
128    current_epoch: AtomicU64,
129
130    /// Statistics for cost-based optimization.
131    statistics: RwLock<Statistics>,
132}
133
134impl LpgStore {
135    /// Creates a new LPG store with default configuration.
136    #[must_use]
137    pub fn new() -> Self {
138        Self::with_config(LpgStoreConfig::default())
139    }
140
141    /// Creates a new LPG store with custom configuration.
142    #[must_use]
143    pub fn with_config(config: LpgStoreConfig) -> Self {
144        let backward_adj = if config.backward_edges {
145            Some(ChunkedAdjacency::new())
146        } else {
147            None
148        };
149
150        Self {
151            nodes: RwLock::new(FxHashMap::default()),
152            edges: RwLock::new(FxHashMap::default()),
153            node_properties: PropertyStorage::new(),
154            edge_properties: PropertyStorage::new(),
155            label_to_id: RwLock::new(FxHashMap::default()),
156            id_to_label: RwLock::new(Vec::new()),
157            edge_type_to_id: RwLock::new(FxHashMap::default()),
158            id_to_edge_type: RwLock::new(Vec::new()),
159            forward_adj: ChunkedAdjacency::new(),
160            backward_adj,
161            label_index: RwLock::new(Vec::new()),
162            node_labels: RwLock::new(FxHashMap::default()),
163            next_node_id: AtomicU64::new(0),
164            next_edge_id: AtomicU64::new(0),
165            current_epoch: AtomicU64::new(0),
166            statistics: RwLock::new(Statistics::new()),
167            config,
168        }
169    }
170
171    /// Returns the current epoch.
172    #[must_use]
173    pub fn current_epoch(&self) -> EpochId {
174        EpochId::new(self.current_epoch.load(Ordering::Acquire))
175    }
176
177    /// Creates a new epoch.
178    pub fn new_epoch(&self) -> EpochId {
179        let id = self.current_epoch.fetch_add(1, Ordering::AcqRel) + 1;
180        EpochId::new(id)
181    }
182
183    // === Node Operations ===
184
185    /// Creates a new node with the given labels.
186    ///
187    /// Uses the system transaction for non-transactional operations.
188    pub fn create_node(&self, labels: &[&str]) -> NodeId {
189        self.create_node_versioned(labels, self.current_epoch(), TxId::SYSTEM)
190    }
191
192    /// Creates a new node with the given labels within a transaction context.
193    pub fn create_node_versioned(&self, labels: &[&str], epoch: EpochId, tx_id: TxId) -> NodeId {
194        let id = NodeId::new(self.next_node_id.fetch_add(1, Ordering::Relaxed));
195
196        let mut record = NodeRecord::new(id, epoch);
197        record.set_label_count(labels.len() as u16);
198
199        // Store labels in node_labels map and label_index
200        let mut node_label_set = FxHashSet::default();
201        for label in labels {
202            let label_id = self.get_or_create_label_id(*label);
203            node_label_set.insert(label_id);
204
205            // Update label index
206            let mut index = self.label_index.write();
207            while index.len() <= label_id as usize {
208                index.push(FxHashMap::default());
209            }
210            index[label_id as usize].insert(id, ());
211        }
212
213        // Store node's labels
214        self.node_labels.write().insert(id, node_label_set);
215
216        // Create version chain with initial version
217        let chain = VersionChain::with_initial(record, epoch, tx_id);
218        self.nodes.write().insert(id, chain);
219        id
220    }
221
222    /// Creates a new node with labels and properties.
223    pub fn create_node_with_props(
224        &self,
225        labels: &[&str],
226        properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
227    ) -> NodeId {
228        self.create_node_with_props_versioned(
229            labels,
230            properties,
231            self.current_epoch(),
232            TxId::SYSTEM,
233        )
234    }
235
236    /// Creates a new node with labels and properties within a transaction context.
237    pub fn create_node_with_props_versioned(
238        &self,
239        labels: &[&str],
240        properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
241        epoch: EpochId,
242        tx_id: TxId,
243    ) -> NodeId {
244        let id = self.create_node_versioned(labels, epoch, tx_id);
245
246        for (key, value) in properties {
247            self.node_properties.set(id, key.into(), value.into());
248        }
249
250        // Update props_count in record
251        let count = self.node_properties.get_all(id).len() as u16;
252        if let Some(chain) = self.nodes.write().get_mut(&id) {
253            if let Some(record) = chain.latest_mut() {
254                record.props_count = count;
255            }
256        }
257
258        id
259    }
260
261    /// Gets a node by ID (latest visible version).
262    #[must_use]
263    pub fn get_node(&self, id: NodeId) -> Option<Node> {
264        self.get_node_at_epoch(id, self.current_epoch())
265    }
266
267    /// Gets a node by ID at a specific epoch.
268    #[must_use]
269    pub fn get_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> Option<Node> {
270        let nodes = self.nodes.read();
271        let chain = nodes.get(&id)?;
272        let record = chain.visible_at(epoch)?;
273
274        if record.is_deleted() {
275            return None;
276        }
277
278        let mut node = Node::new(id);
279
280        // Get labels from node_labels map
281        let id_to_label = self.id_to_label.read();
282        let node_labels = self.node_labels.read();
283        if let Some(label_ids) = node_labels.get(&id) {
284            for &label_id in label_ids {
285                if let Some(label) = id_to_label.get(label_id as usize) {
286                    node.labels.push(label.clone());
287                }
288            }
289        }
290
291        // Get properties
292        node.properties = self.node_properties.get_all(id).into_iter().collect();
293
294        Some(node)
295    }
296
297    /// Gets a node visible to a specific transaction.
298    #[must_use]
299    pub fn get_node_versioned(&self, id: NodeId, epoch: EpochId, tx_id: TxId) -> Option<Node> {
300        let nodes = self.nodes.read();
301        let chain = nodes.get(&id)?;
302        let record = chain.visible_to(epoch, tx_id)?;
303
304        if record.is_deleted() {
305            return None;
306        }
307
308        let mut node = Node::new(id);
309
310        // Get labels from node_labels map
311        let id_to_label = self.id_to_label.read();
312        let node_labels = self.node_labels.read();
313        if let Some(label_ids) = node_labels.get(&id) {
314            for &label_id in label_ids {
315                if let Some(label) = id_to_label.get(label_id as usize) {
316                    node.labels.push(label.clone());
317                }
318            }
319        }
320
321        // Get properties
322        node.properties = self.node_properties.get_all(id).into_iter().collect();
323
324        Some(node)
325    }
326
327    /// Deletes a node and all its edges (using latest epoch).
328    pub fn delete_node(&self, id: NodeId) -> bool {
329        self.delete_node_at_epoch(id, self.current_epoch())
330    }
331
332    /// Deletes a node at a specific epoch.
333    pub fn delete_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> bool {
334        let mut nodes = self.nodes.write();
335        if let Some(chain) = nodes.get_mut(&id) {
336            // Check if visible at this epoch (not already deleted)
337            if let Some(record) = chain.visible_at(epoch) {
338                if record.is_deleted() {
339                    return false;
340                }
341            } else {
342                // Not visible at this epoch (already deleted or doesn't exist)
343                return false;
344            }
345
346            // Mark the version chain as deleted at this epoch
347            chain.mark_deleted(epoch);
348
349            // Remove from label index using node_labels map
350            let mut index = self.label_index.write();
351            let mut node_labels = self.node_labels.write();
352            if let Some(label_ids) = node_labels.remove(&id) {
353                for label_id in label_ids {
354                    if let Some(set) = index.get_mut(label_id as usize) {
355                        set.remove(&id);
356                    }
357                }
358            }
359
360            // Remove properties
361            drop(nodes); // Release lock before removing properties
362            drop(index);
363            drop(node_labels);
364            self.node_properties.remove_all(id);
365
366            // Note: Caller should use delete_node_edges() first if detach is needed
367
368            true
369        } else {
370            false
371        }
372    }
373
374    /// Deletes all edges connected to a node (implements DETACH DELETE).
375    ///
376    /// Call this before `delete_node()` if you want to remove a node that
377    /// has edges. Grafeo doesn't auto-delete edges - you have to be explicit.
378    pub fn delete_node_edges(&self, node_id: NodeId) {
379        // Get outgoing edges
380        let outgoing: Vec<EdgeId> = self
381            .forward_adj
382            .edges_from(node_id)
383            .into_iter()
384            .map(|(_, edge_id)| edge_id)
385            .collect();
386
387        // Get incoming edges
388        let incoming: Vec<EdgeId> = if let Some(ref backward) = self.backward_adj {
389            backward
390                .edges_from(node_id)
391                .into_iter()
392                .map(|(_, edge_id)| edge_id)
393                .collect()
394        } else {
395            // No backward adjacency - scan all edges
396            let epoch = self.current_epoch();
397            self.edges
398                .read()
399                .iter()
400                .filter_map(|(id, chain)| {
401                    chain.visible_at(epoch).and_then(|r| {
402                        if !r.is_deleted() && r.dst == node_id {
403                            Some(*id)
404                        } else {
405                            None
406                        }
407                    })
408                })
409                .collect()
410        };
411
412        // Delete all edges
413        for edge_id in outgoing.into_iter().chain(incoming) {
414            self.delete_edge(edge_id);
415        }
416    }
417
418    /// Sets a property on a node.
419    pub fn set_node_property(&self, id: NodeId, key: &str, value: Value) {
420        self.node_properties.set(id, key.into(), value);
421
422        // Update props_count in record
423        let count = self.node_properties.get_all(id).len() as u16;
424        if let Some(chain) = self.nodes.write().get_mut(&id) {
425            if let Some(record) = chain.latest_mut() {
426                record.props_count = count;
427            }
428        }
429    }
430
431    /// Sets a property on an edge.
432    pub fn set_edge_property(&self, id: EdgeId, key: &str, value: Value) {
433        self.edge_properties.set(id, key.into(), value);
434    }
435
436    /// Removes a property from a node.
437    ///
438    /// Returns the previous value if it existed, or None if the property didn't exist.
439    pub fn remove_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
440        let result = self.node_properties.remove(id, &key.into());
441
442        // Update props_count in record
443        let count = self.node_properties.get_all(id).len() as u16;
444        if let Some(chain) = self.nodes.write().get_mut(&id) {
445            if let Some(record) = chain.latest_mut() {
446                record.props_count = count;
447            }
448        }
449
450        result
451    }
452
453    /// Removes a property from an edge.
454    ///
455    /// Returns the previous value if it existed, or None if the property didn't exist.
456    pub fn remove_edge_property(&self, id: EdgeId, key: &str) -> Option<Value> {
457        self.edge_properties.remove(id, &key.into())
458    }
459
460    /// Adds a label to a node.
461    ///
462    /// Returns true if the label was added, false if the node doesn't exist
463    /// or already has the label.
464    pub fn add_label(&self, node_id: NodeId, label: &str) -> bool {
465        let epoch = self.current_epoch();
466
467        // Check if node exists
468        let nodes = self.nodes.read();
469        if let Some(chain) = nodes.get(&node_id) {
470            if chain.visible_at(epoch).map_or(true, |r| r.is_deleted()) {
471                return false;
472            }
473        } else {
474            return false;
475        }
476        drop(nodes);
477
478        // Get or create label ID
479        let label_id = self.get_or_create_label_id(label);
480
481        // Add to node_labels map
482        let mut node_labels = self.node_labels.write();
483        let label_set = node_labels
484            .entry(node_id)
485            .or_insert_with(FxHashSet::default);
486
487        if label_set.contains(&label_id) {
488            return false; // Already has this label
489        }
490
491        label_set.insert(label_id);
492        drop(node_labels);
493
494        // Add to label_index
495        let mut index = self.label_index.write();
496        if (label_id as usize) >= index.len() {
497            index.resize(label_id as usize + 1, FxHashMap::default());
498        }
499        index[label_id as usize].insert(node_id, ());
500
501        // Update label count in node record
502        if let Some(chain) = self.nodes.write().get_mut(&node_id) {
503            if let Some(record) = chain.latest_mut() {
504                let count = self.node_labels.read().get(&node_id).map_or(0, |s| s.len());
505                record.set_label_count(count as u16);
506            }
507        }
508
509        true
510    }
511
512    /// Removes a label from a node.
513    ///
514    /// Returns true if the label was removed, false if the node doesn't exist
515    /// or doesn't have the label.
516    pub fn remove_label(&self, node_id: NodeId, label: &str) -> bool {
517        let epoch = self.current_epoch();
518
519        // Check if node exists
520        let nodes = self.nodes.read();
521        if let Some(chain) = nodes.get(&node_id) {
522            if chain.visible_at(epoch).map_or(true, |r| r.is_deleted()) {
523                return false;
524            }
525        } else {
526            return false;
527        }
528        drop(nodes);
529
530        // Get label ID
531        let label_id = {
532            let label_ids = self.label_to_id.read();
533            match label_ids.get(label) {
534                Some(&id) => id,
535                None => return false, // Label doesn't exist
536            }
537        };
538
539        // Remove from node_labels map
540        let mut node_labels = self.node_labels.write();
541        if let Some(label_set) = node_labels.get_mut(&node_id) {
542            if !label_set.remove(&label_id) {
543                return false; // Node doesn't have this label
544            }
545        } else {
546            return false;
547        }
548        drop(node_labels);
549
550        // Remove from label_index
551        let mut index = self.label_index.write();
552        if (label_id as usize) < index.len() {
553            index[label_id as usize].remove(&node_id);
554        }
555
556        // Update label count in node record
557        if let Some(chain) = self.nodes.write().get_mut(&node_id) {
558            if let Some(record) = chain.latest_mut() {
559                let count = self.node_labels.read().get(&node_id).map_or(0, |s| s.len());
560                record.set_label_count(count as u16);
561            }
562        }
563
564        true
565    }
566
567    /// Returns the number of nodes (non-deleted at current epoch).
568    #[must_use]
569    pub fn node_count(&self) -> usize {
570        let epoch = self.current_epoch();
571        self.nodes
572            .read()
573            .values()
574            .filter_map(|chain| chain.visible_at(epoch))
575            .filter(|r| !r.is_deleted())
576            .count()
577    }
578
579    /// Returns all node IDs in the store.
580    ///
581    /// This returns a snapshot of current node IDs. The returned vector
582    /// excludes deleted nodes.
583    #[must_use]
584    pub fn node_ids(&self) -> Vec<NodeId> {
585        let epoch = self.current_epoch();
586        self.nodes
587            .read()
588            .iter()
589            .filter_map(|(id, chain)| {
590                chain
591                    .visible_at(epoch)
592                    .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
593            })
594            .collect()
595    }
596
597    // === Edge Operations ===
598
599    /// Creates a new edge.
600    pub fn create_edge(&self, src: NodeId, dst: NodeId, edge_type: &str) -> EdgeId {
601        self.create_edge_versioned(src, dst, edge_type, self.current_epoch(), TxId::SYSTEM)
602    }
603
604    /// Creates a new edge within a transaction context.
605    pub fn create_edge_versioned(
606        &self,
607        src: NodeId,
608        dst: NodeId,
609        edge_type: &str,
610        epoch: EpochId,
611        tx_id: TxId,
612    ) -> EdgeId {
613        let id = EdgeId::new(self.next_edge_id.fetch_add(1, Ordering::Relaxed));
614        let type_id = self.get_or_create_edge_type_id(edge_type);
615
616        let record = EdgeRecord::new(id, src, dst, type_id, epoch);
617        let chain = VersionChain::with_initial(record, epoch, tx_id);
618        self.edges.write().insert(id, chain);
619
620        // Update adjacency
621        self.forward_adj.add_edge(src, dst, id);
622        if let Some(ref backward) = self.backward_adj {
623            backward.add_edge(dst, src, id);
624        }
625
626        id
627    }
628
629    /// Creates a new edge with properties.
630    pub fn create_edge_with_props(
631        &self,
632        src: NodeId,
633        dst: NodeId,
634        edge_type: &str,
635        properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
636    ) -> EdgeId {
637        let id = self.create_edge(src, dst, edge_type);
638
639        for (key, value) in properties {
640            self.edge_properties.set(id, key.into(), value.into());
641        }
642
643        id
644    }
645
646    /// Gets an edge by ID (latest visible version).
647    #[must_use]
648    pub fn get_edge(&self, id: EdgeId) -> Option<Edge> {
649        self.get_edge_at_epoch(id, self.current_epoch())
650    }
651
652    /// Gets an edge by ID at a specific epoch.
653    #[must_use]
654    pub fn get_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> Option<Edge> {
655        let edges = self.edges.read();
656        let chain = edges.get(&id)?;
657        let record = chain.visible_at(epoch)?;
658
659        if record.is_deleted() {
660            return None;
661        }
662
663        let edge_type = {
664            let id_to_type = self.id_to_edge_type.read();
665            id_to_type.get(record.type_id as usize)?.clone()
666        };
667
668        let mut edge = Edge::new(id, record.src, record.dst, edge_type);
669
670        // Get properties
671        edge.properties = self.edge_properties.get_all(id).into_iter().collect();
672
673        Some(edge)
674    }
675
676    /// Gets an edge visible to a specific transaction.
677    #[must_use]
678    pub fn get_edge_versioned(&self, id: EdgeId, epoch: EpochId, tx_id: TxId) -> Option<Edge> {
679        let edges = self.edges.read();
680        let chain = edges.get(&id)?;
681        let record = chain.visible_to(epoch, tx_id)?;
682
683        if record.is_deleted() {
684            return None;
685        }
686
687        let edge_type = {
688            let id_to_type = self.id_to_edge_type.read();
689            id_to_type.get(record.type_id as usize)?.clone()
690        };
691
692        let mut edge = Edge::new(id, record.src, record.dst, edge_type);
693
694        // Get properties
695        edge.properties = self.edge_properties.get_all(id).into_iter().collect();
696
697        Some(edge)
698    }
699
700    /// Deletes an edge (using latest epoch).
701    pub fn delete_edge(&self, id: EdgeId) -> bool {
702        self.delete_edge_at_epoch(id, self.current_epoch())
703    }
704
705    /// Deletes an edge at a specific epoch.
706    pub fn delete_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> bool {
707        let mut edges = self.edges.write();
708        if let Some(chain) = edges.get_mut(&id) {
709            // Get the visible record to check if deleted and get src/dst
710            let (src, dst) = {
711                match chain.visible_at(epoch) {
712                    Some(record) => {
713                        if record.is_deleted() {
714                            return false;
715                        }
716                        (record.src, record.dst)
717                    }
718                    None => return false, // Not visible at this epoch (already deleted)
719                }
720            };
721
722            // Mark the version chain as deleted
723            chain.mark_deleted(epoch);
724
725            drop(edges); // Release lock
726
727            // Mark as deleted in adjacency (soft delete)
728            self.forward_adj.mark_deleted(src, id);
729            if let Some(ref backward) = self.backward_adj {
730                backward.mark_deleted(dst, id);
731            }
732
733            // Remove properties
734            self.edge_properties.remove_all(id);
735
736            true
737        } else {
738            false
739        }
740    }
741
742    /// Returns the number of edges (non-deleted at current epoch).
743    #[must_use]
744    pub fn edge_count(&self) -> usize {
745        let epoch = self.current_epoch();
746        self.edges
747            .read()
748            .values()
749            .filter_map(|chain| chain.visible_at(epoch))
750            .filter(|r| !r.is_deleted())
751            .count()
752    }
753
754    /// Discards all uncommitted versions created by a transaction.
755    ///
756    /// This is called during transaction rollback to clean up uncommitted changes.
757    /// The method removes version chain entries created by the specified transaction.
758    pub fn discard_uncommitted_versions(&self, tx_id: TxId) {
759        // Remove uncommitted node versions
760        {
761            let mut nodes = self.nodes.write();
762            for chain in nodes.values_mut() {
763                chain.remove_versions_by(tx_id);
764            }
765            // Remove completely empty chains (no versions left)
766            nodes.retain(|_, chain| !chain.is_empty());
767        }
768
769        // Remove uncommitted edge versions
770        {
771            let mut edges = self.edges.write();
772            for chain in edges.values_mut() {
773                chain.remove_versions_by(tx_id);
774            }
775            // Remove completely empty chains (no versions left)
776            edges.retain(|_, chain| !chain.is_empty());
777        }
778    }
779
780    /// Returns the number of distinct labels in the store.
781    #[must_use]
782    pub fn label_count(&self) -> usize {
783        self.id_to_label.read().len()
784    }
785
786    /// Returns the number of distinct property keys in the store.
787    ///
788    /// This counts unique property keys across both nodes and edges.
789    #[must_use]
790    pub fn property_key_count(&self) -> usize {
791        let node_keys = self.node_properties.column_count();
792        let edge_keys = self.edge_properties.column_count();
793        // Note: This may count some keys twice if the same key is used
794        // for both nodes and edges. A more precise count would require
795        // tracking unique keys across both storages.
796        node_keys + edge_keys
797    }
798
799    /// Returns the number of distinct edge types in the store.
800    #[must_use]
801    pub fn edge_type_count(&self) -> usize {
802        self.id_to_edge_type.read().len()
803    }
804
805    // === Traversal ===
806
807    /// Iterates over neighbors of a node in the specified direction.
808    ///
809    /// This is the fast path for graph traversal - goes straight to the
810    /// adjacency index without loading full node data.
811    pub fn neighbors(
812        &self,
813        node: NodeId,
814        direction: Direction,
815    ) -> impl Iterator<Item = NodeId> + '_ {
816        let forward: Box<dyn Iterator<Item = NodeId>> = match direction {
817            Direction::Outgoing | Direction::Both => {
818                Box::new(self.forward_adj.neighbors(node).into_iter())
819            }
820            Direction::Incoming => Box::new(std::iter::empty()),
821        };
822
823        let backward: Box<dyn Iterator<Item = NodeId>> = match direction {
824            Direction::Incoming | Direction::Both => {
825                if let Some(ref adj) = self.backward_adj {
826                    Box::new(adj.neighbors(node).into_iter())
827                } else {
828                    Box::new(std::iter::empty())
829                }
830            }
831            Direction::Outgoing => Box::new(std::iter::empty()),
832        };
833
834        forward.chain(backward)
835    }
836
837    /// Returns edges from a node with their targets.
838    ///
839    /// Returns an iterator of (target_node, edge_id) pairs.
840    pub fn edges_from(
841        &self,
842        node: NodeId,
843        direction: Direction,
844    ) -> impl Iterator<Item = (NodeId, EdgeId)> + '_ {
845        let forward: Box<dyn Iterator<Item = (NodeId, EdgeId)>> = match direction {
846            Direction::Outgoing | Direction::Both => {
847                Box::new(self.forward_adj.edges_from(node).into_iter())
848            }
849            Direction::Incoming => Box::new(std::iter::empty()),
850        };
851
852        let backward: Box<dyn Iterator<Item = (NodeId, EdgeId)>> = match direction {
853            Direction::Incoming | Direction::Both => {
854                if let Some(ref adj) = self.backward_adj {
855                    Box::new(adj.edges_from(node).into_iter())
856                } else {
857                    Box::new(std::iter::empty())
858                }
859            }
860            Direction::Outgoing => Box::new(std::iter::empty()),
861        };
862
863        forward.chain(backward)
864    }
865
866    /// Gets the type of an edge by ID.
867    #[must_use]
868    pub fn edge_type(&self, id: EdgeId) -> Option<Arc<str>> {
869        let edges = self.edges.read();
870        let chain = edges.get(&id)?;
871        let epoch = self.current_epoch();
872        let record = chain.visible_at(epoch)?;
873        let id_to_type = self.id_to_edge_type.read();
874        id_to_type.get(record.type_id as usize).cloned()
875    }
876
877    /// Returns all nodes with a specific label.
878    ///
879    /// Uses the label index for O(1) lookup per label. Returns a snapshot -
880    /// concurrent modifications won't affect the returned vector.
881    pub fn nodes_by_label(&self, label: &str) -> Vec<NodeId> {
882        let label_to_id = self.label_to_id.read();
883        if let Some(&label_id) = label_to_id.get(label) {
884            let index = self.label_index.read();
885            if let Some(set) = index.get(label_id as usize) {
886                return set.keys().copied().collect();
887            }
888        }
889        Vec::new()
890    }
891
892    // === Admin API: Iteration ===
893
894    /// Returns an iterator over all nodes in the database.
895    ///
896    /// This creates a snapshot of all visible nodes at the current epoch.
897    /// Useful for dump/export operations.
898    pub fn all_nodes(&self) -> impl Iterator<Item = Node> + '_ {
899        let epoch = self.current_epoch();
900        let node_ids: Vec<NodeId> = self
901            .nodes
902            .read()
903            .iter()
904            .filter_map(|(id, chain)| {
905                chain
906                    .visible_at(epoch)
907                    .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
908            })
909            .collect();
910
911        node_ids.into_iter().filter_map(move |id| self.get_node(id))
912    }
913
914    /// Returns an iterator over all edges in the database.
915    ///
916    /// This creates a snapshot of all visible edges at the current epoch.
917    /// Useful for dump/export operations.
918    pub fn all_edges(&self) -> impl Iterator<Item = Edge> + '_ {
919        let epoch = self.current_epoch();
920        let edge_ids: Vec<EdgeId> = self
921            .edges
922            .read()
923            .iter()
924            .filter_map(|(id, chain)| {
925                chain
926                    .visible_at(epoch)
927                    .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
928            })
929            .collect();
930
931        edge_ids.into_iter().filter_map(move |id| self.get_edge(id))
932    }
933
934    /// Returns all label names in the database.
935    pub fn all_labels(&self) -> Vec<String> {
936        self.id_to_label
937            .read()
938            .iter()
939            .map(|s| s.to_string())
940            .collect()
941    }
942
943    /// Returns all edge type names in the database.
944    pub fn all_edge_types(&self) -> Vec<String> {
945        self.id_to_edge_type
946            .read()
947            .iter()
948            .map(|s| s.to_string())
949            .collect()
950    }
951
952    /// Returns all property keys used in the database.
953    pub fn all_property_keys(&self) -> Vec<String> {
954        let mut keys = std::collections::HashSet::new();
955        for key in self.node_properties.keys() {
956            keys.insert(key.to_string());
957        }
958        for key in self.edge_properties.keys() {
959            keys.insert(key.to_string());
960        }
961        keys.into_iter().collect()
962    }
963
964    /// Returns an iterator over nodes with a specific label.
965    pub fn nodes_with_label<'a>(&'a self, label: &str) -> impl Iterator<Item = Node> + 'a {
966        let node_ids = self.nodes_by_label(label);
967        node_ids.into_iter().filter_map(move |id| self.get_node(id))
968    }
969
970    /// Returns an iterator over edges with a specific type.
971    pub fn edges_with_type<'a>(&'a self, edge_type: &str) -> impl Iterator<Item = Edge> + 'a {
972        let epoch = self.current_epoch();
973        let type_to_id = self.edge_type_to_id.read();
974
975        if let Some(&type_id) = type_to_id.get(edge_type) {
976            let edge_ids: Vec<EdgeId> = self
977                .edges
978                .read()
979                .iter()
980                .filter_map(|(id, chain)| {
981                    chain.visible_at(epoch).and_then(|r| {
982                        if !r.is_deleted() && r.type_id == type_id {
983                            Some(*id)
984                        } else {
985                            None
986                        }
987                    })
988                })
989                .collect();
990
991            // Return a boxed iterator for the found edges
992            Box::new(edge_ids.into_iter().filter_map(move |id| self.get_edge(id)))
993                as Box<dyn Iterator<Item = Edge> + 'a>
994        } else {
995            // Return empty iterator
996            Box::new(std::iter::empty()) as Box<dyn Iterator<Item = Edge> + 'a>
997        }
998    }
999
1000    // === Zone Map Support ===
1001
1002    /// Checks if a node property predicate might match any nodes.
1003    ///
1004    /// Uses zone maps for early filtering. Returns `true` if there might be
1005    /// matching nodes, `false` if there definitely aren't.
1006    #[must_use]
1007    pub fn node_property_might_match(
1008        &self,
1009        property: &PropertyKey,
1010        op: CompareOp,
1011        value: &Value,
1012    ) -> bool {
1013        self.node_properties.might_match(property, op, value)
1014    }
1015
1016    /// Checks if an edge property predicate might match any edges.
1017    #[must_use]
1018    pub fn edge_property_might_match(
1019        &self,
1020        property: &PropertyKey,
1021        op: CompareOp,
1022        value: &Value,
1023    ) -> bool {
1024        self.edge_properties.might_match(property, op, value)
1025    }
1026
1027    /// Gets the zone map for a node property.
1028    #[must_use]
1029    pub fn node_property_zone_map(&self, property: &PropertyKey) -> Option<ZoneMapEntry> {
1030        self.node_properties.zone_map(property)
1031    }
1032
1033    /// Gets the zone map for an edge property.
1034    #[must_use]
1035    pub fn edge_property_zone_map(&self, property: &PropertyKey) -> Option<ZoneMapEntry> {
1036        self.edge_properties.zone_map(property)
1037    }
1038
1039    /// Rebuilds zone maps for all properties.
1040    pub fn rebuild_zone_maps(&self) {
1041        self.node_properties.rebuild_zone_maps();
1042        self.edge_properties.rebuild_zone_maps();
1043    }
1044
1045    // === Statistics ===
1046
1047    /// Returns the current statistics.
1048    #[must_use]
1049    pub fn statistics(&self) -> Statistics {
1050        self.statistics.read().clone()
1051    }
1052
1053    /// Recomputes statistics from current data.
1054    ///
1055    /// Scans all labels and edge types to build cardinality estimates for the
1056    /// query optimizer. Call this periodically or after bulk data loads.
1057    pub fn compute_statistics(&self) {
1058        let mut stats = Statistics::new();
1059
1060        // Compute total counts
1061        stats.total_nodes = self.node_count() as u64;
1062        stats.total_edges = self.edge_count() as u64;
1063
1064        // Compute per-label statistics
1065        let id_to_label = self.id_to_label.read();
1066        let label_index = self.label_index.read();
1067
1068        for (label_id, label_name) in id_to_label.iter().enumerate() {
1069            let node_count = label_index
1070                .get(label_id)
1071                .map(|set| set.len() as u64)
1072                .unwrap_or(0);
1073
1074            if node_count > 0 {
1075                // Estimate average degree
1076                let avg_out_degree = if stats.total_nodes > 0 {
1077                    stats.total_edges as f64 / stats.total_nodes as f64
1078                } else {
1079                    0.0
1080                };
1081
1082                let label_stats =
1083                    LabelStatistics::new(node_count).with_degrees(avg_out_degree, avg_out_degree);
1084
1085                stats.update_label(label_name.as_ref(), label_stats);
1086            }
1087        }
1088
1089        // Compute per-edge-type statistics
1090        let id_to_edge_type = self.id_to_edge_type.read();
1091        let edges = self.edges.read();
1092        let epoch = self.current_epoch();
1093
1094        let mut edge_type_counts: FxHashMap<u32, u64> = FxHashMap::default();
1095        for chain in edges.values() {
1096            if let Some(record) = chain.visible_at(epoch) {
1097                if !record.is_deleted() {
1098                    *edge_type_counts.entry(record.type_id).or_default() += 1;
1099                }
1100            }
1101        }
1102
1103        for (type_id, count) in edge_type_counts {
1104            if let Some(type_name) = id_to_edge_type.get(type_id as usize) {
1105                let avg_degree = if stats.total_nodes > 0 {
1106                    count as f64 / stats.total_nodes as f64
1107                } else {
1108                    0.0
1109                };
1110
1111                let edge_stats = EdgeTypeStatistics::new(count, avg_degree, avg_degree);
1112                stats.update_edge_type(type_name.as_ref(), edge_stats);
1113            }
1114        }
1115
1116        *self.statistics.write() = stats;
1117    }
1118
1119    /// Estimates cardinality for a label scan.
1120    #[must_use]
1121    pub fn estimate_label_cardinality(&self, label: &str) -> f64 {
1122        self.statistics.read().estimate_label_cardinality(label)
1123    }
1124
1125    /// Estimates average degree for an edge type.
1126    #[must_use]
1127    pub fn estimate_avg_degree(&self, edge_type: &str, outgoing: bool) -> f64 {
1128        self.statistics
1129            .read()
1130            .estimate_avg_degree(edge_type, outgoing)
1131    }
1132
1133    // === Internal Helpers ===
1134
1135    fn get_or_create_label_id(&self, label: &str) -> u32 {
1136        {
1137            let label_to_id = self.label_to_id.read();
1138            if let Some(&id) = label_to_id.get(label) {
1139                return id;
1140            }
1141        }
1142
1143        let mut label_to_id = self.label_to_id.write();
1144        let mut id_to_label = self.id_to_label.write();
1145
1146        // Double-check after acquiring write lock
1147        if let Some(&id) = label_to_id.get(label) {
1148            return id;
1149        }
1150
1151        let id = id_to_label.len() as u32;
1152
1153        let label: Arc<str> = label.into();
1154        label_to_id.insert(label.clone(), id);
1155        id_to_label.push(label);
1156
1157        id
1158    }
1159
1160    fn get_or_create_edge_type_id(&self, edge_type: &str) -> u32 {
1161        {
1162            let type_to_id = self.edge_type_to_id.read();
1163            if let Some(&id) = type_to_id.get(edge_type) {
1164                return id;
1165            }
1166        }
1167
1168        let mut type_to_id = self.edge_type_to_id.write();
1169        let mut id_to_type = self.id_to_edge_type.write();
1170
1171        // Double-check
1172        if let Some(&id) = type_to_id.get(edge_type) {
1173            return id;
1174        }
1175
1176        let id = id_to_type.len() as u32;
1177        let edge_type: Arc<str> = edge_type.into();
1178        type_to_id.insert(edge_type.clone(), id);
1179        id_to_type.push(edge_type);
1180
1181        id
1182    }
1183
1184    // === Recovery Support ===
1185
1186    /// Creates a node with a specific ID during recovery.
1187    ///
1188    /// This is used for WAL recovery to restore nodes with their original IDs.
1189    /// The caller must ensure IDs don't conflict with existing nodes.
1190    pub fn create_node_with_id(&self, id: NodeId, labels: &[&str]) {
1191        let epoch = self.current_epoch();
1192        let mut record = NodeRecord::new(id, epoch);
1193        record.set_label_count(labels.len() as u16);
1194
1195        // Store labels in node_labels map and label_index
1196        let mut node_label_set = FxHashSet::default();
1197        for label in labels {
1198            let label_id = self.get_or_create_label_id(*label);
1199            node_label_set.insert(label_id);
1200
1201            // Update label index
1202            let mut index = self.label_index.write();
1203            while index.len() <= label_id as usize {
1204                index.push(FxHashMap::default());
1205            }
1206            index[label_id as usize].insert(id, ());
1207        }
1208
1209        // Store node's labels
1210        self.node_labels.write().insert(id, node_label_set);
1211
1212        // Create version chain with initial version (using SYSTEM tx for recovery)
1213        let chain = VersionChain::with_initial(record, epoch, TxId::SYSTEM);
1214        self.nodes.write().insert(id, chain);
1215
1216        // Update next_node_id if necessary to avoid future collisions
1217        let id_val = id.as_u64();
1218        let _ = self
1219            .next_node_id
1220            .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
1221                if id_val >= current {
1222                    Some(id_val + 1)
1223                } else {
1224                    None
1225                }
1226            });
1227    }
1228
1229    /// Creates an edge with a specific ID during recovery.
1230    ///
1231    /// This is used for WAL recovery to restore edges with their original IDs.
1232    pub fn create_edge_with_id(&self, id: EdgeId, src: NodeId, dst: NodeId, edge_type: &str) {
1233        let epoch = self.current_epoch();
1234        let type_id = self.get_or_create_edge_type_id(edge_type);
1235
1236        let record = EdgeRecord::new(id, src, dst, type_id, epoch);
1237        let chain = VersionChain::with_initial(record, epoch, TxId::SYSTEM);
1238        self.edges.write().insert(id, chain);
1239
1240        // Update adjacency
1241        self.forward_adj.add_edge(src, dst, id);
1242        if let Some(ref backward) = self.backward_adj {
1243            backward.add_edge(dst, src, id);
1244        }
1245
1246        // Update next_edge_id if necessary
1247        let id_val = id.as_u64();
1248        let _ = self
1249            .next_edge_id
1250            .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
1251                if id_val >= current {
1252                    Some(id_val + 1)
1253                } else {
1254                    None
1255                }
1256            });
1257    }
1258
1259    /// Sets the current epoch during recovery.
1260    pub fn set_epoch(&self, epoch: EpochId) {
1261        self.current_epoch.store(epoch.as_u64(), Ordering::SeqCst);
1262    }
1263}
1264
1265impl Default for LpgStore {
1266    fn default() -> Self {
1267        Self::new()
1268    }
1269}
1270
1271#[cfg(test)]
1272mod tests {
1273    use super::*;
1274
1275    #[test]
1276    fn test_create_node() {
1277        let store = LpgStore::new();
1278
1279        let id = store.create_node(&["Person"]);
1280        assert!(id.is_valid());
1281
1282        let node = store.get_node(id).unwrap();
1283        assert!(node.has_label("Person"));
1284        assert!(!node.has_label("Animal"));
1285    }
1286
1287    #[test]
1288    fn test_create_node_with_props() {
1289        let store = LpgStore::new();
1290
1291        let id = store.create_node_with_props(
1292            &["Person"],
1293            [("name", Value::from("Alice")), ("age", Value::from(30i64))],
1294        );
1295
1296        let node = store.get_node(id).unwrap();
1297        assert_eq!(
1298            node.get_property("name").and_then(|v| v.as_str()),
1299            Some("Alice")
1300        );
1301        assert_eq!(
1302            node.get_property("age").and_then(|v| v.as_int64()),
1303            Some(30)
1304        );
1305    }
1306
1307    #[test]
1308    fn test_delete_node() {
1309        let store = LpgStore::new();
1310
1311        let id = store.create_node(&["Person"]);
1312        assert_eq!(store.node_count(), 1);
1313
1314        assert!(store.delete_node(id));
1315        assert_eq!(store.node_count(), 0);
1316        assert!(store.get_node(id).is_none());
1317
1318        // Double delete should return false
1319        assert!(!store.delete_node(id));
1320    }
1321
1322    #[test]
1323    fn test_create_edge() {
1324        let store = LpgStore::new();
1325
1326        let alice = store.create_node(&["Person"]);
1327        let bob = store.create_node(&["Person"]);
1328
1329        let edge_id = store.create_edge(alice, bob, "KNOWS");
1330        assert!(edge_id.is_valid());
1331
1332        let edge = store.get_edge(edge_id).unwrap();
1333        assert_eq!(edge.src, alice);
1334        assert_eq!(edge.dst, bob);
1335        assert_eq!(edge.edge_type.as_ref(), "KNOWS");
1336    }
1337
1338    #[test]
1339    fn test_neighbors() {
1340        let store = LpgStore::new();
1341
1342        let a = store.create_node(&["Person"]);
1343        let b = store.create_node(&["Person"]);
1344        let c = store.create_node(&["Person"]);
1345
1346        store.create_edge(a, b, "KNOWS");
1347        store.create_edge(a, c, "KNOWS");
1348
1349        let outgoing: Vec<_> = store.neighbors(a, Direction::Outgoing).collect();
1350        assert_eq!(outgoing.len(), 2);
1351        assert!(outgoing.contains(&b));
1352        assert!(outgoing.contains(&c));
1353
1354        let incoming: Vec<_> = store.neighbors(b, Direction::Incoming).collect();
1355        assert_eq!(incoming.len(), 1);
1356        assert!(incoming.contains(&a));
1357    }
1358
1359    #[test]
1360    fn test_nodes_by_label() {
1361        let store = LpgStore::new();
1362
1363        let p1 = store.create_node(&["Person"]);
1364        let p2 = store.create_node(&["Person"]);
1365        let _a = store.create_node(&["Animal"]);
1366
1367        let persons = store.nodes_by_label("Person");
1368        assert_eq!(persons.len(), 2);
1369        assert!(persons.contains(&p1));
1370        assert!(persons.contains(&p2));
1371
1372        let animals = store.nodes_by_label("Animal");
1373        assert_eq!(animals.len(), 1);
1374    }
1375
1376    #[test]
1377    fn test_delete_edge() {
1378        let store = LpgStore::new();
1379
1380        let a = store.create_node(&["Person"]);
1381        let b = store.create_node(&["Person"]);
1382        let edge_id = store.create_edge(a, b, "KNOWS");
1383
1384        assert_eq!(store.edge_count(), 1);
1385
1386        assert!(store.delete_edge(edge_id));
1387        assert_eq!(store.edge_count(), 0);
1388        assert!(store.get_edge(edge_id).is_none());
1389    }
1390}