Skip to main content

grafeo_core/graph/lpg/
store.rs

1//! The in-memory LPG graph store.
2//!
3//! This is where your nodes and edges actually live. Most users interact
4//! through [`GrafeoDB`](grafeo_engine::GrafeoDB), but algorithm implementers
5//! sometimes need the raw [`LpgStore`] for direct adjacency traversal.
6//!
7//! Key features:
8//! - MVCC versioning - concurrent readers don't block each other
9//! - Columnar properties with zone maps for fast filtering
10//! - Forward and backward adjacency indexes
11
12use super::property::CompareOp;
13use super::{Edge, EdgeRecord, Node, NodeRecord, PropertyStorage};
14use crate::graph::Direction;
15use crate::index::adjacency::ChunkedAdjacency;
16use crate::index::zone_map::ZoneMapEntry;
17use crate::statistics::{EdgeTypeStatistics, LabelStatistics, Statistics};
18#[cfg(not(feature = "tiered-storage"))]
19use grafeo_common::mvcc::VersionChain;
20use grafeo_common::types::{EdgeId, EpochId, NodeId, PropertyKey, TxId, Value};
21use grafeo_common::utils::hash::{FxHashMap, FxHashSet};
22use parking_lot::RwLock;
23use std::sync::Arc;
24use std::sync::atomic::{AtomicU64, Ordering};
25
26// Tiered storage imports
27#[cfg(feature = "tiered-storage")]
28use crate::storage::EpochStore;
29#[cfg(feature = "tiered-storage")]
30use grafeo_common::memory::arena::ArenaAllocator;
31#[cfg(feature = "tiered-storage")]
32use grafeo_common::mvcc::{ColdVersionRef, HotVersionRef, VersionIndex, VersionRef};
33
34/// Configuration for the LPG store.
35///
36/// The defaults work well for most cases. Tune `backward_edges` if you only
37/// traverse in one direction (saves memory), or adjust capacities if you know
38/// your graph size upfront (avoids reallocations).
39#[derive(Debug, Clone)]
40pub struct LpgStoreConfig {
41    /// Maintain backward adjacency for incoming edge queries. Turn off if
42    /// you only traverse outgoing edges - saves ~50% adjacency memory.
43    pub backward_edges: bool,
44    /// Initial capacity for nodes (avoids early reallocations).
45    pub initial_node_capacity: usize,
46    /// Initial capacity for edges (avoids early reallocations).
47    pub initial_edge_capacity: usize,
48}
49
50impl Default for LpgStoreConfig {
51    fn default() -> Self {
52        Self {
53            backward_edges: true,
54            initial_node_capacity: 1024,
55            initial_edge_capacity: 4096,
56        }
57    }
58}
59
60/// The core in-memory graph storage.
61///
62/// Everything lives here: nodes, edges, properties, adjacency indexes, and
63/// version chains for MVCC. Concurrent reads never block each other.
64///
65/// Most users should go through `GrafeoDB` (from the `grafeo_engine` crate) which
66/// adds transaction management and query execution. Use `LpgStore` directly
67/// when you need raw performance for algorithm implementations.
68///
69/// # Example
70///
71/// ```
72/// use grafeo_core::graph::lpg::LpgStore;
73/// use grafeo_core::graph::Direction;
74///
75/// let store = LpgStore::new();
76///
77/// // Create a small social network
78/// let alice = store.create_node(&["Person"]);
79/// let bob = store.create_node(&["Person"]);
80/// store.create_edge(alice, bob, "KNOWS");
81///
82/// // Traverse outgoing edges
83/// for neighbor in store.neighbors(alice, Direction::Outgoing) {
84///     println!("Alice knows node {:?}", neighbor);
85/// }
86/// ```
87pub struct LpgStore {
88    /// Configuration.
89    #[allow(dead_code)]
90    config: LpgStoreConfig,
91
92    /// Node records indexed by NodeId, with version chains for MVCC.
93    /// Used when `tiered-storage` feature is disabled.
94    #[cfg(not(feature = "tiered-storage"))]
95    nodes: RwLock<FxHashMap<NodeId, VersionChain<NodeRecord>>>,
96
97    /// Edge records indexed by EdgeId, with version chains for MVCC.
98    /// Used when `tiered-storage` feature is disabled.
99    #[cfg(not(feature = "tiered-storage"))]
100    edges: RwLock<FxHashMap<EdgeId, VersionChain<EdgeRecord>>>,
101
102    // === Tiered Storage Fields (feature-gated) ===
103    /// Arena allocator for hot data storage.
104    /// Data is stored in per-epoch arenas for fast allocation and bulk deallocation.
105    #[cfg(feature = "tiered-storage")]
106    arena_allocator: Arc<ArenaAllocator>,
107
108    /// Node version indexes - store metadata and arena offsets.
109    /// The actual NodeRecord data is stored in the arena.
110    #[cfg(feature = "tiered-storage")]
111    node_versions: RwLock<FxHashMap<NodeId, VersionIndex>>,
112
113    /// Edge version indexes - store metadata and arena offsets.
114    /// The actual EdgeRecord data is stored in the arena.
115    #[cfg(feature = "tiered-storage")]
116    edge_versions: RwLock<FxHashMap<EdgeId, VersionIndex>>,
117
118    /// Cold storage for frozen epochs.
119    /// Contains compressed epoch blocks for historical data.
120    #[cfg(feature = "tiered-storage")]
121    epoch_store: Arc<EpochStore>,
122
123    /// Property storage for nodes.
124    node_properties: PropertyStorage<NodeId>,
125
126    /// Property storage for edges.
127    edge_properties: PropertyStorage<EdgeId>,
128
129    /// Label name to ID mapping.
130    label_to_id: RwLock<FxHashMap<Arc<str>, u32>>,
131
132    /// Label ID to name mapping.
133    id_to_label: RwLock<Vec<Arc<str>>>,
134
135    /// Edge type name to ID mapping.
136    edge_type_to_id: RwLock<FxHashMap<Arc<str>, u32>>,
137
138    /// Edge type ID to name mapping.
139    id_to_edge_type: RwLock<Vec<Arc<str>>>,
140
141    /// Forward adjacency lists (outgoing edges).
142    forward_adj: ChunkedAdjacency,
143
144    /// Backward adjacency lists (incoming edges).
145    /// Only populated if config.backward_edges is true.
146    backward_adj: Option<ChunkedAdjacency>,
147
148    /// Label index: label_id -> set of node IDs.
149    label_index: RwLock<Vec<FxHashMap<NodeId, ()>>>,
150
151    /// Node labels: node_id -> set of label IDs.
152    /// Reverse mapping to efficiently get labels for a node.
153    node_labels: RwLock<FxHashMap<NodeId, FxHashSet<u32>>>,
154
155    /// Next node ID.
156    next_node_id: AtomicU64,
157
158    /// Next edge ID.
159    next_edge_id: AtomicU64,
160
161    /// Current epoch.
162    current_epoch: AtomicU64,
163
164    /// Statistics for cost-based optimization.
165    statistics: RwLock<Statistics>,
166}
167
168impl LpgStore {
169    /// Creates a new LPG store with default configuration.
170    #[must_use]
171    pub fn new() -> Self {
172        Self::with_config(LpgStoreConfig::default())
173    }
174
175    /// Creates a new LPG store with custom configuration.
176    #[must_use]
177    pub fn with_config(config: LpgStoreConfig) -> Self {
178        let backward_adj = if config.backward_edges {
179            Some(ChunkedAdjacency::new())
180        } else {
181            None
182        };
183
184        Self {
185            #[cfg(not(feature = "tiered-storage"))]
186            nodes: RwLock::new(FxHashMap::default()),
187            #[cfg(not(feature = "tiered-storage"))]
188            edges: RwLock::new(FxHashMap::default()),
189            #[cfg(feature = "tiered-storage")]
190            arena_allocator: Arc::new(ArenaAllocator::new()),
191            #[cfg(feature = "tiered-storage")]
192            node_versions: RwLock::new(FxHashMap::default()),
193            #[cfg(feature = "tiered-storage")]
194            edge_versions: RwLock::new(FxHashMap::default()),
195            #[cfg(feature = "tiered-storage")]
196            epoch_store: Arc::new(EpochStore::new()),
197            node_properties: PropertyStorage::new(),
198            edge_properties: PropertyStorage::new(),
199            label_to_id: RwLock::new(FxHashMap::default()),
200            id_to_label: RwLock::new(Vec::new()),
201            edge_type_to_id: RwLock::new(FxHashMap::default()),
202            id_to_edge_type: RwLock::new(Vec::new()),
203            forward_adj: ChunkedAdjacency::new(),
204            backward_adj,
205            label_index: RwLock::new(Vec::new()),
206            node_labels: RwLock::new(FxHashMap::default()),
207            next_node_id: AtomicU64::new(0),
208            next_edge_id: AtomicU64::new(0),
209            current_epoch: AtomicU64::new(0),
210            statistics: RwLock::new(Statistics::new()),
211            config,
212        }
213    }
214
215    /// Returns the current epoch.
216    #[must_use]
217    pub fn current_epoch(&self) -> EpochId {
218        EpochId::new(self.current_epoch.load(Ordering::Acquire))
219    }
220
221    /// Creates a new epoch.
222    pub fn new_epoch(&self) -> EpochId {
223        let id = self.current_epoch.fetch_add(1, Ordering::AcqRel) + 1;
224        EpochId::new(id)
225    }
226
227    // === Node Operations ===
228
229    /// Creates a new node with the given labels.
230    ///
231    /// Uses the system transaction for non-transactional operations.
232    pub fn create_node(&self, labels: &[&str]) -> NodeId {
233        self.create_node_versioned(labels, self.current_epoch(), TxId::SYSTEM)
234    }
235
236    /// Creates a new node with the given labels within a transaction context.
237    #[cfg(not(feature = "tiered-storage"))]
238    pub fn create_node_versioned(&self, labels: &[&str], epoch: EpochId, tx_id: TxId) -> NodeId {
239        let id = NodeId::new(self.next_node_id.fetch_add(1, Ordering::Relaxed));
240
241        let mut record = NodeRecord::new(id, epoch);
242        record.set_label_count(labels.len() as u16);
243
244        // Store labels in node_labels map and label_index
245        let mut node_label_set = FxHashSet::default();
246        for label in labels {
247            let label_id = self.get_or_create_label_id(*label);
248            node_label_set.insert(label_id);
249
250            // Update label index
251            let mut index = self.label_index.write();
252            while index.len() <= label_id as usize {
253                index.push(FxHashMap::default());
254            }
255            index[label_id as usize].insert(id, ());
256        }
257
258        // Store node's labels
259        self.node_labels.write().insert(id, node_label_set);
260
261        // Create version chain with initial version
262        let chain = VersionChain::with_initial(record, epoch, tx_id);
263        self.nodes.write().insert(id, chain);
264        id
265    }
266
267    /// Creates a new node with the given labels within a transaction context.
268    /// (Tiered storage version: stores data in arena, metadata in VersionIndex)
269    #[cfg(feature = "tiered-storage")]
270    pub fn create_node_versioned(&self, labels: &[&str], epoch: EpochId, tx_id: TxId) -> NodeId {
271        let id = NodeId::new(self.next_node_id.fetch_add(1, Ordering::Relaxed));
272
273        let mut record = NodeRecord::new(id, epoch);
274        record.set_label_count(labels.len() as u16);
275
276        // Store labels in node_labels map and label_index
277        let mut node_label_set = FxHashSet::default();
278        for label in labels {
279            let label_id = self.get_or_create_label_id(*label);
280            node_label_set.insert(label_id);
281
282            // Update label index
283            let mut index = self.label_index.write();
284            while index.len() <= label_id as usize {
285                index.push(FxHashMap::default());
286            }
287            index[label_id as usize].insert(id, ());
288        }
289
290        // Store node's labels
291        self.node_labels.write().insert(id, node_label_set);
292
293        // Allocate record in arena and get offset (create epoch if needed)
294        let arena = self.arena_allocator.arena_or_create(epoch);
295        let (offset, _stored) = arena.alloc_value_with_offset(record);
296
297        // Create HotVersionRef pointing to arena data
298        let hot_ref = HotVersionRef::new(epoch, offset, tx_id);
299
300        // Create or update version index
301        let mut versions = self.node_versions.write();
302        if let Some(index) = versions.get_mut(&id) {
303            index.add_hot(hot_ref);
304        } else {
305            versions.insert(id, VersionIndex::with_initial(hot_ref));
306        }
307
308        id
309    }
310
311    /// Creates a new node with labels and properties.
312    pub fn create_node_with_props(
313        &self,
314        labels: &[&str],
315        properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
316    ) -> NodeId {
317        self.create_node_with_props_versioned(
318            labels,
319            properties,
320            self.current_epoch(),
321            TxId::SYSTEM,
322        )
323    }
324
325    /// Creates a new node with labels and properties within a transaction context.
326    #[cfg(not(feature = "tiered-storage"))]
327    pub fn create_node_with_props_versioned(
328        &self,
329        labels: &[&str],
330        properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
331        epoch: EpochId,
332        tx_id: TxId,
333    ) -> NodeId {
334        let id = self.create_node_versioned(labels, epoch, tx_id);
335
336        for (key, value) in properties {
337            self.node_properties.set(id, key.into(), value.into());
338        }
339
340        // Update props_count in record
341        let count = self.node_properties.get_all(id).len() as u16;
342        if let Some(chain) = self.nodes.write().get_mut(&id) {
343            if let Some(record) = chain.latest_mut() {
344                record.props_count = count;
345            }
346        }
347
348        id
349    }
350
351    /// Creates a new node with labels and properties within a transaction context.
352    /// (Tiered storage version)
353    #[cfg(feature = "tiered-storage")]
354    pub fn create_node_with_props_versioned(
355        &self,
356        labels: &[&str],
357        properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
358        epoch: EpochId,
359        tx_id: TxId,
360    ) -> NodeId {
361        let id = self.create_node_versioned(labels, epoch, tx_id);
362
363        for (key, value) in properties {
364            self.node_properties.set(id, key.into(), value.into());
365        }
366
367        // Note: props_count in record is not updated for tiered storage.
368        // The record is immutable once allocated in the arena.
369
370        id
371    }
372
373    /// Gets a node by ID (latest visible version).
374    #[must_use]
375    pub fn get_node(&self, id: NodeId) -> Option<Node> {
376        self.get_node_at_epoch(id, self.current_epoch())
377    }
378
379    /// Gets a node by ID at a specific epoch.
380    #[must_use]
381    #[cfg(not(feature = "tiered-storage"))]
382    pub fn get_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> Option<Node> {
383        let nodes = self.nodes.read();
384        let chain = nodes.get(&id)?;
385        let record = chain.visible_at(epoch)?;
386
387        if record.is_deleted() {
388            return None;
389        }
390
391        let mut node = Node::new(id);
392
393        // Get labels from node_labels map
394        let id_to_label = self.id_to_label.read();
395        let node_labels = self.node_labels.read();
396        if let Some(label_ids) = node_labels.get(&id) {
397            for &label_id in label_ids {
398                if let Some(label) = id_to_label.get(label_id as usize) {
399                    node.labels.push(label.clone());
400                }
401            }
402        }
403
404        // Get properties
405        node.properties = self.node_properties.get_all(id).into_iter().collect();
406
407        Some(node)
408    }
409
410    /// Gets a node by ID at a specific epoch.
411    /// (Tiered storage version: reads from arena via VersionIndex)
412    #[must_use]
413    #[cfg(feature = "tiered-storage")]
414    pub fn get_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> Option<Node> {
415        let versions = self.node_versions.read();
416        let index = versions.get(&id)?;
417        let version_ref = index.visible_at(epoch)?;
418
419        // Read the record from arena
420        let record = self.read_node_record(&version_ref)?;
421
422        if record.is_deleted() {
423            return None;
424        }
425
426        let mut node = Node::new(id);
427
428        // Get labels from node_labels map
429        let id_to_label = self.id_to_label.read();
430        let node_labels = self.node_labels.read();
431        if let Some(label_ids) = node_labels.get(&id) {
432            for &label_id in label_ids {
433                if let Some(label) = id_to_label.get(label_id as usize) {
434                    node.labels.push(label.clone());
435                }
436            }
437        }
438
439        // Get properties
440        node.properties = self.node_properties.get_all(id).into_iter().collect();
441
442        Some(node)
443    }
444
445    /// Gets a node visible to a specific transaction.
446    #[must_use]
447    #[cfg(not(feature = "tiered-storage"))]
448    pub fn get_node_versioned(&self, id: NodeId, epoch: EpochId, tx_id: TxId) -> Option<Node> {
449        let nodes = self.nodes.read();
450        let chain = nodes.get(&id)?;
451        let record = chain.visible_to(epoch, tx_id)?;
452
453        if record.is_deleted() {
454            return None;
455        }
456
457        let mut node = Node::new(id);
458
459        // Get labels from node_labels map
460        let id_to_label = self.id_to_label.read();
461        let node_labels = self.node_labels.read();
462        if let Some(label_ids) = node_labels.get(&id) {
463            for &label_id in label_ids {
464                if let Some(label) = id_to_label.get(label_id as usize) {
465                    node.labels.push(label.clone());
466                }
467            }
468        }
469
470        // Get properties
471        node.properties = self.node_properties.get_all(id).into_iter().collect();
472
473        Some(node)
474    }
475
476    /// Gets a node visible to a specific transaction.
477    /// (Tiered storage version: reads from arena via VersionIndex)
478    #[must_use]
479    #[cfg(feature = "tiered-storage")]
480    pub fn get_node_versioned(&self, id: NodeId, epoch: EpochId, tx_id: TxId) -> Option<Node> {
481        let versions = self.node_versions.read();
482        let index = versions.get(&id)?;
483        let version_ref = index.visible_to(epoch, tx_id)?;
484
485        // Read the record from arena
486        let record = self.read_node_record(&version_ref)?;
487
488        if record.is_deleted() {
489            return None;
490        }
491
492        let mut node = Node::new(id);
493
494        // Get labels from node_labels map
495        let id_to_label = self.id_to_label.read();
496        let node_labels = self.node_labels.read();
497        if let Some(label_ids) = node_labels.get(&id) {
498            for &label_id in label_ids {
499                if let Some(label) = id_to_label.get(label_id as usize) {
500                    node.labels.push(label.clone());
501                }
502            }
503        }
504
505        // Get properties
506        node.properties = self.node_properties.get_all(id).into_iter().collect();
507
508        Some(node)
509    }
510
511    /// Reads a NodeRecord from arena (hot) or epoch store (cold) using a VersionRef.
512    #[cfg(feature = "tiered-storage")]
513    #[allow(unsafe_code)]
514    fn read_node_record(&self, version_ref: &VersionRef) -> Option<NodeRecord> {
515        match version_ref {
516            VersionRef::Hot(hot_ref) => {
517                let arena = self.arena_allocator.arena(hot_ref.epoch);
518                // SAFETY: The offset was returned by alloc_value_with_offset for a NodeRecord
519                let record: &NodeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
520                Some(record.clone())
521            }
522            VersionRef::Cold(cold_ref) => {
523                // Read from compressed epoch store
524                self.epoch_store
525                    .get_node(cold_ref.epoch, cold_ref.block_offset, cold_ref.length)
526            }
527        }
528    }
529
530    /// Deletes a node and all its edges (using latest epoch).
531    pub fn delete_node(&self, id: NodeId) -> bool {
532        self.delete_node_at_epoch(id, self.current_epoch())
533    }
534
535    /// Deletes a node at a specific epoch.
536    #[cfg(not(feature = "tiered-storage"))]
537    pub fn delete_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> bool {
538        let mut nodes = self.nodes.write();
539        if let Some(chain) = nodes.get_mut(&id) {
540            // Check if visible at this epoch (not already deleted)
541            if let Some(record) = chain.visible_at(epoch) {
542                if record.is_deleted() {
543                    return false;
544                }
545            } else {
546                // Not visible at this epoch (already deleted or doesn't exist)
547                return false;
548            }
549
550            // Mark the version chain as deleted at this epoch
551            chain.mark_deleted(epoch);
552
553            // Remove from label index using node_labels map
554            let mut index = self.label_index.write();
555            let mut node_labels = self.node_labels.write();
556            if let Some(label_ids) = node_labels.remove(&id) {
557                for label_id in label_ids {
558                    if let Some(set) = index.get_mut(label_id as usize) {
559                        set.remove(&id);
560                    }
561                }
562            }
563
564            // Remove properties
565            drop(nodes); // Release lock before removing properties
566            drop(index);
567            drop(node_labels);
568            self.node_properties.remove_all(id);
569
570            // Note: Caller should use delete_node_edges() first if detach is needed
571
572            true
573        } else {
574            false
575        }
576    }
577
578    /// Deletes a node at a specific epoch.
579    /// (Tiered storage version)
580    #[cfg(feature = "tiered-storage")]
581    pub fn delete_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> bool {
582        let mut versions = self.node_versions.write();
583        if let Some(index) = versions.get_mut(&id) {
584            // Check if visible at this epoch
585            if let Some(version_ref) = index.visible_at(epoch) {
586                if let Some(record) = self.read_node_record(&version_ref) {
587                    if record.is_deleted() {
588                        return false;
589                    }
590                } else {
591                    return false;
592                }
593            } else {
594                return false;
595            }
596
597            // Mark as deleted in version index
598            index.mark_deleted(epoch);
599
600            // Remove from label index using node_labels map
601            let mut label_index = self.label_index.write();
602            let mut node_labels = self.node_labels.write();
603            if let Some(label_ids) = node_labels.remove(&id) {
604                for label_id in label_ids {
605                    if let Some(set) = label_index.get_mut(label_id as usize) {
606                        set.remove(&id);
607                    }
608                }
609            }
610
611            // Remove properties
612            drop(versions);
613            drop(label_index);
614            drop(node_labels);
615            self.node_properties.remove_all(id);
616
617            true
618        } else {
619            false
620        }
621    }
622
623    /// Deletes all edges connected to a node (implements DETACH DELETE).
624    ///
625    /// Call this before `delete_node()` if you want to remove a node that
626    /// has edges. Grafeo doesn't auto-delete edges - you have to be explicit.
627    #[cfg(not(feature = "tiered-storage"))]
628    pub fn delete_node_edges(&self, node_id: NodeId) {
629        // Get outgoing edges
630        let outgoing: Vec<EdgeId> = self
631            .forward_adj
632            .edges_from(node_id)
633            .into_iter()
634            .map(|(_, edge_id)| edge_id)
635            .collect();
636
637        // Get incoming edges
638        let incoming: Vec<EdgeId> = if let Some(ref backward) = self.backward_adj {
639            backward
640                .edges_from(node_id)
641                .into_iter()
642                .map(|(_, edge_id)| edge_id)
643                .collect()
644        } else {
645            // No backward adjacency - scan all edges
646            let epoch = self.current_epoch();
647            self.edges
648                .read()
649                .iter()
650                .filter_map(|(id, chain)| {
651                    chain.visible_at(epoch).and_then(|r| {
652                        if !r.is_deleted() && r.dst == node_id {
653                            Some(*id)
654                        } else {
655                            None
656                        }
657                    })
658                })
659                .collect()
660        };
661
662        // Delete all edges
663        for edge_id in outgoing.into_iter().chain(incoming) {
664            self.delete_edge(edge_id);
665        }
666    }
667
668    /// Deletes all edges connected to a node (implements DETACH DELETE).
669    /// (Tiered storage version)
670    #[cfg(feature = "tiered-storage")]
671    pub fn delete_node_edges(&self, node_id: NodeId) {
672        // Get outgoing edges
673        let outgoing: Vec<EdgeId> = self
674            .forward_adj
675            .edges_from(node_id)
676            .into_iter()
677            .map(|(_, edge_id)| edge_id)
678            .collect();
679
680        // Get incoming edges
681        let incoming: Vec<EdgeId> = if let Some(ref backward) = self.backward_adj {
682            backward
683                .edges_from(node_id)
684                .into_iter()
685                .map(|(_, edge_id)| edge_id)
686                .collect()
687        } else {
688            // No backward adjacency - scan all edges
689            let epoch = self.current_epoch();
690            let versions = self.edge_versions.read();
691            versions
692                .iter()
693                .filter_map(|(id, index)| {
694                    index.visible_at(epoch).and_then(|vref| {
695                        self.read_edge_record(&vref).and_then(|r| {
696                            if !r.is_deleted() && r.dst == node_id {
697                                Some(*id)
698                            } else {
699                                None
700                            }
701                        })
702                    })
703                })
704                .collect()
705        };
706
707        // Delete all edges
708        for edge_id in outgoing.into_iter().chain(incoming) {
709            self.delete_edge(edge_id);
710        }
711    }
712
713    /// Sets a property on a node.
714    #[cfg(not(feature = "tiered-storage"))]
715    pub fn set_node_property(&self, id: NodeId, key: &str, value: Value) {
716        self.node_properties.set(id, key.into(), value);
717
718        // Update props_count in record
719        let count = self.node_properties.get_all(id).len() as u16;
720        if let Some(chain) = self.nodes.write().get_mut(&id) {
721            if let Some(record) = chain.latest_mut() {
722                record.props_count = count;
723            }
724        }
725    }
726
727    /// Sets a property on a node.
728    /// (Tiered storage version: properties stored separately, record is immutable)
729    #[cfg(feature = "tiered-storage")]
730    pub fn set_node_property(&self, id: NodeId, key: &str, value: Value) {
731        self.node_properties.set(id, key.into(), value);
732        // Note: props_count in record is not updated for tiered storage.
733        // The record is immutable once allocated in the arena.
734        // Property count can be derived from PropertyStorage if needed.
735    }
736
737    /// Sets a property on an edge.
738    pub fn set_edge_property(&self, id: EdgeId, key: &str, value: Value) {
739        self.edge_properties.set(id, key.into(), value);
740    }
741
742    /// Removes a property from a node.
743    ///
744    /// Returns the previous value if it existed, or None if the property didn't exist.
745    #[cfg(not(feature = "tiered-storage"))]
746    pub fn remove_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
747        let result = self.node_properties.remove(id, &key.into());
748
749        // Update props_count in record
750        let count = self.node_properties.get_all(id).len() as u16;
751        if let Some(chain) = self.nodes.write().get_mut(&id) {
752            if let Some(record) = chain.latest_mut() {
753                record.props_count = count;
754            }
755        }
756
757        result
758    }
759
760    /// Removes a property from a node.
761    /// (Tiered storage version)
762    #[cfg(feature = "tiered-storage")]
763    pub fn remove_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
764        self.node_properties.remove(id, &key.into())
765        // Note: props_count in record is not updated for tiered storage.
766    }
767
768    /// Removes a property from an edge.
769    ///
770    /// Returns the previous value if it existed, or None if the property didn't exist.
771    pub fn remove_edge_property(&self, id: EdgeId, key: &str) -> Option<Value> {
772        self.edge_properties.remove(id, &key.into())
773    }
774
775    /// Gets a single property from a node without loading all properties.
776    ///
777    /// This is O(1) vs O(properties) for `get_node().get_property()`.
778    /// Use this for filter predicates where you only need one property value.
779    ///
780    /// # Example
781    ///
782    /// ```ignore
783    /// // Fast: Direct single-property lookup
784    /// let age = store.get_node_property(node_id, "age");
785    ///
786    /// // Slow: Loads all properties, then extracts one
787    /// let age = store.get_node(node_id).and_then(|n| n.get_property("age").cloned());
788    /// ```
789    #[must_use]
790    pub fn get_node_property(&self, id: NodeId, key: &PropertyKey) -> Option<Value> {
791        self.node_properties.get(id, key)
792    }
793
794    /// Gets a single property from an edge without loading all properties.
795    ///
796    /// This is O(1) vs O(properties) for `get_edge().get_property()`.
797    #[must_use]
798    pub fn get_edge_property(&self, id: EdgeId, key: &PropertyKey) -> Option<Value> {
799        self.edge_properties.get(id, key)
800    }
801
802    /// Adds a label to a node.
803    ///
804    /// Returns true if the label was added, false if the node doesn't exist
805    /// or already has the label.
806    #[cfg(not(feature = "tiered-storage"))]
807    pub fn add_label(&self, node_id: NodeId, label: &str) -> bool {
808        let epoch = self.current_epoch();
809
810        // Check if node exists
811        let nodes = self.nodes.read();
812        if let Some(chain) = nodes.get(&node_id) {
813            if chain.visible_at(epoch).map_or(true, |r| r.is_deleted()) {
814                return false;
815            }
816        } else {
817            return false;
818        }
819        drop(nodes);
820
821        // Get or create label ID
822        let label_id = self.get_or_create_label_id(label);
823
824        // Add to node_labels map
825        let mut node_labels = self.node_labels.write();
826        let label_set = node_labels
827            .entry(node_id)
828            .or_insert_with(FxHashSet::default);
829
830        if label_set.contains(&label_id) {
831            return false; // Already has this label
832        }
833
834        label_set.insert(label_id);
835        drop(node_labels);
836
837        // Add to label_index
838        let mut index = self.label_index.write();
839        if (label_id as usize) >= index.len() {
840            index.resize(label_id as usize + 1, FxHashMap::default());
841        }
842        index[label_id as usize].insert(node_id, ());
843
844        // Update label count in node record
845        if let Some(chain) = self.nodes.write().get_mut(&node_id) {
846            if let Some(record) = chain.latest_mut() {
847                let count = self.node_labels.read().get(&node_id).map_or(0, |s| s.len());
848                record.set_label_count(count as u16);
849            }
850        }
851
852        true
853    }
854
855    /// Adds a label to a node.
856    /// (Tiered storage version)
857    #[cfg(feature = "tiered-storage")]
858    pub fn add_label(&self, node_id: NodeId, label: &str) -> bool {
859        let epoch = self.current_epoch();
860
861        // Check if node exists
862        let versions = self.node_versions.read();
863        if let Some(index) = versions.get(&node_id) {
864            if let Some(vref) = index.visible_at(epoch) {
865                if let Some(record) = self.read_node_record(&vref) {
866                    if record.is_deleted() {
867                        return false;
868                    }
869                } else {
870                    return false;
871                }
872            } else {
873                return false;
874            }
875        } else {
876            return false;
877        }
878        drop(versions);
879
880        // Get or create label ID
881        let label_id = self.get_or_create_label_id(label);
882
883        // Add to node_labels map
884        let mut node_labels = self.node_labels.write();
885        let label_set = node_labels
886            .entry(node_id)
887            .or_insert_with(FxHashSet::default);
888
889        if label_set.contains(&label_id) {
890            return false; // Already has this label
891        }
892
893        label_set.insert(label_id);
894        drop(node_labels);
895
896        // Add to label_index
897        let mut index = self.label_index.write();
898        if (label_id as usize) >= index.len() {
899            index.resize(label_id as usize + 1, FxHashMap::default());
900        }
901        index[label_id as usize].insert(node_id, ());
902
903        // Note: label_count in record is not updated for tiered storage.
904        // The record is immutable once allocated in the arena.
905
906        true
907    }
908
909    /// Removes a label from a node.
910    ///
911    /// Returns true if the label was removed, false if the node doesn't exist
912    /// or doesn't have the label.
913    #[cfg(not(feature = "tiered-storage"))]
914    pub fn remove_label(&self, node_id: NodeId, label: &str) -> bool {
915        let epoch = self.current_epoch();
916
917        // Check if node exists
918        let nodes = self.nodes.read();
919        if let Some(chain) = nodes.get(&node_id) {
920            if chain.visible_at(epoch).map_or(true, |r| r.is_deleted()) {
921                return false;
922            }
923        } else {
924            return false;
925        }
926        drop(nodes);
927
928        // Get label ID
929        let label_id = {
930            let label_ids = self.label_to_id.read();
931            match label_ids.get(label) {
932                Some(&id) => id,
933                None => return false, // Label doesn't exist
934            }
935        };
936
937        // Remove from node_labels map
938        let mut node_labels = self.node_labels.write();
939        if let Some(label_set) = node_labels.get_mut(&node_id) {
940            if !label_set.remove(&label_id) {
941                return false; // Node doesn't have this label
942            }
943        } else {
944            return false;
945        }
946        drop(node_labels);
947
948        // Remove from label_index
949        let mut index = self.label_index.write();
950        if (label_id as usize) < index.len() {
951            index[label_id as usize].remove(&node_id);
952        }
953
954        // Update label count in node record
955        if let Some(chain) = self.nodes.write().get_mut(&node_id) {
956            if let Some(record) = chain.latest_mut() {
957                let count = self.node_labels.read().get(&node_id).map_or(0, |s| s.len());
958                record.set_label_count(count as u16);
959            }
960        }
961
962        true
963    }
964
965    /// Removes a label from a node.
966    /// (Tiered storage version)
967    #[cfg(feature = "tiered-storage")]
968    pub fn remove_label(&self, node_id: NodeId, label: &str) -> bool {
969        let epoch = self.current_epoch();
970
971        // Check if node exists
972        let versions = self.node_versions.read();
973        if let Some(index) = versions.get(&node_id) {
974            if let Some(vref) = index.visible_at(epoch) {
975                if let Some(record) = self.read_node_record(&vref) {
976                    if record.is_deleted() {
977                        return false;
978                    }
979                } else {
980                    return false;
981                }
982            } else {
983                return false;
984            }
985        } else {
986            return false;
987        }
988        drop(versions);
989
990        // Get label ID
991        let label_id = {
992            let label_ids = self.label_to_id.read();
993            match label_ids.get(label) {
994                Some(&id) => id,
995                None => return false, // Label doesn't exist
996            }
997        };
998
999        // Remove from node_labels map
1000        let mut node_labels = self.node_labels.write();
1001        if let Some(label_set) = node_labels.get_mut(&node_id) {
1002            if !label_set.remove(&label_id) {
1003                return false; // Node doesn't have this label
1004            }
1005        } else {
1006            return false;
1007        }
1008        drop(node_labels);
1009
1010        // Remove from label_index
1011        let mut index = self.label_index.write();
1012        if (label_id as usize) < index.len() {
1013            index[label_id as usize].remove(&node_id);
1014        }
1015
1016        // Note: label_count in record is not updated for tiered storage.
1017
1018        true
1019    }
1020
1021    /// Returns the number of nodes (non-deleted at current epoch).
1022    #[must_use]
1023    #[cfg(not(feature = "tiered-storage"))]
1024    pub fn node_count(&self) -> usize {
1025        let epoch = self.current_epoch();
1026        self.nodes
1027            .read()
1028            .values()
1029            .filter_map(|chain| chain.visible_at(epoch))
1030            .filter(|r| !r.is_deleted())
1031            .count()
1032    }
1033
1034    /// Returns the number of nodes (non-deleted at current epoch).
1035    /// (Tiered storage version)
1036    #[must_use]
1037    #[cfg(feature = "tiered-storage")]
1038    pub fn node_count(&self) -> usize {
1039        let epoch = self.current_epoch();
1040        let versions = self.node_versions.read();
1041        versions
1042            .iter()
1043            .filter(|(_, index)| {
1044                index.visible_at(epoch).map_or(false, |vref| {
1045                    self.read_node_record(&vref)
1046                        .map_or(false, |r| !r.is_deleted())
1047                })
1048            })
1049            .count()
1050    }
1051
1052    /// Returns all node IDs in the store.
1053    ///
1054    /// This returns a snapshot of current node IDs. The returned vector
1055    /// excludes deleted nodes. Results are sorted by NodeId for deterministic
1056    /// iteration order.
1057    #[must_use]
1058    #[cfg(not(feature = "tiered-storage"))]
1059    pub fn node_ids(&self) -> Vec<NodeId> {
1060        let epoch = self.current_epoch();
1061        let mut ids: Vec<NodeId> = self
1062            .nodes
1063            .read()
1064            .iter()
1065            .filter_map(|(id, chain)| {
1066                chain
1067                    .visible_at(epoch)
1068                    .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
1069            })
1070            .collect();
1071        ids.sort_unstable();
1072        ids
1073    }
1074
1075    /// Returns all node IDs in the store.
1076    /// (Tiered storage version)
1077    #[must_use]
1078    #[cfg(feature = "tiered-storage")]
1079    pub fn node_ids(&self) -> Vec<NodeId> {
1080        let epoch = self.current_epoch();
1081        let versions = self.node_versions.read();
1082        let mut ids: Vec<NodeId> = versions
1083            .iter()
1084            .filter_map(|(id, index)| {
1085                index.visible_at(epoch).and_then(|vref| {
1086                    self.read_node_record(&vref)
1087                        .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
1088                })
1089            })
1090            .collect();
1091        ids.sort_unstable();
1092        ids
1093    }
1094
1095    // === Edge Operations ===
1096
1097    /// Creates a new edge.
1098    pub fn create_edge(&self, src: NodeId, dst: NodeId, edge_type: &str) -> EdgeId {
1099        self.create_edge_versioned(src, dst, edge_type, self.current_epoch(), TxId::SYSTEM)
1100    }
1101
1102    /// Creates a new edge within a transaction context.
1103    #[cfg(not(feature = "tiered-storage"))]
1104    pub fn create_edge_versioned(
1105        &self,
1106        src: NodeId,
1107        dst: NodeId,
1108        edge_type: &str,
1109        epoch: EpochId,
1110        tx_id: TxId,
1111    ) -> EdgeId {
1112        let id = EdgeId::new(self.next_edge_id.fetch_add(1, Ordering::Relaxed));
1113        let type_id = self.get_or_create_edge_type_id(edge_type);
1114
1115        let record = EdgeRecord::new(id, src, dst, type_id, epoch);
1116        let chain = VersionChain::with_initial(record, epoch, tx_id);
1117        self.edges.write().insert(id, chain);
1118
1119        // Update adjacency
1120        self.forward_adj.add_edge(src, dst, id);
1121        if let Some(ref backward) = self.backward_adj {
1122            backward.add_edge(dst, src, id);
1123        }
1124
1125        id
1126    }
1127
1128    /// Creates a new edge within a transaction context.
1129    /// (Tiered storage version)
1130    #[cfg(feature = "tiered-storage")]
1131    pub fn create_edge_versioned(
1132        &self,
1133        src: NodeId,
1134        dst: NodeId,
1135        edge_type: &str,
1136        epoch: EpochId,
1137        tx_id: TxId,
1138    ) -> EdgeId {
1139        let id = EdgeId::new(self.next_edge_id.fetch_add(1, Ordering::Relaxed));
1140        let type_id = self.get_or_create_edge_type_id(edge_type);
1141
1142        let record = EdgeRecord::new(id, src, dst, type_id, epoch);
1143
1144        // Allocate record in arena and get offset (create epoch if needed)
1145        let arena = self.arena_allocator.arena_or_create(epoch);
1146        let (offset, _stored) = arena.alloc_value_with_offset(record);
1147
1148        // Create HotVersionRef pointing to arena data
1149        let hot_ref = HotVersionRef::new(epoch, offset, tx_id);
1150
1151        // Create or update version index
1152        let mut versions = self.edge_versions.write();
1153        if let Some(index) = versions.get_mut(&id) {
1154            index.add_hot(hot_ref);
1155        } else {
1156            versions.insert(id, VersionIndex::with_initial(hot_ref));
1157        }
1158
1159        // Update adjacency
1160        self.forward_adj.add_edge(src, dst, id);
1161        if let Some(ref backward) = self.backward_adj {
1162            backward.add_edge(dst, src, id);
1163        }
1164
1165        id
1166    }
1167
1168    /// Creates a new edge with properties.
1169    pub fn create_edge_with_props(
1170        &self,
1171        src: NodeId,
1172        dst: NodeId,
1173        edge_type: &str,
1174        properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
1175    ) -> EdgeId {
1176        let id = self.create_edge(src, dst, edge_type);
1177
1178        for (key, value) in properties {
1179            self.edge_properties.set(id, key.into(), value.into());
1180        }
1181
1182        id
1183    }
1184
1185    /// Gets an edge by ID (latest visible version).
1186    #[must_use]
1187    pub fn get_edge(&self, id: EdgeId) -> Option<Edge> {
1188        self.get_edge_at_epoch(id, self.current_epoch())
1189    }
1190
1191    /// Gets an edge by ID at a specific epoch.
1192    #[must_use]
1193    #[cfg(not(feature = "tiered-storage"))]
1194    pub fn get_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> Option<Edge> {
1195        let edges = self.edges.read();
1196        let chain = edges.get(&id)?;
1197        let record = chain.visible_at(epoch)?;
1198
1199        if record.is_deleted() {
1200            return None;
1201        }
1202
1203        let edge_type = {
1204            let id_to_type = self.id_to_edge_type.read();
1205            id_to_type.get(record.type_id as usize)?.clone()
1206        };
1207
1208        let mut edge = Edge::new(id, record.src, record.dst, edge_type);
1209
1210        // Get properties
1211        edge.properties = self.edge_properties.get_all(id).into_iter().collect();
1212
1213        Some(edge)
1214    }
1215
1216    /// Gets an edge by ID at a specific epoch.
1217    /// (Tiered storage version)
1218    #[must_use]
1219    #[cfg(feature = "tiered-storage")]
1220    pub fn get_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> Option<Edge> {
1221        let versions = self.edge_versions.read();
1222        let index = versions.get(&id)?;
1223        let version_ref = index.visible_at(epoch)?;
1224
1225        let record = self.read_edge_record(&version_ref)?;
1226
1227        if record.is_deleted() {
1228            return None;
1229        }
1230
1231        let edge_type = {
1232            let id_to_type = self.id_to_edge_type.read();
1233            id_to_type.get(record.type_id as usize)?.clone()
1234        };
1235
1236        let mut edge = Edge::new(id, record.src, record.dst, edge_type);
1237
1238        // Get properties
1239        edge.properties = self.edge_properties.get_all(id).into_iter().collect();
1240
1241        Some(edge)
1242    }
1243
1244    /// Gets an edge visible to a specific transaction.
1245    #[must_use]
1246    #[cfg(not(feature = "tiered-storage"))]
1247    pub fn get_edge_versioned(&self, id: EdgeId, epoch: EpochId, tx_id: TxId) -> Option<Edge> {
1248        let edges = self.edges.read();
1249        let chain = edges.get(&id)?;
1250        let record = chain.visible_to(epoch, tx_id)?;
1251
1252        if record.is_deleted() {
1253            return None;
1254        }
1255
1256        let edge_type = {
1257            let id_to_type = self.id_to_edge_type.read();
1258            id_to_type.get(record.type_id as usize)?.clone()
1259        };
1260
1261        let mut edge = Edge::new(id, record.src, record.dst, edge_type);
1262
1263        // Get properties
1264        edge.properties = self.edge_properties.get_all(id).into_iter().collect();
1265
1266        Some(edge)
1267    }
1268
1269    /// Gets an edge visible to a specific transaction.
1270    /// (Tiered storage version)
1271    #[must_use]
1272    #[cfg(feature = "tiered-storage")]
1273    pub fn get_edge_versioned(&self, id: EdgeId, epoch: EpochId, tx_id: TxId) -> Option<Edge> {
1274        let versions = self.edge_versions.read();
1275        let index = versions.get(&id)?;
1276        let version_ref = index.visible_to(epoch, tx_id)?;
1277
1278        let record = self.read_edge_record(&version_ref)?;
1279
1280        if record.is_deleted() {
1281            return None;
1282        }
1283
1284        let edge_type = {
1285            let id_to_type = self.id_to_edge_type.read();
1286            id_to_type.get(record.type_id as usize)?.clone()
1287        };
1288
1289        let mut edge = Edge::new(id, record.src, record.dst, edge_type);
1290
1291        // Get properties
1292        edge.properties = self.edge_properties.get_all(id).into_iter().collect();
1293
1294        Some(edge)
1295    }
1296
1297    /// Reads an EdgeRecord from arena using a VersionRef.
1298    #[cfg(feature = "tiered-storage")]
1299    #[allow(unsafe_code)]
1300    fn read_edge_record(&self, version_ref: &VersionRef) -> Option<EdgeRecord> {
1301        match version_ref {
1302            VersionRef::Hot(hot_ref) => {
1303                let arena = self.arena_allocator.arena(hot_ref.epoch);
1304                // SAFETY: The offset was returned by alloc_value_with_offset for an EdgeRecord
1305                let record: &EdgeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
1306                Some(record.clone())
1307            }
1308            VersionRef::Cold(cold_ref) => {
1309                // Read from compressed epoch store
1310                self.epoch_store
1311                    .get_edge(cold_ref.epoch, cold_ref.block_offset, cold_ref.length)
1312            }
1313        }
1314    }
1315
1316    /// Deletes an edge (using latest epoch).
1317    pub fn delete_edge(&self, id: EdgeId) -> bool {
1318        self.delete_edge_at_epoch(id, self.current_epoch())
1319    }
1320
1321    /// Deletes an edge at a specific epoch.
1322    #[cfg(not(feature = "tiered-storage"))]
1323    pub fn delete_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> bool {
1324        let mut edges = self.edges.write();
1325        if let Some(chain) = edges.get_mut(&id) {
1326            // Get the visible record to check if deleted and get src/dst
1327            let (src, dst) = {
1328                match chain.visible_at(epoch) {
1329                    Some(record) => {
1330                        if record.is_deleted() {
1331                            return false;
1332                        }
1333                        (record.src, record.dst)
1334                    }
1335                    None => return false, // Not visible at this epoch (already deleted)
1336                }
1337            };
1338
1339            // Mark the version chain as deleted
1340            chain.mark_deleted(epoch);
1341
1342            drop(edges); // Release lock
1343
1344            // Mark as deleted in adjacency (soft delete)
1345            self.forward_adj.mark_deleted(src, id);
1346            if let Some(ref backward) = self.backward_adj {
1347                backward.mark_deleted(dst, id);
1348            }
1349
1350            // Remove properties
1351            self.edge_properties.remove_all(id);
1352
1353            true
1354        } else {
1355            false
1356        }
1357    }
1358
1359    /// Deletes an edge at a specific epoch.
1360    /// (Tiered storage version)
1361    #[cfg(feature = "tiered-storage")]
1362    pub fn delete_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> bool {
1363        let mut versions = self.edge_versions.write();
1364        if let Some(index) = versions.get_mut(&id) {
1365            // Get the visible record to check if deleted and get src/dst
1366            let (src, dst) = {
1367                match index.visible_at(epoch) {
1368                    Some(version_ref) => {
1369                        if let Some(record) = self.read_edge_record(&version_ref) {
1370                            if record.is_deleted() {
1371                                return false;
1372                            }
1373                            (record.src, record.dst)
1374                        } else {
1375                            return false;
1376                        }
1377                    }
1378                    None => return false,
1379                }
1380            };
1381
1382            // Mark as deleted in version index
1383            index.mark_deleted(epoch);
1384
1385            drop(versions); // Release lock
1386
1387            // Mark as deleted in adjacency (soft delete)
1388            self.forward_adj.mark_deleted(src, id);
1389            if let Some(ref backward) = self.backward_adj {
1390                backward.mark_deleted(dst, id);
1391            }
1392
1393            // Remove properties
1394            self.edge_properties.remove_all(id);
1395
1396            true
1397        } else {
1398            false
1399        }
1400    }
1401
1402    /// Returns the number of edges (non-deleted at current epoch).
1403    #[must_use]
1404    #[cfg(not(feature = "tiered-storage"))]
1405    pub fn edge_count(&self) -> usize {
1406        let epoch = self.current_epoch();
1407        self.edges
1408            .read()
1409            .values()
1410            .filter_map(|chain| chain.visible_at(epoch))
1411            .filter(|r| !r.is_deleted())
1412            .count()
1413    }
1414
1415    /// Returns the number of edges (non-deleted at current epoch).
1416    /// (Tiered storage version)
1417    #[must_use]
1418    #[cfg(feature = "tiered-storage")]
1419    pub fn edge_count(&self) -> usize {
1420        let epoch = self.current_epoch();
1421        let versions = self.edge_versions.read();
1422        versions
1423            .iter()
1424            .filter(|(_, index)| {
1425                index.visible_at(epoch).map_or(false, |vref| {
1426                    self.read_edge_record(&vref)
1427                        .map_or(false, |r| !r.is_deleted())
1428                })
1429            })
1430            .count()
1431    }
1432
1433    /// Discards all uncommitted versions created by a transaction.
1434    ///
1435    /// This is called during transaction rollback to clean up uncommitted changes.
1436    /// The method removes version chain entries created by the specified transaction.
1437    #[cfg(not(feature = "tiered-storage"))]
1438    pub fn discard_uncommitted_versions(&self, tx_id: TxId) {
1439        // Remove uncommitted node versions
1440        {
1441            let mut nodes = self.nodes.write();
1442            for chain in nodes.values_mut() {
1443                chain.remove_versions_by(tx_id);
1444            }
1445            // Remove completely empty chains (no versions left)
1446            nodes.retain(|_, chain| !chain.is_empty());
1447        }
1448
1449        // Remove uncommitted edge versions
1450        {
1451            let mut edges = self.edges.write();
1452            for chain in edges.values_mut() {
1453                chain.remove_versions_by(tx_id);
1454            }
1455            // Remove completely empty chains (no versions left)
1456            edges.retain(|_, chain| !chain.is_empty());
1457        }
1458    }
1459
1460    /// Discards all uncommitted versions created by a transaction.
1461    /// (Tiered storage version)
1462    #[cfg(feature = "tiered-storage")]
1463    pub fn discard_uncommitted_versions(&self, tx_id: TxId) {
1464        // Remove uncommitted node versions
1465        {
1466            let mut versions = self.node_versions.write();
1467            for index in versions.values_mut() {
1468                index.remove_versions_by(tx_id);
1469            }
1470            // Remove completely empty indexes (no versions left)
1471            versions.retain(|_, index| !index.is_empty());
1472        }
1473
1474        // Remove uncommitted edge versions
1475        {
1476            let mut versions = self.edge_versions.write();
1477            for index in versions.values_mut() {
1478                index.remove_versions_by(tx_id);
1479            }
1480            // Remove completely empty indexes (no versions left)
1481            versions.retain(|_, index| !index.is_empty());
1482        }
1483    }
1484
1485    /// Freezes an epoch from hot (arena) storage to cold (compressed) storage.
1486    ///
1487    /// This is called by the transaction manager when an epoch becomes eligible
1488    /// for freezing (no active transactions can see it). The freeze process:
1489    ///
1490    /// 1. Collects all hot version refs for the epoch
1491    /// 2. Reads the corresponding records from arena
1492    /// 3. Compresses them into a `CompressedEpochBlock`
1493    /// 4. Updates `VersionIndex` entries to point to cold storage
1494    /// 5. The arena can be deallocated after all epochs in it are frozen
1495    ///
1496    /// # Arguments
1497    ///
1498    /// * `epoch` - The epoch to freeze
1499    ///
1500    /// # Returns
1501    ///
1502    /// The number of records frozen (nodes + edges).
1503    #[cfg(feature = "tiered-storage")]
1504    #[allow(unsafe_code)]
1505    pub fn freeze_epoch(&self, epoch: EpochId) -> usize {
1506        // Collect node records to freeze
1507        let mut node_records: Vec<(u64, NodeRecord)> = Vec::new();
1508        let mut node_hot_refs: Vec<(NodeId, HotVersionRef)> = Vec::new();
1509
1510        {
1511            let versions = self.node_versions.read();
1512            for (node_id, index) in versions.iter() {
1513                for hot_ref in index.hot_refs_for_epoch(epoch) {
1514                    let arena = self.arena_allocator.arena(hot_ref.epoch);
1515                    // SAFETY: The offset was returned by alloc_value_with_offset for a NodeRecord
1516                    let record: &NodeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
1517                    node_records.push((node_id.as_u64(), record.clone()));
1518                    node_hot_refs.push((*node_id, *hot_ref));
1519                }
1520            }
1521        }
1522
1523        // Collect edge records to freeze
1524        let mut edge_records: Vec<(u64, EdgeRecord)> = Vec::new();
1525        let mut edge_hot_refs: Vec<(EdgeId, HotVersionRef)> = Vec::new();
1526
1527        {
1528            let versions = self.edge_versions.read();
1529            for (edge_id, index) in versions.iter() {
1530                for hot_ref in index.hot_refs_for_epoch(epoch) {
1531                    let arena = self.arena_allocator.arena(hot_ref.epoch);
1532                    // SAFETY: The offset was returned by alloc_value_with_offset for an EdgeRecord
1533                    let record: &EdgeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
1534                    edge_records.push((edge_id.as_u64(), record.clone()));
1535                    edge_hot_refs.push((*edge_id, *hot_ref));
1536                }
1537            }
1538        }
1539
1540        let total_frozen = node_records.len() + edge_records.len();
1541
1542        if total_frozen == 0 {
1543            return 0;
1544        }
1545
1546        // Freeze to compressed storage
1547        let (node_entries, edge_entries) =
1548            self.epoch_store
1549                .freeze_epoch(epoch, node_records, edge_records);
1550
1551        // Build lookup maps for index entries
1552        let node_entry_map: FxHashMap<u64, _> = node_entries
1553            .iter()
1554            .map(|e| (e.entity_id, (e.offset, e.length)))
1555            .collect();
1556        let edge_entry_map: FxHashMap<u64, _> = edge_entries
1557            .iter()
1558            .map(|e| (e.entity_id, (e.offset, e.length)))
1559            .collect();
1560
1561        // Update version indexes to use cold refs
1562        {
1563            let mut versions = self.node_versions.write();
1564            for (node_id, hot_ref) in &node_hot_refs {
1565                if let Some(index) = versions.get_mut(node_id) {
1566                    if let Some(&(offset, length)) = node_entry_map.get(&node_id.as_u64()) {
1567                        let cold_ref = ColdVersionRef {
1568                            epoch,
1569                            block_offset: offset,
1570                            length,
1571                            created_by: hot_ref.created_by,
1572                            deleted_epoch: hot_ref.deleted_epoch,
1573                        };
1574                        index.freeze_epoch(epoch, std::iter::once(cold_ref));
1575                    }
1576                }
1577            }
1578        }
1579
1580        {
1581            let mut versions = self.edge_versions.write();
1582            for (edge_id, hot_ref) in &edge_hot_refs {
1583                if let Some(index) = versions.get_mut(edge_id) {
1584                    if let Some(&(offset, length)) = edge_entry_map.get(&edge_id.as_u64()) {
1585                        let cold_ref = ColdVersionRef {
1586                            epoch,
1587                            block_offset: offset,
1588                            length,
1589                            created_by: hot_ref.created_by,
1590                            deleted_epoch: hot_ref.deleted_epoch,
1591                        };
1592                        index.freeze_epoch(epoch, std::iter::once(cold_ref));
1593                    }
1594                }
1595            }
1596        }
1597
1598        total_frozen
1599    }
1600
1601    /// Returns the epoch store for cold storage statistics.
1602    #[cfg(feature = "tiered-storage")]
1603    #[must_use]
1604    pub fn epoch_store(&self) -> &EpochStore {
1605        &self.epoch_store
1606    }
1607
1608    /// Returns the number of distinct labels in the store.
1609    #[must_use]
1610    pub fn label_count(&self) -> usize {
1611        self.id_to_label.read().len()
1612    }
1613
1614    /// Returns the number of distinct property keys in the store.
1615    ///
1616    /// This counts unique property keys across both nodes and edges.
1617    #[must_use]
1618    pub fn property_key_count(&self) -> usize {
1619        let node_keys = self.node_properties.column_count();
1620        let edge_keys = self.edge_properties.column_count();
1621        // Note: This may count some keys twice if the same key is used
1622        // for both nodes and edges. A more precise count would require
1623        // tracking unique keys across both storages.
1624        node_keys + edge_keys
1625    }
1626
1627    /// Returns the number of distinct edge types in the store.
1628    #[must_use]
1629    pub fn edge_type_count(&self) -> usize {
1630        self.id_to_edge_type.read().len()
1631    }
1632
1633    // === Traversal ===
1634
1635    /// Iterates over neighbors of a node in the specified direction.
1636    ///
1637    /// This is the fast path for graph traversal - goes straight to the
1638    /// adjacency index without loading full node data.
1639    pub fn neighbors(
1640        &self,
1641        node: NodeId,
1642        direction: Direction,
1643    ) -> impl Iterator<Item = NodeId> + '_ {
1644        let forward: Box<dyn Iterator<Item = NodeId>> = match direction {
1645            Direction::Outgoing | Direction::Both => {
1646                Box::new(self.forward_adj.neighbors(node).into_iter())
1647            }
1648            Direction::Incoming => Box::new(std::iter::empty()),
1649        };
1650
1651        let backward: Box<dyn Iterator<Item = NodeId>> = match direction {
1652            Direction::Incoming | Direction::Both => {
1653                if let Some(ref adj) = self.backward_adj {
1654                    Box::new(adj.neighbors(node).into_iter())
1655                } else {
1656                    Box::new(std::iter::empty())
1657                }
1658            }
1659            Direction::Outgoing => Box::new(std::iter::empty()),
1660        };
1661
1662        forward.chain(backward)
1663    }
1664
1665    /// Returns edges from a node with their targets.
1666    ///
1667    /// Returns an iterator of (target_node, edge_id) pairs.
1668    pub fn edges_from(
1669        &self,
1670        node: NodeId,
1671        direction: Direction,
1672    ) -> impl Iterator<Item = (NodeId, EdgeId)> + '_ {
1673        let forward: Box<dyn Iterator<Item = (NodeId, EdgeId)>> = match direction {
1674            Direction::Outgoing | Direction::Both => {
1675                Box::new(self.forward_adj.edges_from(node).into_iter())
1676            }
1677            Direction::Incoming => Box::new(std::iter::empty()),
1678        };
1679
1680        let backward: Box<dyn Iterator<Item = (NodeId, EdgeId)>> = match direction {
1681            Direction::Incoming | Direction::Both => {
1682                if let Some(ref adj) = self.backward_adj {
1683                    Box::new(adj.edges_from(node).into_iter())
1684                } else {
1685                    Box::new(std::iter::empty())
1686                }
1687            }
1688            Direction::Outgoing => Box::new(std::iter::empty()),
1689        };
1690
1691        forward.chain(backward)
1692    }
1693
1694    /// Returns edges to a node (where the node is the destination).
1695    ///
1696    /// Returns (source_node, edge_id) pairs for all edges pointing TO this node.
1697    /// Uses the backward adjacency index for O(degree) lookup.
1698    ///
1699    /// # Example
1700    ///
1701    /// ```ignore
1702    /// // For edges: A->B, C->B
1703    /// let incoming = store.edges_to(B);
1704    /// // Returns: [(A, edge1), (C, edge2)]
1705    /// ```
1706    pub fn edges_to(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
1707        if let Some(ref backward) = self.backward_adj {
1708            backward.edges_from(node)
1709        } else {
1710            // Fallback: scan all edges (slow but correct)
1711            self.all_edges()
1712                .filter_map(|edge| {
1713                    if edge.dst == node {
1714                        Some((edge.src, edge.id))
1715                    } else {
1716                        None
1717                    }
1718                })
1719                .collect()
1720        }
1721    }
1722
1723    /// Returns the out-degree of a node (number of outgoing edges).
1724    ///
1725    /// Uses the forward adjacency index for O(1) lookup.
1726    #[must_use]
1727    pub fn out_degree(&self, node: NodeId) -> usize {
1728        self.forward_adj.out_degree(node)
1729    }
1730
1731    /// Returns the in-degree of a node (number of incoming edges).
1732    ///
1733    /// Uses the backward adjacency index for O(1) lookup if available,
1734    /// otherwise falls back to scanning edges.
1735    #[must_use]
1736    pub fn in_degree(&self, node: NodeId) -> usize {
1737        if let Some(ref backward) = self.backward_adj {
1738            backward.in_degree(node)
1739        } else {
1740            // Fallback: count edges (slow)
1741            self.all_edges().filter(|edge| edge.dst == node).count()
1742        }
1743    }
1744
1745    /// Gets the type of an edge by ID.
1746    #[must_use]
1747    #[cfg(not(feature = "tiered-storage"))]
1748    pub fn edge_type(&self, id: EdgeId) -> Option<Arc<str>> {
1749        let edges = self.edges.read();
1750        let chain = edges.get(&id)?;
1751        let epoch = self.current_epoch();
1752        let record = chain.visible_at(epoch)?;
1753        let id_to_type = self.id_to_edge_type.read();
1754        id_to_type.get(record.type_id as usize).cloned()
1755    }
1756
1757    /// Gets the type of an edge by ID.
1758    /// (Tiered storage version)
1759    #[must_use]
1760    #[cfg(feature = "tiered-storage")]
1761    pub fn edge_type(&self, id: EdgeId) -> Option<Arc<str>> {
1762        let versions = self.edge_versions.read();
1763        let index = versions.get(&id)?;
1764        let epoch = self.current_epoch();
1765        let vref = index.visible_at(epoch)?;
1766        let record = self.read_edge_record(&vref)?;
1767        let id_to_type = self.id_to_edge_type.read();
1768        id_to_type.get(record.type_id as usize).cloned()
1769    }
1770
1771    /// Returns all nodes with a specific label.
1772    ///
1773    /// Uses the label index for O(1) lookup per label. Returns a snapshot -
1774    /// concurrent modifications won't affect the returned vector. Results are
1775    /// sorted by NodeId for deterministic iteration order.
1776    pub fn nodes_by_label(&self, label: &str) -> Vec<NodeId> {
1777        let label_to_id = self.label_to_id.read();
1778        if let Some(&label_id) = label_to_id.get(label) {
1779            let index = self.label_index.read();
1780            if let Some(set) = index.get(label_id as usize) {
1781                let mut ids: Vec<NodeId> = set.keys().copied().collect();
1782                ids.sort_unstable();
1783                return ids;
1784            }
1785        }
1786        Vec::new()
1787    }
1788
1789    // === Admin API: Iteration ===
1790
1791    /// Returns an iterator over all nodes in the database.
1792    ///
1793    /// This creates a snapshot of all visible nodes at the current epoch.
1794    /// Useful for dump/export operations.
1795    #[cfg(not(feature = "tiered-storage"))]
1796    pub fn all_nodes(&self) -> impl Iterator<Item = Node> + '_ {
1797        let epoch = self.current_epoch();
1798        let node_ids: Vec<NodeId> = self
1799            .nodes
1800            .read()
1801            .iter()
1802            .filter_map(|(id, chain)| {
1803                chain
1804                    .visible_at(epoch)
1805                    .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
1806            })
1807            .collect();
1808
1809        node_ids.into_iter().filter_map(move |id| self.get_node(id))
1810    }
1811
1812    /// Returns an iterator over all nodes in the database.
1813    /// (Tiered storage version)
1814    #[cfg(feature = "tiered-storage")]
1815    pub fn all_nodes(&self) -> impl Iterator<Item = Node> + '_ {
1816        let node_ids = self.node_ids();
1817        node_ids.into_iter().filter_map(move |id| self.get_node(id))
1818    }
1819
1820    /// Returns an iterator over all edges in the database.
1821    ///
1822    /// This creates a snapshot of all visible edges at the current epoch.
1823    /// Useful for dump/export operations.
1824    #[cfg(not(feature = "tiered-storage"))]
1825    pub fn all_edges(&self) -> impl Iterator<Item = Edge> + '_ {
1826        let epoch = self.current_epoch();
1827        let edge_ids: Vec<EdgeId> = self
1828            .edges
1829            .read()
1830            .iter()
1831            .filter_map(|(id, chain)| {
1832                chain
1833                    .visible_at(epoch)
1834                    .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
1835            })
1836            .collect();
1837
1838        edge_ids.into_iter().filter_map(move |id| self.get_edge(id))
1839    }
1840
1841    /// Returns an iterator over all edges in the database.
1842    /// (Tiered storage version)
1843    #[cfg(feature = "tiered-storage")]
1844    pub fn all_edges(&self) -> impl Iterator<Item = Edge> + '_ {
1845        let epoch = self.current_epoch();
1846        let versions = self.edge_versions.read();
1847        let edge_ids: Vec<EdgeId> = versions
1848            .iter()
1849            .filter_map(|(id, index)| {
1850                index.visible_at(epoch).and_then(|vref| {
1851                    self.read_edge_record(&vref)
1852                        .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
1853                })
1854            })
1855            .collect();
1856
1857        edge_ids.into_iter().filter_map(move |id| self.get_edge(id))
1858    }
1859
1860    /// Returns all label names in the database.
1861    pub fn all_labels(&self) -> Vec<String> {
1862        self.id_to_label
1863            .read()
1864            .iter()
1865            .map(|s| s.to_string())
1866            .collect()
1867    }
1868
1869    /// Returns all edge type names in the database.
1870    pub fn all_edge_types(&self) -> Vec<String> {
1871        self.id_to_edge_type
1872            .read()
1873            .iter()
1874            .map(|s| s.to_string())
1875            .collect()
1876    }
1877
1878    /// Returns all property keys used in the database.
1879    pub fn all_property_keys(&self) -> Vec<String> {
1880        let mut keys = std::collections::HashSet::new();
1881        for key in self.node_properties.keys() {
1882            keys.insert(key.to_string());
1883        }
1884        for key in self.edge_properties.keys() {
1885            keys.insert(key.to_string());
1886        }
1887        keys.into_iter().collect()
1888    }
1889
1890    /// Returns an iterator over nodes with a specific label.
1891    pub fn nodes_with_label<'a>(&'a self, label: &str) -> impl Iterator<Item = Node> + 'a {
1892        let node_ids = self.nodes_by_label(label);
1893        node_ids.into_iter().filter_map(move |id| self.get_node(id))
1894    }
1895
1896    /// Returns an iterator over edges with a specific type.
1897    #[cfg(not(feature = "tiered-storage"))]
1898    pub fn edges_with_type<'a>(&'a self, edge_type: &str) -> impl Iterator<Item = Edge> + 'a {
1899        let epoch = self.current_epoch();
1900        let type_to_id = self.edge_type_to_id.read();
1901
1902        if let Some(&type_id) = type_to_id.get(edge_type) {
1903            let edge_ids: Vec<EdgeId> = self
1904                .edges
1905                .read()
1906                .iter()
1907                .filter_map(|(id, chain)| {
1908                    chain.visible_at(epoch).and_then(|r| {
1909                        if !r.is_deleted() && r.type_id == type_id {
1910                            Some(*id)
1911                        } else {
1912                            None
1913                        }
1914                    })
1915                })
1916                .collect();
1917
1918            // Return a boxed iterator for the found edges
1919            Box::new(edge_ids.into_iter().filter_map(move |id| self.get_edge(id)))
1920                as Box<dyn Iterator<Item = Edge> + 'a>
1921        } else {
1922            // Return empty iterator
1923            Box::new(std::iter::empty()) as Box<dyn Iterator<Item = Edge> + 'a>
1924        }
1925    }
1926
1927    /// Returns an iterator over edges with a specific type.
1928    /// (Tiered storage version)
1929    #[cfg(feature = "tiered-storage")]
1930    pub fn edges_with_type<'a>(&'a self, edge_type: &str) -> impl Iterator<Item = Edge> + 'a {
1931        let epoch = self.current_epoch();
1932        let type_to_id = self.edge_type_to_id.read();
1933
1934        if let Some(&type_id) = type_to_id.get(edge_type) {
1935            let versions = self.edge_versions.read();
1936            let edge_ids: Vec<EdgeId> = versions
1937                .iter()
1938                .filter_map(|(id, index)| {
1939                    index.visible_at(epoch).and_then(|vref| {
1940                        self.read_edge_record(&vref).and_then(|r| {
1941                            if !r.is_deleted() && r.type_id == type_id {
1942                                Some(*id)
1943                            } else {
1944                                None
1945                            }
1946                        })
1947                    })
1948                })
1949                .collect();
1950
1951            Box::new(edge_ids.into_iter().filter_map(move |id| self.get_edge(id)))
1952                as Box<dyn Iterator<Item = Edge> + 'a>
1953        } else {
1954            Box::new(std::iter::empty()) as Box<dyn Iterator<Item = Edge> + 'a>
1955        }
1956    }
1957
1958    // === Zone Map Support ===
1959
1960    /// Checks if a node property predicate might match any nodes.
1961    ///
1962    /// Uses zone maps for early filtering. Returns `true` if there might be
1963    /// matching nodes, `false` if there definitely aren't.
1964    #[must_use]
1965    pub fn node_property_might_match(
1966        &self,
1967        property: &PropertyKey,
1968        op: CompareOp,
1969        value: &Value,
1970    ) -> bool {
1971        self.node_properties.might_match(property, op, value)
1972    }
1973
1974    /// Checks if an edge property predicate might match any edges.
1975    #[must_use]
1976    pub fn edge_property_might_match(
1977        &self,
1978        property: &PropertyKey,
1979        op: CompareOp,
1980        value: &Value,
1981    ) -> bool {
1982        self.edge_properties.might_match(property, op, value)
1983    }
1984
1985    /// Gets the zone map for a node property.
1986    #[must_use]
1987    pub fn node_property_zone_map(&self, property: &PropertyKey) -> Option<ZoneMapEntry> {
1988        self.node_properties.zone_map(property)
1989    }
1990
1991    /// Gets the zone map for an edge property.
1992    #[must_use]
1993    pub fn edge_property_zone_map(&self, property: &PropertyKey) -> Option<ZoneMapEntry> {
1994        self.edge_properties.zone_map(property)
1995    }
1996
1997    /// Rebuilds zone maps for all properties.
1998    pub fn rebuild_zone_maps(&self) {
1999        self.node_properties.rebuild_zone_maps();
2000        self.edge_properties.rebuild_zone_maps();
2001    }
2002
2003    // === Statistics ===
2004
2005    /// Returns the current statistics.
2006    #[must_use]
2007    pub fn statistics(&self) -> Statistics {
2008        self.statistics.read().clone()
2009    }
2010
2011    /// Recomputes statistics from current data.
2012    ///
2013    /// Scans all labels and edge types to build cardinality estimates for the
2014    /// query optimizer. Call this periodically or after bulk data loads.
2015    #[cfg(not(feature = "tiered-storage"))]
2016    pub fn compute_statistics(&self) {
2017        let mut stats = Statistics::new();
2018
2019        // Compute total counts
2020        stats.total_nodes = self.node_count() as u64;
2021        stats.total_edges = self.edge_count() as u64;
2022
2023        // Compute per-label statistics
2024        let id_to_label = self.id_to_label.read();
2025        let label_index = self.label_index.read();
2026
2027        for (label_id, label_name) in id_to_label.iter().enumerate() {
2028            let node_count = label_index
2029                .get(label_id)
2030                .map(|set| set.len() as u64)
2031                .unwrap_or(0);
2032
2033            if node_count > 0 {
2034                // Estimate average degree
2035                let avg_out_degree = if stats.total_nodes > 0 {
2036                    stats.total_edges as f64 / stats.total_nodes as f64
2037                } else {
2038                    0.0
2039                };
2040
2041                let label_stats =
2042                    LabelStatistics::new(node_count).with_degrees(avg_out_degree, avg_out_degree);
2043
2044                stats.update_label(label_name.as_ref(), label_stats);
2045            }
2046        }
2047
2048        // Compute per-edge-type statistics
2049        let id_to_edge_type = self.id_to_edge_type.read();
2050        let edges = self.edges.read();
2051        let epoch = self.current_epoch();
2052
2053        let mut edge_type_counts: FxHashMap<u32, u64> = FxHashMap::default();
2054        for chain in edges.values() {
2055            if let Some(record) = chain.visible_at(epoch) {
2056                if !record.is_deleted() {
2057                    *edge_type_counts.entry(record.type_id).or_default() += 1;
2058                }
2059            }
2060        }
2061
2062        for (type_id, count) in edge_type_counts {
2063            if let Some(type_name) = id_to_edge_type.get(type_id as usize) {
2064                let avg_degree = if stats.total_nodes > 0 {
2065                    count as f64 / stats.total_nodes as f64
2066                } else {
2067                    0.0
2068                };
2069
2070                let edge_stats = EdgeTypeStatistics::new(count, avg_degree, avg_degree);
2071                stats.update_edge_type(type_name.as_ref(), edge_stats);
2072            }
2073        }
2074
2075        *self.statistics.write() = stats;
2076    }
2077
2078    /// Recomputes statistics from current data.
2079    /// (Tiered storage version)
2080    #[cfg(feature = "tiered-storage")]
2081    pub fn compute_statistics(&self) {
2082        let mut stats = Statistics::new();
2083
2084        // Compute total counts
2085        stats.total_nodes = self.node_count() as u64;
2086        stats.total_edges = self.edge_count() as u64;
2087
2088        // Compute per-label statistics
2089        let id_to_label = self.id_to_label.read();
2090        let label_index = self.label_index.read();
2091
2092        for (label_id, label_name) in id_to_label.iter().enumerate() {
2093            let node_count = label_index
2094                .get(label_id)
2095                .map(|set| set.len() as u64)
2096                .unwrap_or(0);
2097
2098            if node_count > 0 {
2099                let avg_out_degree = if stats.total_nodes > 0 {
2100                    stats.total_edges as f64 / stats.total_nodes as f64
2101                } else {
2102                    0.0
2103                };
2104
2105                let label_stats =
2106                    LabelStatistics::new(node_count).with_degrees(avg_out_degree, avg_out_degree);
2107
2108                stats.update_label(label_name.as_ref(), label_stats);
2109            }
2110        }
2111
2112        // Compute per-edge-type statistics
2113        let id_to_edge_type = self.id_to_edge_type.read();
2114        let versions = self.edge_versions.read();
2115        let epoch = self.current_epoch();
2116
2117        let mut edge_type_counts: FxHashMap<u32, u64> = FxHashMap::default();
2118        for index in versions.values() {
2119            if let Some(vref) = index.visible_at(epoch) {
2120                if let Some(record) = self.read_edge_record(&vref) {
2121                    if !record.is_deleted() {
2122                        *edge_type_counts.entry(record.type_id).or_default() += 1;
2123                    }
2124                }
2125            }
2126        }
2127
2128        for (type_id, count) in edge_type_counts {
2129            if let Some(type_name) = id_to_edge_type.get(type_id as usize) {
2130                let avg_degree = if stats.total_nodes > 0 {
2131                    count as f64 / stats.total_nodes as f64
2132                } else {
2133                    0.0
2134                };
2135
2136                let edge_stats = EdgeTypeStatistics::new(count, avg_degree, avg_degree);
2137                stats.update_edge_type(type_name.as_ref(), edge_stats);
2138            }
2139        }
2140
2141        *self.statistics.write() = stats;
2142    }
2143
2144    /// Estimates cardinality for a label scan.
2145    #[must_use]
2146    pub fn estimate_label_cardinality(&self, label: &str) -> f64 {
2147        self.statistics.read().estimate_label_cardinality(label)
2148    }
2149
2150    /// Estimates average degree for an edge type.
2151    #[must_use]
2152    pub fn estimate_avg_degree(&self, edge_type: &str, outgoing: bool) -> f64 {
2153        self.statistics
2154            .read()
2155            .estimate_avg_degree(edge_type, outgoing)
2156    }
2157
2158    // === Internal Helpers ===
2159
2160    fn get_or_create_label_id(&self, label: &str) -> u32 {
2161        {
2162            let label_to_id = self.label_to_id.read();
2163            if let Some(&id) = label_to_id.get(label) {
2164                return id;
2165            }
2166        }
2167
2168        let mut label_to_id = self.label_to_id.write();
2169        let mut id_to_label = self.id_to_label.write();
2170
2171        // Double-check after acquiring write lock
2172        if let Some(&id) = label_to_id.get(label) {
2173            return id;
2174        }
2175
2176        let id = id_to_label.len() as u32;
2177
2178        let label: Arc<str> = label.into();
2179        label_to_id.insert(label.clone(), id);
2180        id_to_label.push(label);
2181
2182        id
2183    }
2184
2185    fn get_or_create_edge_type_id(&self, edge_type: &str) -> u32 {
2186        {
2187            let type_to_id = self.edge_type_to_id.read();
2188            if let Some(&id) = type_to_id.get(edge_type) {
2189                return id;
2190            }
2191        }
2192
2193        let mut type_to_id = self.edge_type_to_id.write();
2194        let mut id_to_type = self.id_to_edge_type.write();
2195
2196        // Double-check
2197        if let Some(&id) = type_to_id.get(edge_type) {
2198            return id;
2199        }
2200
2201        let id = id_to_type.len() as u32;
2202        let edge_type: Arc<str> = edge_type.into();
2203        type_to_id.insert(edge_type.clone(), id);
2204        id_to_type.push(edge_type);
2205
2206        id
2207    }
2208
2209    // === Recovery Support ===
2210
2211    /// Creates a node with a specific ID during recovery.
2212    ///
2213    /// This is used for WAL recovery to restore nodes with their original IDs.
2214    /// The caller must ensure IDs don't conflict with existing nodes.
2215    #[cfg(not(feature = "tiered-storage"))]
2216    pub fn create_node_with_id(&self, id: NodeId, labels: &[&str]) {
2217        let epoch = self.current_epoch();
2218        let mut record = NodeRecord::new(id, epoch);
2219        record.set_label_count(labels.len() as u16);
2220
2221        // Store labels in node_labels map and label_index
2222        let mut node_label_set = FxHashSet::default();
2223        for label in labels {
2224            let label_id = self.get_or_create_label_id(*label);
2225            node_label_set.insert(label_id);
2226
2227            // Update label index
2228            let mut index = self.label_index.write();
2229            while index.len() <= label_id as usize {
2230                index.push(FxHashMap::default());
2231            }
2232            index[label_id as usize].insert(id, ());
2233        }
2234
2235        // Store node's labels
2236        self.node_labels.write().insert(id, node_label_set);
2237
2238        // Create version chain with initial version (using SYSTEM tx for recovery)
2239        let chain = VersionChain::with_initial(record, epoch, TxId::SYSTEM);
2240        self.nodes.write().insert(id, chain);
2241
2242        // Update next_node_id if necessary to avoid future collisions
2243        let id_val = id.as_u64();
2244        let _ = self
2245            .next_node_id
2246            .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
2247                if id_val >= current {
2248                    Some(id_val + 1)
2249                } else {
2250                    None
2251                }
2252            });
2253    }
2254
2255    /// Creates a node with a specific ID during recovery.
2256    /// (Tiered storage version)
2257    #[cfg(feature = "tiered-storage")]
2258    pub fn create_node_with_id(&self, id: NodeId, labels: &[&str]) {
2259        let epoch = self.current_epoch();
2260        let mut record = NodeRecord::new(id, epoch);
2261        record.set_label_count(labels.len() as u16);
2262
2263        // Store labels in node_labels map and label_index
2264        let mut node_label_set = FxHashSet::default();
2265        for label in labels {
2266            let label_id = self.get_or_create_label_id(*label);
2267            node_label_set.insert(label_id);
2268
2269            // Update label index
2270            let mut index = self.label_index.write();
2271            while index.len() <= label_id as usize {
2272                index.push(FxHashMap::default());
2273            }
2274            index[label_id as usize].insert(id, ());
2275        }
2276
2277        // Store node's labels
2278        self.node_labels.write().insert(id, node_label_set);
2279
2280        // Allocate record in arena and get offset (create epoch if needed)
2281        let arena = self.arena_allocator.arena_or_create(epoch);
2282        let (offset, _stored) = arena.alloc_value_with_offset(record);
2283
2284        // Create HotVersionRef (using SYSTEM tx for recovery)
2285        let hot_ref = HotVersionRef::new(epoch, offset, TxId::SYSTEM);
2286        let mut versions = self.node_versions.write();
2287        versions.insert(id, VersionIndex::with_initial(hot_ref));
2288
2289        // Update next_node_id if necessary to avoid future collisions
2290        let id_val = id.as_u64();
2291        let _ = self
2292            .next_node_id
2293            .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
2294                if id_val >= current {
2295                    Some(id_val + 1)
2296                } else {
2297                    None
2298                }
2299            });
2300    }
2301
2302    /// Creates an edge with a specific ID during recovery.
2303    ///
2304    /// This is used for WAL recovery to restore edges with their original IDs.
2305    #[cfg(not(feature = "tiered-storage"))]
2306    pub fn create_edge_with_id(&self, id: EdgeId, src: NodeId, dst: NodeId, edge_type: &str) {
2307        let epoch = self.current_epoch();
2308        let type_id = self.get_or_create_edge_type_id(edge_type);
2309
2310        let record = EdgeRecord::new(id, src, dst, type_id, epoch);
2311        let chain = VersionChain::with_initial(record, epoch, TxId::SYSTEM);
2312        self.edges.write().insert(id, chain);
2313
2314        // Update adjacency
2315        self.forward_adj.add_edge(src, dst, id);
2316        if let Some(ref backward) = self.backward_adj {
2317            backward.add_edge(dst, src, id);
2318        }
2319
2320        // Update next_edge_id if necessary
2321        let id_val = id.as_u64();
2322        let _ = self
2323            .next_edge_id
2324            .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
2325                if id_val >= current {
2326                    Some(id_val + 1)
2327                } else {
2328                    None
2329                }
2330            });
2331    }
2332
2333    /// Creates an edge with a specific ID during recovery.
2334    /// (Tiered storage version)
2335    #[cfg(feature = "tiered-storage")]
2336    pub fn create_edge_with_id(&self, id: EdgeId, src: NodeId, dst: NodeId, edge_type: &str) {
2337        let epoch = self.current_epoch();
2338        let type_id = self.get_or_create_edge_type_id(edge_type);
2339
2340        let record = EdgeRecord::new(id, src, dst, type_id, epoch);
2341
2342        // Allocate record in arena and get offset (create epoch if needed)
2343        let arena = self.arena_allocator.arena_or_create(epoch);
2344        let (offset, _stored) = arena.alloc_value_with_offset(record);
2345
2346        // Create HotVersionRef (using SYSTEM tx for recovery)
2347        let hot_ref = HotVersionRef::new(epoch, offset, TxId::SYSTEM);
2348        let mut versions = self.edge_versions.write();
2349        versions.insert(id, VersionIndex::with_initial(hot_ref));
2350
2351        // Update adjacency
2352        self.forward_adj.add_edge(src, dst, id);
2353        if let Some(ref backward) = self.backward_adj {
2354            backward.add_edge(dst, src, id);
2355        }
2356
2357        // Update next_edge_id if necessary
2358        let id_val = id.as_u64();
2359        let _ = self
2360            .next_edge_id
2361            .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
2362                if id_val >= current {
2363                    Some(id_val + 1)
2364                } else {
2365                    None
2366                }
2367            });
2368    }
2369
2370    /// Sets the current epoch during recovery.
2371    pub fn set_epoch(&self, epoch: EpochId) {
2372        self.current_epoch.store(epoch.as_u64(), Ordering::SeqCst);
2373    }
2374}
2375
2376impl Default for LpgStore {
2377    fn default() -> Self {
2378        Self::new()
2379    }
2380}
2381
2382#[cfg(test)]
2383mod tests {
2384    use super::*;
2385
2386    #[test]
2387    fn test_create_node() {
2388        let store = LpgStore::new();
2389
2390        let id = store.create_node(&["Person"]);
2391        assert!(id.is_valid());
2392
2393        let node = store.get_node(id).unwrap();
2394        assert!(node.has_label("Person"));
2395        assert!(!node.has_label("Animal"));
2396    }
2397
2398    #[test]
2399    fn test_create_node_with_props() {
2400        let store = LpgStore::new();
2401
2402        let id = store.create_node_with_props(
2403            &["Person"],
2404            [("name", Value::from("Alice")), ("age", Value::from(30i64))],
2405        );
2406
2407        let node = store.get_node(id).unwrap();
2408        assert_eq!(
2409            node.get_property("name").and_then(|v| v.as_str()),
2410            Some("Alice")
2411        );
2412        assert_eq!(
2413            node.get_property("age").and_then(|v| v.as_int64()),
2414            Some(30)
2415        );
2416    }
2417
2418    #[test]
2419    fn test_delete_node() {
2420        let store = LpgStore::new();
2421
2422        let id = store.create_node(&["Person"]);
2423        assert_eq!(store.node_count(), 1);
2424
2425        assert!(store.delete_node(id));
2426        assert_eq!(store.node_count(), 0);
2427        assert!(store.get_node(id).is_none());
2428
2429        // Double delete should return false
2430        assert!(!store.delete_node(id));
2431    }
2432
2433    #[test]
2434    fn test_create_edge() {
2435        let store = LpgStore::new();
2436
2437        let alice = store.create_node(&["Person"]);
2438        let bob = store.create_node(&["Person"]);
2439
2440        let edge_id = store.create_edge(alice, bob, "KNOWS");
2441        assert!(edge_id.is_valid());
2442
2443        let edge = store.get_edge(edge_id).unwrap();
2444        assert_eq!(edge.src, alice);
2445        assert_eq!(edge.dst, bob);
2446        assert_eq!(edge.edge_type.as_ref(), "KNOWS");
2447    }
2448
2449    #[test]
2450    fn test_neighbors() {
2451        let store = LpgStore::new();
2452
2453        let a = store.create_node(&["Person"]);
2454        let b = store.create_node(&["Person"]);
2455        let c = store.create_node(&["Person"]);
2456
2457        store.create_edge(a, b, "KNOWS");
2458        store.create_edge(a, c, "KNOWS");
2459
2460        let outgoing: Vec<_> = store.neighbors(a, Direction::Outgoing).collect();
2461        assert_eq!(outgoing.len(), 2);
2462        assert!(outgoing.contains(&b));
2463        assert!(outgoing.contains(&c));
2464
2465        let incoming: Vec<_> = store.neighbors(b, Direction::Incoming).collect();
2466        assert_eq!(incoming.len(), 1);
2467        assert!(incoming.contains(&a));
2468    }
2469
2470    #[test]
2471    fn test_nodes_by_label() {
2472        let store = LpgStore::new();
2473
2474        let p1 = store.create_node(&["Person"]);
2475        let p2 = store.create_node(&["Person"]);
2476        let _a = store.create_node(&["Animal"]);
2477
2478        let persons = store.nodes_by_label("Person");
2479        assert_eq!(persons.len(), 2);
2480        assert!(persons.contains(&p1));
2481        assert!(persons.contains(&p2));
2482
2483        let animals = store.nodes_by_label("Animal");
2484        assert_eq!(animals.len(), 1);
2485    }
2486
2487    #[test]
2488    fn test_delete_edge() {
2489        let store = LpgStore::new();
2490
2491        let a = store.create_node(&["Person"]);
2492        let b = store.create_node(&["Person"]);
2493        let edge_id = store.create_edge(a, b, "KNOWS");
2494
2495        assert_eq!(store.edge_count(), 1);
2496
2497        assert!(store.delete_edge(edge_id));
2498        assert_eq!(store.edge_count(), 0);
2499        assert!(store.get_edge(edge_id).is_none());
2500    }
2501
2502    // === New tests for improved coverage ===
2503
2504    #[test]
2505    fn test_lpg_store_config() {
2506        // Test with_config
2507        let config = LpgStoreConfig {
2508            backward_edges: false,
2509            initial_node_capacity: 100,
2510            initial_edge_capacity: 200,
2511        };
2512        let store = LpgStore::with_config(config);
2513
2514        // Store should work but without backward adjacency
2515        let a = store.create_node(&["Person"]);
2516        let b = store.create_node(&["Person"]);
2517        store.create_edge(a, b, "KNOWS");
2518
2519        // Outgoing should work
2520        let outgoing: Vec<_> = store.neighbors(a, Direction::Outgoing).collect();
2521        assert_eq!(outgoing.len(), 1);
2522
2523        // Incoming should be empty (no backward adjacency)
2524        let incoming: Vec<_> = store.neighbors(b, Direction::Incoming).collect();
2525        assert_eq!(incoming.len(), 0);
2526    }
2527
2528    #[test]
2529    fn test_epoch_management() {
2530        let store = LpgStore::new();
2531
2532        let epoch0 = store.current_epoch();
2533        assert_eq!(epoch0.as_u64(), 0);
2534
2535        let epoch1 = store.new_epoch();
2536        assert_eq!(epoch1.as_u64(), 1);
2537
2538        let current = store.current_epoch();
2539        assert_eq!(current.as_u64(), 1);
2540    }
2541
2542    #[test]
2543    fn test_node_properties() {
2544        let store = LpgStore::new();
2545        let id = store.create_node(&["Person"]);
2546
2547        // Set and get property
2548        store.set_node_property(id, "name", Value::from("Alice"));
2549        let name = store.get_node_property(id, &"name".into());
2550        assert!(matches!(name, Some(Value::String(s)) if s.as_ref() == "Alice"));
2551
2552        // Update property
2553        store.set_node_property(id, "name", Value::from("Bob"));
2554        let name = store.get_node_property(id, &"name".into());
2555        assert!(matches!(name, Some(Value::String(s)) if s.as_ref() == "Bob"));
2556
2557        // Remove property
2558        let old = store.remove_node_property(id, "name");
2559        assert!(matches!(old, Some(Value::String(s)) if s.as_ref() == "Bob"));
2560
2561        // Property should be gone
2562        let name = store.get_node_property(id, &"name".into());
2563        assert!(name.is_none());
2564
2565        // Remove non-existent property
2566        let none = store.remove_node_property(id, "nonexistent");
2567        assert!(none.is_none());
2568    }
2569
2570    #[test]
2571    fn test_edge_properties() {
2572        let store = LpgStore::new();
2573        let a = store.create_node(&["Person"]);
2574        let b = store.create_node(&["Person"]);
2575        let edge_id = store.create_edge(a, b, "KNOWS");
2576
2577        // Set and get property
2578        store.set_edge_property(edge_id, "since", Value::from(2020i64));
2579        let since = store.get_edge_property(edge_id, &"since".into());
2580        assert_eq!(since.and_then(|v| v.as_int64()), Some(2020));
2581
2582        // Remove property
2583        let old = store.remove_edge_property(edge_id, "since");
2584        assert_eq!(old.and_then(|v| v.as_int64()), Some(2020));
2585
2586        let since = store.get_edge_property(edge_id, &"since".into());
2587        assert!(since.is_none());
2588    }
2589
2590    #[test]
2591    fn test_add_remove_label() {
2592        let store = LpgStore::new();
2593        let id = store.create_node(&["Person"]);
2594
2595        // Add new label
2596        assert!(store.add_label(id, "Employee"));
2597
2598        let node = store.get_node(id).unwrap();
2599        assert!(node.has_label("Person"));
2600        assert!(node.has_label("Employee"));
2601
2602        // Adding same label again should fail
2603        assert!(!store.add_label(id, "Employee"));
2604
2605        // Remove label
2606        assert!(store.remove_label(id, "Employee"));
2607
2608        let node = store.get_node(id).unwrap();
2609        assert!(node.has_label("Person"));
2610        assert!(!node.has_label("Employee"));
2611
2612        // Removing non-existent label should fail
2613        assert!(!store.remove_label(id, "Employee"));
2614        assert!(!store.remove_label(id, "NonExistent"));
2615    }
2616
2617    #[test]
2618    fn test_add_label_to_nonexistent_node() {
2619        let store = LpgStore::new();
2620        let fake_id = NodeId::new(999);
2621        assert!(!store.add_label(fake_id, "Label"));
2622    }
2623
2624    #[test]
2625    fn test_remove_label_from_nonexistent_node() {
2626        let store = LpgStore::new();
2627        let fake_id = NodeId::new(999);
2628        assert!(!store.remove_label(fake_id, "Label"));
2629    }
2630
2631    #[test]
2632    fn test_node_ids() {
2633        let store = LpgStore::new();
2634
2635        let n1 = store.create_node(&["Person"]);
2636        let n2 = store.create_node(&["Person"]);
2637        let n3 = store.create_node(&["Person"]);
2638
2639        let ids = store.node_ids();
2640        assert_eq!(ids.len(), 3);
2641        assert!(ids.contains(&n1));
2642        assert!(ids.contains(&n2));
2643        assert!(ids.contains(&n3));
2644
2645        // Delete one
2646        store.delete_node(n2);
2647        let ids = store.node_ids();
2648        assert_eq!(ids.len(), 2);
2649        assert!(!ids.contains(&n2));
2650    }
2651
2652    #[test]
2653    fn test_delete_node_nonexistent() {
2654        let store = LpgStore::new();
2655        let fake_id = NodeId::new(999);
2656        assert!(!store.delete_node(fake_id));
2657    }
2658
2659    #[test]
2660    fn test_delete_edge_nonexistent() {
2661        let store = LpgStore::new();
2662        let fake_id = EdgeId::new(999);
2663        assert!(!store.delete_edge(fake_id));
2664    }
2665
2666    #[test]
2667    fn test_delete_edge_double() {
2668        let store = LpgStore::new();
2669        let a = store.create_node(&["Person"]);
2670        let b = store.create_node(&["Person"]);
2671        let edge_id = store.create_edge(a, b, "KNOWS");
2672
2673        assert!(store.delete_edge(edge_id));
2674        assert!(!store.delete_edge(edge_id)); // Double delete
2675    }
2676
2677    #[test]
2678    fn test_create_edge_with_props() {
2679        let store = LpgStore::new();
2680        let a = store.create_node(&["Person"]);
2681        let b = store.create_node(&["Person"]);
2682
2683        let edge_id = store.create_edge_with_props(
2684            a,
2685            b,
2686            "KNOWS",
2687            [
2688                ("since", Value::from(2020i64)),
2689                ("weight", Value::from(1.0)),
2690            ],
2691        );
2692
2693        let edge = store.get_edge(edge_id).unwrap();
2694        assert_eq!(
2695            edge.get_property("since").and_then(|v| v.as_int64()),
2696            Some(2020)
2697        );
2698        assert_eq!(
2699            edge.get_property("weight").and_then(|v| v.as_float64()),
2700            Some(1.0)
2701        );
2702    }
2703
2704    #[test]
2705    fn test_delete_node_edges() {
2706        let store = LpgStore::new();
2707
2708        let a = store.create_node(&["Person"]);
2709        let b = store.create_node(&["Person"]);
2710        let c = store.create_node(&["Person"]);
2711
2712        store.create_edge(a, b, "KNOWS"); // a -> b
2713        store.create_edge(c, a, "KNOWS"); // c -> a
2714
2715        assert_eq!(store.edge_count(), 2);
2716
2717        // Delete all edges connected to a
2718        store.delete_node_edges(a);
2719
2720        assert_eq!(store.edge_count(), 0);
2721    }
2722
2723    #[test]
2724    fn test_neighbors_both_directions() {
2725        let store = LpgStore::new();
2726
2727        let a = store.create_node(&["Person"]);
2728        let b = store.create_node(&["Person"]);
2729        let c = store.create_node(&["Person"]);
2730
2731        store.create_edge(a, b, "KNOWS"); // a -> b
2732        store.create_edge(c, a, "KNOWS"); // c -> a
2733
2734        // Direction::Both for node a
2735        let neighbors: Vec<_> = store.neighbors(a, Direction::Both).collect();
2736        assert_eq!(neighbors.len(), 2);
2737        assert!(neighbors.contains(&b)); // outgoing
2738        assert!(neighbors.contains(&c)); // incoming
2739    }
2740
2741    #[test]
2742    fn test_edges_from() {
2743        let store = LpgStore::new();
2744
2745        let a = store.create_node(&["Person"]);
2746        let b = store.create_node(&["Person"]);
2747        let c = store.create_node(&["Person"]);
2748
2749        let e1 = store.create_edge(a, b, "KNOWS");
2750        let e2 = store.create_edge(a, c, "KNOWS");
2751
2752        let edges: Vec<_> = store.edges_from(a, Direction::Outgoing).collect();
2753        assert_eq!(edges.len(), 2);
2754        assert!(edges.iter().any(|(_, e)| *e == e1));
2755        assert!(edges.iter().any(|(_, e)| *e == e2));
2756
2757        // Incoming edges to b
2758        let incoming: Vec<_> = store.edges_from(b, Direction::Incoming).collect();
2759        assert_eq!(incoming.len(), 1);
2760        assert_eq!(incoming[0].1, e1);
2761    }
2762
2763    #[test]
2764    fn test_edges_to() {
2765        let store = LpgStore::new();
2766
2767        let a = store.create_node(&["Person"]);
2768        let b = store.create_node(&["Person"]);
2769        let c = store.create_node(&["Person"]);
2770
2771        let e1 = store.create_edge(a, b, "KNOWS");
2772        let e2 = store.create_edge(c, b, "KNOWS");
2773
2774        // Edges pointing TO b
2775        let to_b = store.edges_to(b);
2776        assert_eq!(to_b.len(), 2);
2777        assert!(to_b.iter().any(|(src, e)| *src == a && *e == e1));
2778        assert!(to_b.iter().any(|(src, e)| *src == c && *e == e2));
2779    }
2780
2781    #[test]
2782    fn test_out_degree_in_degree() {
2783        let store = LpgStore::new();
2784
2785        let a = store.create_node(&["Person"]);
2786        let b = store.create_node(&["Person"]);
2787        let c = store.create_node(&["Person"]);
2788
2789        store.create_edge(a, b, "KNOWS");
2790        store.create_edge(a, c, "KNOWS");
2791        store.create_edge(c, b, "KNOWS");
2792
2793        assert_eq!(store.out_degree(a), 2);
2794        assert_eq!(store.out_degree(b), 0);
2795        assert_eq!(store.out_degree(c), 1);
2796
2797        assert_eq!(store.in_degree(a), 0);
2798        assert_eq!(store.in_degree(b), 2);
2799        assert_eq!(store.in_degree(c), 1);
2800    }
2801
2802    #[test]
2803    fn test_edge_type() {
2804        let store = LpgStore::new();
2805
2806        let a = store.create_node(&["Person"]);
2807        let b = store.create_node(&["Person"]);
2808        let edge_id = store.create_edge(a, b, "KNOWS");
2809
2810        let edge_type = store.edge_type(edge_id);
2811        assert_eq!(edge_type.as_deref(), Some("KNOWS"));
2812
2813        // Non-existent edge
2814        let fake_id = EdgeId::new(999);
2815        assert!(store.edge_type(fake_id).is_none());
2816    }
2817
2818    #[test]
2819    fn test_count_methods() {
2820        let store = LpgStore::new();
2821
2822        assert_eq!(store.label_count(), 0);
2823        assert_eq!(store.edge_type_count(), 0);
2824        assert_eq!(store.property_key_count(), 0);
2825
2826        let a = store.create_node_with_props(&["Person"], [("age", Value::from(30i64))]);
2827        let b = store.create_node(&["Company"]);
2828        store.create_edge_with_props(a, b, "WORKS_AT", [("since", Value::from(2020i64))]);
2829
2830        assert_eq!(store.label_count(), 2); // Person, Company
2831        assert_eq!(store.edge_type_count(), 1); // WORKS_AT
2832        assert_eq!(store.property_key_count(), 2); // age, since
2833    }
2834
2835    #[test]
2836    fn test_all_nodes_and_edges() {
2837        let store = LpgStore::new();
2838
2839        let a = store.create_node(&["Person"]);
2840        let b = store.create_node(&["Person"]);
2841        store.create_edge(a, b, "KNOWS");
2842
2843        let nodes: Vec<_> = store.all_nodes().collect();
2844        assert_eq!(nodes.len(), 2);
2845
2846        let edges: Vec<_> = store.all_edges().collect();
2847        assert_eq!(edges.len(), 1);
2848    }
2849
2850    #[test]
2851    fn test_all_labels_and_edge_types() {
2852        let store = LpgStore::new();
2853
2854        store.create_node(&["Person"]);
2855        store.create_node(&["Company"]);
2856        let a = store.create_node(&["Animal"]);
2857        let b = store.create_node(&["Animal"]);
2858        store.create_edge(a, b, "EATS");
2859
2860        let labels = store.all_labels();
2861        assert_eq!(labels.len(), 3);
2862        assert!(labels.contains(&"Person".to_string()));
2863        assert!(labels.contains(&"Company".to_string()));
2864        assert!(labels.contains(&"Animal".to_string()));
2865
2866        let edge_types = store.all_edge_types();
2867        assert_eq!(edge_types.len(), 1);
2868        assert!(edge_types.contains(&"EATS".to_string()));
2869    }
2870
2871    #[test]
2872    fn test_all_property_keys() {
2873        let store = LpgStore::new();
2874
2875        let a = store.create_node_with_props(&["Person"], [("name", Value::from("Alice"))]);
2876        let b = store.create_node_with_props(&["Person"], [("age", Value::from(30i64))]);
2877        store.create_edge_with_props(a, b, "KNOWS", [("since", Value::from(2020i64))]);
2878
2879        let keys = store.all_property_keys();
2880        assert!(keys.contains(&"name".to_string()));
2881        assert!(keys.contains(&"age".to_string()));
2882        assert!(keys.contains(&"since".to_string()));
2883    }
2884
2885    #[test]
2886    fn test_nodes_with_label() {
2887        let store = LpgStore::new();
2888
2889        store.create_node(&["Person"]);
2890        store.create_node(&["Person"]);
2891        store.create_node(&["Company"]);
2892
2893        let persons: Vec<_> = store.nodes_with_label("Person").collect();
2894        assert_eq!(persons.len(), 2);
2895
2896        let companies: Vec<_> = store.nodes_with_label("Company").collect();
2897        assert_eq!(companies.len(), 1);
2898
2899        let none: Vec<_> = store.nodes_with_label("NonExistent").collect();
2900        assert_eq!(none.len(), 0);
2901    }
2902
2903    #[test]
2904    fn test_edges_with_type() {
2905        let store = LpgStore::new();
2906
2907        let a = store.create_node(&["Person"]);
2908        let b = store.create_node(&["Person"]);
2909        let c = store.create_node(&["Company"]);
2910
2911        store.create_edge(a, b, "KNOWS");
2912        store.create_edge(a, c, "WORKS_AT");
2913
2914        let knows: Vec<_> = store.edges_with_type("KNOWS").collect();
2915        assert_eq!(knows.len(), 1);
2916
2917        let works_at: Vec<_> = store.edges_with_type("WORKS_AT").collect();
2918        assert_eq!(works_at.len(), 1);
2919
2920        let none: Vec<_> = store.edges_with_type("NonExistent").collect();
2921        assert_eq!(none.len(), 0);
2922    }
2923
2924    #[test]
2925    fn test_nodes_by_label_nonexistent() {
2926        let store = LpgStore::new();
2927        store.create_node(&["Person"]);
2928
2929        let empty = store.nodes_by_label("NonExistent");
2930        assert!(empty.is_empty());
2931    }
2932
2933    #[test]
2934    fn test_statistics() {
2935        let store = LpgStore::new();
2936
2937        let a = store.create_node(&["Person"]);
2938        let b = store.create_node(&["Person"]);
2939        let c = store.create_node(&["Company"]);
2940
2941        store.create_edge(a, b, "KNOWS");
2942        store.create_edge(a, c, "WORKS_AT");
2943
2944        store.compute_statistics();
2945        let stats = store.statistics();
2946
2947        assert_eq!(stats.total_nodes, 3);
2948        assert_eq!(stats.total_edges, 2);
2949
2950        // Estimates
2951        let person_card = store.estimate_label_cardinality("Person");
2952        assert!(person_card > 0.0);
2953
2954        let avg_degree = store.estimate_avg_degree("KNOWS", true);
2955        assert!(avg_degree >= 0.0);
2956    }
2957
2958    #[test]
2959    fn test_zone_maps() {
2960        let store = LpgStore::new();
2961
2962        store.create_node_with_props(&["Person"], [("age", Value::from(25i64))]);
2963        store.create_node_with_props(&["Person"], [("age", Value::from(35i64))]);
2964
2965        // Zone map should indicate possible matches (30 is within [25, 35] range)
2966        let might_match =
2967            store.node_property_might_match(&"age".into(), CompareOp::Eq, &Value::from(30i64));
2968        // Zone maps return true conservatively when value is within min/max range
2969        assert!(might_match);
2970
2971        let zone = store.node_property_zone_map(&"age".into());
2972        assert!(zone.is_some());
2973
2974        // Non-existent property
2975        let no_zone = store.node_property_zone_map(&"nonexistent".into());
2976        assert!(no_zone.is_none());
2977
2978        // Edge zone maps
2979        let a = store.create_node(&["A"]);
2980        let b = store.create_node(&["B"]);
2981        store.create_edge_with_props(a, b, "REL", [("weight", Value::from(1.0))]);
2982
2983        let edge_zone = store.edge_property_zone_map(&"weight".into());
2984        assert!(edge_zone.is_some());
2985    }
2986
2987    #[test]
2988    fn test_rebuild_zone_maps() {
2989        let store = LpgStore::new();
2990        store.create_node_with_props(&["Person"], [("age", Value::from(25i64))]);
2991
2992        // Should not panic
2993        store.rebuild_zone_maps();
2994    }
2995
2996    #[test]
2997    fn test_create_node_with_id() {
2998        let store = LpgStore::new();
2999
3000        let specific_id = NodeId::new(100);
3001        store.create_node_with_id(specific_id, &["Person", "Employee"]);
3002
3003        let node = store.get_node(specific_id).unwrap();
3004        assert!(node.has_label("Person"));
3005        assert!(node.has_label("Employee"));
3006
3007        // Next auto-generated ID should be > 100
3008        let next = store.create_node(&["Other"]);
3009        assert!(next.as_u64() > 100);
3010    }
3011
3012    #[test]
3013    fn test_create_edge_with_id() {
3014        let store = LpgStore::new();
3015
3016        let a = store.create_node(&["A"]);
3017        let b = store.create_node(&["B"]);
3018
3019        let specific_id = EdgeId::new(500);
3020        store.create_edge_with_id(specific_id, a, b, "REL");
3021
3022        let edge = store.get_edge(specific_id).unwrap();
3023        assert_eq!(edge.src, a);
3024        assert_eq!(edge.dst, b);
3025        assert_eq!(edge.edge_type.as_ref(), "REL");
3026
3027        // Next auto-generated ID should be > 500
3028        let next = store.create_edge(a, b, "OTHER");
3029        assert!(next.as_u64() > 500);
3030    }
3031
3032    #[test]
3033    fn test_set_epoch() {
3034        let store = LpgStore::new();
3035
3036        assert_eq!(store.current_epoch().as_u64(), 0);
3037
3038        store.set_epoch(EpochId::new(42));
3039        assert_eq!(store.current_epoch().as_u64(), 42);
3040    }
3041
3042    #[test]
3043    fn test_get_node_nonexistent() {
3044        let store = LpgStore::new();
3045        let fake_id = NodeId::new(999);
3046        assert!(store.get_node(fake_id).is_none());
3047    }
3048
3049    #[test]
3050    fn test_get_edge_nonexistent() {
3051        let store = LpgStore::new();
3052        let fake_id = EdgeId::new(999);
3053        assert!(store.get_edge(fake_id).is_none());
3054    }
3055
3056    #[test]
3057    fn test_multiple_labels() {
3058        let store = LpgStore::new();
3059
3060        let id = store.create_node(&["Person", "Employee", "Manager"]);
3061        let node = store.get_node(id).unwrap();
3062
3063        assert!(node.has_label("Person"));
3064        assert!(node.has_label("Employee"));
3065        assert!(node.has_label("Manager"));
3066        assert!(!node.has_label("Other"));
3067    }
3068
3069    #[test]
3070    fn test_default_impl() {
3071        let store: LpgStore = Default::default();
3072        assert_eq!(store.node_count(), 0);
3073        assert_eq!(store.edge_count(), 0);
3074    }
3075
3076    #[test]
3077    fn test_edges_from_both_directions() {
3078        let store = LpgStore::new();
3079
3080        let a = store.create_node(&["A"]);
3081        let b = store.create_node(&["B"]);
3082        let c = store.create_node(&["C"]);
3083
3084        let e1 = store.create_edge(a, b, "R1"); // a -> b
3085        let e2 = store.create_edge(c, a, "R2"); // c -> a
3086
3087        // Both directions from a
3088        let edges: Vec<_> = store.edges_from(a, Direction::Both).collect();
3089        assert_eq!(edges.len(), 2);
3090        assert!(edges.iter().any(|(_, e)| *e == e1)); // outgoing
3091        assert!(edges.iter().any(|(_, e)| *e == e2)); // incoming
3092    }
3093
3094    #[test]
3095    fn test_no_backward_adj_in_degree() {
3096        let config = LpgStoreConfig {
3097            backward_edges: false,
3098            initial_node_capacity: 10,
3099            initial_edge_capacity: 10,
3100        };
3101        let store = LpgStore::with_config(config);
3102
3103        let a = store.create_node(&["A"]);
3104        let b = store.create_node(&["B"]);
3105        store.create_edge(a, b, "R");
3106
3107        // in_degree should still work (falls back to scanning)
3108        let degree = store.in_degree(b);
3109        assert_eq!(degree, 1);
3110    }
3111
3112    #[test]
3113    fn test_no_backward_adj_edges_to() {
3114        let config = LpgStoreConfig {
3115            backward_edges: false,
3116            initial_node_capacity: 10,
3117            initial_edge_capacity: 10,
3118        };
3119        let store = LpgStore::with_config(config);
3120
3121        let a = store.create_node(&["A"]);
3122        let b = store.create_node(&["B"]);
3123        let e = store.create_edge(a, b, "R");
3124
3125        // edges_to should still work (falls back to scanning)
3126        let edges = store.edges_to(b);
3127        assert_eq!(edges.len(), 1);
3128        assert_eq!(edges[0].1, e);
3129    }
3130
3131    #[test]
3132    fn test_node_versioned_creation() {
3133        let store = LpgStore::new();
3134
3135        let epoch = store.new_epoch();
3136        let tx_id = TxId::new(1);
3137
3138        let id = store.create_node_versioned(&["Person"], epoch, tx_id);
3139        assert!(store.get_node(id).is_some());
3140    }
3141
3142    #[test]
3143    fn test_edge_versioned_creation() {
3144        let store = LpgStore::new();
3145
3146        let a = store.create_node(&["A"]);
3147        let b = store.create_node(&["B"]);
3148
3149        let epoch = store.new_epoch();
3150        let tx_id = TxId::new(1);
3151
3152        let edge_id = store.create_edge_versioned(a, b, "REL", epoch, tx_id);
3153        assert!(store.get_edge(edge_id).is_some());
3154    }
3155
3156    #[test]
3157    fn test_node_with_props_versioned() {
3158        let store = LpgStore::new();
3159
3160        let epoch = store.new_epoch();
3161        let tx_id = TxId::new(1);
3162
3163        let id = store.create_node_with_props_versioned(
3164            &["Person"],
3165            [("name", Value::from("Alice"))],
3166            epoch,
3167            tx_id,
3168        );
3169
3170        let node = store.get_node(id).unwrap();
3171        assert_eq!(
3172            node.get_property("name").and_then(|v| v.as_str()),
3173            Some("Alice")
3174        );
3175    }
3176
3177    #[test]
3178    fn test_discard_uncommitted_versions() {
3179        let store = LpgStore::new();
3180
3181        let epoch = store.new_epoch();
3182        let tx_id = TxId::new(42);
3183
3184        // Create node with specific tx
3185        let node_id = store.create_node_versioned(&["Person"], epoch, tx_id);
3186        assert!(store.get_node(node_id).is_some());
3187
3188        // Discard uncommitted versions for this tx
3189        store.discard_uncommitted_versions(tx_id);
3190
3191        // Node should be gone (version chain was removed)
3192        assert!(store.get_node(node_id).is_none());
3193    }
3194}