Skip to main content

grafeo_core/graph/lpg/
store.rs

1//! LPG graph store implementation.
2
3use super::property::CompareOp;
4use super::{Edge, EdgeRecord, Node, NodeRecord, PropertyStorage};
5use crate::graph::Direction;
6use crate::index::adjacency::ChunkedAdjacency;
7use crate::index::zone_map::ZoneMapEntry;
8use crate::statistics::{EdgeTypeStatistics, LabelStatistics, Statistics};
9use grafeo_common::mvcc::VersionChain;
10use grafeo_common::types::{EdgeId, EpochId, NodeId, PropertyKey, TxId, Value};
11use grafeo_common::utils::hash::{FxHashMap, FxHashSet};
12use parking_lot::RwLock;
13use std::sync::Arc;
14use std::sync::atomic::{AtomicU64, Ordering};
15
16/// Configuration for the LPG store.
17#[derive(Debug, Clone)]
18pub struct LpgStoreConfig {
19    /// Whether to maintain backward adjacency lists.
20    pub backward_edges: bool,
21    /// Initial capacity for nodes.
22    pub initial_node_capacity: usize,
23    /// Initial capacity for edges.
24    pub initial_edge_capacity: usize,
25}
26
27impl Default for LpgStoreConfig {
28    fn default() -> Self {
29        Self {
30            backward_edges: true,
31            initial_node_capacity: 1024,
32            initial_edge_capacity: 4096,
33        }
34    }
35}
36
37/// The main LPG graph store.
38///
39/// This is the core storage for labeled property graphs, providing
40/// efficient node/edge storage and adjacency indexing.
41pub struct LpgStore {
42    /// Configuration.
43    #[allow(dead_code)]
44    config: LpgStoreConfig,
45
46    /// Node records indexed by NodeId, with version chains for MVCC.
47    nodes: RwLock<FxHashMap<NodeId, VersionChain<NodeRecord>>>,
48
49    /// Edge records indexed by EdgeId, with version chains for MVCC.
50    edges: RwLock<FxHashMap<EdgeId, VersionChain<EdgeRecord>>>,
51
52    /// Property storage for nodes.
53    node_properties: PropertyStorage<NodeId>,
54
55    /// Property storage for edges.
56    edge_properties: PropertyStorage<EdgeId>,
57
58    /// Label name to ID mapping.
59    label_to_id: RwLock<FxHashMap<Arc<str>, u32>>,
60
61    /// Label ID to name mapping.
62    id_to_label: RwLock<Vec<Arc<str>>>,
63
64    /// Edge type name to ID mapping.
65    edge_type_to_id: RwLock<FxHashMap<Arc<str>, u32>>,
66
67    /// Edge type ID to name mapping.
68    id_to_edge_type: RwLock<Vec<Arc<str>>>,
69
70    /// Forward adjacency lists (outgoing edges).
71    forward_adj: ChunkedAdjacency,
72
73    /// Backward adjacency lists (incoming edges).
74    /// Only populated if config.backward_edges is true.
75    backward_adj: Option<ChunkedAdjacency>,
76
77    /// Label index: label_id -> set of node IDs.
78    label_index: RwLock<Vec<FxHashMap<NodeId, ()>>>,
79
80    /// Node labels: node_id -> set of label IDs.
81    /// Reverse mapping to efficiently get labels for a node.
82    node_labels: RwLock<FxHashMap<NodeId, FxHashSet<u32>>>,
83
84    /// Next node ID.
85    next_node_id: AtomicU64,
86
87    /// Next edge ID.
88    next_edge_id: AtomicU64,
89
90    /// Current epoch.
91    current_epoch: AtomicU64,
92
93    /// Statistics for cost-based optimization.
94    statistics: RwLock<Statistics>,
95}
96
97impl LpgStore {
98    /// Creates a new LPG store with default configuration.
99    #[must_use]
100    pub fn new() -> Self {
101        Self::with_config(LpgStoreConfig::default())
102    }
103
104    /// Creates a new LPG store with custom configuration.
105    #[must_use]
106    pub fn with_config(config: LpgStoreConfig) -> Self {
107        let backward_adj = if config.backward_edges {
108            Some(ChunkedAdjacency::new())
109        } else {
110            None
111        };
112
113        Self {
114            nodes: RwLock::new(FxHashMap::default()),
115            edges: RwLock::new(FxHashMap::default()),
116            node_properties: PropertyStorage::new(),
117            edge_properties: PropertyStorage::new(),
118            label_to_id: RwLock::new(FxHashMap::default()),
119            id_to_label: RwLock::new(Vec::new()),
120            edge_type_to_id: RwLock::new(FxHashMap::default()),
121            id_to_edge_type: RwLock::new(Vec::new()),
122            forward_adj: ChunkedAdjacency::new(),
123            backward_adj,
124            label_index: RwLock::new(Vec::new()),
125            node_labels: RwLock::new(FxHashMap::default()),
126            next_node_id: AtomicU64::new(0),
127            next_edge_id: AtomicU64::new(0),
128            current_epoch: AtomicU64::new(0),
129            statistics: RwLock::new(Statistics::new()),
130            config,
131        }
132    }
133
134    /// Returns the current epoch.
135    #[must_use]
136    pub fn current_epoch(&self) -> EpochId {
137        EpochId::new(self.current_epoch.load(Ordering::Acquire))
138    }
139
140    /// Creates a new epoch.
141    pub fn new_epoch(&self) -> EpochId {
142        let id = self.current_epoch.fetch_add(1, Ordering::AcqRel) + 1;
143        EpochId::new(id)
144    }
145
146    // === Node Operations ===
147
148    /// Creates a new node with the given labels.
149    ///
150    /// Uses the system transaction for non-transactional operations.
151    pub fn create_node(&self, labels: &[&str]) -> NodeId {
152        self.create_node_versioned(labels, self.current_epoch(), TxId::SYSTEM)
153    }
154
155    /// Creates a new node with the given labels within a transaction context.
156    pub fn create_node_versioned(&self, labels: &[&str], epoch: EpochId, tx_id: TxId) -> NodeId {
157        let id = NodeId::new(self.next_node_id.fetch_add(1, Ordering::Relaxed));
158
159        let mut record = NodeRecord::new(id, epoch);
160        record.set_label_count(labels.len() as u16);
161
162        // Store labels in node_labels map and label_index
163        let mut node_label_set = FxHashSet::default();
164        for label in labels {
165            let label_id = self.get_or_create_label_id(*label);
166            node_label_set.insert(label_id);
167
168            // Update label index
169            let mut index = self.label_index.write();
170            while index.len() <= label_id as usize {
171                index.push(FxHashMap::default());
172            }
173            index[label_id as usize].insert(id, ());
174        }
175
176        // Store node's labels
177        self.node_labels.write().insert(id, node_label_set);
178
179        // Create version chain with initial version
180        let chain = VersionChain::with_initial(record, epoch, tx_id);
181        self.nodes.write().insert(id, chain);
182        id
183    }
184
185    /// Creates a new node with labels and properties.
186    pub fn create_node_with_props(
187        &self,
188        labels: &[&str],
189        properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
190    ) -> NodeId {
191        self.create_node_with_props_versioned(
192            labels,
193            properties,
194            self.current_epoch(),
195            TxId::SYSTEM,
196        )
197    }
198
199    /// Creates a new node with labels and properties within a transaction context.
200    pub fn create_node_with_props_versioned(
201        &self,
202        labels: &[&str],
203        properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
204        epoch: EpochId,
205        tx_id: TxId,
206    ) -> NodeId {
207        let id = self.create_node_versioned(labels, epoch, tx_id);
208
209        for (key, value) in properties {
210            self.node_properties.set(id, key.into(), value.into());
211        }
212
213        // Update props_count in record
214        let count = self.node_properties.get_all(id).len() as u16;
215        if let Some(chain) = self.nodes.write().get_mut(&id) {
216            if let Some(record) = chain.latest_mut() {
217                record.props_count = count;
218            }
219        }
220
221        id
222    }
223
224    /// Gets a node by ID (latest visible version).
225    #[must_use]
226    pub fn get_node(&self, id: NodeId) -> Option<Node> {
227        self.get_node_at_epoch(id, self.current_epoch())
228    }
229
230    /// Gets a node by ID at a specific epoch.
231    #[must_use]
232    pub fn get_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> Option<Node> {
233        let nodes = self.nodes.read();
234        let chain = nodes.get(&id)?;
235        let record = chain.visible_at(epoch)?;
236
237        if record.is_deleted() {
238            return None;
239        }
240
241        let mut node = Node::new(id);
242
243        // Get labels from node_labels map
244        let id_to_label = self.id_to_label.read();
245        let node_labels = self.node_labels.read();
246        if let Some(label_ids) = node_labels.get(&id) {
247            for &label_id in label_ids {
248                if let Some(label) = id_to_label.get(label_id as usize) {
249                    node.labels.push(label.clone());
250                }
251            }
252        }
253
254        // Get properties
255        node.properties = self.node_properties.get_all(id).into_iter().collect();
256
257        Some(node)
258    }
259
260    /// Gets a node visible to a specific transaction.
261    #[must_use]
262    pub fn get_node_versioned(&self, id: NodeId, epoch: EpochId, tx_id: TxId) -> Option<Node> {
263        let nodes = self.nodes.read();
264        let chain = nodes.get(&id)?;
265        let record = chain.visible_to(epoch, tx_id)?;
266
267        if record.is_deleted() {
268            return None;
269        }
270
271        let mut node = Node::new(id);
272
273        // Get labels from node_labels map
274        let id_to_label = self.id_to_label.read();
275        let node_labels = self.node_labels.read();
276        if let Some(label_ids) = node_labels.get(&id) {
277            for &label_id in label_ids {
278                if let Some(label) = id_to_label.get(label_id as usize) {
279                    node.labels.push(label.clone());
280                }
281            }
282        }
283
284        // Get properties
285        node.properties = self.node_properties.get_all(id).into_iter().collect();
286
287        Some(node)
288    }
289
290    /// Deletes a node and all its edges (using latest epoch).
291    pub fn delete_node(&self, id: NodeId) -> bool {
292        self.delete_node_at_epoch(id, self.current_epoch())
293    }
294
295    /// Deletes a node at a specific epoch.
296    pub fn delete_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> bool {
297        let mut nodes = self.nodes.write();
298        if let Some(chain) = nodes.get_mut(&id) {
299            // Check if visible at this epoch (not already deleted)
300            if let Some(record) = chain.visible_at(epoch) {
301                if record.is_deleted() {
302                    return false;
303                }
304            } else {
305                // Not visible at this epoch (already deleted or doesn't exist)
306                return false;
307            }
308
309            // Mark the version chain as deleted at this epoch
310            chain.mark_deleted(epoch);
311
312            // Remove from label index using node_labels map
313            let mut index = self.label_index.write();
314            let mut node_labels = self.node_labels.write();
315            if let Some(label_ids) = node_labels.remove(&id) {
316                for label_id in label_ids {
317                    if let Some(set) = index.get_mut(label_id as usize) {
318                        set.remove(&id);
319                    }
320                }
321            }
322
323            // Remove properties
324            drop(nodes); // Release lock before removing properties
325            drop(index);
326            drop(node_labels);
327            self.node_properties.remove_all(id);
328
329            // Note: Caller should use delete_node_edges() first if detach is needed
330
331            true
332        } else {
333            false
334        }
335    }
336
337    /// Deletes all edges connected to a node (for DETACH DELETE).
338    pub fn delete_node_edges(&self, node_id: NodeId) {
339        // Get outgoing edges
340        let outgoing: Vec<EdgeId> = self
341            .forward_adj
342            .edges_from(node_id)
343            .into_iter()
344            .map(|(_, edge_id)| edge_id)
345            .collect();
346
347        // Get incoming edges
348        let incoming: Vec<EdgeId> = if let Some(ref backward) = self.backward_adj {
349            backward
350                .edges_from(node_id)
351                .into_iter()
352                .map(|(_, edge_id)| edge_id)
353                .collect()
354        } else {
355            // No backward adjacency - scan all edges
356            let epoch = self.current_epoch();
357            self.edges
358                .read()
359                .iter()
360                .filter_map(|(id, chain)| {
361                    chain.visible_at(epoch).and_then(|r| {
362                        if !r.is_deleted() && r.dst == node_id {
363                            Some(*id)
364                        } else {
365                            None
366                        }
367                    })
368                })
369                .collect()
370        };
371
372        // Delete all edges
373        for edge_id in outgoing.into_iter().chain(incoming) {
374            self.delete_edge(edge_id);
375        }
376    }
377
378    /// Sets a property on a node.
379    pub fn set_node_property(&self, id: NodeId, key: &str, value: Value) {
380        self.node_properties.set(id, key.into(), value);
381
382        // Update props_count in record
383        let count = self.node_properties.get_all(id).len() as u16;
384        if let Some(chain) = self.nodes.write().get_mut(&id) {
385            if let Some(record) = chain.latest_mut() {
386                record.props_count = count;
387            }
388        }
389    }
390
391    /// Sets a property on an edge.
392    pub fn set_edge_property(&self, id: EdgeId, key: &str, value: Value) {
393        self.edge_properties.set(id, key.into(), value);
394    }
395
396    /// Removes a property from a node.
397    ///
398    /// Returns the previous value if it existed, or None if the property didn't exist.
399    pub fn remove_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
400        let result = self.node_properties.remove(id, &key.into());
401
402        // Update props_count in record
403        let count = self.node_properties.get_all(id).len() as u16;
404        if let Some(chain) = self.nodes.write().get_mut(&id) {
405            if let Some(record) = chain.latest_mut() {
406                record.props_count = count;
407            }
408        }
409
410        result
411    }
412
413    /// Removes a property from an edge.
414    ///
415    /// Returns the previous value if it existed, or None if the property didn't exist.
416    pub fn remove_edge_property(&self, id: EdgeId, key: &str) -> Option<Value> {
417        self.edge_properties.remove(id, &key.into())
418    }
419
420    /// Adds a label to a node.
421    ///
422    /// Returns true if the label was added, false if the node doesn't exist
423    /// or already has the label.
424    pub fn add_label(&self, node_id: NodeId, label: &str) -> bool {
425        let epoch = self.current_epoch();
426
427        // Check if node exists
428        let nodes = self.nodes.read();
429        if let Some(chain) = nodes.get(&node_id) {
430            if chain.visible_at(epoch).map_or(true, |r| r.is_deleted()) {
431                return false;
432            }
433        } else {
434            return false;
435        }
436        drop(nodes);
437
438        // Get or create label ID
439        let label_id = self.get_or_create_label_id(label);
440
441        // Add to node_labels map
442        let mut node_labels = self.node_labels.write();
443        let label_set = node_labels
444            .entry(node_id)
445            .or_insert_with(FxHashSet::default);
446
447        if label_set.contains(&label_id) {
448            return false; // Already has this label
449        }
450
451        label_set.insert(label_id);
452        drop(node_labels);
453
454        // Add to label_index
455        let mut index = self.label_index.write();
456        if (label_id as usize) >= index.len() {
457            index.resize(label_id as usize + 1, FxHashMap::default());
458        }
459        index[label_id as usize].insert(node_id, ());
460
461        // Update label count in node record
462        if let Some(chain) = self.nodes.write().get_mut(&node_id) {
463            if let Some(record) = chain.latest_mut() {
464                let count = self.node_labels.read().get(&node_id).map_or(0, |s| s.len());
465                record.set_label_count(count as u16);
466            }
467        }
468
469        true
470    }
471
472    /// Removes a label from a node.
473    ///
474    /// Returns true if the label was removed, false if the node doesn't exist
475    /// or doesn't have the label.
476    pub fn remove_label(&self, node_id: NodeId, label: &str) -> bool {
477        let epoch = self.current_epoch();
478
479        // Check if node exists
480        let nodes = self.nodes.read();
481        if let Some(chain) = nodes.get(&node_id) {
482            if chain.visible_at(epoch).map_or(true, |r| r.is_deleted()) {
483                return false;
484            }
485        } else {
486            return false;
487        }
488        drop(nodes);
489
490        // Get label ID
491        let label_id = {
492            let label_ids = self.label_to_id.read();
493            match label_ids.get(label) {
494                Some(&id) => id,
495                None => return false, // Label doesn't exist
496            }
497        };
498
499        // Remove from node_labels map
500        let mut node_labels = self.node_labels.write();
501        if let Some(label_set) = node_labels.get_mut(&node_id) {
502            if !label_set.remove(&label_id) {
503                return false; // Node doesn't have this label
504            }
505        } else {
506            return false;
507        }
508        drop(node_labels);
509
510        // Remove from label_index
511        let mut index = self.label_index.write();
512        if (label_id as usize) < index.len() {
513            index[label_id as usize].remove(&node_id);
514        }
515
516        // Update label count in node record
517        if let Some(chain) = self.nodes.write().get_mut(&node_id) {
518            if let Some(record) = chain.latest_mut() {
519                let count = self.node_labels.read().get(&node_id).map_or(0, |s| s.len());
520                record.set_label_count(count as u16);
521            }
522        }
523
524        true
525    }
526
527    /// Returns the number of nodes (non-deleted at current epoch).
528    #[must_use]
529    pub fn node_count(&self) -> usize {
530        let epoch = self.current_epoch();
531        self.nodes
532            .read()
533            .values()
534            .filter_map(|chain| chain.visible_at(epoch))
535            .filter(|r| !r.is_deleted())
536            .count()
537    }
538
539    /// Returns all node IDs in the store.
540    ///
541    /// This returns a snapshot of current node IDs. The returned vector
542    /// excludes deleted nodes.
543    #[must_use]
544    pub fn node_ids(&self) -> Vec<NodeId> {
545        let epoch = self.current_epoch();
546        self.nodes
547            .read()
548            .iter()
549            .filter_map(|(id, chain)| {
550                chain
551                    .visible_at(epoch)
552                    .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
553            })
554            .collect()
555    }
556
557    // === Edge Operations ===
558
559    /// Creates a new edge.
560    pub fn create_edge(&self, src: NodeId, dst: NodeId, edge_type: &str) -> EdgeId {
561        self.create_edge_versioned(src, dst, edge_type, self.current_epoch(), TxId::SYSTEM)
562    }
563
564    /// Creates a new edge within a transaction context.
565    pub fn create_edge_versioned(
566        &self,
567        src: NodeId,
568        dst: NodeId,
569        edge_type: &str,
570        epoch: EpochId,
571        tx_id: TxId,
572    ) -> EdgeId {
573        let id = EdgeId::new(self.next_edge_id.fetch_add(1, Ordering::Relaxed));
574        let type_id = self.get_or_create_edge_type_id(edge_type);
575
576        let record = EdgeRecord::new(id, src, dst, type_id, epoch);
577        let chain = VersionChain::with_initial(record, epoch, tx_id);
578        self.edges.write().insert(id, chain);
579
580        // Update adjacency
581        self.forward_adj.add_edge(src, dst, id);
582        if let Some(ref backward) = self.backward_adj {
583            backward.add_edge(dst, src, id);
584        }
585
586        id
587    }
588
589    /// Creates a new edge with properties.
590    pub fn create_edge_with_props(
591        &self,
592        src: NodeId,
593        dst: NodeId,
594        edge_type: &str,
595        properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
596    ) -> EdgeId {
597        let id = self.create_edge(src, dst, edge_type);
598
599        for (key, value) in properties {
600            self.edge_properties.set(id, key.into(), value.into());
601        }
602
603        id
604    }
605
606    /// Gets an edge by ID (latest visible version).
607    #[must_use]
608    pub fn get_edge(&self, id: EdgeId) -> Option<Edge> {
609        self.get_edge_at_epoch(id, self.current_epoch())
610    }
611
612    /// Gets an edge by ID at a specific epoch.
613    #[must_use]
614    pub fn get_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> Option<Edge> {
615        let edges = self.edges.read();
616        let chain = edges.get(&id)?;
617        let record = chain.visible_at(epoch)?;
618
619        if record.is_deleted() {
620            return None;
621        }
622
623        let edge_type = {
624            let id_to_type = self.id_to_edge_type.read();
625            id_to_type.get(record.type_id as usize)?.clone()
626        };
627
628        let mut edge = Edge::new(id, record.src, record.dst, edge_type);
629
630        // Get properties
631        edge.properties = self.edge_properties.get_all(id).into_iter().collect();
632
633        Some(edge)
634    }
635
636    /// Gets an edge visible to a specific transaction.
637    #[must_use]
638    pub fn get_edge_versioned(&self, id: EdgeId, epoch: EpochId, tx_id: TxId) -> Option<Edge> {
639        let edges = self.edges.read();
640        let chain = edges.get(&id)?;
641        let record = chain.visible_to(epoch, tx_id)?;
642
643        if record.is_deleted() {
644            return None;
645        }
646
647        let edge_type = {
648            let id_to_type = self.id_to_edge_type.read();
649            id_to_type.get(record.type_id as usize)?.clone()
650        };
651
652        let mut edge = Edge::new(id, record.src, record.dst, edge_type);
653
654        // Get properties
655        edge.properties = self.edge_properties.get_all(id).into_iter().collect();
656
657        Some(edge)
658    }
659
660    /// Deletes an edge (using latest epoch).
661    pub fn delete_edge(&self, id: EdgeId) -> bool {
662        self.delete_edge_at_epoch(id, self.current_epoch())
663    }
664
665    /// Deletes an edge at a specific epoch.
666    pub fn delete_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> bool {
667        let mut edges = self.edges.write();
668        if let Some(chain) = edges.get_mut(&id) {
669            // Get the visible record to check if deleted and get src/dst
670            let (src, dst) = {
671                match chain.visible_at(epoch) {
672                    Some(record) => {
673                        if record.is_deleted() {
674                            return false;
675                        }
676                        (record.src, record.dst)
677                    }
678                    None => return false, // Not visible at this epoch (already deleted)
679                }
680            };
681
682            // Mark the version chain as deleted
683            chain.mark_deleted(epoch);
684
685            drop(edges); // Release lock
686
687            // Mark as deleted in adjacency (soft delete)
688            self.forward_adj.mark_deleted(src, id);
689            if let Some(ref backward) = self.backward_adj {
690                backward.mark_deleted(dst, id);
691            }
692
693            // Remove properties
694            self.edge_properties.remove_all(id);
695
696            true
697        } else {
698            false
699        }
700    }
701
702    /// Returns the number of edges (non-deleted at current epoch).
703    #[must_use]
704    pub fn edge_count(&self) -> usize {
705        let epoch = self.current_epoch();
706        self.edges
707            .read()
708            .values()
709            .filter_map(|chain| chain.visible_at(epoch))
710            .filter(|r| !r.is_deleted())
711            .count()
712    }
713
714    /// Discards all uncommitted versions created by a transaction.
715    ///
716    /// This is called during transaction rollback to clean up uncommitted changes.
717    /// The method removes version chain entries created by the specified transaction.
718    pub fn discard_uncommitted_versions(&self, tx_id: TxId) {
719        // Remove uncommitted node versions
720        {
721            let mut nodes = self.nodes.write();
722            for chain in nodes.values_mut() {
723                chain.remove_versions_by(tx_id);
724            }
725            // Remove completely empty chains (no versions left)
726            nodes.retain(|_, chain| !chain.is_empty());
727        }
728
729        // Remove uncommitted edge versions
730        {
731            let mut edges = self.edges.write();
732            for chain in edges.values_mut() {
733                chain.remove_versions_by(tx_id);
734            }
735            // Remove completely empty chains (no versions left)
736            edges.retain(|_, chain| !chain.is_empty());
737        }
738    }
739
740    /// Returns the number of distinct labels in the store.
741    #[must_use]
742    pub fn label_count(&self) -> usize {
743        self.id_to_label.read().len()
744    }
745
746    /// Returns the number of distinct property keys in the store.
747    ///
748    /// This counts unique property keys across both nodes and edges.
749    #[must_use]
750    pub fn property_key_count(&self) -> usize {
751        let node_keys = self.node_properties.column_count();
752        let edge_keys = self.edge_properties.column_count();
753        // Note: This may count some keys twice if the same key is used
754        // for both nodes and edges. A more precise count would require
755        // tracking unique keys across both storages.
756        node_keys + edge_keys
757    }
758
759    /// Returns the number of distinct edge types in the store.
760    #[must_use]
761    pub fn edge_type_count(&self) -> usize {
762        self.id_to_edge_type.read().len()
763    }
764
765    // === Traversal ===
766
767    /// Returns an iterator over neighbors of a node.
768    pub fn neighbors(
769        &self,
770        node: NodeId,
771        direction: Direction,
772    ) -> impl Iterator<Item = NodeId> + '_ {
773        let forward: Box<dyn Iterator<Item = NodeId>> = match direction {
774            Direction::Outgoing | Direction::Both => {
775                Box::new(self.forward_adj.neighbors(node).into_iter())
776            }
777            Direction::Incoming => Box::new(std::iter::empty()),
778        };
779
780        let backward: Box<dyn Iterator<Item = NodeId>> = match direction {
781            Direction::Incoming | Direction::Both => {
782                if let Some(ref adj) = self.backward_adj {
783                    Box::new(adj.neighbors(node).into_iter())
784                } else {
785                    Box::new(std::iter::empty())
786                }
787            }
788            Direction::Outgoing => Box::new(std::iter::empty()),
789        };
790
791        forward.chain(backward)
792    }
793
794    /// Returns edges from a node with their targets.
795    ///
796    /// Returns an iterator of (target_node, edge_id) pairs.
797    pub fn edges_from(
798        &self,
799        node: NodeId,
800        direction: Direction,
801    ) -> impl Iterator<Item = (NodeId, EdgeId)> + '_ {
802        let forward: Box<dyn Iterator<Item = (NodeId, EdgeId)>> = match direction {
803            Direction::Outgoing | Direction::Both => {
804                Box::new(self.forward_adj.edges_from(node).into_iter())
805            }
806            Direction::Incoming => Box::new(std::iter::empty()),
807        };
808
809        let backward: Box<dyn Iterator<Item = (NodeId, EdgeId)>> = match direction {
810            Direction::Incoming | Direction::Both => {
811                if let Some(ref adj) = self.backward_adj {
812                    Box::new(adj.edges_from(node).into_iter())
813                } else {
814                    Box::new(std::iter::empty())
815                }
816            }
817            Direction::Outgoing => Box::new(std::iter::empty()),
818        };
819
820        forward.chain(backward)
821    }
822
823    /// Gets the type of an edge by ID.
824    #[must_use]
825    pub fn edge_type(&self, id: EdgeId) -> Option<Arc<str>> {
826        let edges = self.edges.read();
827        let chain = edges.get(&id)?;
828        let epoch = self.current_epoch();
829        let record = chain.visible_at(epoch)?;
830        let id_to_type = self.id_to_edge_type.read();
831        id_to_type.get(record.type_id as usize).cloned()
832    }
833
834    /// Returns nodes with a specific label.
835    pub fn nodes_by_label(&self, label: &str) -> Vec<NodeId> {
836        let label_to_id = self.label_to_id.read();
837        if let Some(&label_id) = label_to_id.get(label) {
838            let index = self.label_index.read();
839            if let Some(set) = index.get(label_id as usize) {
840                return set.keys().copied().collect();
841            }
842        }
843        Vec::new()
844    }
845
846    // === Zone Map Support ===
847
848    /// Checks if a node property predicate might match any nodes.
849    ///
850    /// Uses zone maps for early filtering. Returns `true` if there might be
851    /// matching nodes, `false` if there definitely aren't.
852    #[must_use]
853    pub fn node_property_might_match(
854        &self,
855        property: &PropertyKey,
856        op: CompareOp,
857        value: &Value,
858    ) -> bool {
859        self.node_properties.might_match(property, op, value)
860    }
861
862    /// Checks if an edge property predicate might match any edges.
863    #[must_use]
864    pub fn edge_property_might_match(
865        &self,
866        property: &PropertyKey,
867        op: CompareOp,
868        value: &Value,
869    ) -> bool {
870        self.edge_properties.might_match(property, op, value)
871    }
872
873    /// Gets the zone map for a node property.
874    #[must_use]
875    pub fn node_property_zone_map(&self, property: &PropertyKey) -> Option<ZoneMapEntry> {
876        self.node_properties.zone_map(property)
877    }
878
879    /// Gets the zone map for an edge property.
880    #[must_use]
881    pub fn edge_property_zone_map(&self, property: &PropertyKey) -> Option<ZoneMapEntry> {
882        self.edge_properties.zone_map(property)
883    }
884
885    /// Rebuilds zone maps for all properties.
886    pub fn rebuild_zone_maps(&self) {
887        self.node_properties.rebuild_zone_maps();
888        self.edge_properties.rebuild_zone_maps();
889    }
890
891    // === Statistics ===
892
893    /// Returns the current statistics.
894    #[must_use]
895    pub fn statistics(&self) -> Statistics {
896        self.statistics.read().clone()
897    }
898
899    /// Updates statistics from current data.
900    ///
901    /// This scans all labels and edge types to compute cardinality statistics.
902    pub fn compute_statistics(&self) {
903        let mut stats = Statistics::new();
904
905        // Compute total counts
906        stats.total_nodes = self.node_count() as u64;
907        stats.total_edges = self.edge_count() as u64;
908
909        // Compute per-label statistics
910        let id_to_label = self.id_to_label.read();
911        let label_index = self.label_index.read();
912
913        for (label_id, label_name) in id_to_label.iter().enumerate() {
914            let node_count = label_index
915                .get(label_id)
916                .map(|set| set.len() as u64)
917                .unwrap_or(0);
918
919            if node_count > 0 {
920                // Estimate average degree
921                let avg_out_degree = if stats.total_nodes > 0 {
922                    stats.total_edges as f64 / stats.total_nodes as f64
923                } else {
924                    0.0
925                };
926
927                let label_stats =
928                    LabelStatistics::new(node_count).with_degrees(avg_out_degree, avg_out_degree);
929
930                stats.update_label(label_name.as_ref(), label_stats);
931            }
932        }
933
934        // Compute per-edge-type statistics
935        let id_to_edge_type = self.id_to_edge_type.read();
936        let edges = self.edges.read();
937        let epoch = self.current_epoch();
938
939        let mut edge_type_counts: FxHashMap<u32, u64> = FxHashMap::default();
940        for chain in edges.values() {
941            if let Some(record) = chain.visible_at(epoch) {
942                if !record.is_deleted() {
943                    *edge_type_counts.entry(record.type_id).or_default() += 1;
944                }
945            }
946        }
947
948        for (type_id, count) in edge_type_counts {
949            if let Some(type_name) = id_to_edge_type.get(type_id as usize) {
950                let avg_degree = if stats.total_nodes > 0 {
951                    count as f64 / stats.total_nodes as f64
952                } else {
953                    0.0
954                };
955
956                let edge_stats = EdgeTypeStatistics::new(count, avg_degree, avg_degree);
957                stats.update_edge_type(type_name.as_ref(), edge_stats);
958            }
959        }
960
961        *self.statistics.write() = stats;
962    }
963
964    /// Estimates cardinality for a label scan.
965    #[must_use]
966    pub fn estimate_label_cardinality(&self, label: &str) -> f64 {
967        self.statistics.read().estimate_label_cardinality(label)
968    }
969
970    /// Estimates average degree for an edge type.
971    #[must_use]
972    pub fn estimate_avg_degree(&self, edge_type: &str, outgoing: bool) -> f64 {
973        self.statistics
974            .read()
975            .estimate_avg_degree(edge_type, outgoing)
976    }
977
978    // === Internal Helpers ===
979
980    fn get_or_create_label_id(&self, label: &str) -> u32 {
981        {
982            let label_to_id = self.label_to_id.read();
983            if let Some(&id) = label_to_id.get(label) {
984                return id;
985            }
986        }
987
988        let mut label_to_id = self.label_to_id.write();
989        let mut id_to_label = self.id_to_label.write();
990
991        // Double-check after acquiring write lock
992        if let Some(&id) = label_to_id.get(label) {
993            return id;
994        }
995
996        let id = id_to_label.len() as u32;
997
998        let label: Arc<str> = label.into();
999        label_to_id.insert(label.clone(), id);
1000        id_to_label.push(label);
1001
1002        id
1003    }
1004
1005    fn get_or_create_edge_type_id(&self, edge_type: &str) -> u32 {
1006        {
1007            let type_to_id = self.edge_type_to_id.read();
1008            if let Some(&id) = type_to_id.get(edge_type) {
1009                return id;
1010            }
1011        }
1012
1013        let mut type_to_id = self.edge_type_to_id.write();
1014        let mut id_to_type = self.id_to_edge_type.write();
1015
1016        // Double-check
1017        if let Some(&id) = type_to_id.get(edge_type) {
1018            return id;
1019        }
1020
1021        let id = id_to_type.len() as u32;
1022        let edge_type: Arc<str> = edge_type.into();
1023        type_to_id.insert(edge_type.clone(), id);
1024        id_to_type.push(edge_type);
1025
1026        id
1027    }
1028
1029    // === Recovery Support ===
1030
1031    /// Creates a node with a specific ID during recovery.
1032    ///
1033    /// This is used for WAL recovery to restore nodes with their original IDs.
1034    /// The caller must ensure IDs don't conflict with existing nodes.
1035    pub fn create_node_with_id(&self, id: NodeId, labels: &[&str]) {
1036        let epoch = self.current_epoch();
1037        let mut record = NodeRecord::new(id, epoch);
1038        record.set_label_count(labels.len() as u16);
1039
1040        // Store labels in node_labels map and label_index
1041        let mut node_label_set = FxHashSet::default();
1042        for label in labels {
1043            let label_id = self.get_or_create_label_id(*label);
1044            node_label_set.insert(label_id);
1045
1046            // Update label index
1047            let mut index = self.label_index.write();
1048            while index.len() <= label_id as usize {
1049                index.push(FxHashMap::default());
1050            }
1051            index[label_id as usize].insert(id, ());
1052        }
1053
1054        // Store node's labels
1055        self.node_labels.write().insert(id, node_label_set);
1056
1057        // Create version chain with initial version (using SYSTEM tx for recovery)
1058        let chain = VersionChain::with_initial(record, epoch, TxId::SYSTEM);
1059        self.nodes.write().insert(id, chain);
1060
1061        // Update next_node_id if necessary to avoid future collisions
1062        let id_val = id.as_u64();
1063        let _ = self
1064            .next_node_id
1065            .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
1066                if id_val >= current {
1067                    Some(id_val + 1)
1068                } else {
1069                    None
1070                }
1071            });
1072    }
1073
1074    /// Creates an edge with a specific ID during recovery.
1075    ///
1076    /// This is used for WAL recovery to restore edges with their original IDs.
1077    pub fn create_edge_with_id(&self, id: EdgeId, src: NodeId, dst: NodeId, edge_type: &str) {
1078        let epoch = self.current_epoch();
1079        let type_id = self.get_or_create_edge_type_id(edge_type);
1080
1081        let record = EdgeRecord::new(id, src, dst, type_id, epoch);
1082        let chain = VersionChain::with_initial(record, epoch, TxId::SYSTEM);
1083        self.edges.write().insert(id, chain);
1084
1085        // Update adjacency
1086        self.forward_adj.add_edge(src, dst, id);
1087        if let Some(ref backward) = self.backward_adj {
1088            backward.add_edge(dst, src, id);
1089        }
1090
1091        // Update next_edge_id if necessary
1092        let id_val = id.as_u64();
1093        let _ = self
1094            .next_edge_id
1095            .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
1096                if id_val >= current {
1097                    Some(id_val + 1)
1098                } else {
1099                    None
1100                }
1101            });
1102    }
1103
1104    /// Sets the current epoch during recovery.
1105    pub fn set_epoch(&self, epoch: EpochId) {
1106        self.current_epoch.store(epoch.as_u64(), Ordering::SeqCst);
1107    }
1108}
1109
1110impl Default for LpgStore {
1111    fn default() -> Self {
1112        Self::new()
1113    }
1114}
1115
1116#[cfg(test)]
1117mod tests {
1118    use super::*;
1119
1120    #[test]
1121    fn test_create_node() {
1122        let store = LpgStore::new();
1123
1124        let id = store.create_node(&["Person"]);
1125        assert!(id.is_valid());
1126
1127        let node = store.get_node(id).unwrap();
1128        assert!(node.has_label("Person"));
1129        assert!(!node.has_label("Animal"));
1130    }
1131
1132    #[test]
1133    fn test_create_node_with_props() {
1134        let store = LpgStore::new();
1135
1136        let id = store.create_node_with_props(
1137            &["Person"],
1138            [("name", Value::from("Alice")), ("age", Value::from(30i64))],
1139        );
1140
1141        let node = store.get_node(id).unwrap();
1142        assert_eq!(
1143            node.get_property("name").and_then(|v| v.as_str()),
1144            Some("Alice")
1145        );
1146        assert_eq!(
1147            node.get_property("age").and_then(|v| v.as_int64()),
1148            Some(30)
1149        );
1150    }
1151
1152    #[test]
1153    fn test_delete_node() {
1154        let store = LpgStore::new();
1155
1156        let id = store.create_node(&["Person"]);
1157        assert_eq!(store.node_count(), 1);
1158
1159        assert!(store.delete_node(id));
1160        assert_eq!(store.node_count(), 0);
1161        assert!(store.get_node(id).is_none());
1162
1163        // Double delete should return false
1164        assert!(!store.delete_node(id));
1165    }
1166
1167    #[test]
1168    fn test_create_edge() {
1169        let store = LpgStore::new();
1170
1171        let alice = store.create_node(&["Person"]);
1172        let bob = store.create_node(&["Person"]);
1173
1174        let edge_id = store.create_edge(alice, bob, "KNOWS");
1175        assert!(edge_id.is_valid());
1176
1177        let edge = store.get_edge(edge_id).unwrap();
1178        assert_eq!(edge.src, alice);
1179        assert_eq!(edge.dst, bob);
1180        assert_eq!(edge.edge_type.as_ref(), "KNOWS");
1181    }
1182
1183    #[test]
1184    fn test_neighbors() {
1185        let store = LpgStore::new();
1186
1187        let a = store.create_node(&["Person"]);
1188        let b = store.create_node(&["Person"]);
1189        let c = store.create_node(&["Person"]);
1190
1191        store.create_edge(a, b, "KNOWS");
1192        store.create_edge(a, c, "KNOWS");
1193
1194        let outgoing: Vec<_> = store.neighbors(a, Direction::Outgoing).collect();
1195        assert_eq!(outgoing.len(), 2);
1196        assert!(outgoing.contains(&b));
1197        assert!(outgoing.contains(&c));
1198
1199        let incoming: Vec<_> = store.neighbors(b, Direction::Incoming).collect();
1200        assert_eq!(incoming.len(), 1);
1201        assert!(incoming.contains(&a));
1202    }
1203
1204    #[test]
1205    fn test_nodes_by_label() {
1206        let store = LpgStore::new();
1207
1208        let p1 = store.create_node(&["Person"]);
1209        let p2 = store.create_node(&["Person"]);
1210        let _a = store.create_node(&["Animal"]);
1211
1212        let persons = store.nodes_by_label("Person");
1213        assert_eq!(persons.len(), 2);
1214        assert!(persons.contains(&p1));
1215        assert!(persons.contains(&p2));
1216
1217        let animals = store.nodes_by_label("Animal");
1218        assert_eq!(animals.len(), 1);
1219    }
1220
1221    #[test]
1222    fn test_delete_edge() {
1223        let store = LpgStore::new();
1224
1225        let a = store.create_node(&["Person"]);
1226        let b = store.create_node(&["Person"]);
1227        let edge_id = store.create_edge(a, b, "KNOWS");
1228
1229        assert_eq!(store.edge_count(), 1);
1230
1231        assert!(store.delete_edge(edge_id));
1232        assert_eq!(store.edge_count(), 0);
1233        assert!(store.get_edge(edge_id).is_none());
1234    }
1235}