Skip to main content

grafeo_core/graph/lpg/
store.rs

1//! The in-memory LPG graph store.
2//!
3//! This is where your nodes and edges actually live. Most users interact
4//! through [`GrafeoDB`](grafeo_engine::GrafeoDB), but algorithm implementers
5//! sometimes need the raw [`LpgStore`] for direct adjacency traversal.
6//!
7//! Key features:
8//! - MVCC versioning - concurrent readers don't block each other
9//! - Columnar properties with zone maps for fast filtering
10//! - Forward and backward adjacency indexes
11
12use super::property::CompareOp;
13use super::{Edge, EdgeRecord, Node, NodeRecord, PropertyStorage};
14use crate::graph::Direction;
15use crate::index::adjacency::ChunkedAdjacency;
16use crate::index::zone_map::ZoneMapEntry;
17use crate::statistics::{EdgeTypeStatistics, LabelStatistics, Statistics};
18use arcstr::ArcStr;
19use dashmap::DashMap;
20#[cfg(not(feature = "tiered-storage"))]
21use grafeo_common::mvcc::VersionChain;
22use grafeo_common::types::{EdgeId, EpochId, HashableValue, NodeId, PropertyKey, TxId, Value};
23use grafeo_common::utils::hash::{FxHashMap, FxHashSet};
24use parking_lot::RwLock;
25use std::cmp::Ordering as CmpOrdering;
26#[cfg(feature = "tiered-storage")]
27use std::sync::Arc;
28use std::sync::atomic::{AtomicU64, Ordering};
29
30/// Compares two values for ordering (used for range checks).
31fn compare_values_for_range(a: &Value, b: &Value) -> Option<CmpOrdering> {
32    match (a, b) {
33        (Value::Int64(a), Value::Int64(b)) => Some(a.cmp(b)),
34        (Value::Float64(a), Value::Float64(b)) => a.partial_cmp(b),
35        (Value::String(a), Value::String(b)) => Some(a.cmp(b)),
36        (Value::Bool(a), Value::Bool(b)) => Some(a.cmp(b)),
37        _ => None,
38    }
39}
40
41/// Checks if a value is within a range.
42fn value_in_range(
43    value: &Value,
44    min: Option<&Value>,
45    max: Option<&Value>,
46    min_inclusive: bool,
47    max_inclusive: bool,
48) -> bool {
49    // Check lower bound
50    if let Some(min_val) = min {
51        match compare_values_for_range(value, min_val) {
52            Some(CmpOrdering::Less) => return false,
53            Some(CmpOrdering::Equal) if !min_inclusive => return false,
54            None => return false, // Can't compare
55            _ => {}
56        }
57    }
58
59    // Check upper bound
60    if let Some(max_val) = max {
61        match compare_values_for_range(value, max_val) {
62            Some(CmpOrdering::Greater) => return false,
63            Some(CmpOrdering::Equal) if !max_inclusive => return false,
64            None => return false,
65            _ => {}
66        }
67    }
68
69    true
70}
71
72// Tiered storage imports
73#[cfg(feature = "tiered-storage")]
74use crate::storage::EpochStore;
75#[cfg(feature = "tiered-storage")]
76use grafeo_common::memory::arena::ArenaAllocator;
77#[cfg(feature = "tiered-storage")]
78use grafeo_common::mvcc::{ColdVersionRef, HotVersionRef, VersionIndex, VersionRef};
79
80/// Configuration for the LPG store.
81///
82/// The defaults work well for most cases. Tune `backward_edges` if you only
83/// traverse in one direction (saves memory), or adjust capacities if you know
84/// your graph size upfront (avoids reallocations).
85#[derive(Debug, Clone)]
86pub struct LpgStoreConfig {
87    /// Maintain backward adjacency for incoming edge queries. Turn off if
88    /// you only traverse outgoing edges - saves ~50% adjacency memory.
89    pub backward_edges: bool,
90    /// Initial capacity for nodes (avoids early reallocations).
91    pub initial_node_capacity: usize,
92    /// Initial capacity for edges (avoids early reallocations).
93    pub initial_edge_capacity: usize,
94}
95
96impl Default for LpgStoreConfig {
97    fn default() -> Self {
98        Self {
99            backward_edges: true,
100            initial_node_capacity: 1024,
101            initial_edge_capacity: 4096,
102        }
103    }
104}
105
106/// The core in-memory graph storage.
107///
108/// Everything lives here: nodes, edges, properties, adjacency indexes, and
109/// version chains for MVCC. Concurrent reads never block each other.
110///
111/// Most users should go through `GrafeoDB` (from the `grafeo_engine` crate) which
112/// adds transaction management and query execution. Use `LpgStore` directly
113/// when you need raw performance for algorithm implementations.
114///
115/// # Example
116///
117/// ```
118/// use grafeo_core::graph::lpg::LpgStore;
119/// use grafeo_core::graph::Direction;
120///
121/// let store = LpgStore::new();
122///
123/// // Create a small social network
124/// let alice = store.create_node(&["Person"]);
125/// let bob = store.create_node(&["Person"]);
126/// store.create_edge(alice, bob, "KNOWS");
127///
128/// // Traverse outgoing edges
129/// for neighbor in store.neighbors(alice, Direction::Outgoing) {
130///     println!("Alice knows node {:?}", neighbor);
131/// }
132/// ```
133///
134/// # Lock Ordering
135///
136/// `LpgStore` contains multiple `RwLock` fields that must be acquired in a
137/// consistent order to prevent deadlocks. Always acquire locks in this order:
138///
139/// ## Level 1 - Entity Storage (mutually exclusive via feature flag)
140/// 1. `nodes` / `node_versions`
141/// 2. `edges` / `edge_versions`
142///
143/// ## Level 2 - Catalogs (acquire as pairs when writing)
144/// 3. `label_to_id` + `id_to_label`
145/// 4. `edge_type_to_id` + `id_to_edge_type`
146///
147/// ## Level 3 - Indexes
148/// 5. `label_index`
149/// 6. `node_labels`
150/// 7. `property_indexes`
151///
152/// ## Level 4 - Statistics
153/// 8. `statistics`
154///
155/// ## Level 5 - Nested Locks (internal to other structs)
156/// 9. `PropertyStorage::columns` (via `node_properties`/`edge_properties`)
157/// 10. `ChunkedAdjacency::lists` (via `forward_adj`/`backward_adj`)
158///
159/// ## Rules
160/// - Catalog pairs must be acquired together when writing.
161/// - Never hold entity locks while acquiring catalog locks in a different scope.
162/// - Statistics lock is always last.
163/// - Read locks are generally safe, but avoid read-to-write upgrades.
164pub struct LpgStore {
165    /// Configuration.
166    #[allow(dead_code)]
167    config: LpgStoreConfig,
168
169    /// Node records indexed by NodeId, with version chains for MVCC.
170    /// Used when `tiered-storage` feature is disabled.
171    /// Lock order: 1
172    #[cfg(not(feature = "tiered-storage"))]
173    nodes: RwLock<FxHashMap<NodeId, VersionChain<NodeRecord>>>,
174
175    /// Edge records indexed by EdgeId, with version chains for MVCC.
176    /// Used when `tiered-storage` feature is disabled.
177    /// Lock order: 2
178    #[cfg(not(feature = "tiered-storage"))]
179    edges: RwLock<FxHashMap<EdgeId, VersionChain<EdgeRecord>>>,
180
181    // === Tiered Storage Fields (feature-gated) ===
182    /// Arena allocator for hot data storage.
183    /// Data is stored in per-epoch arenas for fast allocation and bulk deallocation.
184    #[cfg(feature = "tiered-storage")]
185    arena_allocator: Arc<ArenaAllocator>,
186
187    /// Node version indexes - store metadata and arena offsets.
188    /// The actual NodeRecord data is stored in the arena.
189    /// Lock order: 1
190    #[cfg(feature = "tiered-storage")]
191    node_versions: RwLock<FxHashMap<NodeId, VersionIndex>>,
192
193    /// Edge version indexes - store metadata and arena offsets.
194    /// The actual EdgeRecord data is stored in the arena.
195    /// Lock order: 2
196    #[cfg(feature = "tiered-storage")]
197    edge_versions: RwLock<FxHashMap<EdgeId, VersionIndex>>,
198
199    /// Cold storage for frozen epochs.
200    /// Contains compressed epoch blocks for historical data.
201    #[cfg(feature = "tiered-storage")]
202    epoch_store: Arc<EpochStore>,
203
204    /// Property storage for nodes.
205    node_properties: PropertyStorage<NodeId>,
206
207    /// Property storage for edges.
208    edge_properties: PropertyStorage<EdgeId>,
209
210    /// Label name to ID mapping.
211    /// Lock order: 3 (acquire with id_to_label)
212    label_to_id: RwLock<FxHashMap<ArcStr, u32>>,
213
214    /// Label ID to name mapping.
215    /// Lock order: 3 (acquire with label_to_id)
216    id_to_label: RwLock<Vec<ArcStr>>,
217
218    /// Edge type name to ID mapping.
219    /// Lock order: 4 (acquire with id_to_edge_type)
220    edge_type_to_id: RwLock<FxHashMap<ArcStr, u32>>,
221
222    /// Edge type ID to name mapping.
223    /// Lock order: 4 (acquire with edge_type_to_id)
224    id_to_edge_type: RwLock<Vec<ArcStr>>,
225
226    /// Forward adjacency lists (outgoing edges).
227    forward_adj: ChunkedAdjacency,
228
229    /// Backward adjacency lists (incoming edges).
230    /// Only populated if config.backward_edges is true.
231    backward_adj: Option<ChunkedAdjacency>,
232
233    /// Label index: label_id -> set of node IDs.
234    /// Lock order: 5
235    label_index: RwLock<Vec<FxHashMap<NodeId, ()>>>,
236
237    /// Node labels: node_id -> set of label IDs.
238    /// Reverse mapping to efficiently get labels for a node.
239    /// Lock order: 6
240    node_labels: RwLock<FxHashMap<NodeId, FxHashSet<u32>>>,
241
242    /// Property indexes: property_key -> (value -> set of node IDs).
243    ///
244    /// When a property is indexed, lookups by value are O(1) instead of O(n).
245    /// Use [`create_property_index`] to enable indexing for a property.
246    /// Lock order: 7
247    property_indexes: RwLock<FxHashMap<PropertyKey, DashMap<HashableValue, FxHashSet<NodeId>>>>,
248
249    /// Next node ID.
250    next_node_id: AtomicU64,
251
252    /// Next edge ID.
253    next_edge_id: AtomicU64,
254
255    /// Current epoch.
256    current_epoch: AtomicU64,
257
258    /// Statistics for cost-based optimization.
259    /// Lock order: 8 (always last)
260    statistics: RwLock<Statistics>,
261}
262
263impl LpgStore {
264    /// Creates a new LPG store with default configuration.
265    #[must_use]
266    pub fn new() -> Self {
267        Self::with_config(LpgStoreConfig::default())
268    }
269
270    /// Creates a new LPG store with custom configuration.
271    #[must_use]
272    pub fn with_config(config: LpgStoreConfig) -> Self {
273        let backward_adj = if config.backward_edges {
274            Some(ChunkedAdjacency::new())
275        } else {
276            None
277        };
278
279        Self {
280            #[cfg(not(feature = "tiered-storage"))]
281            nodes: RwLock::new(FxHashMap::default()),
282            #[cfg(not(feature = "tiered-storage"))]
283            edges: RwLock::new(FxHashMap::default()),
284            #[cfg(feature = "tiered-storage")]
285            arena_allocator: Arc::new(ArenaAllocator::new()),
286            #[cfg(feature = "tiered-storage")]
287            node_versions: RwLock::new(FxHashMap::default()),
288            #[cfg(feature = "tiered-storage")]
289            edge_versions: RwLock::new(FxHashMap::default()),
290            #[cfg(feature = "tiered-storage")]
291            epoch_store: Arc::new(EpochStore::new()),
292            node_properties: PropertyStorage::new(),
293            edge_properties: PropertyStorage::new(),
294            label_to_id: RwLock::new(FxHashMap::default()),
295            id_to_label: RwLock::new(Vec::new()),
296            edge_type_to_id: RwLock::new(FxHashMap::default()),
297            id_to_edge_type: RwLock::new(Vec::new()),
298            forward_adj: ChunkedAdjacency::new(),
299            backward_adj,
300            label_index: RwLock::new(Vec::new()),
301            node_labels: RwLock::new(FxHashMap::default()),
302            property_indexes: RwLock::new(FxHashMap::default()),
303            next_node_id: AtomicU64::new(0),
304            next_edge_id: AtomicU64::new(0),
305            current_epoch: AtomicU64::new(0),
306            statistics: RwLock::new(Statistics::new()),
307            config,
308        }
309    }
310
311    /// Returns the current epoch.
312    #[must_use]
313    pub fn current_epoch(&self) -> EpochId {
314        EpochId::new(self.current_epoch.load(Ordering::Acquire))
315    }
316
317    /// Creates a new epoch.
318    pub fn new_epoch(&self) -> EpochId {
319        let id = self.current_epoch.fetch_add(1, Ordering::AcqRel) + 1;
320        EpochId::new(id)
321    }
322
323    // === Node Operations ===
324
325    /// Creates a new node with the given labels.
326    ///
327    /// Uses the system transaction for non-transactional operations.
328    pub fn create_node(&self, labels: &[&str]) -> NodeId {
329        self.create_node_versioned(labels, self.current_epoch(), TxId::SYSTEM)
330    }
331
332    /// Creates a new node with the given labels within a transaction context.
333    #[cfg(not(feature = "tiered-storage"))]
334    pub fn create_node_versioned(&self, labels: &[&str], epoch: EpochId, tx_id: TxId) -> NodeId {
335        let id = NodeId::new(self.next_node_id.fetch_add(1, Ordering::Relaxed));
336
337        let mut record = NodeRecord::new(id, epoch);
338        record.set_label_count(labels.len() as u16);
339
340        // Store labels in node_labels map and label_index
341        let mut node_label_set = FxHashSet::default();
342        for label in labels {
343            let label_id = self.get_or_create_label_id(*label);
344            node_label_set.insert(label_id);
345
346            // Update label index
347            let mut index = self.label_index.write();
348            while index.len() <= label_id as usize {
349                index.push(FxHashMap::default());
350            }
351            index[label_id as usize].insert(id, ());
352        }
353
354        // Store node's labels
355        self.node_labels.write().insert(id, node_label_set);
356
357        // Create version chain with initial version
358        let chain = VersionChain::with_initial(record, epoch, tx_id);
359        self.nodes.write().insert(id, chain);
360        id
361    }
362
363    /// Creates a new node with the given labels within a transaction context.
364    /// (Tiered storage version: stores data in arena, metadata in VersionIndex)
365    #[cfg(feature = "tiered-storage")]
366    pub fn create_node_versioned(&self, labels: &[&str], epoch: EpochId, tx_id: TxId) -> NodeId {
367        let id = NodeId::new(self.next_node_id.fetch_add(1, Ordering::Relaxed));
368
369        let mut record = NodeRecord::new(id, epoch);
370        record.set_label_count(labels.len() as u16);
371
372        // Store labels in node_labels map and label_index
373        let mut node_label_set = FxHashSet::default();
374        for label in labels {
375            let label_id = self.get_or_create_label_id(*label);
376            node_label_set.insert(label_id);
377
378            // Update label index
379            let mut index = self.label_index.write();
380            while index.len() <= label_id as usize {
381                index.push(FxHashMap::default());
382            }
383            index[label_id as usize].insert(id, ());
384        }
385
386        // Store node's labels
387        self.node_labels.write().insert(id, node_label_set);
388
389        // Allocate record in arena and get offset (create epoch if needed)
390        let arena = self.arena_allocator.arena_or_create(epoch);
391        let (offset, _stored) = arena.alloc_value_with_offset(record);
392
393        // Create HotVersionRef pointing to arena data
394        let hot_ref = HotVersionRef::new(epoch, offset, tx_id);
395
396        // Create or update version index
397        let mut versions = self.node_versions.write();
398        if let Some(index) = versions.get_mut(&id) {
399            index.add_hot(hot_ref);
400        } else {
401            versions.insert(id, VersionIndex::with_initial(hot_ref));
402        }
403
404        id
405    }
406
407    /// Creates a new node with labels and properties.
408    pub fn create_node_with_props(
409        &self,
410        labels: &[&str],
411        properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
412    ) -> NodeId {
413        self.create_node_with_props_versioned(
414            labels,
415            properties,
416            self.current_epoch(),
417            TxId::SYSTEM,
418        )
419    }
420
421    /// Creates a new node with labels and properties within a transaction context.
422    #[cfg(not(feature = "tiered-storage"))]
423    pub fn create_node_with_props_versioned(
424        &self,
425        labels: &[&str],
426        properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
427        epoch: EpochId,
428        tx_id: TxId,
429    ) -> NodeId {
430        let id = self.create_node_versioned(labels, epoch, tx_id);
431
432        for (key, value) in properties {
433            self.node_properties.set(id, key.into(), value.into());
434        }
435
436        // Update props_count in record
437        let count = self.node_properties.get_all(id).len() as u16;
438        if let Some(chain) = self.nodes.write().get_mut(&id) {
439            if let Some(record) = chain.latest_mut() {
440                record.props_count = count;
441            }
442        }
443
444        id
445    }
446
447    /// Creates a new node with labels and properties within a transaction context.
448    /// (Tiered storage version)
449    #[cfg(feature = "tiered-storage")]
450    pub fn create_node_with_props_versioned(
451        &self,
452        labels: &[&str],
453        properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
454        epoch: EpochId,
455        tx_id: TxId,
456    ) -> NodeId {
457        let id = self.create_node_versioned(labels, epoch, tx_id);
458
459        for (key, value) in properties {
460            self.node_properties.set(id, key.into(), value.into());
461        }
462
463        // Note: props_count in record is not updated for tiered storage.
464        // The record is immutable once allocated in the arena.
465
466        id
467    }
468
469    /// Gets a node by ID (latest visible version).
470    #[must_use]
471    pub fn get_node(&self, id: NodeId) -> Option<Node> {
472        self.get_node_at_epoch(id, self.current_epoch())
473    }
474
475    /// Gets a node by ID at a specific epoch.
476    #[must_use]
477    #[cfg(not(feature = "tiered-storage"))]
478    pub fn get_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> Option<Node> {
479        let nodes = self.nodes.read();
480        let chain = nodes.get(&id)?;
481        let record = chain.visible_at(epoch)?;
482
483        if record.is_deleted() {
484            return None;
485        }
486
487        let mut node = Node::new(id);
488
489        // Get labels from node_labels map
490        let id_to_label = self.id_to_label.read();
491        let node_labels = self.node_labels.read();
492        if let Some(label_ids) = node_labels.get(&id) {
493            for &label_id in label_ids {
494                if let Some(label) = id_to_label.get(label_id as usize) {
495                    node.labels.push(label.clone());
496                }
497            }
498        }
499
500        // Get properties
501        node.properties = self.node_properties.get_all(id).into_iter().collect();
502
503        Some(node)
504    }
505
506    /// Gets a node by ID at a specific epoch.
507    /// (Tiered storage version: reads from arena via VersionIndex)
508    #[must_use]
509    #[cfg(feature = "tiered-storage")]
510    pub fn get_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> Option<Node> {
511        let versions = self.node_versions.read();
512        let index = versions.get(&id)?;
513        let version_ref = index.visible_at(epoch)?;
514
515        // Read the record from arena
516        let record = self.read_node_record(&version_ref)?;
517
518        if record.is_deleted() {
519            return None;
520        }
521
522        let mut node = Node::new(id);
523
524        // Get labels from node_labels map
525        let id_to_label = self.id_to_label.read();
526        let node_labels = self.node_labels.read();
527        if let Some(label_ids) = node_labels.get(&id) {
528            for &label_id in label_ids {
529                if let Some(label) = id_to_label.get(label_id as usize) {
530                    node.labels.push(label.clone());
531                }
532            }
533        }
534
535        // Get properties
536        node.properties = self.node_properties.get_all(id).into_iter().collect();
537
538        Some(node)
539    }
540
541    /// Gets a node visible to a specific transaction.
542    #[must_use]
543    #[cfg(not(feature = "tiered-storage"))]
544    pub fn get_node_versioned(&self, id: NodeId, epoch: EpochId, tx_id: TxId) -> Option<Node> {
545        let nodes = self.nodes.read();
546        let chain = nodes.get(&id)?;
547        let record = chain.visible_to(epoch, tx_id)?;
548
549        if record.is_deleted() {
550            return None;
551        }
552
553        let mut node = Node::new(id);
554
555        // Get labels from node_labels map
556        let id_to_label = self.id_to_label.read();
557        let node_labels = self.node_labels.read();
558        if let Some(label_ids) = node_labels.get(&id) {
559            for &label_id in label_ids {
560                if let Some(label) = id_to_label.get(label_id as usize) {
561                    node.labels.push(label.clone());
562                }
563            }
564        }
565
566        // Get properties
567        node.properties = self.node_properties.get_all(id).into_iter().collect();
568
569        Some(node)
570    }
571
572    /// Gets a node visible to a specific transaction.
573    /// (Tiered storage version: reads from arena via VersionIndex)
574    #[must_use]
575    #[cfg(feature = "tiered-storage")]
576    pub fn get_node_versioned(&self, id: NodeId, epoch: EpochId, tx_id: TxId) -> Option<Node> {
577        let versions = self.node_versions.read();
578        let index = versions.get(&id)?;
579        let version_ref = index.visible_to(epoch, tx_id)?;
580
581        // Read the record from arena
582        let record = self.read_node_record(&version_ref)?;
583
584        if record.is_deleted() {
585            return None;
586        }
587
588        let mut node = Node::new(id);
589
590        // Get labels from node_labels map
591        let id_to_label = self.id_to_label.read();
592        let node_labels = self.node_labels.read();
593        if let Some(label_ids) = node_labels.get(&id) {
594            for &label_id in label_ids {
595                if let Some(label) = id_to_label.get(label_id as usize) {
596                    node.labels.push(label.clone());
597                }
598            }
599        }
600
601        // Get properties
602        node.properties = self.node_properties.get_all(id).into_iter().collect();
603
604        Some(node)
605    }
606
607    /// Reads a NodeRecord from arena (hot) or epoch store (cold) using a VersionRef.
608    #[cfg(feature = "tiered-storage")]
609    #[allow(unsafe_code)]
610    fn read_node_record(&self, version_ref: &VersionRef) -> Option<NodeRecord> {
611        match version_ref {
612            VersionRef::Hot(hot_ref) => {
613                let arena = self.arena_allocator.arena(hot_ref.epoch);
614                // SAFETY: The offset was returned by alloc_value_with_offset for a NodeRecord
615                let record: &NodeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
616                Some(record.clone())
617            }
618            VersionRef::Cold(cold_ref) => {
619                // Read from compressed epoch store
620                self.epoch_store
621                    .get_node(cold_ref.epoch, cold_ref.block_offset, cold_ref.length)
622            }
623        }
624    }
625
626    /// Deletes a node and all its edges (using latest epoch).
627    pub fn delete_node(&self, id: NodeId) -> bool {
628        self.delete_node_at_epoch(id, self.current_epoch())
629    }
630
631    /// Deletes a node at a specific epoch.
632    #[cfg(not(feature = "tiered-storage"))]
633    pub fn delete_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> bool {
634        let mut nodes = self.nodes.write();
635        if let Some(chain) = nodes.get_mut(&id) {
636            // Check if visible at this epoch (not already deleted)
637            if let Some(record) = chain.visible_at(epoch) {
638                if record.is_deleted() {
639                    return false;
640                }
641            } else {
642                // Not visible at this epoch (already deleted or doesn't exist)
643                return false;
644            }
645
646            // Mark the version chain as deleted at this epoch
647            chain.mark_deleted(epoch);
648
649            // Remove from label index using node_labels map
650            let mut index = self.label_index.write();
651            let mut node_labels = self.node_labels.write();
652            if let Some(label_ids) = node_labels.remove(&id) {
653                for label_id in label_ids {
654                    if let Some(set) = index.get_mut(label_id as usize) {
655                        set.remove(&id);
656                    }
657                }
658            }
659
660            // Remove properties
661            drop(nodes); // Release lock before removing properties
662            drop(index);
663            drop(node_labels);
664            self.node_properties.remove_all(id);
665
666            // Note: Caller should use delete_node_edges() first if detach is needed
667
668            true
669        } else {
670            false
671        }
672    }
673
674    /// Deletes a node at a specific epoch.
675    /// (Tiered storage version)
676    #[cfg(feature = "tiered-storage")]
677    pub fn delete_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> bool {
678        let mut versions = self.node_versions.write();
679        if let Some(index) = versions.get_mut(&id) {
680            // Check if visible at this epoch
681            if let Some(version_ref) = index.visible_at(epoch) {
682                if let Some(record) = self.read_node_record(&version_ref) {
683                    if record.is_deleted() {
684                        return false;
685                    }
686                } else {
687                    return false;
688                }
689            } else {
690                return false;
691            }
692
693            // Mark as deleted in version index
694            index.mark_deleted(epoch);
695
696            // Remove from label index using node_labels map
697            let mut label_index = self.label_index.write();
698            let mut node_labels = self.node_labels.write();
699            if let Some(label_ids) = node_labels.remove(&id) {
700                for label_id in label_ids {
701                    if let Some(set) = label_index.get_mut(label_id as usize) {
702                        set.remove(&id);
703                    }
704                }
705            }
706
707            // Remove properties
708            drop(versions);
709            drop(label_index);
710            drop(node_labels);
711            self.node_properties.remove_all(id);
712
713            true
714        } else {
715            false
716        }
717    }
718
719    /// Deletes all edges connected to a node (implements DETACH DELETE).
720    ///
721    /// Call this before `delete_node()` if you want to remove a node that
722    /// has edges. Grafeo doesn't auto-delete edges - you have to be explicit.
723    #[cfg(not(feature = "tiered-storage"))]
724    pub fn delete_node_edges(&self, node_id: NodeId) {
725        // Get outgoing edges
726        let outgoing: Vec<EdgeId> = self
727            .forward_adj
728            .edges_from(node_id)
729            .into_iter()
730            .map(|(_, edge_id)| edge_id)
731            .collect();
732
733        // Get incoming edges
734        let incoming: Vec<EdgeId> = if let Some(ref backward) = self.backward_adj {
735            backward
736                .edges_from(node_id)
737                .into_iter()
738                .map(|(_, edge_id)| edge_id)
739                .collect()
740        } else {
741            // No backward adjacency - scan all edges
742            let epoch = self.current_epoch();
743            self.edges
744                .read()
745                .iter()
746                .filter_map(|(id, chain)| {
747                    chain.visible_at(epoch).and_then(|r| {
748                        if !r.is_deleted() && r.dst == node_id {
749                            Some(*id)
750                        } else {
751                            None
752                        }
753                    })
754                })
755                .collect()
756        };
757
758        // Delete all edges
759        for edge_id in outgoing.into_iter().chain(incoming) {
760            self.delete_edge(edge_id);
761        }
762    }
763
764    /// Deletes all edges connected to a node (implements DETACH DELETE).
765    /// (Tiered storage version)
766    #[cfg(feature = "tiered-storage")]
767    pub fn delete_node_edges(&self, node_id: NodeId) {
768        // Get outgoing edges
769        let outgoing: Vec<EdgeId> = self
770            .forward_adj
771            .edges_from(node_id)
772            .into_iter()
773            .map(|(_, edge_id)| edge_id)
774            .collect();
775
776        // Get incoming edges
777        let incoming: Vec<EdgeId> = if let Some(ref backward) = self.backward_adj {
778            backward
779                .edges_from(node_id)
780                .into_iter()
781                .map(|(_, edge_id)| edge_id)
782                .collect()
783        } else {
784            // No backward adjacency - scan all edges
785            let epoch = self.current_epoch();
786            let versions = self.edge_versions.read();
787            versions
788                .iter()
789                .filter_map(|(id, index)| {
790                    index.visible_at(epoch).and_then(|vref| {
791                        self.read_edge_record(&vref).and_then(|r| {
792                            if !r.is_deleted() && r.dst == node_id {
793                                Some(*id)
794                            } else {
795                                None
796                            }
797                        })
798                    })
799                })
800                .collect()
801        };
802
803        // Delete all edges
804        for edge_id in outgoing.into_iter().chain(incoming) {
805            self.delete_edge(edge_id);
806        }
807    }
808
809    /// Sets a property on a node.
810    #[cfg(not(feature = "tiered-storage"))]
811    pub fn set_node_property(&self, id: NodeId, key: &str, value: Value) {
812        let prop_key: PropertyKey = key.into();
813
814        // Update property index before setting the property (needs to read old value)
815        self.update_property_index_on_set(id, &prop_key, &value);
816
817        self.node_properties.set(id, prop_key, value);
818
819        // Update props_count in record
820        let count = self.node_properties.get_all(id).len() as u16;
821        if let Some(chain) = self.nodes.write().get_mut(&id) {
822            if let Some(record) = chain.latest_mut() {
823                record.props_count = count;
824            }
825        }
826    }
827
828    /// Sets a property on a node.
829    /// (Tiered storage version: properties stored separately, record is immutable)
830    #[cfg(feature = "tiered-storage")]
831    pub fn set_node_property(&self, id: NodeId, key: &str, value: Value) {
832        let prop_key: PropertyKey = key.into();
833
834        // Update property index before setting the property (needs to read old value)
835        self.update_property_index_on_set(id, &prop_key, &value);
836
837        self.node_properties.set(id, prop_key, value);
838        // Note: props_count in record is not updated for tiered storage.
839        // The record is immutable once allocated in the arena.
840        // Property count can be derived from PropertyStorage if needed.
841    }
842
843    /// Sets a property on an edge.
844    pub fn set_edge_property(&self, id: EdgeId, key: &str, value: Value) {
845        self.edge_properties.set(id, key.into(), value);
846    }
847
848    /// Removes a property from a node.
849    ///
850    /// Returns the previous value if it existed, or None if the property didn't exist.
851    #[cfg(not(feature = "tiered-storage"))]
852    pub fn remove_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
853        let prop_key: PropertyKey = key.into();
854
855        // Update property index before removing (needs to read old value)
856        self.update_property_index_on_remove(id, &prop_key);
857
858        let result = self.node_properties.remove(id, &prop_key);
859
860        // Update props_count in record
861        let count = self.node_properties.get_all(id).len() as u16;
862        if let Some(chain) = self.nodes.write().get_mut(&id) {
863            if let Some(record) = chain.latest_mut() {
864                record.props_count = count;
865            }
866        }
867
868        result
869    }
870
871    /// Removes a property from a node.
872    /// (Tiered storage version)
873    #[cfg(feature = "tiered-storage")]
874    pub fn remove_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
875        let prop_key: PropertyKey = key.into();
876
877        // Update property index before removing (needs to read old value)
878        self.update_property_index_on_remove(id, &prop_key);
879
880        self.node_properties.remove(id, &prop_key)
881        // Note: props_count in record is not updated for tiered storage.
882    }
883
884    /// Removes a property from an edge.
885    ///
886    /// Returns the previous value if it existed, or None if the property didn't exist.
887    pub fn remove_edge_property(&self, id: EdgeId, key: &str) -> Option<Value> {
888        self.edge_properties.remove(id, &key.into())
889    }
890
891    /// Gets a single property from a node without loading all properties.
892    ///
893    /// This is O(1) vs O(properties) for `get_node().get_property()`.
894    /// Use this for filter predicates where you only need one property value.
895    ///
896    /// # Example
897    ///
898    /// ```ignore
899    /// // Fast: Direct single-property lookup
900    /// let age = store.get_node_property(node_id, "age");
901    ///
902    /// // Slow: Loads all properties, then extracts one
903    /// let age = store.get_node(node_id).and_then(|n| n.get_property("age").cloned());
904    /// ```
905    #[must_use]
906    pub fn get_node_property(&self, id: NodeId, key: &PropertyKey) -> Option<Value> {
907        self.node_properties.get(id, key)
908    }
909
910    /// Gets a single property from an edge without loading all properties.
911    ///
912    /// This is O(1) vs O(properties) for `get_edge().get_property()`.
913    #[must_use]
914    pub fn get_edge_property(&self, id: EdgeId, key: &PropertyKey) -> Option<Value> {
915        self.edge_properties.get(id, key)
916    }
917
918    // === Batch Property Operations ===
919
920    /// Gets a property for multiple nodes in a single batch operation.
921    ///
922    /// More efficient than calling [`Self::get_node_property`] in a loop because it
923    /// reduces lock overhead and enables better cache utilization.
924    ///
925    /// # Example
926    ///
927    /// ```
928    /// use grafeo_core::graph::lpg::LpgStore;
929    /// use grafeo_common::types::{NodeId, PropertyKey, Value};
930    ///
931    /// let store = LpgStore::new();
932    /// let n1 = store.create_node(&["Person"]);
933    /// let n2 = store.create_node(&["Person"]);
934    /// store.set_node_property(n1, "age", Value::from(25i64));
935    /// store.set_node_property(n2, "age", Value::from(30i64));
936    ///
937    /// let ages = store.get_node_property_batch(&[n1, n2], &PropertyKey::new("age"));
938    /// assert_eq!(ages, vec![Some(Value::from(25i64)), Some(Value::from(30i64))]);
939    /// ```
940    #[must_use]
941    pub fn get_node_property_batch(&self, ids: &[NodeId], key: &PropertyKey) -> Vec<Option<Value>> {
942        self.node_properties.get_batch(ids, key)
943    }
944
945    /// Gets all properties for multiple nodes in a single batch operation.
946    ///
947    /// Returns a vector of property maps, one per node ID (empty map if no properties).
948    /// More efficient than calling [`Self::get_node`] in a loop.
949    #[must_use]
950    pub fn get_nodes_properties_batch(&self, ids: &[NodeId]) -> Vec<FxHashMap<PropertyKey, Value>> {
951        self.node_properties.get_all_batch(ids)
952    }
953
954    /// Gets selected properties for multiple nodes (projection pushdown).
955    ///
956    /// This is more efficient than [`Self::get_nodes_properties_batch`] when you only
957    /// need a subset of properties. It only iterates the requested columns instead of
958    /// all columns.
959    ///
960    /// **Use this for**: Queries with explicit projections like `RETURN n.name, n.age`
961    /// instead of `RETURN n` (which requires all properties).
962    ///
963    /// # Example
964    ///
965    /// ```
966    /// use grafeo_core::graph::lpg::LpgStore;
967    /// use grafeo_common::types::{PropertyKey, Value};
968    ///
969    /// let store = LpgStore::new();
970    /// let n1 = store.create_node(&["Person"]);
971    /// store.set_node_property(n1, "name", Value::from("Alice"));
972    /// store.set_node_property(n1, "age", Value::from(30i64));
973    /// store.set_node_property(n1, "email", Value::from("alice@example.com"));
974    ///
975    /// // Only fetch name and age (faster than get_nodes_properties_batch)
976    /// let keys = vec![PropertyKey::new("name"), PropertyKey::new("age")];
977    /// let props = store.get_nodes_properties_selective_batch(&[n1], &keys);
978    ///
979    /// assert_eq!(props[0].len(), 2); // Only name and age, not email
980    /// ```
981    #[must_use]
982    pub fn get_nodes_properties_selective_batch(
983        &self,
984        ids: &[NodeId],
985        keys: &[PropertyKey],
986    ) -> Vec<FxHashMap<PropertyKey, Value>> {
987        self.node_properties.get_selective_batch(ids, keys)
988    }
989
990    /// Gets selected properties for multiple edges (projection pushdown).
991    ///
992    /// Edge-property version of [`Self::get_nodes_properties_selective_batch`].
993    #[must_use]
994    pub fn get_edges_properties_selective_batch(
995        &self,
996        ids: &[EdgeId],
997        keys: &[PropertyKey],
998    ) -> Vec<FxHashMap<PropertyKey, Value>> {
999        self.edge_properties.get_selective_batch(ids, keys)
1000    }
1001
1002    /// Finds nodes where a property value is in a range.
1003    ///
1004    /// This is useful for queries like `n.age > 30` or `n.price BETWEEN 10 AND 100`.
1005    /// Uses zone maps to skip scanning when the range definitely doesn't match.
1006    ///
1007    /// # Arguments
1008    ///
1009    /// * `property` - The property to check
1010    /// * `min` - Optional lower bound (None for unbounded)
1011    /// * `max` - Optional upper bound (None for unbounded)
1012    /// * `min_inclusive` - Whether lower bound is inclusive (>= vs >)
1013    /// * `max_inclusive` - Whether upper bound is inclusive (<= vs <)
1014    ///
1015    /// # Example
1016    ///
1017    /// ```
1018    /// use grafeo_core::graph::lpg::LpgStore;
1019    /// use grafeo_common::types::Value;
1020    ///
1021    /// let store = LpgStore::new();
1022    /// let n1 = store.create_node(&["Person"]);
1023    /// let n2 = store.create_node(&["Person"]);
1024    /// store.set_node_property(n1, "age", Value::from(25i64));
1025    /// store.set_node_property(n2, "age", Value::from(35i64));
1026    ///
1027    /// // Find nodes where age > 30
1028    /// let result = store.find_nodes_in_range(
1029    ///     "age",
1030    ///     Some(&Value::from(30i64)),
1031    ///     None,
1032    ///     false, // exclusive lower bound
1033    ///     true,  // inclusive upper bound (doesn't matter since None)
1034    /// );
1035    /// assert_eq!(result.len(), 1); // Only n2 matches
1036    /// ```
1037    #[must_use]
1038    pub fn find_nodes_in_range(
1039        &self,
1040        property: &str,
1041        min: Option<&Value>,
1042        max: Option<&Value>,
1043        min_inclusive: bool,
1044        max_inclusive: bool,
1045    ) -> Vec<NodeId> {
1046        let key = PropertyKey::new(property);
1047
1048        // Check zone map first - if no values could match, return empty
1049        if !self
1050            .node_properties
1051            .might_match_range(&key, min, max, min_inclusive, max_inclusive)
1052        {
1053            return Vec::new();
1054        }
1055
1056        // Scan all nodes and filter by range
1057        self.node_ids()
1058            .into_iter()
1059            .filter(|&node_id| {
1060                self.node_properties
1061                    .get(node_id, &key)
1062                    .is_some_and(|v| value_in_range(&v, min, max, min_inclusive, max_inclusive))
1063            })
1064            .collect()
1065    }
1066
1067    /// Finds nodes matching multiple property equality conditions.
1068    ///
1069    /// This is more efficient than intersecting multiple single-property lookups
1070    /// because it can use indexes when available and short-circuits on the first
1071    /// miss.
1072    ///
1073    /// # Example
1074    ///
1075    /// ```
1076    /// use grafeo_core::graph::lpg::LpgStore;
1077    /// use grafeo_common::types::Value;
1078    ///
1079    /// let store = LpgStore::new();
1080    /// let alice = store.create_node(&["Person"]);
1081    /// store.set_node_property(alice, "name", Value::from("Alice"));
1082    /// store.set_node_property(alice, "city", Value::from("NYC"));
1083    ///
1084    /// // Find nodes where name = "Alice" AND city = "NYC"
1085    /// let matches = store.find_nodes_by_properties(&[
1086    ///     ("name", Value::from("Alice")),
1087    ///     ("city", Value::from("NYC")),
1088    /// ]);
1089    /// assert!(matches.contains(&alice));
1090    /// ```
1091    #[must_use]
1092    pub fn find_nodes_by_properties(&self, conditions: &[(&str, Value)]) -> Vec<NodeId> {
1093        if conditions.is_empty() {
1094            return self.node_ids();
1095        }
1096
1097        // Find the most selective condition (smallest result set) to start
1098        // If any condition has an index, use that first
1099        let mut best_start: Option<(usize, Vec<NodeId>)> = None;
1100        let indexes = self.property_indexes.read();
1101
1102        for (i, (prop, value)) in conditions.iter().enumerate() {
1103            let key = PropertyKey::new(*prop);
1104            let hv = HashableValue::new(value.clone());
1105
1106            if let Some(index) = indexes.get(&key) {
1107                let matches: Vec<NodeId> = index
1108                    .get(&hv)
1109                    .map(|nodes| nodes.iter().copied().collect())
1110                    .unwrap_or_default();
1111
1112                // Short-circuit if any indexed condition has no matches
1113                if matches.is_empty() {
1114                    return Vec::new();
1115                }
1116
1117                // Use smallest indexed result as starting point
1118                if best_start
1119                    .as_ref()
1120                    .is_none_or(|(_, best)| matches.len() < best.len())
1121                {
1122                    best_start = Some((i, matches));
1123                }
1124            }
1125        }
1126        drop(indexes);
1127
1128        // Start from best indexed result or fall back to full node scan
1129        let (start_idx, mut candidates) = best_start.unwrap_or_else(|| {
1130            // No indexes available, start with first condition via full scan
1131            let (prop, value) = &conditions[0];
1132            (0, self.find_nodes_by_property(prop, value))
1133        });
1134
1135        // Filter candidates through remaining conditions
1136        for (i, (prop, value)) in conditions.iter().enumerate() {
1137            if i == start_idx {
1138                continue;
1139            }
1140
1141            let key = PropertyKey::new(*prop);
1142            candidates.retain(|&node_id| {
1143                self.node_properties
1144                    .get(node_id, &key)
1145                    .is_some_and(|v| v == *value)
1146            });
1147
1148            // Short-circuit if no candidates remain
1149            if candidates.is_empty() {
1150                return Vec::new();
1151            }
1152        }
1153
1154        candidates
1155    }
1156
1157    // === Property Index Operations ===
1158
1159    /// Creates an index on a node property for O(1) lookups by value.
1160    ///
1161    /// After creating an index, calls to [`Self::find_nodes_by_property`] will be
1162    /// O(1) instead of O(n) for this property. The index is automatically
1163    /// maintained when properties are set or removed.
1164    ///
1165    /// # Example
1166    ///
1167    /// ```
1168    /// use grafeo_core::graph::lpg::LpgStore;
1169    /// use grafeo_common::types::Value;
1170    ///
1171    /// let store = LpgStore::new();
1172    ///
1173    /// // Create nodes with an 'id' property
1174    /// let alice = store.create_node(&["Person"]);
1175    /// store.set_node_property(alice, "id", Value::from("alice_123"));
1176    ///
1177    /// // Create an index on the 'id' property
1178    /// store.create_property_index("id");
1179    ///
1180    /// // Now lookups by 'id' are O(1)
1181    /// let found = store.find_nodes_by_property("id", &Value::from("alice_123"));
1182    /// assert!(found.contains(&alice));
1183    /// ```
1184    pub fn create_property_index(&self, property: &str) {
1185        let key = PropertyKey::new(property);
1186
1187        let mut indexes = self.property_indexes.write();
1188        if indexes.contains_key(&key) {
1189            return; // Already indexed
1190        }
1191
1192        // Create the index and populate it with existing data
1193        let index: DashMap<HashableValue, FxHashSet<NodeId>> = DashMap::new();
1194
1195        // Scan all nodes to build the index
1196        for node_id in self.node_ids() {
1197            if let Some(value) = self.node_properties.get(node_id, &key) {
1198                let hv = HashableValue::new(value);
1199                index
1200                    .entry(hv)
1201                    .or_insert_with(FxHashSet::default)
1202                    .insert(node_id);
1203            }
1204        }
1205
1206        indexes.insert(key, index);
1207    }
1208
1209    /// Drops an index on a node property.
1210    ///
1211    /// Returns `true` if the index existed and was removed.
1212    pub fn drop_property_index(&self, property: &str) -> bool {
1213        let key = PropertyKey::new(property);
1214        self.property_indexes.write().remove(&key).is_some()
1215    }
1216
1217    /// Returns `true` if the property has an index.
1218    #[must_use]
1219    pub fn has_property_index(&self, property: &str) -> bool {
1220        let key = PropertyKey::new(property);
1221        self.property_indexes.read().contains_key(&key)
1222    }
1223
1224    /// Finds all nodes that have a specific property value.
1225    ///
1226    /// If the property is indexed, this is O(1). Otherwise, it scans all nodes
1227    /// which is O(n). Use [`Self::create_property_index`] for frequently queried properties.
1228    ///
1229    /// # Example
1230    ///
1231    /// ```
1232    /// use grafeo_core::graph::lpg::LpgStore;
1233    /// use grafeo_common::types::Value;
1234    ///
1235    /// let store = LpgStore::new();
1236    /// store.create_property_index("city"); // Optional but makes lookups fast
1237    ///
1238    /// let alice = store.create_node(&["Person"]);
1239    /// let bob = store.create_node(&["Person"]);
1240    /// store.set_node_property(alice, "city", Value::from("NYC"));
1241    /// store.set_node_property(bob, "city", Value::from("NYC"));
1242    ///
1243    /// let nyc_people = store.find_nodes_by_property("city", &Value::from("NYC"));
1244    /// assert_eq!(nyc_people.len(), 2);
1245    /// ```
1246    #[must_use]
1247    pub fn find_nodes_by_property(&self, property: &str, value: &Value) -> Vec<NodeId> {
1248        let key = PropertyKey::new(property);
1249        let hv = HashableValue::new(value.clone());
1250
1251        // Try indexed lookup first
1252        let indexes = self.property_indexes.read();
1253        if let Some(index) = indexes.get(&key) {
1254            if let Some(nodes) = index.get(&hv) {
1255                return nodes.iter().copied().collect();
1256            }
1257            return Vec::new();
1258        }
1259        drop(indexes);
1260
1261        // Fall back to full scan
1262        self.node_ids()
1263            .into_iter()
1264            .filter(|&node_id| {
1265                self.node_properties
1266                    .get(node_id, &key)
1267                    .is_some_and(|v| v == *value)
1268            })
1269            .collect()
1270    }
1271
1272    /// Updates property indexes when a property is set.
1273    fn update_property_index_on_set(&self, node_id: NodeId, key: &PropertyKey, new_value: &Value) {
1274        let indexes = self.property_indexes.read();
1275        if let Some(index) = indexes.get(key) {
1276            // Get old value to remove from index
1277            if let Some(old_value) = self.node_properties.get(node_id, key) {
1278                let old_hv = HashableValue::new(old_value);
1279                if let Some(mut nodes) = index.get_mut(&old_hv) {
1280                    nodes.remove(&node_id);
1281                    if nodes.is_empty() {
1282                        drop(nodes);
1283                        index.remove(&old_hv);
1284                    }
1285                }
1286            }
1287
1288            // Add new value to index
1289            let new_hv = HashableValue::new(new_value.clone());
1290            index
1291                .entry(new_hv)
1292                .or_insert_with(FxHashSet::default)
1293                .insert(node_id);
1294        }
1295    }
1296
1297    /// Updates property indexes when a property is removed.
1298    fn update_property_index_on_remove(&self, node_id: NodeId, key: &PropertyKey) {
1299        let indexes = self.property_indexes.read();
1300        if let Some(index) = indexes.get(key) {
1301            // Get old value to remove from index
1302            if let Some(old_value) = self.node_properties.get(node_id, key) {
1303                let old_hv = HashableValue::new(old_value);
1304                if let Some(mut nodes) = index.get_mut(&old_hv) {
1305                    nodes.remove(&node_id);
1306                    if nodes.is_empty() {
1307                        drop(nodes);
1308                        index.remove(&old_hv);
1309                    }
1310                }
1311            }
1312        }
1313    }
1314
1315    /// Adds a label to a node.
1316    ///
1317    /// Returns true if the label was added, false if the node doesn't exist
1318    /// or already has the label.
1319    #[cfg(not(feature = "tiered-storage"))]
1320    pub fn add_label(&self, node_id: NodeId, label: &str) -> bool {
1321        let epoch = self.current_epoch();
1322
1323        // Check if node exists
1324        let nodes = self.nodes.read();
1325        if let Some(chain) = nodes.get(&node_id) {
1326            if chain.visible_at(epoch).map_or(true, |r| r.is_deleted()) {
1327                return false;
1328            }
1329        } else {
1330            return false;
1331        }
1332        drop(nodes);
1333
1334        // Get or create label ID
1335        let label_id = self.get_or_create_label_id(label);
1336
1337        // Add to node_labels map
1338        let mut node_labels = self.node_labels.write();
1339        let label_set = node_labels
1340            .entry(node_id)
1341            .or_insert_with(FxHashSet::default);
1342
1343        if label_set.contains(&label_id) {
1344            return false; // Already has this label
1345        }
1346
1347        label_set.insert(label_id);
1348        drop(node_labels);
1349
1350        // Add to label_index
1351        let mut index = self.label_index.write();
1352        if (label_id as usize) >= index.len() {
1353            index.resize(label_id as usize + 1, FxHashMap::default());
1354        }
1355        index[label_id as usize].insert(node_id, ());
1356
1357        // Update label count in node record
1358        if let Some(chain) = self.nodes.write().get_mut(&node_id) {
1359            if let Some(record) = chain.latest_mut() {
1360                let count = self.node_labels.read().get(&node_id).map_or(0, |s| s.len());
1361                record.set_label_count(count as u16);
1362            }
1363        }
1364
1365        true
1366    }
1367
1368    /// Adds a label to a node.
1369    /// (Tiered storage version)
1370    #[cfg(feature = "tiered-storage")]
1371    pub fn add_label(&self, node_id: NodeId, label: &str) -> bool {
1372        let epoch = self.current_epoch();
1373
1374        // Check if node exists
1375        let versions = self.node_versions.read();
1376        if let Some(index) = versions.get(&node_id) {
1377            if let Some(vref) = index.visible_at(epoch) {
1378                if let Some(record) = self.read_node_record(&vref) {
1379                    if record.is_deleted() {
1380                        return false;
1381                    }
1382                } else {
1383                    return false;
1384                }
1385            } else {
1386                return false;
1387            }
1388        } else {
1389            return false;
1390        }
1391        drop(versions);
1392
1393        // Get or create label ID
1394        let label_id = self.get_or_create_label_id(label);
1395
1396        // Add to node_labels map
1397        let mut node_labels = self.node_labels.write();
1398        let label_set = node_labels
1399            .entry(node_id)
1400            .or_insert_with(FxHashSet::default);
1401
1402        if label_set.contains(&label_id) {
1403            return false; // Already has this label
1404        }
1405
1406        label_set.insert(label_id);
1407        drop(node_labels);
1408
1409        // Add to label_index
1410        let mut index = self.label_index.write();
1411        if (label_id as usize) >= index.len() {
1412            index.resize(label_id as usize + 1, FxHashMap::default());
1413        }
1414        index[label_id as usize].insert(node_id, ());
1415
1416        // Note: label_count in record is not updated for tiered storage.
1417        // The record is immutable once allocated in the arena.
1418
1419        true
1420    }
1421
1422    /// Removes a label from a node.
1423    ///
1424    /// Returns true if the label was removed, false if the node doesn't exist
1425    /// or doesn't have the label.
1426    #[cfg(not(feature = "tiered-storage"))]
1427    pub fn remove_label(&self, node_id: NodeId, label: &str) -> bool {
1428        let epoch = self.current_epoch();
1429
1430        // Check if node exists
1431        let nodes = self.nodes.read();
1432        if let Some(chain) = nodes.get(&node_id) {
1433            if chain.visible_at(epoch).map_or(true, |r| r.is_deleted()) {
1434                return false;
1435            }
1436        } else {
1437            return false;
1438        }
1439        drop(nodes);
1440
1441        // Get label ID
1442        let label_id = {
1443            let label_ids = self.label_to_id.read();
1444            match label_ids.get(label) {
1445                Some(&id) => id,
1446                None => return false, // Label doesn't exist
1447            }
1448        };
1449
1450        // Remove from node_labels map
1451        let mut node_labels = self.node_labels.write();
1452        if let Some(label_set) = node_labels.get_mut(&node_id) {
1453            if !label_set.remove(&label_id) {
1454                return false; // Node doesn't have this label
1455            }
1456        } else {
1457            return false;
1458        }
1459        drop(node_labels);
1460
1461        // Remove from label_index
1462        let mut index = self.label_index.write();
1463        if (label_id as usize) < index.len() {
1464            index[label_id as usize].remove(&node_id);
1465        }
1466
1467        // Update label count in node record
1468        if let Some(chain) = self.nodes.write().get_mut(&node_id) {
1469            if let Some(record) = chain.latest_mut() {
1470                let count = self.node_labels.read().get(&node_id).map_or(0, |s| s.len());
1471                record.set_label_count(count as u16);
1472            }
1473        }
1474
1475        true
1476    }
1477
1478    /// Removes a label from a node.
1479    /// (Tiered storage version)
1480    #[cfg(feature = "tiered-storage")]
1481    pub fn remove_label(&self, node_id: NodeId, label: &str) -> bool {
1482        let epoch = self.current_epoch();
1483
1484        // Check if node exists
1485        let versions = self.node_versions.read();
1486        if let Some(index) = versions.get(&node_id) {
1487            if let Some(vref) = index.visible_at(epoch) {
1488                if let Some(record) = self.read_node_record(&vref) {
1489                    if record.is_deleted() {
1490                        return false;
1491                    }
1492                } else {
1493                    return false;
1494                }
1495            } else {
1496                return false;
1497            }
1498        } else {
1499            return false;
1500        }
1501        drop(versions);
1502
1503        // Get label ID
1504        let label_id = {
1505            let label_ids = self.label_to_id.read();
1506            match label_ids.get(label) {
1507                Some(&id) => id,
1508                None => return false, // Label doesn't exist
1509            }
1510        };
1511
1512        // Remove from node_labels map
1513        let mut node_labels = self.node_labels.write();
1514        if let Some(label_set) = node_labels.get_mut(&node_id) {
1515            if !label_set.remove(&label_id) {
1516                return false; // Node doesn't have this label
1517            }
1518        } else {
1519            return false;
1520        }
1521        drop(node_labels);
1522
1523        // Remove from label_index
1524        let mut index = self.label_index.write();
1525        if (label_id as usize) < index.len() {
1526            index[label_id as usize].remove(&node_id);
1527        }
1528
1529        // Note: label_count in record is not updated for tiered storage.
1530
1531        true
1532    }
1533
1534    /// Returns the number of nodes (non-deleted at current epoch).
1535    #[must_use]
1536    #[cfg(not(feature = "tiered-storage"))]
1537    pub fn node_count(&self) -> usize {
1538        let epoch = self.current_epoch();
1539        self.nodes
1540            .read()
1541            .values()
1542            .filter_map(|chain| chain.visible_at(epoch))
1543            .filter(|r| !r.is_deleted())
1544            .count()
1545    }
1546
1547    /// Returns the number of nodes (non-deleted at current epoch).
1548    /// (Tiered storage version)
1549    #[must_use]
1550    #[cfg(feature = "tiered-storage")]
1551    pub fn node_count(&self) -> usize {
1552        let epoch = self.current_epoch();
1553        let versions = self.node_versions.read();
1554        versions
1555            .iter()
1556            .filter(|(_, index)| {
1557                index.visible_at(epoch).map_or(false, |vref| {
1558                    self.read_node_record(&vref)
1559                        .map_or(false, |r| !r.is_deleted())
1560                })
1561            })
1562            .count()
1563    }
1564
1565    /// Returns all node IDs in the store.
1566    ///
1567    /// This returns a snapshot of current node IDs. The returned vector
1568    /// excludes deleted nodes. Results are sorted by NodeId for deterministic
1569    /// iteration order.
1570    #[must_use]
1571    #[cfg(not(feature = "tiered-storage"))]
1572    pub fn node_ids(&self) -> Vec<NodeId> {
1573        let epoch = self.current_epoch();
1574        let mut ids: Vec<NodeId> = self
1575            .nodes
1576            .read()
1577            .iter()
1578            .filter_map(|(id, chain)| {
1579                chain
1580                    .visible_at(epoch)
1581                    .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
1582            })
1583            .collect();
1584        ids.sort_unstable();
1585        ids
1586    }
1587
1588    /// Returns all node IDs in the store.
1589    /// (Tiered storage version)
1590    #[must_use]
1591    #[cfg(feature = "tiered-storage")]
1592    pub fn node_ids(&self) -> Vec<NodeId> {
1593        let epoch = self.current_epoch();
1594        let versions = self.node_versions.read();
1595        let mut ids: Vec<NodeId> = versions
1596            .iter()
1597            .filter_map(|(id, index)| {
1598                index.visible_at(epoch).and_then(|vref| {
1599                    self.read_node_record(&vref)
1600                        .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
1601                })
1602            })
1603            .collect();
1604        ids.sort_unstable();
1605        ids
1606    }
1607
1608    // === Edge Operations ===
1609
1610    /// Creates a new edge.
1611    pub fn create_edge(&self, src: NodeId, dst: NodeId, edge_type: &str) -> EdgeId {
1612        self.create_edge_versioned(src, dst, edge_type, self.current_epoch(), TxId::SYSTEM)
1613    }
1614
1615    /// Creates a new edge within a transaction context.
1616    #[cfg(not(feature = "tiered-storage"))]
1617    pub fn create_edge_versioned(
1618        &self,
1619        src: NodeId,
1620        dst: NodeId,
1621        edge_type: &str,
1622        epoch: EpochId,
1623        tx_id: TxId,
1624    ) -> EdgeId {
1625        let id = EdgeId::new(self.next_edge_id.fetch_add(1, Ordering::Relaxed));
1626        let type_id = self.get_or_create_edge_type_id(edge_type);
1627
1628        let record = EdgeRecord::new(id, src, dst, type_id, epoch);
1629        let chain = VersionChain::with_initial(record, epoch, tx_id);
1630        self.edges.write().insert(id, chain);
1631
1632        // Update adjacency
1633        self.forward_adj.add_edge(src, dst, id);
1634        if let Some(ref backward) = self.backward_adj {
1635            backward.add_edge(dst, src, id);
1636        }
1637
1638        id
1639    }
1640
1641    /// Creates a new edge within a transaction context.
1642    /// (Tiered storage version)
1643    #[cfg(feature = "tiered-storage")]
1644    pub fn create_edge_versioned(
1645        &self,
1646        src: NodeId,
1647        dst: NodeId,
1648        edge_type: &str,
1649        epoch: EpochId,
1650        tx_id: TxId,
1651    ) -> EdgeId {
1652        let id = EdgeId::new(self.next_edge_id.fetch_add(1, Ordering::Relaxed));
1653        let type_id = self.get_or_create_edge_type_id(edge_type);
1654
1655        let record = EdgeRecord::new(id, src, dst, type_id, epoch);
1656
1657        // Allocate record in arena and get offset (create epoch if needed)
1658        let arena = self.arena_allocator.arena_or_create(epoch);
1659        let (offset, _stored) = arena.alloc_value_with_offset(record);
1660
1661        // Create HotVersionRef pointing to arena data
1662        let hot_ref = HotVersionRef::new(epoch, offset, tx_id);
1663
1664        // Create or update version index
1665        let mut versions = self.edge_versions.write();
1666        if let Some(index) = versions.get_mut(&id) {
1667            index.add_hot(hot_ref);
1668        } else {
1669            versions.insert(id, VersionIndex::with_initial(hot_ref));
1670        }
1671
1672        // Update adjacency
1673        self.forward_adj.add_edge(src, dst, id);
1674        if let Some(ref backward) = self.backward_adj {
1675            backward.add_edge(dst, src, id);
1676        }
1677
1678        id
1679    }
1680
1681    /// Creates a new edge with properties.
1682    pub fn create_edge_with_props(
1683        &self,
1684        src: NodeId,
1685        dst: NodeId,
1686        edge_type: &str,
1687        properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
1688    ) -> EdgeId {
1689        let id = self.create_edge(src, dst, edge_type);
1690
1691        for (key, value) in properties {
1692            self.edge_properties.set(id, key.into(), value.into());
1693        }
1694
1695        id
1696    }
1697
1698    /// Gets an edge by ID (latest visible version).
1699    #[must_use]
1700    pub fn get_edge(&self, id: EdgeId) -> Option<Edge> {
1701        self.get_edge_at_epoch(id, self.current_epoch())
1702    }
1703
1704    /// Gets an edge by ID at a specific epoch.
1705    #[must_use]
1706    #[cfg(not(feature = "tiered-storage"))]
1707    pub fn get_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> Option<Edge> {
1708        let edges = self.edges.read();
1709        let chain = edges.get(&id)?;
1710        let record = chain.visible_at(epoch)?;
1711
1712        if record.is_deleted() {
1713            return None;
1714        }
1715
1716        let edge_type = {
1717            let id_to_type = self.id_to_edge_type.read();
1718            id_to_type.get(record.type_id as usize)?.clone()
1719        };
1720
1721        let mut edge = Edge::new(id, record.src, record.dst, edge_type);
1722
1723        // Get properties
1724        edge.properties = self.edge_properties.get_all(id).into_iter().collect();
1725
1726        Some(edge)
1727    }
1728
1729    /// Gets an edge by ID at a specific epoch.
1730    /// (Tiered storage version)
1731    #[must_use]
1732    #[cfg(feature = "tiered-storage")]
1733    pub fn get_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> Option<Edge> {
1734        let versions = self.edge_versions.read();
1735        let index = versions.get(&id)?;
1736        let version_ref = index.visible_at(epoch)?;
1737
1738        let record = self.read_edge_record(&version_ref)?;
1739
1740        if record.is_deleted() {
1741            return None;
1742        }
1743
1744        let edge_type = {
1745            let id_to_type = self.id_to_edge_type.read();
1746            id_to_type.get(record.type_id as usize)?.clone()
1747        };
1748
1749        let mut edge = Edge::new(id, record.src, record.dst, edge_type);
1750
1751        // Get properties
1752        edge.properties = self.edge_properties.get_all(id).into_iter().collect();
1753
1754        Some(edge)
1755    }
1756
1757    /// Gets an edge visible to a specific transaction.
1758    #[must_use]
1759    #[cfg(not(feature = "tiered-storage"))]
1760    pub fn get_edge_versioned(&self, id: EdgeId, epoch: EpochId, tx_id: TxId) -> Option<Edge> {
1761        let edges = self.edges.read();
1762        let chain = edges.get(&id)?;
1763        let record = chain.visible_to(epoch, tx_id)?;
1764
1765        if record.is_deleted() {
1766            return None;
1767        }
1768
1769        let edge_type = {
1770            let id_to_type = self.id_to_edge_type.read();
1771            id_to_type.get(record.type_id as usize)?.clone()
1772        };
1773
1774        let mut edge = Edge::new(id, record.src, record.dst, edge_type);
1775
1776        // Get properties
1777        edge.properties = self.edge_properties.get_all(id).into_iter().collect();
1778
1779        Some(edge)
1780    }
1781
1782    /// Gets an edge visible to a specific transaction.
1783    /// (Tiered storage version)
1784    #[must_use]
1785    #[cfg(feature = "tiered-storage")]
1786    pub fn get_edge_versioned(&self, id: EdgeId, epoch: EpochId, tx_id: TxId) -> Option<Edge> {
1787        let versions = self.edge_versions.read();
1788        let index = versions.get(&id)?;
1789        let version_ref = index.visible_to(epoch, tx_id)?;
1790
1791        let record = self.read_edge_record(&version_ref)?;
1792
1793        if record.is_deleted() {
1794            return None;
1795        }
1796
1797        let edge_type = {
1798            let id_to_type = self.id_to_edge_type.read();
1799            id_to_type.get(record.type_id as usize)?.clone()
1800        };
1801
1802        let mut edge = Edge::new(id, record.src, record.dst, edge_type);
1803
1804        // Get properties
1805        edge.properties = self.edge_properties.get_all(id).into_iter().collect();
1806
1807        Some(edge)
1808    }
1809
1810    /// Reads an EdgeRecord from arena using a VersionRef.
1811    #[cfg(feature = "tiered-storage")]
1812    #[allow(unsafe_code)]
1813    fn read_edge_record(&self, version_ref: &VersionRef) -> Option<EdgeRecord> {
1814        match version_ref {
1815            VersionRef::Hot(hot_ref) => {
1816                let arena = self.arena_allocator.arena(hot_ref.epoch);
1817                // SAFETY: The offset was returned by alloc_value_with_offset for an EdgeRecord
1818                let record: &EdgeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
1819                Some(record.clone())
1820            }
1821            VersionRef::Cold(cold_ref) => {
1822                // Read from compressed epoch store
1823                self.epoch_store
1824                    .get_edge(cold_ref.epoch, cold_ref.block_offset, cold_ref.length)
1825            }
1826        }
1827    }
1828
1829    /// Deletes an edge (using latest epoch).
1830    pub fn delete_edge(&self, id: EdgeId) -> bool {
1831        self.delete_edge_at_epoch(id, self.current_epoch())
1832    }
1833
1834    /// Deletes an edge at a specific epoch.
1835    #[cfg(not(feature = "tiered-storage"))]
1836    pub fn delete_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> bool {
1837        let mut edges = self.edges.write();
1838        if let Some(chain) = edges.get_mut(&id) {
1839            // Get the visible record to check if deleted and get src/dst
1840            let (src, dst) = {
1841                match chain.visible_at(epoch) {
1842                    Some(record) => {
1843                        if record.is_deleted() {
1844                            return false;
1845                        }
1846                        (record.src, record.dst)
1847                    }
1848                    None => return false, // Not visible at this epoch (already deleted)
1849                }
1850            };
1851
1852            // Mark the version chain as deleted
1853            chain.mark_deleted(epoch);
1854
1855            drop(edges); // Release lock
1856
1857            // Mark as deleted in adjacency (soft delete)
1858            self.forward_adj.mark_deleted(src, id);
1859            if let Some(ref backward) = self.backward_adj {
1860                backward.mark_deleted(dst, id);
1861            }
1862
1863            // Remove properties
1864            self.edge_properties.remove_all(id);
1865
1866            true
1867        } else {
1868            false
1869        }
1870    }
1871
1872    /// Deletes an edge at a specific epoch.
1873    /// (Tiered storage version)
1874    #[cfg(feature = "tiered-storage")]
1875    pub fn delete_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> bool {
1876        let mut versions = self.edge_versions.write();
1877        if let Some(index) = versions.get_mut(&id) {
1878            // Get the visible record to check if deleted and get src/dst
1879            let (src, dst) = {
1880                match index.visible_at(epoch) {
1881                    Some(version_ref) => {
1882                        if let Some(record) = self.read_edge_record(&version_ref) {
1883                            if record.is_deleted() {
1884                                return false;
1885                            }
1886                            (record.src, record.dst)
1887                        } else {
1888                            return false;
1889                        }
1890                    }
1891                    None => return false,
1892                }
1893            };
1894
1895            // Mark as deleted in version index
1896            index.mark_deleted(epoch);
1897
1898            drop(versions); // Release lock
1899
1900            // Mark as deleted in adjacency (soft delete)
1901            self.forward_adj.mark_deleted(src, id);
1902            if let Some(ref backward) = self.backward_adj {
1903                backward.mark_deleted(dst, id);
1904            }
1905
1906            // Remove properties
1907            self.edge_properties.remove_all(id);
1908
1909            true
1910        } else {
1911            false
1912        }
1913    }
1914
1915    /// Returns the number of edges (non-deleted at current epoch).
1916    #[must_use]
1917    #[cfg(not(feature = "tiered-storage"))]
1918    pub fn edge_count(&self) -> usize {
1919        let epoch = self.current_epoch();
1920        self.edges
1921            .read()
1922            .values()
1923            .filter_map(|chain| chain.visible_at(epoch))
1924            .filter(|r| !r.is_deleted())
1925            .count()
1926    }
1927
1928    /// Returns the number of edges (non-deleted at current epoch).
1929    /// (Tiered storage version)
1930    #[must_use]
1931    #[cfg(feature = "tiered-storage")]
1932    pub fn edge_count(&self) -> usize {
1933        let epoch = self.current_epoch();
1934        let versions = self.edge_versions.read();
1935        versions
1936            .iter()
1937            .filter(|(_, index)| {
1938                index.visible_at(epoch).map_or(false, |vref| {
1939                    self.read_edge_record(&vref)
1940                        .map_or(false, |r| !r.is_deleted())
1941                })
1942            })
1943            .count()
1944    }
1945
1946    /// Discards all uncommitted versions created by a transaction.
1947    ///
1948    /// This is called during transaction rollback to clean up uncommitted changes.
1949    /// The method removes version chain entries created by the specified transaction.
1950    #[cfg(not(feature = "tiered-storage"))]
1951    pub fn discard_uncommitted_versions(&self, tx_id: TxId) {
1952        // Remove uncommitted node versions
1953        {
1954            let mut nodes = self.nodes.write();
1955            for chain in nodes.values_mut() {
1956                chain.remove_versions_by(tx_id);
1957            }
1958            // Remove completely empty chains (no versions left)
1959            nodes.retain(|_, chain| !chain.is_empty());
1960        }
1961
1962        // Remove uncommitted edge versions
1963        {
1964            let mut edges = self.edges.write();
1965            for chain in edges.values_mut() {
1966                chain.remove_versions_by(tx_id);
1967            }
1968            // Remove completely empty chains (no versions left)
1969            edges.retain(|_, chain| !chain.is_empty());
1970        }
1971    }
1972
1973    /// Discards all uncommitted versions created by a transaction.
1974    /// (Tiered storage version)
1975    #[cfg(feature = "tiered-storage")]
1976    pub fn discard_uncommitted_versions(&self, tx_id: TxId) {
1977        // Remove uncommitted node versions
1978        {
1979            let mut versions = self.node_versions.write();
1980            for index in versions.values_mut() {
1981                index.remove_versions_by(tx_id);
1982            }
1983            // Remove completely empty indexes (no versions left)
1984            versions.retain(|_, index| !index.is_empty());
1985        }
1986
1987        // Remove uncommitted edge versions
1988        {
1989            let mut versions = self.edge_versions.write();
1990            for index in versions.values_mut() {
1991                index.remove_versions_by(tx_id);
1992            }
1993            // Remove completely empty indexes (no versions left)
1994            versions.retain(|_, index| !index.is_empty());
1995        }
1996    }
1997
1998    /// Freezes an epoch from hot (arena) storage to cold (compressed) storage.
1999    ///
2000    /// This is called by the transaction manager when an epoch becomes eligible
2001    /// for freezing (no active transactions can see it). The freeze process:
2002    ///
2003    /// 1. Collects all hot version refs for the epoch
2004    /// 2. Reads the corresponding records from arena
2005    /// 3. Compresses them into a `CompressedEpochBlock`
2006    /// 4. Updates `VersionIndex` entries to point to cold storage
2007    /// 5. The arena can be deallocated after all epochs in it are frozen
2008    ///
2009    /// # Arguments
2010    ///
2011    /// * `epoch` - The epoch to freeze
2012    ///
2013    /// # Returns
2014    ///
2015    /// The number of records frozen (nodes + edges).
2016    #[cfg(feature = "tiered-storage")]
2017    #[allow(unsafe_code)]
2018    pub fn freeze_epoch(&self, epoch: EpochId) -> usize {
2019        // Collect node records to freeze
2020        let mut node_records: Vec<(u64, NodeRecord)> = Vec::new();
2021        let mut node_hot_refs: Vec<(NodeId, HotVersionRef)> = Vec::new();
2022
2023        {
2024            let versions = self.node_versions.read();
2025            for (node_id, index) in versions.iter() {
2026                for hot_ref in index.hot_refs_for_epoch(epoch) {
2027                    let arena = self.arena_allocator.arena(hot_ref.epoch);
2028                    // SAFETY: The offset was returned by alloc_value_with_offset for a NodeRecord
2029                    let record: &NodeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
2030                    node_records.push((node_id.as_u64(), record.clone()));
2031                    node_hot_refs.push((*node_id, *hot_ref));
2032                }
2033            }
2034        }
2035
2036        // Collect edge records to freeze
2037        let mut edge_records: Vec<(u64, EdgeRecord)> = Vec::new();
2038        let mut edge_hot_refs: Vec<(EdgeId, HotVersionRef)> = Vec::new();
2039
2040        {
2041            let versions = self.edge_versions.read();
2042            for (edge_id, index) in versions.iter() {
2043                for hot_ref in index.hot_refs_for_epoch(epoch) {
2044                    let arena = self.arena_allocator.arena(hot_ref.epoch);
2045                    // SAFETY: The offset was returned by alloc_value_with_offset for an EdgeRecord
2046                    let record: &EdgeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
2047                    edge_records.push((edge_id.as_u64(), record.clone()));
2048                    edge_hot_refs.push((*edge_id, *hot_ref));
2049                }
2050            }
2051        }
2052
2053        let total_frozen = node_records.len() + edge_records.len();
2054
2055        if total_frozen == 0 {
2056            return 0;
2057        }
2058
2059        // Freeze to compressed storage
2060        let (node_entries, edge_entries) =
2061            self.epoch_store
2062                .freeze_epoch(epoch, node_records, edge_records);
2063
2064        // Build lookup maps for index entries
2065        let node_entry_map: FxHashMap<u64, _> = node_entries
2066            .iter()
2067            .map(|e| (e.entity_id, (e.offset, e.length)))
2068            .collect();
2069        let edge_entry_map: FxHashMap<u64, _> = edge_entries
2070            .iter()
2071            .map(|e| (e.entity_id, (e.offset, e.length)))
2072            .collect();
2073
2074        // Update version indexes to use cold refs
2075        {
2076            let mut versions = self.node_versions.write();
2077            for (node_id, hot_ref) in &node_hot_refs {
2078                if let Some(index) = versions.get_mut(node_id) {
2079                    if let Some(&(offset, length)) = node_entry_map.get(&node_id.as_u64()) {
2080                        let cold_ref = ColdVersionRef {
2081                            epoch,
2082                            block_offset: offset,
2083                            length,
2084                            created_by: hot_ref.created_by,
2085                            deleted_epoch: hot_ref.deleted_epoch,
2086                        };
2087                        index.freeze_epoch(epoch, std::iter::once(cold_ref));
2088                    }
2089                }
2090            }
2091        }
2092
2093        {
2094            let mut versions = self.edge_versions.write();
2095            for (edge_id, hot_ref) in &edge_hot_refs {
2096                if let Some(index) = versions.get_mut(edge_id) {
2097                    if let Some(&(offset, length)) = edge_entry_map.get(&edge_id.as_u64()) {
2098                        let cold_ref = ColdVersionRef {
2099                            epoch,
2100                            block_offset: offset,
2101                            length,
2102                            created_by: hot_ref.created_by,
2103                            deleted_epoch: hot_ref.deleted_epoch,
2104                        };
2105                        index.freeze_epoch(epoch, std::iter::once(cold_ref));
2106                    }
2107                }
2108            }
2109        }
2110
2111        total_frozen
2112    }
2113
2114    /// Returns the epoch store for cold storage statistics.
2115    #[cfg(feature = "tiered-storage")]
2116    #[must_use]
2117    pub fn epoch_store(&self) -> &EpochStore {
2118        &self.epoch_store
2119    }
2120
2121    /// Returns the number of distinct labels in the store.
2122    #[must_use]
2123    pub fn label_count(&self) -> usize {
2124        self.id_to_label.read().len()
2125    }
2126
2127    /// Returns the number of distinct property keys in the store.
2128    ///
2129    /// This counts unique property keys across both nodes and edges.
2130    #[must_use]
2131    pub fn property_key_count(&self) -> usize {
2132        let node_keys = self.node_properties.column_count();
2133        let edge_keys = self.edge_properties.column_count();
2134        // Note: This may count some keys twice if the same key is used
2135        // for both nodes and edges. A more precise count would require
2136        // tracking unique keys across both storages.
2137        node_keys + edge_keys
2138    }
2139
2140    /// Returns the number of distinct edge types in the store.
2141    #[must_use]
2142    pub fn edge_type_count(&self) -> usize {
2143        self.id_to_edge_type.read().len()
2144    }
2145
2146    // === Traversal ===
2147
2148    /// Iterates over neighbors of a node in the specified direction.
2149    ///
2150    /// This is the fast path for graph traversal - goes straight to the
2151    /// adjacency index without loading full node data.
2152    pub fn neighbors(
2153        &self,
2154        node: NodeId,
2155        direction: Direction,
2156    ) -> impl Iterator<Item = NodeId> + '_ {
2157        let forward: Box<dyn Iterator<Item = NodeId>> = match direction {
2158            Direction::Outgoing | Direction::Both => {
2159                Box::new(self.forward_adj.neighbors(node).into_iter())
2160            }
2161            Direction::Incoming => Box::new(std::iter::empty()),
2162        };
2163
2164        let backward: Box<dyn Iterator<Item = NodeId>> = match direction {
2165            Direction::Incoming | Direction::Both => {
2166                if let Some(ref adj) = self.backward_adj {
2167                    Box::new(adj.neighbors(node).into_iter())
2168                } else {
2169                    Box::new(std::iter::empty())
2170                }
2171            }
2172            Direction::Outgoing => Box::new(std::iter::empty()),
2173        };
2174
2175        forward.chain(backward)
2176    }
2177
2178    /// Returns edges from a node with their targets.
2179    ///
2180    /// Returns an iterator of (target_node, edge_id) pairs.
2181    pub fn edges_from(
2182        &self,
2183        node: NodeId,
2184        direction: Direction,
2185    ) -> impl Iterator<Item = (NodeId, EdgeId)> + '_ {
2186        let forward: Box<dyn Iterator<Item = (NodeId, EdgeId)>> = match direction {
2187            Direction::Outgoing | Direction::Both => {
2188                Box::new(self.forward_adj.edges_from(node).into_iter())
2189            }
2190            Direction::Incoming => Box::new(std::iter::empty()),
2191        };
2192
2193        let backward: Box<dyn Iterator<Item = (NodeId, EdgeId)>> = match direction {
2194            Direction::Incoming | Direction::Both => {
2195                if let Some(ref adj) = self.backward_adj {
2196                    Box::new(adj.edges_from(node).into_iter())
2197                } else {
2198                    Box::new(std::iter::empty())
2199                }
2200            }
2201            Direction::Outgoing => Box::new(std::iter::empty()),
2202        };
2203
2204        forward.chain(backward)
2205    }
2206
2207    /// Returns edges to a node (where the node is the destination).
2208    ///
2209    /// Returns (source_node, edge_id) pairs for all edges pointing TO this node.
2210    /// Uses the backward adjacency index for O(degree) lookup.
2211    ///
2212    /// # Example
2213    ///
2214    /// ```ignore
2215    /// // For edges: A->B, C->B
2216    /// let incoming = store.edges_to(B);
2217    /// // Returns: [(A, edge1), (C, edge2)]
2218    /// ```
2219    pub fn edges_to(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
2220        if let Some(ref backward) = self.backward_adj {
2221            backward.edges_from(node)
2222        } else {
2223            // Fallback: scan all edges (slow but correct)
2224            self.all_edges()
2225                .filter_map(|edge| {
2226                    if edge.dst == node {
2227                        Some((edge.src, edge.id))
2228                    } else {
2229                        None
2230                    }
2231                })
2232                .collect()
2233        }
2234    }
2235
2236    /// Returns the out-degree of a node (number of outgoing edges).
2237    ///
2238    /// Uses the forward adjacency index for O(1) lookup.
2239    #[must_use]
2240    pub fn out_degree(&self, node: NodeId) -> usize {
2241        self.forward_adj.out_degree(node)
2242    }
2243
2244    /// Returns the in-degree of a node (number of incoming edges).
2245    ///
2246    /// Uses the backward adjacency index for O(1) lookup if available,
2247    /// otherwise falls back to scanning edges.
2248    #[must_use]
2249    pub fn in_degree(&self, node: NodeId) -> usize {
2250        if let Some(ref backward) = self.backward_adj {
2251            backward.in_degree(node)
2252        } else {
2253            // Fallback: count edges (slow)
2254            self.all_edges().filter(|edge| edge.dst == node).count()
2255        }
2256    }
2257
2258    /// Gets the type of an edge by ID.
2259    #[must_use]
2260    #[cfg(not(feature = "tiered-storage"))]
2261    pub fn edge_type(&self, id: EdgeId) -> Option<ArcStr> {
2262        let edges = self.edges.read();
2263        let chain = edges.get(&id)?;
2264        let epoch = self.current_epoch();
2265        let record = chain.visible_at(epoch)?;
2266        let id_to_type = self.id_to_edge_type.read();
2267        id_to_type.get(record.type_id as usize).cloned()
2268    }
2269
2270    /// Gets the type of an edge by ID.
2271    /// (Tiered storage version)
2272    #[must_use]
2273    #[cfg(feature = "tiered-storage")]
2274    pub fn edge_type(&self, id: EdgeId) -> Option<ArcStr> {
2275        let versions = self.edge_versions.read();
2276        let index = versions.get(&id)?;
2277        let epoch = self.current_epoch();
2278        let vref = index.visible_at(epoch)?;
2279        let record = self.read_edge_record(&vref)?;
2280        let id_to_type = self.id_to_edge_type.read();
2281        id_to_type.get(record.type_id as usize).cloned()
2282    }
2283
2284    /// Returns all nodes with a specific label.
2285    ///
2286    /// Uses the label index for O(1) lookup per label. Returns a snapshot -
2287    /// concurrent modifications won't affect the returned vector. Results are
2288    /// sorted by NodeId for deterministic iteration order.
2289    pub fn nodes_by_label(&self, label: &str) -> Vec<NodeId> {
2290        let label_to_id = self.label_to_id.read();
2291        if let Some(&label_id) = label_to_id.get(label) {
2292            let index = self.label_index.read();
2293            if let Some(set) = index.get(label_id as usize) {
2294                let mut ids: Vec<NodeId> = set.keys().copied().collect();
2295                ids.sort_unstable();
2296                return ids;
2297            }
2298        }
2299        Vec::new()
2300    }
2301
2302    // === Admin API: Iteration ===
2303
2304    /// Returns an iterator over all nodes in the database.
2305    ///
2306    /// This creates a snapshot of all visible nodes at the current epoch.
2307    /// Useful for dump/export operations.
2308    #[cfg(not(feature = "tiered-storage"))]
2309    pub fn all_nodes(&self) -> impl Iterator<Item = Node> + '_ {
2310        let epoch = self.current_epoch();
2311        let node_ids: Vec<NodeId> = self
2312            .nodes
2313            .read()
2314            .iter()
2315            .filter_map(|(id, chain)| {
2316                chain
2317                    .visible_at(epoch)
2318                    .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
2319            })
2320            .collect();
2321
2322        node_ids.into_iter().filter_map(move |id| self.get_node(id))
2323    }
2324
2325    /// Returns an iterator over all nodes in the database.
2326    /// (Tiered storage version)
2327    #[cfg(feature = "tiered-storage")]
2328    pub fn all_nodes(&self) -> impl Iterator<Item = Node> + '_ {
2329        let node_ids = self.node_ids();
2330        node_ids.into_iter().filter_map(move |id| self.get_node(id))
2331    }
2332
2333    /// Returns an iterator over all edges in the database.
2334    ///
2335    /// This creates a snapshot of all visible edges at the current epoch.
2336    /// Useful for dump/export operations.
2337    #[cfg(not(feature = "tiered-storage"))]
2338    pub fn all_edges(&self) -> impl Iterator<Item = Edge> + '_ {
2339        let epoch = self.current_epoch();
2340        let edge_ids: Vec<EdgeId> = self
2341            .edges
2342            .read()
2343            .iter()
2344            .filter_map(|(id, chain)| {
2345                chain
2346                    .visible_at(epoch)
2347                    .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
2348            })
2349            .collect();
2350
2351        edge_ids.into_iter().filter_map(move |id| self.get_edge(id))
2352    }
2353
2354    /// Returns an iterator over all edges in the database.
2355    /// (Tiered storage version)
2356    #[cfg(feature = "tiered-storage")]
2357    pub fn all_edges(&self) -> impl Iterator<Item = Edge> + '_ {
2358        let epoch = self.current_epoch();
2359        let versions = self.edge_versions.read();
2360        let edge_ids: Vec<EdgeId> = versions
2361            .iter()
2362            .filter_map(|(id, index)| {
2363                index.visible_at(epoch).and_then(|vref| {
2364                    self.read_edge_record(&vref)
2365                        .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
2366                })
2367            })
2368            .collect();
2369
2370        edge_ids.into_iter().filter_map(move |id| self.get_edge(id))
2371    }
2372
2373    /// Returns all label names in the database.
2374    pub fn all_labels(&self) -> Vec<String> {
2375        self.id_to_label
2376            .read()
2377            .iter()
2378            .map(|s| s.to_string())
2379            .collect()
2380    }
2381
2382    /// Returns all edge type names in the database.
2383    pub fn all_edge_types(&self) -> Vec<String> {
2384        self.id_to_edge_type
2385            .read()
2386            .iter()
2387            .map(|s| s.to_string())
2388            .collect()
2389    }
2390
2391    /// Returns all property keys used in the database.
2392    pub fn all_property_keys(&self) -> Vec<String> {
2393        let mut keys = std::collections::HashSet::new();
2394        for key in self.node_properties.keys() {
2395            keys.insert(key.to_string());
2396        }
2397        for key in self.edge_properties.keys() {
2398            keys.insert(key.to_string());
2399        }
2400        keys.into_iter().collect()
2401    }
2402
2403    /// Returns an iterator over nodes with a specific label.
2404    pub fn nodes_with_label<'a>(&'a self, label: &str) -> impl Iterator<Item = Node> + 'a {
2405        let node_ids = self.nodes_by_label(label);
2406        node_ids.into_iter().filter_map(move |id| self.get_node(id))
2407    }
2408
2409    /// Returns an iterator over edges with a specific type.
2410    #[cfg(not(feature = "tiered-storage"))]
2411    pub fn edges_with_type<'a>(&'a self, edge_type: &str) -> impl Iterator<Item = Edge> + 'a {
2412        let epoch = self.current_epoch();
2413        let type_to_id = self.edge_type_to_id.read();
2414
2415        if let Some(&type_id) = type_to_id.get(edge_type) {
2416            let edge_ids: Vec<EdgeId> = self
2417                .edges
2418                .read()
2419                .iter()
2420                .filter_map(|(id, chain)| {
2421                    chain.visible_at(epoch).and_then(|r| {
2422                        if !r.is_deleted() && r.type_id == type_id {
2423                            Some(*id)
2424                        } else {
2425                            None
2426                        }
2427                    })
2428                })
2429                .collect();
2430
2431            // Return a boxed iterator for the found edges
2432            Box::new(edge_ids.into_iter().filter_map(move |id| self.get_edge(id)))
2433                as Box<dyn Iterator<Item = Edge> + 'a>
2434        } else {
2435            // Return empty iterator
2436            Box::new(std::iter::empty()) as Box<dyn Iterator<Item = Edge> + 'a>
2437        }
2438    }
2439
2440    /// Returns an iterator over edges with a specific type.
2441    /// (Tiered storage version)
2442    #[cfg(feature = "tiered-storage")]
2443    pub fn edges_with_type<'a>(&'a self, edge_type: &str) -> impl Iterator<Item = Edge> + 'a {
2444        let epoch = self.current_epoch();
2445        let type_to_id = self.edge_type_to_id.read();
2446
2447        if let Some(&type_id) = type_to_id.get(edge_type) {
2448            let versions = self.edge_versions.read();
2449            let edge_ids: Vec<EdgeId> = versions
2450                .iter()
2451                .filter_map(|(id, index)| {
2452                    index.visible_at(epoch).and_then(|vref| {
2453                        self.read_edge_record(&vref).and_then(|r| {
2454                            if !r.is_deleted() && r.type_id == type_id {
2455                                Some(*id)
2456                            } else {
2457                                None
2458                            }
2459                        })
2460                    })
2461                })
2462                .collect();
2463
2464            Box::new(edge_ids.into_iter().filter_map(move |id| self.get_edge(id)))
2465                as Box<dyn Iterator<Item = Edge> + 'a>
2466        } else {
2467            Box::new(std::iter::empty()) as Box<dyn Iterator<Item = Edge> + 'a>
2468        }
2469    }
2470
2471    // === Zone Map Support ===
2472
2473    /// Checks if a node property predicate might match any nodes.
2474    ///
2475    /// Uses zone maps for early filtering. Returns `true` if there might be
2476    /// matching nodes, `false` if there definitely aren't.
2477    #[must_use]
2478    pub fn node_property_might_match(
2479        &self,
2480        property: &PropertyKey,
2481        op: CompareOp,
2482        value: &Value,
2483    ) -> bool {
2484        self.node_properties.might_match(property, op, value)
2485    }
2486
2487    /// Checks if an edge property predicate might match any edges.
2488    #[must_use]
2489    pub fn edge_property_might_match(
2490        &self,
2491        property: &PropertyKey,
2492        op: CompareOp,
2493        value: &Value,
2494    ) -> bool {
2495        self.edge_properties.might_match(property, op, value)
2496    }
2497
2498    /// Gets the zone map for a node property.
2499    #[must_use]
2500    pub fn node_property_zone_map(&self, property: &PropertyKey) -> Option<ZoneMapEntry> {
2501        self.node_properties.zone_map(property)
2502    }
2503
2504    /// Gets the zone map for an edge property.
2505    #[must_use]
2506    pub fn edge_property_zone_map(&self, property: &PropertyKey) -> Option<ZoneMapEntry> {
2507        self.edge_properties.zone_map(property)
2508    }
2509
2510    /// Rebuilds zone maps for all properties.
2511    pub fn rebuild_zone_maps(&self) {
2512        self.node_properties.rebuild_zone_maps();
2513        self.edge_properties.rebuild_zone_maps();
2514    }
2515
2516    // === Statistics ===
2517
2518    /// Returns the current statistics.
2519    #[must_use]
2520    pub fn statistics(&self) -> Statistics {
2521        self.statistics.read().clone()
2522    }
2523
2524    /// Recomputes statistics from current data.
2525    ///
2526    /// Scans all labels and edge types to build cardinality estimates for the
2527    /// query optimizer. Call this periodically or after bulk data loads.
2528    #[cfg(not(feature = "tiered-storage"))]
2529    pub fn compute_statistics(&self) {
2530        let mut stats = Statistics::new();
2531
2532        // Compute total counts
2533        stats.total_nodes = self.node_count() as u64;
2534        stats.total_edges = self.edge_count() as u64;
2535
2536        // Compute per-label statistics
2537        let id_to_label = self.id_to_label.read();
2538        let label_index = self.label_index.read();
2539
2540        for (label_id, label_name) in id_to_label.iter().enumerate() {
2541            let node_count = label_index
2542                .get(label_id)
2543                .map(|set| set.len() as u64)
2544                .unwrap_or(0);
2545
2546            if node_count > 0 {
2547                // Estimate average degree
2548                let avg_out_degree = if stats.total_nodes > 0 {
2549                    stats.total_edges as f64 / stats.total_nodes as f64
2550                } else {
2551                    0.0
2552                };
2553
2554                let label_stats =
2555                    LabelStatistics::new(node_count).with_degrees(avg_out_degree, avg_out_degree);
2556
2557                stats.update_label(label_name.as_ref(), label_stats);
2558            }
2559        }
2560
2561        // Compute per-edge-type statistics
2562        let id_to_edge_type = self.id_to_edge_type.read();
2563        let edges = self.edges.read();
2564        let epoch = self.current_epoch();
2565
2566        let mut edge_type_counts: FxHashMap<u32, u64> = FxHashMap::default();
2567        for chain in edges.values() {
2568            if let Some(record) = chain.visible_at(epoch) {
2569                if !record.is_deleted() {
2570                    *edge_type_counts.entry(record.type_id).or_default() += 1;
2571                }
2572            }
2573        }
2574
2575        for (type_id, count) in edge_type_counts {
2576            if let Some(type_name) = id_to_edge_type.get(type_id as usize) {
2577                let avg_degree = if stats.total_nodes > 0 {
2578                    count as f64 / stats.total_nodes as f64
2579                } else {
2580                    0.0
2581                };
2582
2583                let edge_stats = EdgeTypeStatistics::new(count, avg_degree, avg_degree);
2584                stats.update_edge_type(type_name.as_ref(), edge_stats);
2585            }
2586        }
2587
2588        *self.statistics.write() = stats;
2589    }
2590
2591    /// Recomputes statistics from current data.
2592    /// (Tiered storage version)
2593    #[cfg(feature = "tiered-storage")]
2594    pub fn compute_statistics(&self) {
2595        let mut stats = Statistics::new();
2596
2597        // Compute total counts
2598        stats.total_nodes = self.node_count() as u64;
2599        stats.total_edges = self.edge_count() as u64;
2600
2601        // Compute per-label statistics
2602        let id_to_label = self.id_to_label.read();
2603        let label_index = self.label_index.read();
2604
2605        for (label_id, label_name) in id_to_label.iter().enumerate() {
2606            let node_count = label_index
2607                .get(label_id)
2608                .map(|set| set.len() as u64)
2609                .unwrap_or(0);
2610
2611            if node_count > 0 {
2612                let avg_out_degree = if stats.total_nodes > 0 {
2613                    stats.total_edges as f64 / stats.total_nodes as f64
2614                } else {
2615                    0.0
2616                };
2617
2618                let label_stats =
2619                    LabelStatistics::new(node_count).with_degrees(avg_out_degree, avg_out_degree);
2620
2621                stats.update_label(label_name.as_ref(), label_stats);
2622            }
2623        }
2624
2625        // Compute per-edge-type statistics
2626        let id_to_edge_type = self.id_to_edge_type.read();
2627        let versions = self.edge_versions.read();
2628        let epoch = self.current_epoch();
2629
2630        let mut edge_type_counts: FxHashMap<u32, u64> = FxHashMap::default();
2631        for index in versions.values() {
2632            if let Some(vref) = index.visible_at(epoch) {
2633                if let Some(record) = self.read_edge_record(&vref) {
2634                    if !record.is_deleted() {
2635                        *edge_type_counts.entry(record.type_id).or_default() += 1;
2636                    }
2637                }
2638            }
2639        }
2640
2641        for (type_id, count) in edge_type_counts {
2642            if let Some(type_name) = id_to_edge_type.get(type_id as usize) {
2643                let avg_degree = if stats.total_nodes > 0 {
2644                    count as f64 / stats.total_nodes as f64
2645                } else {
2646                    0.0
2647                };
2648
2649                let edge_stats = EdgeTypeStatistics::new(count, avg_degree, avg_degree);
2650                stats.update_edge_type(type_name.as_ref(), edge_stats);
2651            }
2652        }
2653
2654        *self.statistics.write() = stats;
2655    }
2656
2657    /// Estimates cardinality for a label scan.
2658    #[must_use]
2659    pub fn estimate_label_cardinality(&self, label: &str) -> f64 {
2660        self.statistics.read().estimate_label_cardinality(label)
2661    }
2662
2663    /// Estimates average degree for an edge type.
2664    #[must_use]
2665    pub fn estimate_avg_degree(&self, edge_type: &str, outgoing: bool) -> f64 {
2666        self.statistics
2667            .read()
2668            .estimate_avg_degree(edge_type, outgoing)
2669    }
2670
2671    // === Internal Helpers ===
2672
2673    fn get_or_create_label_id(&self, label: &str) -> u32 {
2674        {
2675            let label_to_id = self.label_to_id.read();
2676            if let Some(&id) = label_to_id.get(label) {
2677                return id;
2678            }
2679        }
2680
2681        let mut label_to_id = self.label_to_id.write();
2682        let mut id_to_label = self.id_to_label.write();
2683
2684        // Double-check after acquiring write lock
2685        if let Some(&id) = label_to_id.get(label) {
2686            return id;
2687        }
2688
2689        let id = id_to_label.len() as u32;
2690
2691        let label: ArcStr = label.into();
2692        label_to_id.insert(label.clone(), id);
2693        id_to_label.push(label);
2694
2695        id
2696    }
2697
2698    fn get_or_create_edge_type_id(&self, edge_type: &str) -> u32 {
2699        {
2700            let type_to_id = self.edge_type_to_id.read();
2701            if let Some(&id) = type_to_id.get(edge_type) {
2702                return id;
2703            }
2704        }
2705
2706        let mut type_to_id = self.edge_type_to_id.write();
2707        let mut id_to_type = self.id_to_edge_type.write();
2708
2709        // Double-check
2710        if let Some(&id) = type_to_id.get(edge_type) {
2711            return id;
2712        }
2713
2714        let id = id_to_type.len() as u32;
2715        let edge_type: ArcStr = edge_type.into();
2716        type_to_id.insert(edge_type.clone(), id);
2717        id_to_type.push(edge_type);
2718
2719        id
2720    }
2721
2722    // === Recovery Support ===
2723
2724    /// Creates a node with a specific ID during recovery.
2725    ///
2726    /// This is used for WAL recovery to restore nodes with their original IDs.
2727    /// The caller must ensure IDs don't conflict with existing nodes.
2728    #[cfg(not(feature = "tiered-storage"))]
2729    pub fn create_node_with_id(&self, id: NodeId, labels: &[&str]) {
2730        let epoch = self.current_epoch();
2731        let mut record = NodeRecord::new(id, epoch);
2732        record.set_label_count(labels.len() as u16);
2733
2734        // Store labels in node_labels map and label_index
2735        let mut node_label_set = FxHashSet::default();
2736        for label in labels {
2737            let label_id = self.get_or_create_label_id(*label);
2738            node_label_set.insert(label_id);
2739
2740            // Update label index
2741            let mut index = self.label_index.write();
2742            while index.len() <= label_id as usize {
2743                index.push(FxHashMap::default());
2744            }
2745            index[label_id as usize].insert(id, ());
2746        }
2747
2748        // Store node's labels
2749        self.node_labels.write().insert(id, node_label_set);
2750
2751        // Create version chain with initial version (using SYSTEM tx for recovery)
2752        let chain = VersionChain::with_initial(record, epoch, TxId::SYSTEM);
2753        self.nodes.write().insert(id, chain);
2754
2755        // Update next_node_id if necessary to avoid future collisions
2756        let id_val = id.as_u64();
2757        let _ = self
2758            .next_node_id
2759            .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
2760                if id_val >= current {
2761                    Some(id_val + 1)
2762                } else {
2763                    None
2764                }
2765            });
2766    }
2767
2768    /// Creates a node with a specific ID during recovery.
2769    /// (Tiered storage version)
2770    #[cfg(feature = "tiered-storage")]
2771    pub fn create_node_with_id(&self, id: NodeId, labels: &[&str]) {
2772        let epoch = self.current_epoch();
2773        let mut record = NodeRecord::new(id, epoch);
2774        record.set_label_count(labels.len() as u16);
2775
2776        // Store labels in node_labels map and label_index
2777        let mut node_label_set = FxHashSet::default();
2778        for label in labels {
2779            let label_id = self.get_or_create_label_id(*label);
2780            node_label_set.insert(label_id);
2781
2782            // Update label index
2783            let mut index = self.label_index.write();
2784            while index.len() <= label_id as usize {
2785                index.push(FxHashMap::default());
2786            }
2787            index[label_id as usize].insert(id, ());
2788        }
2789
2790        // Store node's labels
2791        self.node_labels.write().insert(id, node_label_set);
2792
2793        // Allocate record in arena and get offset (create epoch if needed)
2794        let arena = self.arena_allocator.arena_or_create(epoch);
2795        let (offset, _stored) = arena.alloc_value_with_offset(record);
2796
2797        // Create HotVersionRef (using SYSTEM tx for recovery)
2798        let hot_ref = HotVersionRef::new(epoch, offset, TxId::SYSTEM);
2799        let mut versions = self.node_versions.write();
2800        versions.insert(id, VersionIndex::with_initial(hot_ref));
2801
2802        // Update next_node_id if necessary to avoid future collisions
2803        let id_val = id.as_u64();
2804        let _ = self
2805            .next_node_id
2806            .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
2807                if id_val >= current {
2808                    Some(id_val + 1)
2809                } else {
2810                    None
2811                }
2812            });
2813    }
2814
2815    /// Creates an edge with a specific ID during recovery.
2816    ///
2817    /// This is used for WAL recovery to restore edges with their original IDs.
2818    #[cfg(not(feature = "tiered-storage"))]
2819    pub fn create_edge_with_id(&self, id: EdgeId, src: NodeId, dst: NodeId, edge_type: &str) {
2820        let epoch = self.current_epoch();
2821        let type_id = self.get_or_create_edge_type_id(edge_type);
2822
2823        let record = EdgeRecord::new(id, src, dst, type_id, epoch);
2824        let chain = VersionChain::with_initial(record, epoch, TxId::SYSTEM);
2825        self.edges.write().insert(id, chain);
2826
2827        // Update adjacency
2828        self.forward_adj.add_edge(src, dst, id);
2829        if let Some(ref backward) = self.backward_adj {
2830            backward.add_edge(dst, src, id);
2831        }
2832
2833        // Update next_edge_id if necessary
2834        let id_val = id.as_u64();
2835        let _ = self
2836            .next_edge_id
2837            .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
2838                if id_val >= current {
2839                    Some(id_val + 1)
2840                } else {
2841                    None
2842                }
2843            });
2844    }
2845
2846    /// Creates an edge with a specific ID during recovery.
2847    /// (Tiered storage version)
2848    #[cfg(feature = "tiered-storage")]
2849    pub fn create_edge_with_id(&self, id: EdgeId, src: NodeId, dst: NodeId, edge_type: &str) {
2850        let epoch = self.current_epoch();
2851        let type_id = self.get_or_create_edge_type_id(edge_type);
2852
2853        let record = EdgeRecord::new(id, src, dst, type_id, epoch);
2854
2855        // Allocate record in arena and get offset (create epoch if needed)
2856        let arena = self.arena_allocator.arena_or_create(epoch);
2857        let (offset, _stored) = arena.alloc_value_with_offset(record);
2858
2859        // Create HotVersionRef (using SYSTEM tx for recovery)
2860        let hot_ref = HotVersionRef::new(epoch, offset, TxId::SYSTEM);
2861        let mut versions = self.edge_versions.write();
2862        versions.insert(id, VersionIndex::with_initial(hot_ref));
2863
2864        // Update adjacency
2865        self.forward_adj.add_edge(src, dst, id);
2866        if let Some(ref backward) = self.backward_adj {
2867            backward.add_edge(dst, src, id);
2868        }
2869
2870        // Update next_edge_id if necessary
2871        let id_val = id.as_u64();
2872        let _ = self
2873            .next_edge_id
2874            .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
2875                if id_val >= current {
2876                    Some(id_val + 1)
2877                } else {
2878                    None
2879                }
2880            });
2881    }
2882
2883    /// Sets the current epoch during recovery.
2884    pub fn set_epoch(&self, epoch: EpochId) {
2885        self.current_epoch.store(epoch.as_u64(), Ordering::SeqCst);
2886    }
2887}
2888
2889impl Default for LpgStore {
2890    fn default() -> Self {
2891        Self::new()
2892    }
2893}
2894
2895#[cfg(test)]
2896mod tests {
2897    use super::*;
2898
2899    #[test]
2900    fn test_create_node() {
2901        let store = LpgStore::new();
2902
2903        let id = store.create_node(&["Person"]);
2904        assert!(id.is_valid());
2905
2906        let node = store.get_node(id).unwrap();
2907        assert!(node.has_label("Person"));
2908        assert!(!node.has_label("Animal"));
2909    }
2910
2911    #[test]
2912    fn test_create_node_with_props() {
2913        let store = LpgStore::new();
2914
2915        let id = store.create_node_with_props(
2916            &["Person"],
2917            [("name", Value::from("Alice")), ("age", Value::from(30i64))],
2918        );
2919
2920        let node = store.get_node(id).unwrap();
2921        assert_eq!(
2922            node.get_property("name").and_then(|v| v.as_str()),
2923            Some("Alice")
2924        );
2925        assert_eq!(
2926            node.get_property("age").and_then(|v| v.as_int64()),
2927            Some(30)
2928        );
2929    }
2930
2931    #[test]
2932    fn test_delete_node() {
2933        let store = LpgStore::new();
2934
2935        let id = store.create_node(&["Person"]);
2936        assert_eq!(store.node_count(), 1);
2937
2938        assert!(store.delete_node(id));
2939        assert_eq!(store.node_count(), 0);
2940        assert!(store.get_node(id).is_none());
2941
2942        // Double delete should return false
2943        assert!(!store.delete_node(id));
2944    }
2945
2946    #[test]
2947    fn test_create_edge() {
2948        let store = LpgStore::new();
2949
2950        let alice = store.create_node(&["Person"]);
2951        let bob = store.create_node(&["Person"]);
2952
2953        let edge_id = store.create_edge(alice, bob, "KNOWS");
2954        assert!(edge_id.is_valid());
2955
2956        let edge = store.get_edge(edge_id).unwrap();
2957        assert_eq!(edge.src, alice);
2958        assert_eq!(edge.dst, bob);
2959        assert_eq!(edge.edge_type.as_str(), "KNOWS");
2960    }
2961
2962    #[test]
2963    fn test_neighbors() {
2964        let store = LpgStore::new();
2965
2966        let a = store.create_node(&["Person"]);
2967        let b = store.create_node(&["Person"]);
2968        let c = store.create_node(&["Person"]);
2969
2970        store.create_edge(a, b, "KNOWS");
2971        store.create_edge(a, c, "KNOWS");
2972
2973        let outgoing: Vec<_> = store.neighbors(a, Direction::Outgoing).collect();
2974        assert_eq!(outgoing.len(), 2);
2975        assert!(outgoing.contains(&b));
2976        assert!(outgoing.contains(&c));
2977
2978        let incoming: Vec<_> = store.neighbors(b, Direction::Incoming).collect();
2979        assert_eq!(incoming.len(), 1);
2980        assert!(incoming.contains(&a));
2981    }
2982
2983    #[test]
2984    fn test_nodes_by_label() {
2985        let store = LpgStore::new();
2986
2987        let p1 = store.create_node(&["Person"]);
2988        let p2 = store.create_node(&["Person"]);
2989        let _a = store.create_node(&["Animal"]);
2990
2991        let persons = store.nodes_by_label("Person");
2992        assert_eq!(persons.len(), 2);
2993        assert!(persons.contains(&p1));
2994        assert!(persons.contains(&p2));
2995
2996        let animals = store.nodes_by_label("Animal");
2997        assert_eq!(animals.len(), 1);
2998    }
2999
3000    #[test]
3001    fn test_delete_edge() {
3002        let store = LpgStore::new();
3003
3004        let a = store.create_node(&["Person"]);
3005        let b = store.create_node(&["Person"]);
3006        let edge_id = store.create_edge(a, b, "KNOWS");
3007
3008        assert_eq!(store.edge_count(), 1);
3009
3010        assert!(store.delete_edge(edge_id));
3011        assert_eq!(store.edge_count(), 0);
3012        assert!(store.get_edge(edge_id).is_none());
3013    }
3014
3015    // === New tests for improved coverage ===
3016
3017    #[test]
3018    fn test_lpg_store_config() {
3019        // Test with_config
3020        let config = LpgStoreConfig {
3021            backward_edges: false,
3022            initial_node_capacity: 100,
3023            initial_edge_capacity: 200,
3024        };
3025        let store = LpgStore::with_config(config);
3026
3027        // Store should work but without backward adjacency
3028        let a = store.create_node(&["Person"]);
3029        let b = store.create_node(&["Person"]);
3030        store.create_edge(a, b, "KNOWS");
3031
3032        // Outgoing should work
3033        let outgoing: Vec<_> = store.neighbors(a, Direction::Outgoing).collect();
3034        assert_eq!(outgoing.len(), 1);
3035
3036        // Incoming should be empty (no backward adjacency)
3037        let incoming: Vec<_> = store.neighbors(b, Direction::Incoming).collect();
3038        assert_eq!(incoming.len(), 0);
3039    }
3040
3041    #[test]
3042    fn test_epoch_management() {
3043        let store = LpgStore::new();
3044
3045        let epoch0 = store.current_epoch();
3046        assert_eq!(epoch0.as_u64(), 0);
3047
3048        let epoch1 = store.new_epoch();
3049        assert_eq!(epoch1.as_u64(), 1);
3050
3051        let current = store.current_epoch();
3052        assert_eq!(current.as_u64(), 1);
3053    }
3054
3055    #[test]
3056    fn test_node_properties() {
3057        let store = LpgStore::new();
3058        let id = store.create_node(&["Person"]);
3059
3060        // Set and get property
3061        store.set_node_property(id, "name", Value::from("Alice"));
3062        let name = store.get_node_property(id, &"name".into());
3063        assert!(matches!(name, Some(Value::String(s)) if s.as_str() == "Alice"));
3064
3065        // Update property
3066        store.set_node_property(id, "name", Value::from("Bob"));
3067        let name = store.get_node_property(id, &"name".into());
3068        assert!(matches!(name, Some(Value::String(s)) if s.as_str() == "Bob"));
3069
3070        // Remove property
3071        let old = store.remove_node_property(id, "name");
3072        assert!(matches!(old, Some(Value::String(s)) if s.as_str() == "Bob"));
3073
3074        // Property should be gone
3075        let name = store.get_node_property(id, &"name".into());
3076        assert!(name.is_none());
3077
3078        // Remove non-existent property
3079        let none = store.remove_node_property(id, "nonexistent");
3080        assert!(none.is_none());
3081    }
3082
3083    #[test]
3084    fn test_edge_properties() {
3085        let store = LpgStore::new();
3086        let a = store.create_node(&["Person"]);
3087        let b = store.create_node(&["Person"]);
3088        let edge_id = store.create_edge(a, b, "KNOWS");
3089
3090        // Set and get property
3091        store.set_edge_property(edge_id, "since", Value::from(2020i64));
3092        let since = store.get_edge_property(edge_id, &"since".into());
3093        assert_eq!(since.and_then(|v| v.as_int64()), Some(2020));
3094
3095        // Remove property
3096        let old = store.remove_edge_property(edge_id, "since");
3097        assert_eq!(old.and_then(|v| v.as_int64()), Some(2020));
3098
3099        let since = store.get_edge_property(edge_id, &"since".into());
3100        assert!(since.is_none());
3101    }
3102
3103    #[test]
3104    fn test_add_remove_label() {
3105        let store = LpgStore::new();
3106        let id = store.create_node(&["Person"]);
3107
3108        // Add new label
3109        assert!(store.add_label(id, "Employee"));
3110
3111        let node = store.get_node(id).unwrap();
3112        assert!(node.has_label("Person"));
3113        assert!(node.has_label("Employee"));
3114
3115        // Adding same label again should fail
3116        assert!(!store.add_label(id, "Employee"));
3117
3118        // Remove label
3119        assert!(store.remove_label(id, "Employee"));
3120
3121        let node = store.get_node(id).unwrap();
3122        assert!(node.has_label("Person"));
3123        assert!(!node.has_label("Employee"));
3124
3125        // Removing non-existent label should fail
3126        assert!(!store.remove_label(id, "Employee"));
3127        assert!(!store.remove_label(id, "NonExistent"));
3128    }
3129
3130    #[test]
3131    fn test_add_label_to_nonexistent_node() {
3132        let store = LpgStore::new();
3133        let fake_id = NodeId::new(999);
3134        assert!(!store.add_label(fake_id, "Label"));
3135    }
3136
3137    #[test]
3138    fn test_remove_label_from_nonexistent_node() {
3139        let store = LpgStore::new();
3140        let fake_id = NodeId::new(999);
3141        assert!(!store.remove_label(fake_id, "Label"));
3142    }
3143
3144    #[test]
3145    fn test_node_ids() {
3146        let store = LpgStore::new();
3147
3148        let n1 = store.create_node(&["Person"]);
3149        let n2 = store.create_node(&["Person"]);
3150        let n3 = store.create_node(&["Person"]);
3151
3152        let ids = store.node_ids();
3153        assert_eq!(ids.len(), 3);
3154        assert!(ids.contains(&n1));
3155        assert!(ids.contains(&n2));
3156        assert!(ids.contains(&n3));
3157
3158        // Delete one
3159        store.delete_node(n2);
3160        let ids = store.node_ids();
3161        assert_eq!(ids.len(), 2);
3162        assert!(!ids.contains(&n2));
3163    }
3164
3165    #[test]
3166    fn test_delete_node_nonexistent() {
3167        let store = LpgStore::new();
3168        let fake_id = NodeId::new(999);
3169        assert!(!store.delete_node(fake_id));
3170    }
3171
3172    #[test]
3173    fn test_delete_edge_nonexistent() {
3174        let store = LpgStore::new();
3175        let fake_id = EdgeId::new(999);
3176        assert!(!store.delete_edge(fake_id));
3177    }
3178
3179    #[test]
3180    fn test_delete_edge_double() {
3181        let store = LpgStore::new();
3182        let a = store.create_node(&["Person"]);
3183        let b = store.create_node(&["Person"]);
3184        let edge_id = store.create_edge(a, b, "KNOWS");
3185
3186        assert!(store.delete_edge(edge_id));
3187        assert!(!store.delete_edge(edge_id)); // Double delete
3188    }
3189
3190    #[test]
3191    fn test_create_edge_with_props() {
3192        let store = LpgStore::new();
3193        let a = store.create_node(&["Person"]);
3194        let b = store.create_node(&["Person"]);
3195
3196        let edge_id = store.create_edge_with_props(
3197            a,
3198            b,
3199            "KNOWS",
3200            [
3201                ("since", Value::from(2020i64)),
3202                ("weight", Value::from(1.0)),
3203            ],
3204        );
3205
3206        let edge = store.get_edge(edge_id).unwrap();
3207        assert_eq!(
3208            edge.get_property("since").and_then(|v| v.as_int64()),
3209            Some(2020)
3210        );
3211        assert_eq!(
3212            edge.get_property("weight").and_then(|v| v.as_float64()),
3213            Some(1.0)
3214        );
3215    }
3216
3217    #[test]
3218    fn test_delete_node_edges() {
3219        let store = LpgStore::new();
3220
3221        let a = store.create_node(&["Person"]);
3222        let b = store.create_node(&["Person"]);
3223        let c = store.create_node(&["Person"]);
3224
3225        store.create_edge(a, b, "KNOWS"); // a -> b
3226        store.create_edge(c, a, "KNOWS"); // c -> a
3227
3228        assert_eq!(store.edge_count(), 2);
3229
3230        // Delete all edges connected to a
3231        store.delete_node_edges(a);
3232
3233        assert_eq!(store.edge_count(), 0);
3234    }
3235
3236    #[test]
3237    fn test_neighbors_both_directions() {
3238        let store = LpgStore::new();
3239
3240        let a = store.create_node(&["Person"]);
3241        let b = store.create_node(&["Person"]);
3242        let c = store.create_node(&["Person"]);
3243
3244        store.create_edge(a, b, "KNOWS"); // a -> b
3245        store.create_edge(c, a, "KNOWS"); // c -> a
3246
3247        // Direction::Both for node a
3248        let neighbors: Vec<_> = store.neighbors(a, Direction::Both).collect();
3249        assert_eq!(neighbors.len(), 2);
3250        assert!(neighbors.contains(&b)); // outgoing
3251        assert!(neighbors.contains(&c)); // incoming
3252    }
3253
3254    #[test]
3255    fn test_edges_from() {
3256        let store = LpgStore::new();
3257
3258        let a = store.create_node(&["Person"]);
3259        let b = store.create_node(&["Person"]);
3260        let c = store.create_node(&["Person"]);
3261
3262        let e1 = store.create_edge(a, b, "KNOWS");
3263        let e2 = store.create_edge(a, c, "KNOWS");
3264
3265        let edges: Vec<_> = store.edges_from(a, Direction::Outgoing).collect();
3266        assert_eq!(edges.len(), 2);
3267        assert!(edges.iter().any(|(_, e)| *e == e1));
3268        assert!(edges.iter().any(|(_, e)| *e == e2));
3269
3270        // Incoming edges to b
3271        let incoming: Vec<_> = store.edges_from(b, Direction::Incoming).collect();
3272        assert_eq!(incoming.len(), 1);
3273        assert_eq!(incoming[0].1, e1);
3274    }
3275
3276    #[test]
3277    fn test_edges_to() {
3278        let store = LpgStore::new();
3279
3280        let a = store.create_node(&["Person"]);
3281        let b = store.create_node(&["Person"]);
3282        let c = store.create_node(&["Person"]);
3283
3284        let e1 = store.create_edge(a, b, "KNOWS");
3285        let e2 = store.create_edge(c, b, "KNOWS");
3286
3287        // Edges pointing TO b
3288        let to_b = store.edges_to(b);
3289        assert_eq!(to_b.len(), 2);
3290        assert!(to_b.iter().any(|(src, e)| *src == a && *e == e1));
3291        assert!(to_b.iter().any(|(src, e)| *src == c && *e == e2));
3292    }
3293
3294    #[test]
3295    fn test_out_degree_in_degree() {
3296        let store = LpgStore::new();
3297
3298        let a = store.create_node(&["Person"]);
3299        let b = store.create_node(&["Person"]);
3300        let c = store.create_node(&["Person"]);
3301
3302        store.create_edge(a, b, "KNOWS");
3303        store.create_edge(a, c, "KNOWS");
3304        store.create_edge(c, b, "KNOWS");
3305
3306        assert_eq!(store.out_degree(a), 2);
3307        assert_eq!(store.out_degree(b), 0);
3308        assert_eq!(store.out_degree(c), 1);
3309
3310        assert_eq!(store.in_degree(a), 0);
3311        assert_eq!(store.in_degree(b), 2);
3312        assert_eq!(store.in_degree(c), 1);
3313    }
3314
3315    #[test]
3316    fn test_edge_type() {
3317        let store = LpgStore::new();
3318
3319        let a = store.create_node(&["Person"]);
3320        let b = store.create_node(&["Person"]);
3321        let edge_id = store.create_edge(a, b, "KNOWS");
3322
3323        let edge_type = store.edge_type(edge_id);
3324        assert_eq!(edge_type.as_deref(), Some("KNOWS"));
3325
3326        // Non-existent edge
3327        let fake_id = EdgeId::new(999);
3328        assert!(store.edge_type(fake_id).is_none());
3329    }
3330
3331    #[test]
3332    fn test_count_methods() {
3333        let store = LpgStore::new();
3334
3335        assert_eq!(store.label_count(), 0);
3336        assert_eq!(store.edge_type_count(), 0);
3337        assert_eq!(store.property_key_count(), 0);
3338
3339        let a = store.create_node_with_props(&["Person"], [("age", Value::from(30i64))]);
3340        let b = store.create_node(&["Company"]);
3341        store.create_edge_with_props(a, b, "WORKS_AT", [("since", Value::from(2020i64))]);
3342
3343        assert_eq!(store.label_count(), 2); // Person, Company
3344        assert_eq!(store.edge_type_count(), 1); // WORKS_AT
3345        assert_eq!(store.property_key_count(), 2); // age, since
3346    }
3347
3348    #[test]
3349    fn test_all_nodes_and_edges() {
3350        let store = LpgStore::new();
3351
3352        let a = store.create_node(&["Person"]);
3353        let b = store.create_node(&["Person"]);
3354        store.create_edge(a, b, "KNOWS");
3355
3356        let nodes: Vec<_> = store.all_nodes().collect();
3357        assert_eq!(nodes.len(), 2);
3358
3359        let edges: Vec<_> = store.all_edges().collect();
3360        assert_eq!(edges.len(), 1);
3361    }
3362
3363    #[test]
3364    fn test_all_labels_and_edge_types() {
3365        let store = LpgStore::new();
3366
3367        store.create_node(&["Person"]);
3368        store.create_node(&["Company"]);
3369        let a = store.create_node(&["Animal"]);
3370        let b = store.create_node(&["Animal"]);
3371        store.create_edge(a, b, "EATS");
3372
3373        let labels = store.all_labels();
3374        assert_eq!(labels.len(), 3);
3375        assert!(labels.contains(&"Person".to_string()));
3376        assert!(labels.contains(&"Company".to_string()));
3377        assert!(labels.contains(&"Animal".to_string()));
3378
3379        let edge_types = store.all_edge_types();
3380        assert_eq!(edge_types.len(), 1);
3381        assert!(edge_types.contains(&"EATS".to_string()));
3382    }
3383
3384    #[test]
3385    fn test_all_property_keys() {
3386        let store = LpgStore::new();
3387
3388        let a = store.create_node_with_props(&["Person"], [("name", Value::from("Alice"))]);
3389        let b = store.create_node_with_props(&["Person"], [("age", Value::from(30i64))]);
3390        store.create_edge_with_props(a, b, "KNOWS", [("since", Value::from(2020i64))]);
3391
3392        let keys = store.all_property_keys();
3393        assert!(keys.contains(&"name".to_string()));
3394        assert!(keys.contains(&"age".to_string()));
3395        assert!(keys.contains(&"since".to_string()));
3396    }
3397
3398    #[test]
3399    fn test_nodes_with_label() {
3400        let store = LpgStore::new();
3401
3402        store.create_node(&["Person"]);
3403        store.create_node(&["Person"]);
3404        store.create_node(&["Company"]);
3405
3406        let persons: Vec<_> = store.nodes_with_label("Person").collect();
3407        assert_eq!(persons.len(), 2);
3408
3409        let companies: Vec<_> = store.nodes_with_label("Company").collect();
3410        assert_eq!(companies.len(), 1);
3411
3412        let none: Vec<_> = store.nodes_with_label("NonExistent").collect();
3413        assert_eq!(none.len(), 0);
3414    }
3415
3416    #[test]
3417    fn test_edges_with_type() {
3418        let store = LpgStore::new();
3419
3420        let a = store.create_node(&["Person"]);
3421        let b = store.create_node(&["Person"]);
3422        let c = store.create_node(&["Company"]);
3423
3424        store.create_edge(a, b, "KNOWS");
3425        store.create_edge(a, c, "WORKS_AT");
3426
3427        let knows: Vec<_> = store.edges_with_type("KNOWS").collect();
3428        assert_eq!(knows.len(), 1);
3429
3430        let works_at: Vec<_> = store.edges_with_type("WORKS_AT").collect();
3431        assert_eq!(works_at.len(), 1);
3432
3433        let none: Vec<_> = store.edges_with_type("NonExistent").collect();
3434        assert_eq!(none.len(), 0);
3435    }
3436
3437    #[test]
3438    fn test_nodes_by_label_nonexistent() {
3439        let store = LpgStore::new();
3440        store.create_node(&["Person"]);
3441
3442        let empty = store.nodes_by_label("NonExistent");
3443        assert!(empty.is_empty());
3444    }
3445
3446    #[test]
3447    fn test_statistics() {
3448        let store = LpgStore::new();
3449
3450        let a = store.create_node(&["Person"]);
3451        let b = store.create_node(&["Person"]);
3452        let c = store.create_node(&["Company"]);
3453
3454        store.create_edge(a, b, "KNOWS");
3455        store.create_edge(a, c, "WORKS_AT");
3456
3457        store.compute_statistics();
3458        let stats = store.statistics();
3459
3460        assert_eq!(stats.total_nodes, 3);
3461        assert_eq!(stats.total_edges, 2);
3462
3463        // Estimates
3464        let person_card = store.estimate_label_cardinality("Person");
3465        assert!(person_card > 0.0);
3466
3467        let avg_degree = store.estimate_avg_degree("KNOWS", true);
3468        assert!(avg_degree >= 0.0);
3469    }
3470
3471    #[test]
3472    fn test_zone_maps() {
3473        let store = LpgStore::new();
3474
3475        store.create_node_with_props(&["Person"], [("age", Value::from(25i64))]);
3476        store.create_node_with_props(&["Person"], [("age", Value::from(35i64))]);
3477
3478        // Zone map should indicate possible matches (30 is within [25, 35] range)
3479        let might_match =
3480            store.node_property_might_match(&"age".into(), CompareOp::Eq, &Value::from(30i64));
3481        // Zone maps return true conservatively when value is within min/max range
3482        assert!(might_match);
3483
3484        let zone = store.node_property_zone_map(&"age".into());
3485        assert!(zone.is_some());
3486
3487        // Non-existent property
3488        let no_zone = store.node_property_zone_map(&"nonexistent".into());
3489        assert!(no_zone.is_none());
3490
3491        // Edge zone maps
3492        let a = store.create_node(&["A"]);
3493        let b = store.create_node(&["B"]);
3494        store.create_edge_with_props(a, b, "REL", [("weight", Value::from(1.0))]);
3495
3496        let edge_zone = store.edge_property_zone_map(&"weight".into());
3497        assert!(edge_zone.is_some());
3498    }
3499
3500    #[test]
3501    fn test_rebuild_zone_maps() {
3502        let store = LpgStore::new();
3503        store.create_node_with_props(&["Person"], [("age", Value::from(25i64))]);
3504
3505        // Should not panic
3506        store.rebuild_zone_maps();
3507    }
3508
3509    #[test]
3510    fn test_create_node_with_id() {
3511        let store = LpgStore::new();
3512
3513        let specific_id = NodeId::new(100);
3514        store.create_node_with_id(specific_id, &["Person", "Employee"]);
3515
3516        let node = store.get_node(specific_id).unwrap();
3517        assert!(node.has_label("Person"));
3518        assert!(node.has_label("Employee"));
3519
3520        // Next auto-generated ID should be > 100
3521        let next = store.create_node(&["Other"]);
3522        assert!(next.as_u64() > 100);
3523    }
3524
3525    #[test]
3526    fn test_create_edge_with_id() {
3527        let store = LpgStore::new();
3528
3529        let a = store.create_node(&["A"]);
3530        let b = store.create_node(&["B"]);
3531
3532        let specific_id = EdgeId::new(500);
3533        store.create_edge_with_id(specific_id, a, b, "REL");
3534
3535        let edge = store.get_edge(specific_id).unwrap();
3536        assert_eq!(edge.src, a);
3537        assert_eq!(edge.dst, b);
3538        assert_eq!(edge.edge_type.as_str(), "REL");
3539
3540        // Next auto-generated ID should be > 500
3541        let next = store.create_edge(a, b, "OTHER");
3542        assert!(next.as_u64() > 500);
3543    }
3544
3545    #[test]
3546    fn test_set_epoch() {
3547        let store = LpgStore::new();
3548
3549        assert_eq!(store.current_epoch().as_u64(), 0);
3550
3551        store.set_epoch(EpochId::new(42));
3552        assert_eq!(store.current_epoch().as_u64(), 42);
3553    }
3554
3555    #[test]
3556    fn test_get_node_nonexistent() {
3557        let store = LpgStore::new();
3558        let fake_id = NodeId::new(999);
3559        assert!(store.get_node(fake_id).is_none());
3560    }
3561
3562    #[test]
3563    fn test_get_edge_nonexistent() {
3564        let store = LpgStore::new();
3565        let fake_id = EdgeId::new(999);
3566        assert!(store.get_edge(fake_id).is_none());
3567    }
3568
3569    #[test]
3570    fn test_multiple_labels() {
3571        let store = LpgStore::new();
3572
3573        let id = store.create_node(&["Person", "Employee", "Manager"]);
3574        let node = store.get_node(id).unwrap();
3575
3576        assert!(node.has_label("Person"));
3577        assert!(node.has_label("Employee"));
3578        assert!(node.has_label("Manager"));
3579        assert!(!node.has_label("Other"));
3580    }
3581
3582    #[test]
3583    fn test_default_impl() {
3584        let store: LpgStore = Default::default();
3585        assert_eq!(store.node_count(), 0);
3586        assert_eq!(store.edge_count(), 0);
3587    }
3588
3589    #[test]
3590    fn test_edges_from_both_directions() {
3591        let store = LpgStore::new();
3592
3593        let a = store.create_node(&["A"]);
3594        let b = store.create_node(&["B"]);
3595        let c = store.create_node(&["C"]);
3596
3597        let e1 = store.create_edge(a, b, "R1"); // a -> b
3598        let e2 = store.create_edge(c, a, "R2"); // c -> a
3599
3600        // Both directions from a
3601        let edges: Vec<_> = store.edges_from(a, Direction::Both).collect();
3602        assert_eq!(edges.len(), 2);
3603        assert!(edges.iter().any(|(_, e)| *e == e1)); // outgoing
3604        assert!(edges.iter().any(|(_, e)| *e == e2)); // incoming
3605    }
3606
3607    #[test]
3608    fn test_no_backward_adj_in_degree() {
3609        let config = LpgStoreConfig {
3610            backward_edges: false,
3611            initial_node_capacity: 10,
3612            initial_edge_capacity: 10,
3613        };
3614        let store = LpgStore::with_config(config);
3615
3616        let a = store.create_node(&["A"]);
3617        let b = store.create_node(&["B"]);
3618        store.create_edge(a, b, "R");
3619
3620        // in_degree should still work (falls back to scanning)
3621        let degree = store.in_degree(b);
3622        assert_eq!(degree, 1);
3623    }
3624
3625    #[test]
3626    fn test_no_backward_adj_edges_to() {
3627        let config = LpgStoreConfig {
3628            backward_edges: false,
3629            initial_node_capacity: 10,
3630            initial_edge_capacity: 10,
3631        };
3632        let store = LpgStore::with_config(config);
3633
3634        let a = store.create_node(&["A"]);
3635        let b = store.create_node(&["B"]);
3636        let e = store.create_edge(a, b, "R");
3637
3638        // edges_to should still work (falls back to scanning)
3639        let edges = store.edges_to(b);
3640        assert_eq!(edges.len(), 1);
3641        assert_eq!(edges[0].1, e);
3642    }
3643
3644    #[test]
3645    fn test_node_versioned_creation() {
3646        let store = LpgStore::new();
3647
3648        let epoch = store.new_epoch();
3649        let tx_id = TxId::new(1);
3650
3651        let id = store.create_node_versioned(&["Person"], epoch, tx_id);
3652        assert!(store.get_node(id).is_some());
3653    }
3654
3655    #[test]
3656    fn test_edge_versioned_creation() {
3657        let store = LpgStore::new();
3658
3659        let a = store.create_node(&["A"]);
3660        let b = store.create_node(&["B"]);
3661
3662        let epoch = store.new_epoch();
3663        let tx_id = TxId::new(1);
3664
3665        let edge_id = store.create_edge_versioned(a, b, "REL", epoch, tx_id);
3666        assert!(store.get_edge(edge_id).is_some());
3667    }
3668
3669    #[test]
3670    fn test_node_with_props_versioned() {
3671        let store = LpgStore::new();
3672
3673        let epoch = store.new_epoch();
3674        let tx_id = TxId::new(1);
3675
3676        let id = store.create_node_with_props_versioned(
3677            &["Person"],
3678            [("name", Value::from("Alice"))],
3679            epoch,
3680            tx_id,
3681        );
3682
3683        let node = store.get_node(id).unwrap();
3684        assert_eq!(
3685            node.get_property("name").and_then(|v| v.as_str()),
3686            Some("Alice")
3687        );
3688    }
3689
3690    #[test]
3691    fn test_discard_uncommitted_versions() {
3692        let store = LpgStore::new();
3693
3694        let epoch = store.new_epoch();
3695        let tx_id = TxId::new(42);
3696
3697        // Create node with specific tx
3698        let node_id = store.create_node_versioned(&["Person"], epoch, tx_id);
3699        assert!(store.get_node(node_id).is_some());
3700
3701        // Discard uncommitted versions for this tx
3702        store.discard_uncommitted_versions(tx_id);
3703
3704        // Node should be gone (version chain was removed)
3705        assert!(store.get_node(node_id).is_none());
3706    }
3707
3708    // === Property Index Tests ===
3709
3710    #[test]
3711    fn test_property_index_create_and_lookup() {
3712        let store = LpgStore::new();
3713
3714        // Create nodes with properties
3715        let alice = store.create_node(&["Person"]);
3716        let bob = store.create_node(&["Person"]);
3717        let charlie = store.create_node(&["Person"]);
3718
3719        store.set_node_property(alice, "city", Value::from("NYC"));
3720        store.set_node_property(bob, "city", Value::from("NYC"));
3721        store.set_node_property(charlie, "city", Value::from("LA"));
3722
3723        // Before indexing, lookup still works (via scan)
3724        let nyc_people = store.find_nodes_by_property("city", &Value::from("NYC"));
3725        assert_eq!(nyc_people.len(), 2);
3726
3727        // Create index
3728        store.create_property_index("city");
3729        assert!(store.has_property_index("city"));
3730
3731        // Indexed lookup should return same results
3732        let nyc_people = store.find_nodes_by_property("city", &Value::from("NYC"));
3733        assert_eq!(nyc_people.len(), 2);
3734        assert!(nyc_people.contains(&alice));
3735        assert!(nyc_people.contains(&bob));
3736
3737        let la_people = store.find_nodes_by_property("city", &Value::from("LA"));
3738        assert_eq!(la_people.len(), 1);
3739        assert!(la_people.contains(&charlie));
3740    }
3741
3742    #[test]
3743    fn test_property_index_maintained_on_update() {
3744        let store = LpgStore::new();
3745
3746        // Create index first
3747        store.create_property_index("status");
3748
3749        let node = store.create_node(&["Task"]);
3750        store.set_node_property(node, "status", Value::from("pending"));
3751
3752        // Should find by initial value
3753        let pending = store.find_nodes_by_property("status", &Value::from("pending"));
3754        assert_eq!(pending.len(), 1);
3755        assert!(pending.contains(&node));
3756
3757        // Update the property
3758        store.set_node_property(node, "status", Value::from("done"));
3759
3760        // Old value should not find it
3761        let pending = store.find_nodes_by_property("status", &Value::from("pending"));
3762        assert!(pending.is_empty());
3763
3764        // New value should find it
3765        let done = store.find_nodes_by_property("status", &Value::from("done"));
3766        assert_eq!(done.len(), 1);
3767        assert!(done.contains(&node));
3768    }
3769
3770    #[test]
3771    fn test_property_index_maintained_on_remove() {
3772        let store = LpgStore::new();
3773
3774        store.create_property_index("tag");
3775
3776        let node = store.create_node(&["Item"]);
3777        store.set_node_property(node, "tag", Value::from("important"));
3778
3779        // Should find it
3780        let found = store.find_nodes_by_property("tag", &Value::from("important"));
3781        assert_eq!(found.len(), 1);
3782
3783        // Remove the property
3784        store.remove_node_property(node, "tag");
3785
3786        // Should no longer find it
3787        let found = store.find_nodes_by_property("tag", &Value::from("important"));
3788        assert!(found.is_empty());
3789    }
3790
3791    #[test]
3792    fn test_property_index_drop() {
3793        let store = LpgStore::new();
3794
3795        store.create_property_index("key");
3796        assert!(store.has_property_index("key"));
3797
3798        assert!(store.drop_property_index("key"));
3799        assert!(!store.has_property_index("key"));
3800
3801        // Dropping non-existent index returns false
3802        assert!(!store.drop_property_index("key"));
3803    }
3804
3805    #[test]
3806    fn test_property_index_multiple_values() {
3807        let store = LpgStore::new();
3808
3809        store.create_property_index("age");
3810
3811        // Create multiple nodes with same and different ages
3812        let n1 = store.create_node(&["Person"]);
3813        let n2 = store.create_node(&["Person"]);
3814        let n3 = store.create_node(&["Person"]);
3815        let n4 = store.create_node(&["Person"]);
3816
3817        store.set_node_property(n1, "age", Value::from(25i64));
3818        store.set_node_property(n2, "age", Value::from(25i64));
3819        store.set_node_property(n3, "age", Value::from(30i64));
3820        store.set_node_property(n4, "age", Value::from(25i64));
3821
3822        let age_25 = store.find_nodes_by_property("age", &Value::from(25i64));
3823        assert_eq!(age_25.len(), 3);
3824
3825        let age_30 = store.find_nodes_by_property("age", &Value::from(30i64));
3826        assert_eq!(age_30.len(), 1);
3827
3828        let age_40 = store.find_nodes_by_property("age", &Value::from(40i64));
3829        assert!(age_40.is_empty());
3830    }
3831
3832    #[test]
3833    fn test_property_index_builds_from_existing_data() {
3834        let store = LpgStore::new();
3835
3836        // Create nodes first
3837        let n1 = store.create_node(&["Person"]);
3838        let n2 = store.create_node(&["Person"]);
3839        store.set_node_property(n1, "email", Value::from("alice@example.com"));
3840        store.set_node_property(n2, "email", Value::from("bob@example.com"));
3841
3842        // Create index after data exists
3843        store.create_property_index("email");
3844
3845        // Index should include existing data
3846        let alice = store.find_nodes_by_property("email", &Value::from("alice@example.com"));
3847        assert_eq!(alice.len(), 1);
3848        assert!(alice.contains(&n1));
3849
3850        let bob = store.find_nodes_by_property("email", &Value::from("bob@example.com"));
3851        assert_eq!(bob.len(), 1);
3852        assert!(bob.contains(&n2));
3853    }
3854
3855    #[test]
3856    fn test_get_node_property_batch() {
3857        let store = LpgStore::new();
3858
3859        let n1 = store.create_node(&["Person"]);
3860        let n2 = store.create_node(&["Person"]);
3861        let n3 = store.create_node(&["Person"]);
3862
3863        store.set_node_property(n1, "age", Value::from(25i64));
3864        store.set_node_property(n2, "age", Value::from(30i64));
3865        // n3 has no age property
3866
3867        let age_key = PropertyKey::new("age");
3868        let values = store.get_node_property_batch(&[n1, n2, n3], &age_key);
3869
3870        assert_eq!(values.len(), 3);
3871        assert_eq!(values[0], Some(Value::from(25i64)));
3872        assert_eq!(values[1], Some(Value::from(30i64)));
3873        assert_eq!(values[2], None);
3874    }
3875
3876    #[test]
3877    fn test_get_node_property_batch_empty() {
3878        let store = LpgStore::new();
3879        let key = PropertyKey::new("any");
3880
3881        let values = store.get_node_property_batch(&[], &key);
3882        assert!(values.is_empty());
3883    }
3884
3885    #[test]
3886    fn test_get_nodes_properties_batch() {
3887        let store = LpgStore::new();
3888
3889        let n1 = store.create_node(&["Person"]);
3890        let n2 = store.create_node(&["Person"]);
3891        let n3 = store.create_node(&["Person"]);
3892
3893        store.set_node_property(n1, "name", Value::from("Alice"));
3894        store.set_node_property(n1, "age", Value::from(25i64));
3895        store.set_node_property(n2, "name", Value::from("Bob"));
3896        // n3 has no properties
3897
3898        let all_props = store.get_nodes_properties_batch(&[n1, n2, n3]);
3899
3900        assert_eq!(all_props.len(), 3);
3901        assert_eq!(all_props[0].len(), 2); // name and age
3902        assert_eq!(all_props[1].len(), 1); // name only
3903        assert_eq!(all_props[2].len(), 0); // no properties
3904
3905        assert_eq!(
3906            all_props[0].get(&PropertyKey::new("name")),
3907            Some(&Value::from("Alice"))
3908        );
3909        assert_eq!(
3910            all_props[1].get(&PropertyKey::new("name")),
3911            Some(&Value::from("Bob"))
3912        );
3913    }
3914
3915    #[test]
3916    fn test_get_nodes_properties_batch_empty() {
3917        let store = LpgStore::new();
3918
3919        let all_props = store.get_nodes_properties_batch(&[]);
3920        assert!(all_props.is_empty());
3921    }
3922
3923    #[test]
3924    fn test_get_nodes_properties_selective_batch() {
3925        let store = LpgStore::new();
3926
3927        let n1 = store.create_node(&["Person"]);
3928        let n2 = store.create_node(&["Person"]);
3929
3930        // Set multiple properties
3931        store.set_node_property(n1, "name", Value::from("Alice"));
3932        store.set_node_property(n1, "age", Value::from(25i64));
3933        store.set_node_property(n1, "email", Value::from("alice@example.com"));
3934        store.set_node_property(n2, "name", Value::from("Bob"));
3935        store.set_node_property(n2, "age", Value::from(30i64));
3936        store.set_node_property(n2, "city", Value::from("NYC"));
3937
3938        // Request only name and age (not email or city)
3939        let keys = vec![PropertyKey::new("name"), PropertyKey::new("age")];
3940        let props = store.get_nodes_properties_selective_batch(&[n1, n2], &keys);
3941
3942        assert_eq!(props.len(), 2);
3943
3944        // n1: should have name and age, but NOT email
3945        assert_eq!(props[0].len(), 2);
3946        assert_eq!(
3947            props[0].get(&PropertyKey::new("name")),
3948            Some(&Value::from("Alice"))
3949        );
3950        assert_eq!(
3951            props[0].get(&PropertyKey::new("age")),
3952            Some(&Value::from(25i64))
3953        );
3954        assert_eq!(props[0].get(&PropertyKey::new("email")), None);
3955
3956        // n2: should have name and age, but NOT city
3957        assert_eq!(props[1].len(), 2);
3958        assert_eq!(
3959            props[1].get(&PropertyKey::new("name")),
3960            Some(&Value::from("Bob"))
3961        );
3962        assert_eq!(
3963            props[1].get(&PropertyKey::new("age")),
3964            Some(&Value::from(30i64))
3965        );
3966        assert_eq!(props[1].get(&PropertyKey::new("city")), None);
3967    }
3968
3969    #[test]
3970    fn test_get_nodes_properties_selective_batch_empty_keys() {
3971        let store = LpgStore::new();
3972
3973        let n1 = store.create_node(&["Person"]);
3974        store.set_node_property(n1, "name", Value::from("Alice"));
3975
3976        // Request no properties
3977        let props = store.get_nodes_properties_selective_batch(&[n1], &[]);
3978
3979        assert_eq!(props.len(), 1);
3980        assert!(props[0].is_empty()); // Empty map when no keys requested
3981    }
3982
3983    #[test]
3984    fn test_get_nodes_properties_selective_batch_missing_keys() {
3985        let store = LpgStore::new();
3986
3987        let n1 = store.create_node(&["Person"]);
3988        store.set_node_property(n1, "name", Value::from("Alice"));
3989
3990        // Request a property that doesn't exist
3991        let keys = vec![PropertyKey::new("nonexistent"), PropertyKey::new("name")];
3992        let props = store.get_nodes_properties_selective_batch(&[n1], &keys);
3993
3994        assert_eq!(props.len(), 1);
3995        assert_eq!(props[0].len(), 1); // Only name exists
3996        assert_eq!(
3997            props[0].get(&PropertyKey::new("name")),
3998            Some(&Value::from("Alice"))
3999        );
4000    }
4001}