Skip to main content

grafeo_core/graph/lpg/
store.rs

1//! The in-memory LPG graph store.
2//!
3//! This is where your nodes and edges actually live. Most users interact
4//! through [`GrafeoDB`](grafeo_engine::GrafeoDB), but algorithm implementers
5//! sometimes need the raw [`LpgStore`] for direct adjacency traversal.
6//!
7//! Key features:
8//! - MVCC versioning - concurrent readers don't block each other
9//! - Columnar properties with zone maps for fast filtering
10//! - Forward and backward adjacency indexes
11
12use super::property::CompareOp;
13use super::{Edge, EdgeRecord, Node, NodeRecord, PropertyStorage};
14use crate::graph::Direction;
15use crate::index::adjacency::ChunkedAdjacency;
16use crate::index::zone_map::ZoneMapEntry;
17use crate::statistics::{EdgeTypeStatistics, LabelStatistics, Statistics};
18use arcstr::ArcStr;
19use dashmap::DashMap;
20#[cfg(not(feature = "tiered-storage"))]
21use grafeo_common::mvcc::VersionChain;
22use grafeo_common::types::{EdgeId, EpochId, HashableValue, NodeId, PropertyKey, TxId, Value};
23use grafeo_common::utils::hash::{FxHashMap, FxHashSet};
24use parking_lot::RwLock;
25use std::cmp::Ordering as CmpOrdering;
26#[cfg(any(feature = "tiered-storage", feature = "vector-index"))]
27use std::sync::Arc;
28use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
29
30#[cfg(feature = "vector-index")]
31use crate::index::vector::HnswIndex;
32
33/// Compares two values for ordering (used for range checks).
34fn compare_values_for_range(a: &Value, b: &Value) -> Option<CmpOrdering> {
35    match (a, b) {
36        (Value::Int64(a), Value::Int64(b)) => Some(a.cmp(b)),
37        (Value::Float64(a), Value::Float64(b)) => a.partial_cmp(b),
38        (Value::String(a), Value::String(b)) => Some(a.cmp(b)),
39        (Value::Bool(a), Value::Bool(b)) => Some(a.cmp(b)),
40        _ => None,
41    }
42}
43
44/// Checks if a value is within a range.
45fn value_in_range(
46    value: &Value,
47    min: Option<&Value>,
48    max: Option<&Value>,
49    min_inclusive: bool,
50    max_inclusive: bool,
51) -> bool {
52    // Check lower bound
53    if let Some(min_val) = min {
54        match compare_values_for_range(value, min_val) {
55            Some(CmpOrdering::Less) => return false,
56            Some(CmpOrdering::Equal) if !min_inclusive => return false,
57            None => return false, // Can't compare
58            _ => {}
59        }
60    }
61
62    // Check upper bound
63    if let Some(max_val) = max {
64        match compare_values_for_range(value, max_val) {
65            Some(CmpOrdering::Greater) => return false,
66            Some(CmpOrdering::Equal) if !max_inclusive => return false,
67            None => return false,
68            _ => {}
69        }
70    }
71
72    true
73}
74
75// Tiered storage imports
76#[cfg(feature = "tiered-storage")]
77use crate::storage::EpochStore;
78#[cfg(feature = "tiered-storage")]
79use grafeo_common::memory::arena::ArenaAllocator;
80#[cfg(feature = "tiered-storage")]
81use grafeo_common::mvcc::{ColdVersionRef, HotVersionRef, VersionIndex, VersionRef};
82
83/// Configuration for the LPG store.
84///
85/// The defaults work well for most cases. Tune `backward_edges` if you only
86/// traverse in one direction (saves memory), or adjust capacities if you know
87/// your graph size upfront (avoids reallocations).
88#[derive(Debug, Clone)]
89pub struct LpgStoreConfig {
90    /// Maintain backward adjacency for incoming edge queries. Turn off if
91    /// you only traverse outgoing edges - saves ~50% adjacency memory.
92    pub backward_edges: bool,
93    /// Initial capacity for nodes (avoids early reallocations).
94    pub initial_node_capacity: usize,
95    /// Initial capacity for edges (avoids early reallocations).
96    pub initial_edge_capacity: usize,
97}
98
99impl Default for LpgStoreConfig {
100    fn default() -> Self {
101        Self {
102            backward_edges: true,
103            initial_node_capacity: 1024,
104            initial_edge_capacity: 4096,
105        }
106    }
107}
108
109/// The core in-memory graph storage.
110///
111/// Everything lives here: nodes, edges, properties, adjacency indexes, and
112/// version chains for MVCC. Concurrent reads never block each other.
113///
114/// Most users should go through `GrafeoDB` (from the `grafeo_engine` crate) which
115/// adds transaction management and query execution. Use `LpgStore` directly
116/// when you need raw performance for algorithm implementations.
117///
118/// # Example
119///
120/// ```
121/// use grafeo_core::graph::lpg::LpgStore;
122/// use grafeo_core::graph::Direction;
123///
124/// let store = LpgStore::new();
125///
126/// // Create a small social network
127/// let alice = store.create_node(&["Person"]);
128/// let bob = store.create_node(&["Person"]);
129/// store.create_edge(alice, bob, "KNOWS");
130///
131/// // Traverse outgoing edges
132/// for neighbor in store.neighbors(alice, Direction::Outgoing) {
133///     println!("Alice knows node {:?}", neighbor);
134/// }
135/// ```
136///
137/// # Lock Ordering
138///
139/// `LpgStore` contains multiple `RwLock` fields that must be acquired in a
140/// consistent order to prevent deadlocks. Always acquire locks in this order:
141///
142/// ## Level 1 - Entity Storage (mutually exclusive via feature flag)
143/// 1. `nodes` / `node_versions`
144/// 2. `edges` / `edge_versions`
145///
146/// ## Level 2 - Catalogs (acquire as pairs when writing)
147/// 3. `label_to_id` + `id_to_label`
148/// 4. `edge_type_to_id` + `id_to_edge_type`
149///
150/// ## Level 3 - Indexes
151/// 5. `label_index`
152/// 6. `node_labels`
153/// 7. `property_indexes`
154///
155/// ## Level 4 - Statistics
156/// 8. `statistics`
157///
158/// ## Level 5 - Nested Locks (internal to other structs)
159/// 9. `PropertyStorage::columns` (via `node_properties`/`edge_properties`)
160/// 10. `ChunkedAdjacency::lists` (via `forward_adj`/`backward_adj`)
161///
162/// ## Rules
163/// - Catalog pairs must be acquired together when writing.
164/// - Never hold entity locks while acquiring catalog locks in a different scope.
165/// - Statistics lock is always last.
166/// - Read locks are generally safe, but avoid read-to-write upgrades.
167pub struct LpgStore {
168    /// Configuration.
169    #[allow(dead_code)]
170    config: LpgStoreConfig,
171
172    /// Node records indexed by NodeId, with version chains for MVCC.
173    /// Used when `tiered-storage` feature is disabled.
174    /// Lock order: 1
175    #[cfg(not(feature = "tiered-storage"))]
176    nodes: RwLock<FxHashMap<NodeId, VersionChain<NodeRecord>>>,
177
178    /// Edge records indexed by EdgeId, with version chains for MVCC.
179    /// Used when `tiered-storage` feature is disabled.
180    /// Lock order: 2
181    #[cfg(not(feature = "tiered-storage"))]
182    edges: RwLock<FxHashMap<EdgeId, VersionChain<EdgeRecord>>>,
183
184    // === Tiered Storage Fields (feature-gated) ===
185    //
186    // Lock ordering for arena access:
187    //   version_lock (read/write) → arena read lock (via arena_allocator.arena())
188    //
189    // Rules:
190    // - Acquire arena read lock *after* version locks, never before.
191    // - Multiple threads may call arena.read_at() concurrently (shared refs only).
192    // - Never acquire arena write lock (alloc_new_chunk) while holding version locks.
193    // - freeze_epoch order: node_versions.read() → arena.read_at(),
194    //   then edge_versions.read() → arena.read_at().
195    /// Arena allocator for hot data storage.
196    /// Data is stored in per-epoch arenas for fast allocation and bulk deallocation.
197    #[cfg(feature = "tiered-storage")]
198    arena_allocator: Arc<ArenaAllocator>,
199
200    /// Node version indexes - store metadata and arena offsets.
201    /// The actual NodeRecord data is stored in the arena.
202    /// Lock order: 1
203    #[cfg(feature = "tiered-storage")]
204    node_versions: RwLock<FxHashMap<NodeId, VersionIndex>>,
205
206    /// Edge version indexes - store metadata and arena offsets.
207    /// The actual EdgeRecord data is stored in the arena.
208    /// Lock order: 2
209    #[cfg(feature = "tiered-storage")]
210    edge_versions: RwLock<FxHashMap<EdgeId, VersionIndex>>,
211
212    /// Cold storage for frozen epochs.
213    /// Contains compressed epoch blocks for historical data.
214    #[cfg(feature = "tiered-storage")]
215    epoch_store: Arc<EpochStore>,
216
217    /// Property storage for nodes.
218    node_properties: PropertyStorage<NodeId>,
219
220    /// Property storage for edges.
221    edge_properties: PropertyStorage<EdgeId>,
222
223    /// Label name to ID mapping.
224    /// Lock order: 3 (acquire with id_to_label)
225    label_to_id: RwLock<FxHashMap<ArcStr, u32>>,
226
227    /// Label ID to name mapping.
228    /// Lock order: 3 (acquire with label_to_id)
229    id_to_label: RwLock<Vec<ArcStr>>,
230
231    /// Edge type name to ID mapping.
232    /// Lock order: 4 (acquire with id_to_edge_type)
233    edge_type_to_id: RwLock<FxHashMap<ArcStr, u32>>,
234
235    /// Edge type ID to name mapping.
236    /// Lock order: 4 (acquire with edge_type_to_id)
237    id_to_edge_type: RwLock<Vec<ArcStr>>,
238
239    /// Forward adjacency lists (outgoing edges).
240    forward_adj: ChunkedAdjacency,
241
242    /// Backward adjacency lists (incoming edges).
243    /// Only populated if config.backward_edges is true.
244    backward_adj: Option<ChunkedAdjacency>,
245
246    /// Label index: label_id -> set of node IDs.
247    /// Lock order: 5
248    label_index: RwLock<Vec<FxHashMap<NodeId, ()>>>,
249
250    /// Node labels: node_id -> set of label IDs.
251    /// Reverse mapping to efficiently get labels for a node.
252    /// Lock order: 6
253    node_labels: RwLock<FxHashMap<NodeId, FxHashSet<u32>>>,
254
255    /// Property indexes: property_key -> (value -> set of node IDs).
256    ///
257    /// When a property is indexed, lookups by value are O(1) instead of O(n).
258    /// Use [`create_property_index`] to enable indexing for a property.
259    /// Lock order: 7
260    property_indexes: RwLock<FxHashMap<PropertyKey, DashMap<HashableValue, FxHashSet<NodeId>>>>,
261
262    /// Vector indexes: "label:property" -> HNSW index.
263    ///
264    /// Created via [`GrafeoDB::create_vector_index`](grafeo_engine::GrafeoDB::create_vector_index).
265    /// Lock order: 7 (same level as property_indexes, disjoint keys)
266    #[cfg(feature = "vector-index")]
267    vector_indexes: RwLock<FxHashMap<String, Arc<HnswIndex>>>,
268
269    /// Next node ID.
270    next_node_id: AtomicU64,
271
272    /// Next edge ID.
273    next_edge_id: AtomicU64,
274
275    /// Current epoch.
276    current_epoch: AtomicU64,
277
278    /// Statistics for cost-based optimization.
279    /// Lock order: 8 (always last)
280    statistics: RwLock<Statistics>,
281
282    /// Whether statistics need recomputation after mutations.
283    needs_stats_recompute: AtomicBool,
284}
285
286impl LpgStore {
287    /// Creates a new LPG store with default configuration.
288    #[must_use]
289    pub fn new() -> Self {
290        Self::with_config(LpgStoreConfig::default())
291    }
292
293    /// Creates a new LPG store with custom configuration.
294    #[must_use]
295    pub fn with_config(config: LpgStoreConfig) -> Self {
296        let backward_adj = if config.backward_edges {
297            Some(ChunkedAdjacency::new())
298        } else {
299            None
300        };
301
302        Self {
303            #[cfg(not(feature = "tiered-storage"))]
304            nodes: RwLock::new(FxHashMap::default()),
305            #[cfg(not(feature = "tiered-storage"))]
306            edges: RwLock::new(FxHashMap::default()),
307            #[cfg(feature = "tiered-storage")]
308            arena_allocator: Arc::new(ArenaAllocator::new()),
309            #[cfg(feature = "tiered-storage")]
310            node_versions: RwLock::new(FxHashMap::default()),
311            #[cfg(feature = "tiered-storage")]
312            edge_versions: RwLock::new(FxHashMap::default()),
313            #[cfg(feature = "tiered-storage")]
314            epoch_store: Arc::new(EpochStore::new()),
315            node_properties: PropertyStorage::new(),
316            edge_properties: PropertyStorage::new(),
317            label_to_id: RwLock::new(FxHashMap::default()),
318            id_to_label: RwLock::new(Vec::new()),
319            edge_type_to_id: RwLock::new(FxHashMap::default()),
320            id_to_edge_type: RwLock::new(Vec::new()),
321            forward_adj: ChunkedAdjacency::new(),
322            backward_adj,
323            label_index: RwLock::new(Vec::new()),
324            node_labels: RwLock::new(FxHashMap::default()),
325            property_indexes: RwLock::new(FxHashMap::default()),
326            #[cfg(feature = "vector-index")]
327            vector_indexes: RwLock::new(FxHashMap::default()),
328            next_node_id: AtomicU64::new(0),
329            next_edge_id: AtomicU64::new(0),
330            current_epoch: AtomicU64::new(0),
331            statistics: RwLock::new(Statistics::new()),
332            needs_stats_recompute: AtomicBool::new(true),
333            config,
334        }
335    }
336
337    /// Returns the current epoch.
338    #[must_use]
339    pub fn current_epoch(&self) -> EpochId {
340        EpochId::new(self.current_epoch.load(Ordering::Acquire))
341    }
342
343    /// Creates a new epoch.
344    pub fn new_epoch(&self) -> EpochId {
345        let id = self.current_epoch.fetch_add(1, Ordering::AcqRel) + 1;
346        EpochId::new(id)
347    }
348
349    // === Node Operations ===
350
351    /// Creates a new node with the given labels.
352    ///
353    /// Uses the system transaction for non-transactional operations.
354    pub fn create_node(&self, labels: &[&str]) -> NodeId {
355        self.needs_stats_recompute.store(true, Ordering::Relaxed);
356        self.create_node_versioned(labels, self.current_epoch(), TxId::SYSTEM)
357    }
358
359    /// Creates a new node with the given labels within a transaction context.
360    #[cfg(not(feature = "tiered-storage"))]
361    pub fn create_node_versioned(&self, labels: &[&str], epoch: EpochId, tx_id: TxId) -> NodeId {
362        let id = NodeId::new(self.next_node_id.fetch_add(1, Ordering::Relaxed));
363
364        let mut record = NodeRecord::new(id, epoch);
365        record.set_label_count(labels.len() as u16);
366
367        // Store labels in node_labels map and label_index
368        let mut node_label_set = FxHashSet::default();
369        for label in labels {
370            let label_id = self.get_or_create_label_id(*label);
371            node_label_set.insert(label_id);
372
373            // Update label index
374            let mut index = self.label_index.write();
375            while index.len() <= label_id as usize {
376                index.push(FxHashMap::default());
377            }
378            index[label_id as usize].insert(id, ());
379        }
380
381        // Store node's labels
382        self.node_labels.write().insert(id, node_label_set);
383
384        // Create version chain with initial version
385        let chain = VersionChain::with_initial(record, epoch, tx_id);
386        self.nodes.write().insert(id, chain);
387        id
388    }
389
390    /// Creates a new node with the given labels within a transaction context.
391    /// (Tiered storage version: stores data in arena, metadata in VersionIndex)
392    #[cfg(feature = "tiered-storage")]
393    pub fn create_node_versioned(&self, labels: &[&str], epoch: EpochId, tx_id: TxId) -> NodeId {
394        let id = NodeId::new(self.next_node_id.fetch_add(1, Ordering::Relaxed));
395
396        let mut record = NodeRecord::new(id, epoch);
397        record.set_label_count(labels.len() as u16);
398
399        // Store labels in node_labels map and label_index
400        let mut node_label_set = FxHashSet::default();
401        for label in labels {
402            let label_id = self.get_or_create_label_id(*label);
403            node_label_set.insert(label_id);
404
405            // Update label index
406            let mut index = self.label_index.write();
407            while index.len() <= label_id as usize {
408                index.push(FxHashMap::default());
409            }
410            index[label_id as usize].insert(id, ());
411        }
412
413        // Store node's labels
414        self.node_labels.write().insert(id, node_label_set);
415
416        // Allocate record in arena and get offset (create epoch if needed)
417        let arena = self.arena_allocator.arena_or_create(epoch);
418        let (offset, _stored) = arena.alloc_value_with_offset(record);
419
420        // Create HotVersionRef pointing to arena data
421        let hot_ref = HotVersionRef::new(epoch, offset, tx_id);
422
423        // Create or update version index
424        let mut versions = self.node_versions.write();
425        if let Some(index) = versions.get_mut(&id) {
426            index.add_hot(hot_ref);
427        } else {
428            versions.insert(id, VersionIndex::with_initial(hot_ref));
429        }
430
431        id
432    }
433
434    /// Creates a new node with labels and properties.
435    pub fn create_node_with_props(
436        &self,
437        labels: &[&str],
438        properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
439    ) -> NodeId {
440        self.create_node_with_props_versioned(
441            labels,
442            properties,
443            self.current_epoch(),
444            TxId::SYSTEM,
445        )
446    }
447
448    /// Creates a new node with labels and properties within a transaction context.
449    #[cfg(not(feature = "tiered-storage"))]
450    pub fn create_node_with_props_versioned(
451        &self,
452        labels: &[&str],
453        properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
454        epoch: EpochId,
455        tx_id: TxId,
456    ) -> NodeId {
457        let id = self.create_node_versioned(labels, epoch, tx_id);
458
459        for (key, value) in properties {
460            let prop_key: PropertyKey = key.into();
461            let prop_value: Value = value.into();
462            // Update property index before setting the property
463            self.update_property_index_on_set(id, &prop_key, &prop_value);
464            self.node_properties.set(id, prop_key, prop_value);
465        }
466
467        // Update props_count in record
468        let count = self.node_properties.get_all(id).len() as u16;
469        if let Some(chain) = self.nodes.write().get_mut(&id) {
470            if let Some(record) = chain.latest_mut() {
471                record.props_count = count;
472            }
473        }
474
475        id
476    }
477
478    /// Creates a new node with labels and properties within a transaction context.
479    /// (Tiered storage version)
480    #[cfg(feature = "tiered-storage")]
481    pub fn create_node_with_props_versioned(
482        &self,
483        labels: &[&str],
484        properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
485        epoch: EpochId,
486        tx_id: TxId,
487    ) -> NodeId {
488        let id = self.create_node_versioned(labels, epoch, tx_id);
489
490        for (key, value) in properties {
491            let prop_key: PropertyKey = key.into();
492            let prop_value: Value = value.into();
493            // Update property index before setting the property
494            self.update_property_index_on_set(id, &prop_key, &prop_value);
495            self.node_properties.set(id, prop_key, prop_value);
496        }
497
498        // Note: props_count in record is not updated for tiered storage.
499        // The record is immutable once allocated in the arena.
500
501        id
502    }
503
504    /// Gets a node by ID (latest visible version).
505    #[must_use]
506    pub fn get_node(&self, id: NodeId) -> Option<Node> {
507        self.get_node_at_epoch(id, self.current_epoch())
508    }
509
510    /// Gets a node by ID at a specific epoch.
511    #[must_use]
512    #[cfg(not(feature = "tiered-storage"))]
513    pub fn get_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> Option<Node> {
514        let nodes = self.nodes.read();
515        let chain = nodes.get(&id)?;
516        let record = chain.visible_at(epoch)?;
517
518        if record.is_deleted() {
519            return None;
520        }
521
522        let mut node = Node::new(id);
523
524        // Get labels from node_labels map
525        let id_to_label = self.id_to_label.read();
526        let node_labels = self.node_labels.read();
527        if let Some(label_ids) = node_labels.get(&id) {
528            for &label_id in label_ids {
529                if let Some(label) = id_to_label.get(label_id as usize) {
530                    node.labels.push(label.clone());
531                }
532            }
533        }
534
535        // Get properties
536        node.properties = self.node_properties.get_all(id).into_iter().collect();
537
538        Some(node)
539    }
540
541    /// Gets a node by ID at a specific epoch.
542    /// (Tiered storage version: reads from arena via VersionIndex)
543    #[must_use]
544    #[cfg(feature = "tiered-storage")]
545    pub fn get_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> Option<Node> {
546        let versions = self.node_versions.read();
547        let index = versions.get(&id)?;
548        let version_ref = index.visible_at(epoch)?;
549
550        // Read the record from arena
551        let record = self.read_node_record(&version_ref)?;
552
553        if record.is_deleted() {
554            return None;
555        }
556
557        let mut node = Node::new(id);
558
559        // Get labels from node_labels map
560        let id_to_label = self.id_to_label.read();
561        let node_labels = self.node_labels.read();
562        if let Some(label_ids) = node_labels.get(&id) {
563            for &label_id in label_ids {
564                if let Some(label) = id_to_label.get(label_id as usize) {
565                    node.labels.push(label.clone());
566                }
567            }
568        }
569
570        // Get properties
571        node.properties = self.node_properties.get_all(id).into_iter().collect();
572
573        Some(node)
574    }
575
576    /// Gets a node visible to a specific transaction.
577    #[must_use]
578    #[cfg(not(feature = "tiered-storage"))]
579    pub fn get_node_versioned(&self, id: NodeId, epoch: EpochId, tx_id: TxId) -> Option<Node> {
580        let nodes = self.nodes.read();
581        let chain = nodes.get(&id)?;
582        let record = chain.visible_to(epoch, tx_id)?;
583
584        if record.is_deleted() {
585            return None;
586        }
587
588        let mut node = Node::new(id);
589
590        // Get labels from node_labels map
591        let id_to_label = self.id_to_label.read();
592        let node_labels = self.node_labels.read();
593        if let Some(label_ids) = node_labels.get(&id) {
594            for &label_id in label_ids {
595                if let Some(label) = id_to_label.get(label_id as usize) {
596                    node.labels.push(label.clone());
597                }
598            }
599        }
600
601        // Get properties
602        node.properties = self.node_properties.get_all(id).into_iter().collect();
603
604        Some(node)
605    }
606
607    /// Gets a node visible to a specific transaction.
608    /// (Tiered storage version: reads from arena via VersionIndex)
609    #[must_use]
610    #[cfg(feature = "tiered-storage")]
611    pub fn get_node_versioned(&self, id: NodeId, epoch: EpochId, tx_id: TxId) -> Option<Node> {
612        let versions = self.node_versions.read();
613        let index = versions.get(&id)?;
614        let version_ref = index.visible_to(epoch, tx_id)?;
615
616        // Read the record from arena
617        let record = self.read_node_record(&version_ref)?;
618
619        if record.is_deleted() {
620            return None;
621        }
622
623        let mut node = Node::new(id);
624
625        // Get labels from node_labels map
626        let id_to_label = self.id_to_label.read();
627        let node_labels = self.node_labels.read();
628        if let Some(label_ids) = node_labels.get(&id) {
629            for &label_id in label_ids {
630                if let Some(label) = id_to_label.get(label_id as usize) {
631                    node.labels.push(label.clone());
632                }
633            }
634        }
635
636        // Get properties
637        node.properties = self.node_properties.get_all(id).into_iter().collect();
638
639        Some(node)
640    }
641
642    /// Reads a NodeRecord from arena (hot) or epoch store (cold) using a VersionRef.
643    #[cfg(feature = "tiered-storage")]
644    #[allow(unsafe_code)]
645    fn read_node_record(&self, version_ref: &VersionRef) -> Option<NodeRecord> {
646        match version_ref {
647            VersionRef::Hot(hot_ref) => {
648                let arena = self.arena_allocator.arena(hot_ref.epoch);
649                // SAFETY: The offset was returned by alloc_value_with_offset for a NodeRecord
650                let record: &NodeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
651                Some(*record)
652            }
653            VersionRef::Cold(cold_ref) => {
654                // Read from compressed epoch store
655                self.epoch_store
656                    .get_node(cold_ref.epoch, cold_ref.block_offset, cold_ref.length)
657            }
658        }
659    }
660
661    /// Deletes a node and all its edges (using latest epoch).
662    pub fn delete_node(&self, id: NodeId) -> bool {
663        self.needs_stats_recompute.store(true, Ordering::Relaxed);
664        self.delete_node_at_epoch(id, self.current_epoch())
665    }
666
667    /// Deletes a node at a specific epoch.
668    #[cfg(not(feature = "tiered-storage"))]
669    pub fn delete_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> bool {
670        let mut nodes = self.nodes.write();
671        if let Some(chain) = nodes.get_mut(&id) {
672            // Check if visible at this epoch (not already deleted)
673            if let Some(record) = chain.visible_at(epoch) {
674                if record.is_deleted() {
675                    return false;
676                }
677            } else {
678                // Not visible at this epoch (already deleted or doesn't exist)
679                return false;
680            }
681
682            // Mark the version chain as deleted at this epoch
683            chain.mark_deleted(epoch);
684
685            // Remove from label index using node_labels map
686            let mut index = self.label_index.write();
687            let mut node_labels = self.node_labels.write();
688            if let Some(label_ids) = node_labels.remove(&id) {
689                for label_id in label_ids {
690                    if let Some(set) = index.get_mut(label_id as usize) {
691                        set.remove(&id);
692                    }
693                }
694            }
695
696            // Remove properties
697            drop(nodes); // Release lock before removing properties
698            drop(index);
699            drop(node_labels);
700            self.node_properties.remove_all(id);
701
702            // Note: Caller should use delete_node_edges() first if detach is needed
703
704            true
705        } else {
706            false
707        }
708    }
709
710    /// Deletes a node at a specific epoch.
711    /// (Tiered storage version)
712    #[cfg(feature = "tiered-storage")]
713    pub fn delete_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> bool {
714        let mut versions = self.node_versions.write();
715        if let Some(index) = versions.get_mut(&id) {
716            // Check if visible at this epoch
717            if let Some(version_ref) = index.visible_at(epoch) {
718                if let Some(record) = self.read_node_record(&version_ref) {
719                    if record.is_deleted() {
720                        return false;
721                    }
722                } else {
723                    return false;
724                }
725            } else {
726                return false;
727            }
728
729            // Mark as deleted in version index
730            index.mark_deleted(epoch);
731
732            // Remove from label index using node_labels map
733            let mut label_index = self.label_index.write();
734            let mut node_labels = self.node_labels.write();
735            if let Some(label_ids) = node_labels.remove(&id) {
736                for label_id in label_ids {
737                    if let Some(set) = label_index.get_mut(label_id as usize) {
738                        set.remove(&id);
739                    }
740                }
741            }
742
743            // Remove properties
744            drop(versions);
745            drop(label_index);
746            drop(node_labels);
747            self.node_properties.remove_all(id);
748
749            true
750        } else {
751            false
752        }
753    }
754
755    /// Deletes all edges connected to a node (implements DETACH DELETE).
756    ///
757    /// Call this before `delete_node()` if you want to remove a node that
758    /// has edges. Grafeo doesn't auto-delete edges - you have to be explicit.
759    #[cfg(not(feature = "tiered-storage"))]
760    pub fn delete_node_edges(&self, node_id: NodeId) {
761        // Get outgoing edges
762        let outgoing: Vec<EdgeId> = self
763            .forward_adj
764            .edges_from(node_id)
765            .into_iter()
766            .map(|(_, edge_id)| edge_id)
767            .collect();
768
769        // Get incoming edges
770        let incoming: Vec<EdgeId> = if let Some(ref backward) = self.backward_adj {
771            backward
772                .edges_from(node_id)
773                .into_iter()
774                .map(|(_, edge_id)| edge_id)
775                .collect()
776        } else {
777            // No backward adjacency - scan all edges
778            let epoch = self.current_epoch();
779            self.edges
780                .read()
781                .iter()
782                .filter_map(|(id, chain)| {
783                    chain.visible_at(epoch).and_then(|r| {
784                        if !r.is_deleted() && r.dst == node_id {
785                            Some(*id)
786                        } else {
787                            None
788                        }
789                    })
790                })
791                .collect()
792        };
793
794        // Delete all edges
795        for edge_id in outgoing.into_iter().chain(incoming) {
796            self.delete_edge(edge_id);
797        }
798    }
799
800    /// Deletes all edges connected to a node (implements DETACH DELETE).
801    /// (Tiered storage version)
802    #[cfg(feature = "tiered-storage")]
803    pub fn delete_node_edges(&self, node_id: NodeId) {
804        // Get outgoing edges
805        let outgoing: Vec<EdgeId> = self
806            .forward_adj
807            .edges_from(node_id)
808            .into_iter()
809            .map(|(_, edge_id)| edge_id)
810            .collect();
811
812        // Get incoming edges
813        let incoming: Vec<EdgeId> = if let Some(ref backward) = self.backward_adj {
814            backward
815                .edges_from(node_id)
816                .into_iter()
817                .map(|(_, edge_id)| edge_id)
818                .collect()
819        } else {
820            // No backward adjacency - scan all edges
821            let epoch = self.current_epoch();
822            let versions = self.edge_versions.read();
823            versions
824                .iter()
825                .filter_map(|(id, index)| {
826                    index.visible_at(epoch).and_then(|vref| {
827                        self.read_edge_record(&vref).and_then(|r| {
828                            if !r.is_deleted() && r.dst == node_id {
829                                Some(*id)
830                            } else {
831                                None
832                            }
833                        })
834                    })
835                })
836                .collect()
837        };
838
839        // Delete all edges
840        for edge_id in outgoing.into_iter().chain(incoming) {
841            self.delete_edge(edge_id);
842        }
843    }
844
845    /// Sets a property on a node.
846    #[cfg(not(feature = "tiered-storage"))]
847    pub fn set_node_property(&self, id: NodeId, key: &str, value: Value) {
848        let prop_key: PropertyKey = key.into();
849
850        // Update property index before setting the property (needs to read old value)
851        self.update_property_index_on_set(id, &prop_key, &value);
852
853        self.node_properties.set(id, prop_key, value);
854
855        // Update props_count in record
856        let count = self.node_properties.get_all(id).len() as u16;
857        if let Some(chain) = self.nodes.write().get_mut(&id) {
858            if let Some(record) = chain.latest_mut() {
859                record.props_count = count;
860            }
861        }
862    }
863
864    /// Sets a property on a node.
865    /// (Tiered storage version: properties stored separately, record is immutable)
866    #[cfg(feature = "tiered-storage")]
867    pub fn set_node_property(&self, id: NodeId, key: &str, value: Value) {
868        let prop_key: PropertyKey = key.into();
869
870        // Update property index before setting the property (needs to read old value)
871        self.update_property_index_on_set(id, &prop_key, &value);
872
873        self.node_properties.set(id, prop_key, value);
874        // Note: props_count in record is not updated for tiered storage.
875        // The record is immutable once allocated in the arena.
876        // Property count can be derived from PropertyStorage if needed.
877    }
878
879    /// Sets a property on an edge.
880    pub fn set_edge_property(&self, id: EdgeId, key: &str, value: Value) {
881        self.edge_properties.set(id, key.into(), value);
882    }
883
884    /// Removes a property from a node.
885    ///
886    /// Returns the previous value if it existed, or None if the property didn't exist.
887    #[cfg(not(feature = "tiered-storage"))]
888    pub fn remove_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
889        let prop_key: PropertyKey = key.into();
890
891        // Update property index before removing (needs to read old value)
892        self.update_property_index_on_remove(id, &prop_key);
893
894        let result = self.node_properties.remove(id, &prop_key);
895
896        // Update props_count in record
897        let count = self.node_properties.get_all(id).len() as u16;
898        if let Some(chain) = self.nodes.write().get_mut(&id) {
899            if let Some(record) = chain.latest_mut() {
900                record.props_count = count;
901            }
902        }
903
904        result
905    }
906
907    /// Removes a property from a node.
908    /// (Tiered storage version)
909    #[cfg(feature = "tiered-storage")]
910    pub fn remove_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
911        let prop_key: PropertyKey = key.into();
912
913        // Update property index before removing (needs to read old value)
914        self.update_property_index_on_remove(id, &prop_key);
915
916        self.node_properties.remove(id, &prop_key)
917        // Note: props_count in record is not updated for tiered storage.
918    }
919
920    /// Removes a property from an edge.
921    ///
922    /// Returns the previous value if it existed, or None if the property didn't exist.
923    pub fn remove_edge_property(&self, id: EdgeId, key: &str) -> Option<Value> {
924        self.edge_properties.remove(id, &key.into())
925    }
926
927    /// Gets a single property from a node without loading all properties.
928    ///
929    /// This is O(1) vs O(properties) for `get_node().get_property()`.
930    /// Use this for filter predicates where you only need one property value.
931    ///
932    /// # Example
933    ///
934    /// ```ignore
935    /// // Fast: Direct single-property lookup
936    /// let age = store.get_node_property(node_id, "age");
937    ///
938    /// // Slow: Loads all properties, then extracts one
939    /// let age = store.get_node(node_id).and_then(|n| n.get_property("age").cloned());
940    /// ```
941    #[must_use]
942    pub fn get_node_property(&self, id: NodeId, key: &PropertyKey) -> Option<Value> {
943        self.node_properties.get(id, key)
944    }
945
946    /// Gets a single property from an edge without loading all properties.
947    ///
948    /// This is O(1) vs O(properties) for `get_edge().get_property()`.
949    #[must_use]
950    pub fn get_edge_property(&self, id: EdgeId, key: &PropertyKey) -> Option<Value> {
951        self.edge_properties.get(id, key)
952    }
953
954    // === Batch Property Operations ===
955
956    /// Gets a property for multiple nodes in a single batch operation.
957    ///
958    /// More efficient than calling [`Self::get_node_property`] in a loop because it
959    /// reduces lock overhead and enables better cache utilization.
960    ///
961    /// # Example
962    ///
963    /// ```
964    /// use grafeo_core::graph::lpg::LpgStore;
965    /// use grafeo_common::types::{NodeId, PropertyKey, Value};
966    ///
967    /// let store = LpgStore::new();
968    /// let n1 = store.create_node(&["Person"]);
969    /// let n2 = store.create_node(&["Person"]);
970    /// store.set_node_property(n1, "age", Value::from(25i64));
971    /// store.set_node_property(n2, "age", Value::from(30i64));
972    ///
973    /// let ages = store.get_node_property_batch(&[n1, n2], &PropertyKey::new("age"));
974    /// assert_eq!(ages, vec![Some(Value::from(25i64)), Some(Value::from(30i64))]);
975    /// ```
976    #[must_use]
977    pub fn get_node_property_batch(&self, ids: &[NodeId], key: &PropertyKey) -> Vec<Option<Value>> {
978        self.node_properties.get_batch(ids, key)
979    }
980
981    /// Gets all properties for multiple nodes in a single batch operation.
982    ///
983    /// Returns a vector of property maps, one per node ID (empty map if no properties).
984    /// More efficient than calling [`Self::get_node`] in a loop.
985    #[must_use]
986    pub fn get_nodes_properties_batch(&self, ids: &[NodeId]) -> Vec<FxHashMap<PropertyKey, Value>> {
987        self.node_properties.get_all_batch(ids)
988    }
989
990    /// Gets selected properties for multiple nodes (projection pushdown).
991    ///
992    /// This is more efficient than [`Self::get_nodes_properties_batch`] when you only
993    /// need a subset of properties. It only iterates the requested columns instead of
994    /// all columns.
995    ///
996    /// **Use this for**: Queries with explicit projections like `RETURN n.name, n.age`
997    /// instead of `RETURN n` (which requires all properties).
998    ///
999    /// # Example
1000    ///
1001    /// ```
1002    /// use grafeo_core::graph::lpg::LpgStore;
1003    /// use grafeo_common::types::{PropertyKey, Value};
1004    ///
1005    /// let store = LpgStore::new();
1006    /// let n1 = store.create_node(&["Person"]);
1007    /// store.set_node_property(n1, "name", Value::from("Alice"));
1008    /// store.set_node_property(n1, "age", Value::from(30i64));
1009    /// store.set_node_property(n1, "email", Value::from("alice@example.com"));
1010    ///
1011    /// // Only fetch name and age (faster than get_nodes_properties_batch)
1012    /// let keys = vec![PropertyKey::new("name"), PropertyKey::new("age")];
1013    /// let props = store.get_nodes_properties_selective_batch(&[n1], &keys);
1014    ///
1015    /// assert_eq!(props[0].len(), 2); // Only name and age, not email
1016    /// ```
1017    #[must_use]
1018    pub fn get_nodes_properties_selective_batch(
1019        &self,
1020        ids: &[NodeId],
1021        keys: &[PropertyKey],
1022    ) -> Vec<FxHashMap<PropertyKey, Value>> {
1023        self.node_properties.get_selective_batch(ids, keys)
1024    }
1025
1026    /// Gets selected properties for multiple edges (projection pushdown).
1027    ///
1028    /// Edge-property version of [`Self::get_nodes_properties_selective_batch`].
1029    #[must_use]
1030    pub fn get_edges_properties_selective_batch(
1031        &self,
1032        ids: &[EdgeId],
1033        keys: &[PropertyKey],
1034    ) -> Vec<FxHashMap<PropertyKey, Value>> {
1035        self.edge_properties.get_selective_batch(ids, keys)
1036    }
1037
1038    /// Finds nodes where a property value is in a range.
1039    ///
1040    /// This is useful for queries like `n.age > 30` or `n.price BETWEEN 10 AND 100`.
1041    /// Uses zone maps to skip scanning when the range definitely doesn't match.
1042    ///
1043    /// # Arguments
1044    ///
1045    /// * `property` - The property to check
1046    /// * `min` - Optional lower bound (None for unbounded)
1047    /// * `max` - Optional upper bound (None for unbounded)
1048    /// * `min_inclusive` - Whether lower bound is inclusive (>= vs >)
1049    /// * `max_inclusive` - Whether upper bound is inclusive (<= vs <)
1050    ///
1051    /// # Example
1052    ///
1053    /// ```
1054    /// use grafeo_core::graph::lpg::LpgStore;
1055    /// use grafeo_common::types::Value;
1056    ///
1057    /// let store = LpgStore::new();
1058    /// let n1 = store.create_node(&["Person"]);
1059    /// let n2 = store.create_node(&["Person"]);
1060    /// store.set_node_property(n1, "age", Value::from(25i64));
1061    /// store.set_node_property(n2, "age", Value::from(35i64));
1062    ///
1063    /// // Find nodes where age > 30
1064    /// let result = store.find_nodes_in_range(
1065    ///     "age",
1066    ///     Some(&Value::from(30i64)),
1067    ///     None,
1068    ///     false, // exclusive lower bound
1069    ///     true,  // inclusive upper bound (doesn't matter since None)
1070    /// );
1071    /// assert_eq!(result.len(), 1); // Only n2 matches
1072    /// ```
1073    #[must_use]
1074    pub fn find_nodes_in_range(
1075        &self,
1076        property: &str,
1077        min: Option<&Value>,
1078        max: Option<&Value>,
1079        min_inclusive: bool,
1080        max_inclusive: bool,
1081    ) -> Vec<NodeId> {
1082        let key = PropertyKey::new(property);
1083
1084        // Check zone map first - if no values could match, return empty
1085        if !self
1086            .node_properties
1087            .might_match_range(&key, min, max, min_inclusive, max_inclusive)
1088        {
1089            return Vec::new();
1090        }
1091
1092        // Scan all nodes and filter by range
1093        self.node_ids()
1094            .into_iter()
1095            .filter(|&node_id| {
1096                self.node_properties
1097                    .get(node_id, &key)
1098                    .is_some_and(|v| value_in_range(&v, min, max, min_inclusive, max_inclusive))
1099            })
1100            .collect()
1101    }
1102
1103    /// Finds nodes matching multiple property equality conditions.
1104    ///
1105    /// This is more efficient than intersecting multiple single-property lookups
1106    /// because it can use indexes when available and short-circuits on the first
1107    /// miss.
1108    ///
1109    /// # Example
1110    ///
1111    /// ```
1112    /// use grafeo_core::graph::lpg::LpgStore;
1113    /// use grafeo_common::types::Value;
1114    ///
1115    /// let store = LpgStore::new();
1116    /// let alice = store.create_node(&["Person"]);
1117    /// store.set_node_property(alice, "name", Value::from("Alice"));
1118    /// store.set_node_property(alice, "city", Value::from("NYC"));
1119    ///
1120    /// // Find nodes where name = "Alice" AND city = "NYC"
1121    /// let matches = store.find_nodes_by_properties(&[
1122    ///     ("name", Value::from("Alice")),
1123    ///     ("city", Value::from("NYC")),
1124    /// ]);
1125    /// assert!(matches.contains(&alice));
1126    /// ```
1127    #[must_use]
1128    pub fn find_nodes_by_properties(&self, conditions: &[(&str, Value)]) -> Vec<NodeId> {
1129        if conditions.is_empty() {
1130            return self.node_ids();
1131        }
1132
1133        // Find the most selective condition (smallest result set) to start
1134        // If any condition has an index, use that first
1135        let mut best_start: Option<(usize, Vec<NodeId>)> = None;
1136        let indexes = self.property_indexes.read();
1137
1138        for (i, (prop, value)) in conditions.iter().enumerate() {
1139            let key = PropertyKey::new(*prop);
1140            let hv = HashableValue::new(value.clone());
1141
1142            if let Some(index) = indexes.get(&key) {
1143                let matches: Vec<NodeId> = index
1144                    .get(&hv)
1145                    .map(|nodes| nodes.iter().copied().collect())
1146                    .unwrap_or_default();
1147
1148                // Short-circuit if any indexed condition has no matches
1149                if matches.is_empty() {
1150                    return Vec::new();
1151                }
1152
1153                // Use smallest indexed result as starting point
1154                if best_start
1155                    .as_ref()
1156                    .is_none_or(|(_, best)| matches.len() < best.len())
1157                {
1158                    best_start = Some((i, matches));
1159                }
1160            }
1161        }
1162        drop(indexes);
1163
1164        // Start from best indexed result or fall back to full node scan
1165        let (start_idx, mut candidates) = best_start.unwrap_or_else(|| {
1166            // No indexes available, start with first condition via full scan
1167            let (prop, value) = &conditions[0];
1168            (0, self.find_nodes_by_property(prop, value))
1169        });
1170
1171        // Filter candidates through remaining conditions
1172        for (i, (prop, value)) in conditions.iter().enumerate() {
1173            if i == start_idx {
1174                continue;
1175            }
1176
1177            let key = PropertyKey::new(*prop);
1178            candidates.retain(|&node_id| {
1179                self.node_properties
1180                    .get(node_id, &key)
1181                    .is_some_and(|v| v == *value)
1182            });
1183
1184            // Short-circuit if no candidates remain
1185            if candidates.is_empty() {
1186                return Vec::new();
1187            }
1188        }
1189
1190        candidates
1191    }
1192
1193    // === Property Index Operations ===
1194
1195    /// Creates an index on a node property for O(1) lookups by value.
1196    ///
1197    /// After creating an index, calls to [`Self::find_nodes_by_property`] will be
1198    /// O(1) instead of O(n) for this property. The index is automatically
1199    /// maintained when properties are set or removed.
1200    ///
1201    /// # Example
1202    ///
1203    /// ```
1204    /// use grafeo_core::graph::lpg::LpgStore;
1205    /// use grafeo_common::types::Value;
1206    ///
1207    /// let store = LpgStore::new();
1208    ///
1209    /// // Create nodes with an 'id' property
1210    /// let alice = store.create_node(&["Person"]);
1211    /// store.set_node_property(alice, "id", Value::from("alice_123"));
1212    ///
1213    /// // Create an index on the 'id' property
1214    /// store.create_property_index("id");
1215    ///
1216    /// // Now lookups by 'id' are O(1)
1217    /// let found = store.find_nodes_by_property("id", &Value::from("alice_123"));
1218    /// assert!(found.contains(&alice));
1219    /// ```
1220    pub fn create_property_index(&self, property: &str) {
1221        let key = PropertyKey::new(property);
1222
1223        let mut indexes = self.property_indexes.write();
1224        if indexes.contains_key(&key) {
1225            return; // Already indexed
1226        }
1227
1228        // Create the index and populate it with existing data
1229        let index: DashMap<HashableValue, FxHashSet<NodeId>> = DashMap::new();
1230
1231        // Scan all nodes to build the index
1232        for node_id in self.node_ids() {
1233            if let Some(value) = self.node_properties.get(node_id, &key) {
1234                let hv = HashableValue::new(value);
1235                index
1236                    .entry(hv)
1237                    .or_insert_with(FxHashSet::default)
1238                    .insert(node_id);
1239            }
1240        }
1241
1242        indexes.insert(key, index);
1243    }
1244
1245    /// Drops an index on a node property.
1246    ///
1247    /// Returns `true` if the index existed and was removed.
1248    pub fn drop_property_index(&self, property: &str) -> bool {
1249        let key = PropertyKey::new(property);
1250        self.property_indexes.write().remove(&key).is_some()
1251    }
1252
1253    /// Returns `true` if the property has an index.
1254    #[must_use]
1255    pub fn has_property_index(&self, property: &str) -> bool {
1256        let key = PropertyKey::new(property);
1257        self.property_indexes.read().contains_key(&key)
1258    }
1259
1260    /// Stores a vector index for a label+property pair.
1261    #[cfg(feature = "vector-index")]
1262    pub fn add_vector_index(&self, label: &str, property: &str, index: Arc<HnswIndex>) {
1263        let key = format!("{label}:{property}");
1264        self.vector_indexes.write().insert(key, index);
1265    }
1266
1267    /// Retrieves the vector index for a label+property pair.
1268    #[cfg(feature = "vector-index")]
1269    #[must_use]
1270    pub fn get_vector_index(&self, label: &str, property: &str) -> Option<Arc<HnswIndex>> {
1271        let key = format!("{label}:{property}");
1272        self.vector_indexes.read().get(&key).cloned()
1273    }
1274
1275    /// Finds all nodes that have a specific property value.
1276    ///
1277    /// If the property is indexed, this is O(1). Otherwise, it scans all nodes
1278    /// which is O(n). Use [`Self::create_property_index`] for frequently queried properties.
1279    ///
1280    /// # Example
1281    ///
1282    /// ```
1283    /// use grafeo_core::graph::lpg::LpgStore;
1284    /// use grafeo_common::types::Value;
1285    ///
1286    /// let store = LpgStore::new();
1287    /// store.create_property_index("city"); // Optional but makes lookups fast
1288    ///
1289    /// let alice = store.create_node(&["Person"]);
1290    /// let bob = store.create_node(&["Person"]);
1291    /// store.set_node_property(alice, "city", Value::from("NYC"));
1292    /// store.set_node_property(bob, "city", Value::from("NYC"));
1293    ///
1294    /// let nyc_people = store.find_nodes_by_property("city", &Value::from("NYC"));
1295    /// assert_eq!(nyc_people.len(), 2);
1296    /// ```
1297    #[must_use]
1298    pub fn find_nodes_by_property(&self, property: &str, value: &Value) -> Vec<NodeId> {
1299        let key = PropertyKey::new(property);
1300        let hv = HashableValue::new(value.clone());
1301
1302        // Try indexed lookup first
1303        let indexes = self.property_indexes.read();
1304        if let Some(index) = indexes.get(&key) {
1305            if let Some(nodes) = index.get(&hv) {
1306                return nodes.iter().copied().collect();
1307            }
1308            return Vec::new();
1309        }
1310        drop(indexes);
1311
1312        // Fall back to full scan
1313        self.node_ids()
1314            .into_iter()
1315            .filter(|&node_id| {
1316                self.node_properties
1317                    .get(node_id, &key)
1318                    .is_some_and(|v| v == *value)
1319            })
1320            .collect()
1321    }
1322
1323    /// Updates property indexes when a property is set.
1324    fn update_property_index_on_set(&self, node_id: NodeId, key: &PropertyKey, new_value: &Value) {
1325        let indexes = self.property_indexes.read();
1326        if let Some(index) = indexes.get(key) {
1327            // Get old value to remove from index
1328            if let Some(old_value) = self.node_properties.get(node_id, key) {
1329                let old_hv = HashableValue::new(old_value);
1330                if let Some(mut nodes) = index.get_mut(&old_hv) {
1331                    nodes.remove(&node_id);
1332                    if nodes.is_empty() {
1333                        drop(nodes);
1334                        index.remove(&old_hv);
1335                    }
1336                }
1337            }
1338
1339            // Add new value to index
1340            let new_hv = HashableValue::new(new_value.clone());
1341            index
1342                .entry(new_hv)
1343                .or_insert_with(FxHashSet::default)
1344                .insert(node_id);
1345        }
1346    }
1347
1348    /// Updates property indexes when a property is removed.
1349    fn update_property_index_on_remove(&self, node_id: NodeId, key: &PropertyKey) {
1350        let indexes = self.property_indexes.read();
1351        if let Some(index) = indexes.get(key) {
1352            // Get old value to remove from index
1353            if let Some(old_value) = self.node_properties.get(node_id, key) {
1354                let old_hv = HashableValue::new(old_value);
1355                if let Some(mut nodes) = index.get_mut(&old_hv) {
1356                    nodes.remove(&node_id);
1357                    if nodes.is_empty() {
1358                        drop(nodes);
1359                        index.remove(&old_hv);
1360                    }
1361                }
1362            }
1363        }
1364    }
1365
1366    /// Adds a label to a node.
1367    ///
1368    /// Returns true if the label was added, false if the node doesn't exist
1369    /// or already has the label.
1370    #[cfg(not(feature = "tiered-storage"))]
1371    pub fn add_label(&self, node_id: NodeId, label: &str) -> bool {
1372        let epoch = self.current_epoch();
1373
1374        // Check if node exists
1375        let nodes = self.nodes.read();
1376        if let Some(chain) = nodes.get(&node_id) {
1377            if chain.visible_at(epoch).map_or(true, |r| r.is_deleted()) {
1378                return false;
1379            }
1380        } else {
1381            return false;
1382        }
1383        drop(nodes);
1384
1385        // Get or create label ID
1386        let label_id = self.get_or_create_label_id(label);
1387
1388        // Add to node_labels map
1389        let mut node_labels = self.node_labels.write();
1390        let label_set = node_labels
1391            .entry(node_id)
1392            .or_insert_with(FxHashSet::default);
1393
1394        if label_set.contains(&label_id) {
1395            return false; // Already has this label
1396        }
1397
1398        label_set.insert(label_id);
1399        drop(node_labels);
1400
1401        // Add to label_index
1402        let mut index = self.label_index.write();
1403        if (label_id as usize) >= index.len() {
1404            index.resize(label_id as usize + 1, FxHashMap::default());
1405        }
1406        index[label_id as usize].insert(node_id, ());
1407
1408        // Update label count in node record
1409        if let Some(chain) = self.nodes.write().get_mut(&node_id) {
1410            if let Some(record) = chain.latest_mut() {
1411                let count = self.node_labels.read().get(&node_id).map_or(0, |s| s.len());
1412                record.set_label_count(count as u16);
1413            }
1414        }
1415
1416        true
1417    }
1418
1419    /// Adds a label to a node.
1420    /// (Tiered storage version)
1421    #[cfg(feature = "tiered-storage")]
1422    pub fn add_label(&self, node_id: NodeId, label: &str) -> bool {
1423        let epoch = self.current_epoch();
1424
1425        // Check if node exists
1426        let versions = self.node_versions.read();
1427        if let Some(index) = versions.get(&node_id) {
1428            if let Some(vref) = index.visible_at(epoch) {
1429                if let Some(record) = self.read_node_record(&vref) {
1430                    if record.is_deleted() {
1431                        return false;
1432                    }
1433                } else {
1434                    return false;
1435                }
1436            } else {
1437                return false;
1438            }
1439        } else {
1440            return false;
1441        }
1442        drop(versions);
1443
1444        // Get or create label ID
1445        let label_id = self.get_or_create_label_id(label);
1446
1447        // Add to node_labels map
1448        let mut node_labels = self.node_labels.write();
1449        let label_set = node_labels
1450            .entry(node_id)
1451            .or_insert_with(FxHashSet::default);
1452
1453        if label_set.contains(&label_id) {
1454            return false; // Already has this label
1455        }
1456
1457        label_set.insert(label_id);
1458        drop(node_labels);
1459
1460        // Add to label_index
1461        let mut index = self.label_index.write();
1462        if (label_id as usize) >= index.len() {
1463            index.resize(label_id as usize + 1, FxHashMap::default());
1464        }
1465        index[label_id as usize].insert(node_id, ());
1466
1467        // Note: label_count in record is not updated for tiered storage.
1468        // The record is immutable once allocated in the arena.
1469
1470        true
1471    }
1472
1473    /// Removes a label from a node.
1474    ///
1475    /// Returns true if the label was removed, false if the node doesn't exist
1476    /// or doesn't have the label.
1477    #[cfg(not(feature = "tiered-storage"))]
1478    pub fn remove_label(&self, node_id: NodeId, label: &str) -> bool {
1479        let epoch = self.current_epoch();
1480
1481        // Check if node exists
1482        let nodes = self.nodes.read();
1483        if let Some(chain) = nodes.get(&node_id) {
1484            if chain.visible_at(epoch).map_or(true, |r| r.is_deleted()) {
1485                return false;
1486            }
1487        } else {
1488            return false;
1489        }
1490        drop(nodes);
1491
1492        // Get label ID
1493        let label_id = {
1494            let label_ids = self.label_to_id.read();
1495            match label_ids.get(label) {
1496                Some(&id) => id,
1497                None => return false, // Label doesn't exist
1498            }
1499        };
1500
1501        // Remove from node_labels map
1502        let mut node_labels = self.node_labels.write();
1503        if let Some(label_set) = node_labels.get_mut(&node_id) {
1504            if !label_set.remove(&label_id) {
1505                return false; // Node doesn't have this label
1506            }
1507        } else {
1508            return false;
1509        }
1510        drop(node_labels);
1511
1512        // Remove from label_index
1513        let mut index = self.label_index.write();
1514        if (label_id as usize) < index.len() {
1515            index[label_id as usize].remove(&node_id);
1516        }
1517
1518        // Update label count in node record
1519        if let Some(chain) = self.nodes.write().get_mut(&node_id) {
1520            if let Some(record) = chain.latest_mut() {
1521                let count = self.node_labels.read().get(&node_id).map_or(0, |s| s.len());
1522                record.set_label_count(count as u16);
1523            }
1524        }
1525
1526        true
1527    }
1528
1529    /// Removes a label from a node.
1530    /// (Tiered storage version)
1531    #[cfg(feature = "tiered-storage")]
1532    pub fn remove_label(&self, node_id: NodeId, label: &str) -> bool {
1533        let epoch = self.current_epoch();
1534
1535        // Check if node exists
1536        let versions = self.node_versions.read();
1537        if let Some(index) = versions.get(&node_id) {
1538            if let Some(vref) = index.visible_at(epoch) {
1539                if let Some(record) = self.read_node_record(&vref) {
1540                    if record.is_deleted() {
1541                        return false;
1542                    }
1543                } else {
1544                    return false;
1545                }
1546            } else {
1547                return false;
1548            }
1549        } else {
1550            return false;
1551        }
1552        drop(versions);
1553
1554        // Get label ID
1555        let label_id = {
1556            let label_ids = self.label_to_id.read();
1557            match label_ids.get(label) {
1558                Some(&id) => id,
1559                None => return false, // Label doesn't exist
1560            }
1561        };
1562
1563        // Remove from node_labels map
1564        let mut node_labels = self.node_labels.write();
1565        if let Some(label_set) = node_labels.get_mut(&node_id) {
1566            if !label_set.remove(&label_id) {
1567                return false; // Node doesn't have this label
1568            }
1569        } else {
1570            return false;
1571        }
1572        drop(node_labels);
1573
1574        // Remove from label_index
1575        let mut index = self.label_index.write();
1576        if (label_id as usize) < index.len() {
1577            index[label_id as usize].remove(&node_id);
1578        }
1579
1580        // Note: label_count in record is not updated for tiered storage.
1581
1582        true
1583    }
1584
1585    /// Returns the number of nodes (non-deleted at current epoch).
1586    #[must_use]
1587    #[cfg(not(feature = "tiered-storage"))]
1588    pub fn node_count(&self) -> usize {
1589        let epoch = self.current_epoch();
1590        self.nodes
1591            .read()
1592            .values()
1593            .filter_map(|chain| chain.visible_at(epoch))
1594            .filter(|r| !r.is_deleted())
1595            .count()
1596    }
1597
1598    /// Returns the number of nodes (non-deleted at current epoch).
1599    /// (Tiered storage version)
1600    #[must_use]
1601    #[cfg(feature = "tiered-storage")]
1602    pub fn node_count(&self) -> usize {
1603        let epoch = self.current_epoch();
1604        let versions = self.node_versions.read();
1605        versions
1606            .iter()
1607            .filter(|(_, index)| {
1608                index.visible_at(epoch).map_or(false, |vref| {
1609                    self.read_node_record(&vref)
1610                        .map_or(false, |r| !r.is_deleted())
1611                })
1612            })
1613            .count()
1614    }
1615
1616    /// Returns all node IDs in the store.
1617    ///
1618    /// This returns a snapshot of current node IDs. The returned vector
1619    /// excludes deleted nodes. Results are sorted by NodeId for deterministic
1620    /// iteration order.
1621    #[must_use]
1622    #[cfg(not(feature = "tiered-storage"))]
1623    pub fn node_ids(&self) -> Vec<NodeId> {
1624        let epoch = self.current_epoch();
1625        let mut ids: Vec<NodeId> = self
1626            .nodes
1627            .read()
1628            .iter()
1629            .filter_map(|(id, chain)| {
1630                chain
1631                    .visible_at(epoch)
1632                    .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
1633            })
1634            .collect();
1635        ids.sort_unstable();
1636        ids
1637    }
1638
1639    /// Returns all node IDs in the store.
1640    /// (Tiered storage version)
1641    #[must_use]
1642    #[cfg(feature = "tiered-storage")]
1643    pub fn node_ids(&self) -> Vec<NodeId> {
1644        let epoch = self.current_epoch();
1645        let versions = self.node_versions.read();
1646        let mut ids: Vec<NodeId> = versions
1647            .iter()
1648            .filter_map(|(id, index)| {
1649                index.visible_at(epoch).and_then(|vref| {
1650                    self.read_node_record(&vref)
1651                        .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
1652                })
1653            })
1654            .collect();
1655        ids.sort_unstable();
1656        ids
1657    }
1658
1659    // === Edge Operations ===
1660
1661    /// Creates a new edge.
1662    pub fn create_edge(&self, src: NodeId, dst: NodeId, edge_type: &str) -> EdgeId {
1663        self.needs_stats_recompute.store(true, Ordering::Relaxed);
1664        self.create_edge_versioned(src, dst, edge_type, self.current_epoch(), TxId::SYSTEM)
1665    }
1666
1667    /// Creates a new edge within a transaction context.
1668    #[cfg(not(feature = "tiered-storage"))]
1669    pub fn create_edge_versioned(
1670        &self,
1671        src: NodeId,
1672        dst: NodeId,
1673        edge_type: &str,
1674        epoch: EpochId,
1675        tx_id: TxId,
1676    ) -> EdgeId {
1677        let id = EdgeId::new(self.next_edge_id.fetch_add(1, Ordering::Relaxed));
1678        let type_id = self.get_or_create_edge_type_id(edge_type);
1679
1680        let record = EdgeRecord::new(id, src, dst, type_id, epoch);
1681        let chain = VersionChain::with_initial(record, epoch, tx_id);
1682        self.edges.write().insert(id, chain);
1683
1684        // Update adjacency
1685        self.forward_adj.add_edge(src, dst, id);
1686        if let Some(ref backward) = self.backward_adj {
1687            backward.add_edge(dst, src, id);
1688        }
1689
1690        id
1691    }
1692
1693    /// Creates a new edge within a transaction context.
1694    /// (Tiered storage version)
1695    #[cfg(feature = "tiered-storage")]
1696    pub fn create_edge_versioned(
1697        &self,
1698        src: NodeId,
1699        dst: NodeId,
1700        edge_type: &str,
1701        epoch: EpochId,
1702        tx_id: TxId,
1703    ) -> EdgeId {
1704        let id = EdgeId::new(self.next_edge_id.fetch_add(1, Ordering::Relaxed));
1705        let type_id = self.get_or_create_edge_type_id(edge_type);
1706
1707        let record = EdgeRecord::new(id, src, dst, type_id, epoch);
1708
1709        // Allocate record in arena and get offset (create epoch if needed)
1710        let arena = self.arena_allocator.arena_or_create(epoch);
1711        let (offset, _stored) = arena.alloc_value_with_offset(record);
1712
1713        // Create HotVersionRef pointing to arena data
1714        let hot_ref = HotVersionRef::new(epoch, offset, tx_id);
1715
1716        // Create or update version index
1717        let mut versions = self.edge_versions.write();
1718        if let Some(index) = versions.get_mut(&id) {
1719            index.add_hot(hot_ref);
1720        } else {
1721            versions.insert(id, VersionIndex::with_initial(hot_ref));
1722        }
1723
1724        // Update adjacency
1725        self.forward_adj.add_edge(src, dst, id);
1726        if let Some(ref backward) = self.backward_adj {
1727            backward.add_edge(dst, src, id);
1728        }
1729
1730        id
1731    }
1732
1733    /// Creates a new edge with properties.
1734    pub fn create_edge_with_props(
1735        &self,
1736        src: NodeId,
1737        dst: NodeId,
1738        edge_type: &str,
1739        properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
1740    ) -> EdgeId {
1741        let id = self.create_edge(src, dst, edge_type);
1742
1743        for (key, value) in properties {
1744            self.edge_properties.set(id, key.into(), value.into());
1745        }
1746
1747        id
1748    }
1749
1750    /// Gets an edge by ID (latest visible version).
1751    #[must_use]
1752    pub fn get_edge(&self, id: EdgeId) -> Option<Edge> {
1753        self.get_edge_at_epoch(id, self.current_epoch())
1754    }
1755
1756    /// Gets an edge by ID at a specific epoch.
1757    #[must_use]
1758    #[cfg(not(feature = "tiered-storage"))]
1759    pub fn get_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> Option<Edge> {
1760        let edges = self.edges.read();
1761        let chain = edges.get(&id)?;
1762        let record = chain.visible_at(epoch)?;
1763
1764        if record.is_deleted() {
1765            return None;
1766        }
1767
1768        let edge_type = {
1769            let id_to_type = self.id_to_edge_type.read();
1770            id_to_type.get(record.type_id as usize)?.clone()
1771        };
1772
1773        let mut edge = Edge::new(id, record.src, record.dst, edge_type);
1774
1775        // Get properties
1776        edge.properties = self.edge_properties.get_all(id).into_iter().collect();
1777
1778        Some(edge)
1779    }
1780
1781    /// Gets an edge by ID at a specific epoch.
1782    /// (Tiered storage version)
1783    #[must_use]
1784    #[cfg(feature = "tiered-storage")]
1785    pub fn get_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> Option<Edge> {
1786        let versions = self.edge_versions.read();
1787        let index = versions.get(&id)?;
1788        let version_ref = index.visible_at(epoch)?;
1789
1790        let record = self.read_edge_record(&version_ref)?;
1791
1792        if record.is_deleted() {
1793            return None;
1794        }
1795
1796        let edge_type = {
1797            let id_to_type = self.id_to_edge_type.read();
1798            id_to_type.get(record.type_id as usize)?.clone()
1799        };
1800
1801        let mut edge = Edge::new(id, record.src, record.dst, edge_type);
1802
1803        // Get properties
1804        edge.properties = self.edge_properties.get_all(id).into_iter().collect();
1805
1806        Some(edge)
1807    }
1808
1809    /// Gets an edge visible to a specific transaction.
1810    #[must_use]
1811    #[cfg(not(feature = "tiered-storage"))]
1812    pub fn get_edge_versioned(&self, id: EdgeId, epoch: EpochId, tx_id: TxId) -> Option<Edge> {
1813        let edges = self.edges.read();
1814        let chain = edges.get(&id)?;
1815        let record = chain.visible_to(epoch, tx_id)?;
1816
1817        if record.is_deleted() {
1818            return None;
1819        }
1820
1821        let edge_type = {
1822            let id_to_type = self.id_to_edge_type.read();
1823            id_to_type.get(record.type_id as usize)?.clone()
1824        };
1825
1826        let mut edge = Edge::new(id, record.src, record.dst, edge_type);
1827
1828        // Get properties
1829        edge.properties = self.edge_properties.get_all(id).into_iter().collect();
1830
1831        Some(edge)
1832    }
1833
1834    /// Gets an edge visible to a specific transaction.
1835    /// (Tiered storage version)
1836    #[must_use]
1837    #[cfg(feature = "tiered-storage")]
1838    pub fn get_edge_versioned(&self, id: EdgeId, epoch: EpochId, tx_id: TxId) -> Option<Edge> {
1839        let versions = self.edge_versions.read();
1840        let index = versions.get(&id)?;
1841        let version_ref = index.visible_to(epoch, tx_id)?;
1842
1843        let record = self.read_edge_record(&version_ref)?;
1844
1845        if record.is_deleted() {
1846            return None;
1847        }
1848
1849        let edge_type = {
1850            let id_to_type = self.id_to_edge_type.read();
1851            id_to_type.get(record.type_id as usize)?.clone()
1852        };
1853
1854        let mut edge = Edge::new(id, record.src, record.dst, edge_type);
1855
1856        // Get properties
1857        edge.properties = self.edge_properties.get_all(id).into_iter().collect();
1858
1859        Some(edge)
1860    }
1861
1862    /// Reads an EdgeRecord from arena using a VersionRef.
1863    #[cfg(feature = "tiered-storage")]
1864    #[allow(unsafe_code)]
1865    fn read_edge_record(&self, version_ref: &VersionRef) -> Option<EdgeRecord> {
1866        match version_ref {
1867            VersionRef::Hot(hot_ref) => {
1868                let arena = self.arena_allocator.arena(hot_ref.epoch);
1869                // SAFETY: The offset was returned by alloc_value_with_offset for an EdgeRecord
1870                let record: &EdgeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
1871                Some(*record)
1872            }
1873            VersionRef::Cold(cold_ref) => {
1874                // Read from compressed epoch store
1875                self.epoch_store
1876                    .get_edge(cold_ref.epoch, cold_ref.block_offset, cold_ref.length)
1877            }
1878        }
1879    }
1880
1881    /// Deletes an edge (using latest epoch).
1882    pub fn delete_edge(&self, id: EdgeId) -> bool {
1883        self.needs_stats_recompute.store(true, Ordering::Relaxed);
1884        self.delete_edge_at_epoch(id, self.current_epoch())
1885    }
1886
1887    /// Deletes an edge at a specific epoch.
1888    #[cfg(not(feature = "tiered-storage"))]
1889    pub fn delete_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> bool {
1890        let mut edges = self.edges.write();
1891        if let Some(chain) = edges.get_mut(&id) {
1892            // Get the visible record to check if deleted and get src/dst
1893            let (src, dst) = {
1894                match chain.visible_at(epoch) {
1895                    Some(record) => {
1896                        if record.is_deleted() {
1897                            return false;
1898                        }
1899                        (record.src, record.dst)
1900                    }
1901                    None => return false, // Not visible at this epoch (already deleted)
1902                }
1903            };
1904
1905            // Mark the version chain as deleted
1906            chain.mark_deleted(epoch);
1907
1908            drop(edges); // Release lock
1909
1910            // Mark as deleted in adjacency (soft delete)
1911            self.forward_adj.mark_deleted(src, id);
1912            if let Some(ref backward) = self.backward_adj {
1913                backward.mark_deleted(dst, id);
1914            }
1915
1916            // Remove properties
1917            self.edge_properties.remove_all(id);
1918
1919            true
1920        } else {
1921            false
1922        }
1923    }
1924
1925    /// Deletes an edge at a specific epoch.
1926    /// (Tiered storage version)
1927    #[cfg(feature = "tiered-storage")]
1928    pub fn delete_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> bool {
1929        let mut versions = self.edge_versions.write();
1930        if let Some(index) = versions.get_mut(&id) {
1931            // Get the visible record to check if deleted and get src/dst
1932            let (src, dst) = {
1933                match index.visible_at(epoch) {
1934                    Some(version_ref) => {
1935                        if let Some(record) = self.read_edge_record(&version_ref) {
1936                            if record.is_deleted() {
1937                                return false;
1938                            }
1939                            (record.src, record.dst)
1940                        } else {
1941                            return false;
1942                        }
1943                    }
1944                    None => return false,
1945                }
1946            };
1947
1948            // Mark as deleted in version index
1949            index.mark_deleted(epoch);
1950
1951            drop(versions); // Release lock
1952
1953            // Mark as deleted in adjacency (soft delete)
1954            self.forward_adj.mark_deleted(src, id);
1955            if let Some(ref backward) = self.backward_adj {
1956                backward.mark_deleted(dst, id);
1957            }
1958
1959            // Remove properties
1960            self.edge_properties.remove_all(id);
1961
1962            true
1963        } else {
1964            false
1965        }
1966    }
1967
1968    /// Returns the number of edges (non-deleted at current epoch).
1969    #[must_use]
1970    #[cfg(not(feature = "tiered-storage"))]
1971    pub fn edge_count(&self) -> usize {
1972        let epoch = self.current_epoch();
1973        self.edges
1974            .read()
1975            .values()
1976            .filter_map(|chain| chain.visible_at(epoch))
1977            .filter(|r| !r.is_deleted())
1978            .count()
1979    }
1980
1981    /// Returns the number of edges (non-deleted at current epoch).
1982    /// (Tiered storage version)
1983    #[must_use]
1984    #[cfg(feature = "tiered-storage")]
1985    pub fn edge_count(&self) -> usize {
1986        let epoch = self.current_epoch();
1987        let versions = self.edge_versions.read();
1988        versions
1989            .iter()
1990            .filter(|(_, index)| {
1991                index.visible_at(epoch).map_or(false, |vref| {
1992                    self.read_edge_record(&vref)
1993                        .map_or(false, |r| !r.is_deleted())
1994                })
1995            })
1996            .count()
1997    }
1998
1999    /// Discards all uncommitted versions created by a transaction.
2000    ///
2001    /// This is called during transaction rollback to clean up uncommitted changes.
2002    /// The method removes version chain entries created by the specified transaction.
2003    #[cfg(not(feature = "tiered-storage"))]
2004    pub fn discard_uncommitted_versions(&self, tx_id: TxId) {
2005        // Remove uncommitted node versions
2006        {
2007            let mut nodes = self.nodes.write();
2008            for chain in nodes.values_mut() {
2009                chain.remove_versions_by(tx_id);
2010            }
2011            // Remove completely empty chains (no versions left)
2012            nodes.retain(|_, chain| !chain.is_empty());
2013        }
2014
2015        // Remove uncommitted edge versions
2016        {
2017            let mut edges = self.edges.write();
2018            for chain in edges.values_mut() {
2019                chain.remove_versions_by(tx_id);
2020            }
2021            // Remove completely empty chains (no versions left)
2022            edges.retain(|_, chain| !chain.is_empty());
2023        }
2024    }
2025
2026    /// Discards all uncommitted versions created by a transaction.
2027    /// (Tiered storage version)
2028    #[cfg(feature = "tiered-storage")]
2029    pub fn discard_uncommitted_versions(&self, tx_id: TxId) {
2030        // Remove uncommitted node versions
2031        {
2032            let mut versions = self.node_versions.write();
2033            for index in versions.values_mut() {
2034                index.remove_versions_by(tx_id);
2035            }
2036            // Remove completely empty indexes (no versions left)
2037            versions.retain(|_, index| !index.is_empty());
2038        }
2039
2040        // Remove uncommitted edge versions
2041        {
2042            let mut versions = self.edge_versions.write();
2043            for index in versions.values_mut() {
2044                index.remove_versions_by(tx_id);
2045            }
2046            // Remove completely empty indexes (no versions left)
2047            versions.retain(|_, index| !index.is_empty());
2048        }
2049    }
2050
2051    /// Freezes an epoch from hot (arena) storage to cold (compressed) storage.
2052    ///
2053    /// This is called by the transaction manager when an epoch becomes eligible
2054    /// for freezing (no active transactions can see it). The freeze process:
2055    ///
2056    /// 1. Collects all hot version refs for the epoch
2057    /// 2. Reads the corresponding records from arena
2058    /// 3. Compresses them into a `CompressedEpochBlock`
2059    /// 4. Updates `VersionIndex` entries to point to cold storage
2060    /// 5. The arena can be deallocated after all epochs in it are frozen
2061    ///
2062    /// # Arguments
2063    ///
2064    /// * `epoch` - The epoch to freeze
2065    ///
2066    /// # Returns
2067    ///
2068    /// The number of records frozen (nodes + edges).
2069    #[cfg(feature = "tiered-storage")]
2070    #[allow(unsafe_code)]
2071    pub fn freeze_epoch(&self, epoch: EpochId) -> usize {
2072        // Collect node records to freeze
2073        let mut node_records: Vec<(u64, NodeRecord)> = Vec::new();
2074        let mut node_hot_refs: Vec<(NodeId, HotVersionRef)> = Vec::new();
2075
2076        {
2077            let versions = self.node_versions.read();
2078            for (node_id, index) in versions.iter() {
2079                for hot_ref in index.hot_refs_for_epoch(epoch) {
2080                    let arena = self.arena_allocator.arena(hot_ref.epoch);
2081                    // SAFETY: The offset was returned by alloc_value_with_offset for a NodeRecord
2082                    let record: &NodeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
2083                    node_records.push((node_id.as_u64(), *record));
2084                    node_hot_refs.push((*node_id, *hot_ref));
2085                }
2086            }
2087        }
2088
2089        // Collect edge records to freeze
2090        let mut edge_records: Vec<(u64, EdgeRecord)> = Vec::new();
2091        let mut edge_hot_refs: Vec<(EdgeId, HotVersionRef)> = Vec::new();
2092
2093        {
2094            let versions = self.edge_versions.read();
2095            for (edge_id, index) in versions.iter() {
2096                for hot_ref in index.hot_refs_for_epoch(epoch) {
2097                    let arena = self.arena_allocator.arena(hot_ref.epoch);
2098                    // SAFETY: The offset was returned by alloc_value_with_offset for an EdgeRecord
2099                    let record: &EdgeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
2100                    edge_records.push((edge_id.as_u64(), *record));
2101                    edge_hot_refs.push((*edge_id, *hot_ref));
2102                }
2103            }
2104        }
2105
2106        let total_frozen = node_records.len() + edge_records.len();
2107
2108        if total_frozen == 0 {
2109            return 0;
2110        }
2111
2112        // Freeze to compressed storage
2113        let (node_entries, edge_entries) =
2114            self.epoch_store
2115                .freeze_epoch(epoch, node_records, edge_records);
2116
2117        // Build lookup maps for index entries
2118        let node_entry_map: FxHashMap<u64, _> = node_entries
2119            .iter()
2120            .map(|e| (e.entity_id, (e.offset, e.length)))
2121            .collect();
2122        let edge_entry_map: FxHashMap<u64, _> = edge_entries
2123            .iter()
2124            .map(|e| (e.entity_id, (e.offset, e.length)))
2125            .collect();
2126
2127        // Update version indexes to use cold refs
2128        {
2129            let mut versions = self.node_versions.write();
2130            for (node_id, hot_ref) in &node_hot_refs {
2131                if let Some(index) = versions.get_mut(node_id)
2132                    && let Some(&(offset, length)) = node_entry_map.get(&node_id.as_u64())
2133                {
2134                    let cold_ref = ColdVersionRef {
2135                        epoch,
2136                        block_offset: offset,
2137                        length,
2138                        created_by: hot_ref.created_by,
2139                        deleted_epoch: hot_ref.deleted_epoch,
2140                    };
2141                    index.freeze_epoch(epoch, std::iter::once(cold_ref));
2142                }
2143            }
2144        }
2145
2146        {
2147            let mut versions = self.edge_versions.write();
2148            for (edge_id, hot_ref) in &edge_hot_refs {
2149                if let Some(index) = versions.get_mut(edge_id)
2150                    && let Some(&(offset, length)) = edge_entry_map.get(&edge_id.as_u64())
2151                {
2152                    let cold_ref = ColdVersionRef {
2153                        epoch,
2154                        block_offset: offset,
2155                        length,
2156                        created_by: hot_ref.created_by,
2157                        deleted_epoch: hot_ref.deleted_epoch,
2158                    };
2159                    index.freeze_epoch(epoch, std::iter::once(cold_ref));
2160                }
2161            }
2162        }
2163
2164        total_frozen
2165    }
2166
2167    /// Returns the epoch store for cold storage statistics.
2168    #[cfg(feature = "tiered-storage")]
2169    #[must_use]
2170    pub fn epoch_store(&self) -> &EpochStore {
2171        &self.epoch_store
2172    }
2173
2174    /// Returns the number of distinct labels in the store.
2175    #[must_use]
2176    pub fn label_count(&self) -> usize {
2177        self.id_to_label.read().len()
2178    }
2179
2180    /// Returns the number of distinct property keys in the store.
2181    ///
2182    /// This counts unique property keys across both nodes and edges.
2183    #[must_use]
2184    pub fn property_key_count(&self) -> usize {
2185        let node_keys = self.node_properties.column_count();
2186        let edge_keys = self.edge_properties.column_count();
2187        // Note: This may count some keys twice if the same key is used
2188        // for both nodes and edges. A more precise count would require
2189        // tracking unique keys across both storages.
2190        node_keys + edge_keys
2191    }
2192
2193    /// Returns the number of distinct edge types in the store.
2194    #[must_use]
2195    pub fn edge_type_count(&self) -> usize {
2196        self.id_to_edge_type.read().len()
2197    }
2198
2199    // === Traversal ===
2200
2201    /// Iterates over neighbors of a node in the specified direction.
2202    ///
2203    /// This is the fast path for graph traversal - goes straight to the
2204    /// adjacency index without loading full node data.
2205    pub fn neighbors(
2206        &self,
2207        node: NodeId,
2208        direction: Direction,
2209    ) -> impl Iterator<Item = NodeId> + '_ {
2210        let forward: Box<dyn Iterator<Item = NodeId>> = match direction {
2211            Direction::Outgoing | Direction::Both => {
2212                Box::new(self.forward_adj.neighbors(node).into_iter())
2213            }
2214            Direction::Incoming => Box::new(std::iter::empty()),
2215        };
2216
2217        let backward: Box<dyn Iterator<Item = NodeId>> = match direction {
2218            Direction::Incoming | Direction::Both => {
2219                if let Some(ref adj) = self.backward_adj {
2220                    Box::new(adj.neighbors(node).into_iter())
2221                } else {
2222                    Box::new(std::iter::empty())
2223                }
2224            }
2225            Direction::Outgoing => Box::new(std::iter::empty()),
2226        };
2227
2228        forward.chain(backward)
2229    }
2230
2231    /// Returns edges from a node with their targets.
2232    ///
2233    /// Returns an iterator of (target_node, edge_id) pairs.
2234    pub fn edges_from(
2235        &self,
2236        node: NodeId,
2237        direction: Direction,
2238    ) -> impl Iterator<Item = (NodeId, EdgeId)> + '_ {
2239        let forward: Box<dyn Iterator<Item = (NodeId, EdgeId)>> = match direction {
2240            Direction::Outgoing | Direction::Both => {
2241                Box::new(self.forward_adj.edges_from(node).into_iter())
2242            }
2243            Direction::Incoming => Box::new(std::iter::empty()),
2244        };
2245
2246        let backward: Box<dyn Iterator<Item = (NodeId, EdgeId)>> = match direction {
2247            Direction::Incoming | Direction::Both => {
2248                if let Some(ref adj) = self.backward_adj {
2249                    Box::new(adj.edges_from(node).into_iter())
2250                } else {
2251                    Box::new(std::iter::empty())
2252                }
2253            }
2254            Direction::Outgoing => Box::new(std::iter::empty()),
2255        };
2256
2257        forward.chain(backward)
2258    }
2259
2260    /// Returns edges to a node (where the node is the destination).
2261    ///
2262    /// Returns (source_node, edge_id) pairs for all edges pointing TO this node.
2263    /// Uses the backward adjacency index for O(degree) lookup.
2264    ///
2265    /// # Example
2266    ///
2267    /// ```ignore
2268    /// // For edges: A->B, C->B
2269    /// let incoming = store.edges_to(B);
2270    /// // Returns: [(A, edge1), (C, edge2)]
2271    /// ```
2272    pub fn edges_to(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
2273        if let Some(ref backward) = self.backward_adj {
2274            backward.edges_from(node)
2275        } else {
2276            // Fallback: scan all edges (slow but correct)
2277            self.all_edges()
2278                .filter_map(|edge| {
2279                    if edge.dst == node {
2280                        Some((edge.src, edge.id))
2281                    } else {
2282                        None
2283                    }
2284                })
2285                .collect()
2286        }
2287    }
2288
2289    /// Returns the out-degree of a node (number of outgoing edges).
2290    ///
2291    /// Uses the forward adjacency index for O(1) lookup.
2292    #[must_use]
2293    pub fn out_degree(&self, node: NodeId) -> usize {
2294        self.forward_adj.out_degree(node)
2295    }
2296
2297    /// Returns the in-degree of a node (number of incoming edges).
2298    ///
2299    /// Uses the backward adjacency index for O(1) lookup if available,
2300    /// otherwise falls back to scanning edges.
2301    #[must_use]
2302    pub fn in_degree(&self, node: NodeId) -> usize {
2303        if let Some(ref backward) = self.backward_adj {
2304            backward.in_degree(node)
2305        } else {
2306            // Fallback: count edges (slow)
2307            self.all_edges().filter(|edge| edge.dst == node).count()
2308        }
2309    }
2310
2311    /// Gets the type of an edge by ID.
2312    #[must_use]
2313    #[cfg(not(feature = "tiered-storage"))]
2314    pub fn edge_type(&self, id: EdgeId) -> Option<ArcStr> {
2315        let edges = self.edges.read();
2316        let chain = edges.get(&id)?;
2317        let epoch = self.current_epoch();
2318        let record = chain.visible_at(epoch)?;
2319        let id_to_type = self.id_to_edge_type.read();
2320        id_to_type.get(record.type_id as usize).cloned()
2321    }
2322
2323    /// Gets the type of an edge by ID.
2324    /// (Tiered storage version)
2325    #[must_use]
2326    #[cfg(feature = "tiered-storage")]
2327    pub fn edge_type(&self, id: EdgeId) -> Option<ArcStr> {
2328        let versions = self.edge_versions.read();
2329        let index = versions.get(&id)?;
2330        let epoch = self.current_epoch();
2331        let vref = index.visible_at(epoch)?;
2332        let record = self.read_edge_record(&vref)?;
2333        let id_to_type = self.id_to_edge_type.read();
2334        id_to_type.get(record.type_id as usize).cloned()
2335    }
2336
2337    /// Returns all nodes with a specific label.
2338    ///
2339    /// Uses the label index for O(1) lookup per label. Returns a snapshot -
2340    /// concurrent modifications won't affect the returned vector. Results are
2341    /// sorted by NodeId for deterministic iteration order.
2342    pub fn nodes_by_label(&self, label: &str) -> Vec<NodeId> {
2343        let label_to_id = self.label_to_id.read();
2344        if let Some(&label_id) = label_to_id.get(label) {
2345            let index = self.label_index.read();
2346            if let Some(set) = index.get(label_id as usize) {
2347                let mut ids: Vec<NodeId> = set.keys().copied().collect();
2348                ids.sort_unstable();
2349                return ids;
2350            }
2351        }
2352        Vec::new()
2353    }
2354
2355    // === Admin API: Iteration ===
2356
2357    /// Returns an iterator over all nodes in the database.
2358    ///
2359    /// This creates a snapshot of all visible nodes at the current epoch.
2360    /// Useful for dump/export operations.
2361    #[cfg(not(feature = "tiered-storage"))]
2362    pub fn all_nodes(&self) -> impl Iterator<Item = Node> + '_ {
2363        let epoch = self.current_epoch();
2364        let node_ids: Vec<NodeId> = self
2365            .nodes
2366            .read()
2367            .iter()
2368            .filter_map(|(id, chain)| {
2369                chain
2370                    .visible_at(epoch)
2371                    .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
2372            })
2373            .collect();
2374
2375        node_ids.into_iter().filter_map(move |id| self.get_node(id))
2376    }
2377
2378    /// Returns an iterator over all nodes in the database.
2379    /// (Tiered storage version)
2380    #[cfg(feature = "tiered-storage")]
2381    pub fn all_nodes(&self) -> impl Iterator<Item = Node> + '_ {
2382        let node_ids = self.node_ids();
2383        node_ids.into_iter().filter_map(move |id| self.get_node(id))
2384    }
2385
2386    /// Returns an iterator over all edges in the database.
2387    ///
2388    /// This creates a snapshot of all visible edges at the current epoch.
2389    /// Useful for dump/export operations.
2390    #[cfg(not(feature = "tiered-storage"))]
2391    pub fn all_edges(&self) -> impl Iterator<Item = Edge> + '_ {
2392        let epoch = self.current_epoch();
2393        let edge_ids: Vec<EdgeId> = self
2394            .edges
2395            .read()
2396            .iter()
2397            .filter_map(|(id, chain)| {
2398                chain
2399                    .visible_at(epoch)
2400                    .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
2401            })
2402            .collect();
2403
2404        edge_ids.into_iter().filter_map(move |id| self.get_edge(id))
2405    }
2406
2407    /// Returns an iterator over all edges in the database.
2408    /// (Tiered storage version)
2409    #[cfg(feature = "tiered-storage")]
2410    pub fn all_edges(&self) -> impl Iterator<Item = Edge> + '_ {
2411        let epoch = self.current_epoch();
2412        let versions = self.edge_versions.read();
2413        let edge_ids: Vec<EdgeId> = versions
2414            .iter()
2415            .filter_map(|(id, index)| {
2416                index.visible_at(epoch).and_then(|vref| {
2417                    self.read_edge_record(&vref)
2418                        .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
2419                })
2420            })
2421            .collect();
2422
2423        edge_ids.into_iter().filter_map(move |id| self.get_edge(id))
2424    }
2425
2426    /// Returns all label names in the database.
2427    pub fn all_labels(&self) -> Vec<String> {
2428        self.id_to_label
2429            .read()
2430            .iter()
2431            .map(|s| s.to_string())
2432            .collect()
2433    }
2434
2435    /// Returns all edge type names in the database.
2436    pub fn all_edge_types(&self) -> Vec<String> {
2437        self.id_to_edge_type
2438            .read()
2439            .iter()
2440            .map(|s| s.to_string())
2441            .collect()
2442    }
2443
2444    /// Returns all property keys used in the database.
2445    pub fn all_property_keys(&self) -> Vec<String> {
2446        let mut keys = std::collections::HashSet::new();
2447        for key in self.node_properties.keys() {
2448            keys.insert(key.to_string());
2449        }
2450        for key in self.edge_properties.keys() {
2451            keys.insert(key.to_string());
2452        }
2453        keys.into_iter().collect()
2454    }
2455
2456    /// Returns an iterator over nodes with a specific label.
2457    pub fn nodes_with_label<'a>(&'a self, label: &str) -> impl Iterator<Item = Node> + 'a {
2458        let node_ids = self.nodes_by_label(label);
2459        node_ids.into_iter().filter_map(move |id| self.get_node(id))
2460    }
2461
2462    /// Returns an iterator over edges with a specific type.
2463    #[cfg(not(feature = "tiered-storage"))]
2464    pub fn edges_with_type<'a>(&'a self, edge_type: &str) -> impl Iterator<Item = Edge> + 'a {
2465        let epoch = self.current_epoch();
2466        let type_to_id = self.edge_type_to_id.read();
2467
2468        if let Some(&type_id) = type_to_id.get(edge_type) {
2469            let edge_ids: Vec<EdgeId> = self
2470                .edges
2471                .read()
2472                .iter()
2473                .filter_map(|(id, chain)| {
2474                    chain.visible_at(epoch).and_then(|r| {
2475                        if !r.is_deleted() && r.type_id == type_id {
2476                            Some(*id)
2477                        } else {
2478                            None
2479                        }
2480                    })
2481                })
2482                .collect();
2483
2484            // Return a boxed iterator for the found edges
2485            Box::new(edge_ids.into_iter().filter_map(move |id| self.get_edge(id)))
2486                as Box<dyn Iterator<Item = Edge> + 'a>
2487        } else {
2488            // Return empty iterator
2489            Box::new(std::iter::empty()) as Box<dyn Iterator<Item = Edge> + 'a>
2490        }
2491    }
2492
2493    /// Returns an iterator over edges with a specific type.
2494    /// (Tiered storage version)
2495    #[cfg(feature = "tiered-storage")]
2496    pub fn edges_with_type<'a>(&'a self, edge_type: &str) -> impl Iterator<Item = Edge> + 'a {
2497        let epoch = self.current_epoch();
2498        let type_to_id = self.edge_type_to_id.read();
2499
2500        if let Some(&type_id) = type_to_id.get(edge_type) {
2501            let versions = self.edge_versions.read();
2502            let edge_ids: Vec<EdgeId> = versions
2503                .iter()
2504                .filter_map(|(id, index)| {
2505                    index.visible_at(epoch).and_then(|vref| {
2506                        self.read_edge_record(&vref).and_then(|r| {
2507                            if !r.is_deleted() && r.type_id == type_id {
2508                                Some(*id)
2509                            } else {
2510                                None
2511                            }
2512                        })
2513                    })
2514                })
2515                .collect();
2516
2517            Box::new(edge_ids.into_iter().filter_map(move |id| self.get_edge(id)))
2518                as Box<dyn Iterator<Item = Edge> + 'a>
2519        } else {
2520            Box::new(std::iter::empty()) as Box<dyn Iterator<Item = Edge> + 'a>
2521        }
2522    }
2523
2524    // === Zone Map Support ===
2525
2526    /// Checks if a node property predicate might match any nodes.
2527    ///
2528    /// Uses zone maps for early filtering. Returns `true` if there might be
2529    /// matching nodes, `false` if there definitely aren't.
2530    #[must_use]
2531    pub fn node_property_might_match(
2532        &self,
2533        property: &PropertyKey,
2534        op: CompareOp,
2535        value: &Value,
2536    ) -> bool {
2537        self.node_properties.might_match(property, op, value)
2538    }
2539
2540    /// Checks if an edge property predicate might match any edges.
2541    #[must_use]
2542    pub fn edge_property_might_match(
2543        &self,
2544        property: &PropertyKey,
2545        op: CompareOp,
2546        value: &Value,
2547    ) -> bool {
2548        self.edge_properties.might_match(property, op, value)
2549    }
2550
2551    /// Gets the zone map for a node property.
2552    #[must_use]
2553    pub fn node_property_zone_map(&self, property: &PropertyKey) -> Option<ZoneMapEntry> {
2554        self.node_properties.zone_map(property)
2555    }
2556
2557    /// Gets the zone map for an edge property.
2558    #[must_use]
2559    pub fn edge_property_zone_map(&self, property: &PropertyKey) -> Option<ZoneMapEntry> {
2560        self.edge_properties.zone_map(property)
2561    }
2562
2563    /// Rebuilds zone maps for all properties.
2564    pub fn rebuild_zone_maps(&self) {
2565        self.node_properties.rebuild_zone_maps();
2566        self.edge_properties.rebuild_zone_maps();
2567    }
2568
2569    // === Statistics ===
2570
2571    /// Returns the current statistics.
2572    #[must_use]
2573    pub fn statistics(&self) -> Statistics {
2574        self.statistics.read().clone()
2575    }
2576
2577    /// Recomputes statistics if they are stale (i.e., after mutations).
2578    ///
2579    /// Call this before reading statistics for query optimization.
2580    /// Avoids redundant recomputation if no mutations occurred.
2581    pub fn ensure_statistics_fresh(&self) {
2582        if self.needs_stats_recompute.swap(false, Ordering::Relaxed) {
2583            self.compute_statistics();
2584        }
2585    }
2586
2587    /// Recomputes statistics from current data.
2588    ///
2589    /// Scans all labels and edge types to build cardinality estimates for the
2590    /// query optimizer. Call this periodically or after bulk data loads.
2591    #[cfg(not(feature = "tiered-storage"))]
2592    pub fn compute_statistics(&self) {
2593        let mut stats = Statistics::new();
2594
2595        // Compute total counts
2596        stats.total_nodes = self.node_count() as u64;
2597        stats.total_edges = self.edge_count() as u64;
2598
2599        // Compute per-label statistics
2600        let id_to_label = self.id_to_label.read();
2601        let label_index = self.label_index.read();
2602
2603        for (label_id, label_name) in id_to_label.iter().enumerate() {
2604            let node_count = label_index
2605                .get(label_id)
2606                .map(|set| set.len() as u64)
2607                .unwrap_or(0);
2608
2609            if node_count > 0 {
2610                // Estimate average degree
2611                let avg_out_degree = if stats.total_nodes > 0 {
2612                    stats.total_edges as f64 / stats.total_nodes as f64
2613                } else {
2614                    0.0
2615                };
2616
2617                let label_stats =
2618                    LabelStatistics::new(node_count).with_degrees(avg_out_degree, avg_out_degree);
2619
2620                stats.update_label(label_name.as_ref(), label_stats);
2621            }
2622        }
2623
2624        // Compute per-edge-type statistics
2625        let id_to_edge_type = self.id_to_edge_type.read();
2626        let edges = self.edges.read();
2627        let epoch = self.current_epoch();
2628
2629        let mut edge_type_counts: FxHashMap<u32, u64> = FxHashMap::default();
2630        for chain in edges.values() {
2631            if let Some(record) = chain.visible_at(epoch) {
2632                if !record.is_deleted() {
2633                    *edge_type_counts.entry(record.type_id).or_default() += 1;
2634                }
2635            }
2636        }
2637
2638        for (type_id, count) in edge_type_counts {
2639            if let Some(type_name) = id_to_edge_type.get(type_id as usize) {
2640                let avg_degree = if stats.total_nodes > 0 {
2641                    count as f64 / stats.total_nodes as f64
2642                } else {
2643                    0.0
2644                };
2645
2646                let edge_stats = EdgeTypeStatistics::new(count, avg_degree, avg_degree);
2647                stats.update_edge_type(type_name.as_ref(), edge_stats);
2648            }
2649        }
2650
2651        *self.statistics.write() = stats;
2652    }
2653
2654    /// Recomputes statistics from current data.
2655    /// (Tiered storage version)
2656    #[cfg(feature = "tiered-storage")]
2657    pub fn compute_statistics(&self) {
2658        let mut stats = Statistics::new();
2659
2660        // Compute total counts
2661        stats.total_nodes = self.node_count() as u64;
2662        stats.total_edges = self.edge_count() as u64;
2663
2664        // Compute per-label statistics
2665        let id_to_label = self.id_to_label.read();
2666        let label_index = self.label_index.read();
2667
2668        for (label_id, label_name) in id_to_label.iter().enumerate() {
2669            let node_count = label_index
2670                .get(label_id)
2671                .map(|set| set.len() as u64)
2672                .unwrap_or(0);
2673
2674            if node_count > 0 {
2675                let avg_out_degree = if stats.total_nodes > 0 {
2676                    stats.total_edges as f64 / stats.total_nodes as f64
2677                } else {
2678                    0.0
2679                };
2680
2681                let label_stats =
2682                    LabelStatistics::new(node_count).with_degrees(avg_out_degree, avg_out_degree);
2683
2684                stats.update_label(label_name.as_ref(), label_stats);
2685            }
2686        }
2687
2688        // Compute per-edge-type statistics
2689        let id_to_edge_type = self.id_to_edge_type.read();
2690        let versions = self.edge_versions.read();
2691        let epoch = self.current_epoch();
2692
2693        let mut edge_type_counts: FxHashMap<u32, u64> = FxHashMap::default();
2694        for index in versions.values() {
2695            if let Some(vref) = index.visible_at(epoch)
2696                && let Some(record) = self.read_edge_record(&vref)
2697                && !record.is_deleted()
2698            {
2699                *edge_type_counts.entry(record.type_id).or_default() += 1;
2700            }
2701        }
2702
2703        for (type_id, count) in edge_type_counts {
2704            if let Some(type_name) = id_to_edge_type.get(type_id as usize) {
2705                let avg_degree = if stats.total_nodes > 0 {
2706                    count as f64 / stats.total_nodes as f64
2707                } else {
2708                    0.0
2709                };
2710
2711                let edge_stats = EdgeTypeStatistics::new(count, avg_degree, avg_degree);
2712                stats.update_edge_type(type_name.as_ref(), edge_stats);
2713            }
2714        }
2715
2716        *self.statistics.write() = stats;
2717    }
2718
2719    /// Estimates cardinality for a label scan.
2720    #[must_use]
2721    pub fn estimate_label_cardinality(&self, label: &str) -> f64 {
2722        self.statistics.read().estimate_label_cardinality(label)
2723    }
2724
2725    /// Estimates average degree for an edge type.
2726    #[must_use]
2727    pub fn estimate_avg_degree(&self, edge_type: &str, outgoing: bool) -> f64 {
2728        self.statistics
2729            .read()
2730            .estimate_avg_degree(edge_type, outgoing)
2731    }
2732
2733    // === Internal Helpers ===
2734
2735    fn get_or_create_label_id(&self, label: &str) -> u32 {
2736        {
2737            let label_to_id = self.label_to_id.read();
2738            if let Some(&id) = label_to_id.get(label) {
2739                return id;
2740            }
2741        }
2742
2743        let mut label_to_id = self.label_to_id.write();
2744        let mut id_to_label = self.id_to_label.write();
2745
2746        // Double-check after acquiring write lock
2747        if let Some(&id) = label_to_id.get(label) {
2748            return id;
2749        }
2750
2751        let id = id_to_label.len() as u32;
2752
2753        let label: ArcStr = label.into();
2754        label_to_id.insert(label.clone(), id);
2755        id_to_label.push(label);
2756
2757        id
2758    }
2759
2760    fn get_or_create_edge_type_id(&self, edge_type: &str) -> u32 {
2761        {
2762            let type_to_id = self.edge_type_to_id.read();
2763            if let Some(&id) = type_to_id.get(edge_type) {
2764                return id;
2765            }
2766        }
2767
2768        let mut type_to_id = self.edge_type_to_id.write();
2769        let mut id_to_type = self.id_to_edge_type.write();
2770
2771        // Double-check
2772        if let Some(&id) = type_to_id.get(edge_type) {
2773            return id;
2774        }
2775
2776        let id = id_to_type.len() as u32;
2777        let edge_type: ArcStr = edge_type.into();
2778        type_to_id.insert(edge_type.clone(), id);
2779        id_to_type.push(edge_type);
2780
2781        id
2782    }
2783
2784    // === Recovery Support ===
2785
2786    /// Creates a node with a specific ID during recovery.
2787    ///
2788    /// This is used for WAL recovery to restore nodes with their original IDs.
2789    /// The caller must ensure IDs don't conflict with existing nodes.
2790    #[cfg(not(feature = "tiered-storage"))]
2791    pub fn create_node_with_id(&self, id: NodeId, labels: &[&str]) {
2792        let epoch = self.current_epoch();
2793        let mut record = NodeRecord::new(id, epoch);
2794        record.set_label_count(labels.len() as u16);
2795
2796        // Store labels in node_labels map and label_index
2797        let mut node_label_set = FxHashSet::default();
2798        for label in labels {
2799            let label_id = self.get_or_create_label_id(*label);
2800            node_label_set.insert(label_id);
2801
2802            // Update label index
2803            let mut index = self.label_index.write();
2804            while index.len() <= label_id as usize {
2805                index.push(FxHashMap::default());
2806            }
2807            index[label_id as usize].insert(id, ());
2808        }
2809
2810        // Store node's labels
2811        self.node_labels.write().insert(id, node_label_set);
2812
2813        // Create version chain with initial version (using SYSTEM tx for recovery)
2814        let chain = VersionChain::with_initial(record, epoch, TxId::SYSTEM);
2815        self.nodes.write().insert(id, chain);
2816
2817        // Update next_node_id if necessary to avoid future collisions
2818        let id_val = id.as_u64();
2819        let _ = self
2820            .next_node_id
2821            .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
2822                if id_val >= current {
2823                    Some(id_val + 1)
2824                } else {
2825                    None
2826                }
2827            });
2828    }
2829
2830    /// Creates a node with a specific ID during recovery.
2831    /// (Tiered storage version)
2832    #[cfg(feature = "tiered-storage")]
2833    pub fn create_node_with_id(&self, id: NodeId, labels: &[&str]) {
2834        let epoch = self.current_epoch();
2835        let mut record = NodeRecord::new(id, epoch);
2836        record.set_label_count(labels.len() as u16);
2837
2838        // Store labels in node_labels map and label_index
2839        let mut node_label_set = FxHashSet::default();
2840        for label in labels {
2841            let label_id = self.get_or_create_label_id(*label);
2842            node_label_set.insert(label_id);
2843
2844            // Update label index
2845            let mut index = self.label_index.write();
2846            while index.len() <= label_id as usize {
2847                index.push(FxHashMap::default());
2848            }
2849            index[label_id as usize].insert(id, ());
2850        }
2851
2852        // Store node's labels
2853        self.node_labels.write().insert(id, node_label_set);
2854
2855        // Allocate record in arena and get offset (create epoch if needed)
2856        let arena = self.arena_allocator.arena_or_create(epoch);
2857        let (offset, _stored) = arena.alloc_value_with_offset(record);
2858
2859        // Create HotVersionRef (using SYSTEM tx for recovery)
2860        let hot_ref = HotVersionRef::new(epoch, offset, TxId::SYSTEM);
2861        let mut versions = self.node_versions.write();
2862        versions.insert(id, VersionIndex::with_initial(hot_ref));
2863
2864        // Update next_node_id if necessary to avoid future collisions
2865        let id_val = id.as_u64();
2866        let _ = self
2867            .next_node_id
2868            .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
2869                if id_val >= current {
2870                    Some(id_val + 1)
2871                } else {
2872                    None
2873                }
2874            });
2875    }
2876
2877    /// Creates an edge with a specific ID during recovery.
2878    ///
2879    /// This is used for WAL recovery to restore edges with their original IDs.
2880    #[cfg(not(feature = "tiered-storage"))]
2881    pub fn create_edge_with_id(&self, id: EdgeId, src: NodeId, dst: NodeId, edge_type: &str) {
2882        let epoch = self.current_epoch();
2883        let type_id = self.get_or_create_edge_type_id(edge_type);
2884
2885        let record = EdgeRecord::new(id, src, dst, type_id, epoch);
2886        let chain = VersionChain::with_initial(record, epoch, TxId::SYSTEM);
2887        self.edges.write().insert(id, chain);
2888
2889        // Update adjacency
2890        self.forward_adj.add_edge(src, dst, id);
2891        if let Some(ref backward) = self.backward_adj {
2892            backward.add_edge(dst, src, id);
2893        }
2894
2895        // Update next_edge_id if necessary
2896        let id_val = id.as_u64();
2897        let _ = self
2898            .next_edge_id
2899            .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
2900                if id_val >= current {
2901                    Some(id_val + 1)
2902                } else {
2903                    None
2904                }
2905            });
2906    }
2907
2908    /// Creates an edge with a specific ID during recovery.
2909    /// (Tiered storage version)
2910    #[cfg(feature = "tiered-storage")]
2911    pub fn create_edge_with_id(&self, id: EdgeId, src: NodeId, dst: NodeId, edge_type: &str) {
2912        let epoch = self.current_epoch();
2913        let type_id = self.get_or_create_edge_type_id(edge_type);
2914
2915        let record = EdgeRecord::new(id, src, dst, type_id, epoch);
2916
2917        // Allocate record in arena and get offset (create epoch if needed)
2918        let arena = self.arena_allocator.arena_or_create(epoch);
2919        let (offset, _stored) = arena.alloc_value_with_offset(record);
2920
2921        // Create HotVersionRef (using SYSTEM tx for recovery)
2922        let hot_ref = HotVersionRef::new(epoch, offset, TxId::SYSTEM);
2923        let mut versions = self.edge_versions.write();
2924        versions.insert(id, VersionIndex::with_initial(hot_ref));
2925
2926        // Update adjacency
2927        self.forward_adj.add_edge(src, dst, id);
2928        if let Some(ref backward) = self.backward_adj {
2929            backward.add_edge(dst, src, id);
2930        }
2931
2932        // Update next_edge_id if necessary
2933        let id_val = id.as_u64();
2934        let _ = self
2935            .next_edge_id
2936            .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
2937                if id_val >= current {
2938                    Some(id_val + 1)
2939                } else {
2940                    None
2941                }
2942            });
2943    }
2944
2945    /// Sets the current epoch during recovery.
2946    pub fn set_epoch(&self, epoch: EpochId) {
2947        self.current_epoch.store(epoch.as_u64(), Ordering::SeqCst);
2948    }
2949}
2950
2951impl Default for LpgStore {
2952    fn default() -> Self {
2953        Self::new()
2954    }
2955}
2956
2957#[cfg(test)]
2958mod tests {
2959    use super::*;
2960
2961    #[test]
2962    fn test_create_node() {
2963        let store = LpgStore::new();
2964
2965        let id = store.create_node(&["Person"]);
2966        assert!(id.is_valid());
2967
2968        let node = store.get_node(id).unwrap();
2969        assert!(node.has_label("Person"));
2970        assert!(!node.has_label("Animal"));
2971    }
2972
2973    #[test]
2974    fn test_create_node_with_props() {
2975        let store = LpgStore::new();
2976
2977        let id = store.create_node_with_props(
2978            &["Person"],
2979            [("name", Value::from("Alice")), ("age", Value::from(30i64))],
2980        );
2981
2982        let node = store.get_node(id).unwrap();
2983        assert_eq!(
2984            node.get_property("name").and_then(|v| v.as_str()),
2985            Some("Alice")
2986        );
2987        assert_eq!(
2988            node.get_property("age").and_then(|v| v.as_int64()),
2989            Some(30)
2990        );
2991    }
2992
2993    #[test]
2994    fn test_delete_node() {
2995        let store = LpgStore::new();
2996
2997        let id = store.create_node(&["Person"]);
2998        assert_eq!(store.node_count(), 1);
2999
3000        assert!(store.delete_node(id));
3001        assert_eq!(store.node_count(), 0);
3002        assert!(store.get_node(id).is_none());
3003
3004        // Double delete should return false
3005        assert!(!store.delete_node(id));
3006    }
3007
3008    #[test]
3009    fn test_create_edge() {
3010        let store = LpgStore::new();
3011
3012        let alice = store.create_node(&["Person"]);
3013        let bob = store.create_node(&["Person"]);
3014
3015        let edge_id = store.create_edge(alice, bob, "KNOWS");
3016        assert!(edge_id.is_valid());
3017
3018        let edge = store.get_edge(edge_id).unwrap();
3019        assert_eq!(edge.src, alice);
3020        assert_eq!(edge.dst, bob);
3021        assert_eq!(edge.edge_type.as_str(), "KNOWS");
3022    }
3023
3024    #[test]
3025    fn test_neighbors() {
3026        let store = LpgStore::new();
3027
3028        let a = store.create_node(&["Person"]);
3029        let b = store.create_node(&["Person"]);
3030        let c = store.create_node(&["Person"]);
3031
3032        store.create_edge(a, b, "KNOWS");
3033        store.create_edge(a, c, "KNOWS");
3034
3035        let outgoing: Vec<_> = store.neighbors(a, Direction::Outgoing).collect();
3036        assert_eq!(outgoing.len(), 2);
3037        assert!(outgoing.contains(&b));
3038        assert!(outgoing.contains(&c));
3039
3040        let incoming: Vec<_> = store.neighbors(b, Direction::Incoming).collect();
3041        assert_eq!(incoming.len(), 1);
3042        assert!(incoming.contains(&a));
3043    }
3044
3045    #[test]
3046    fn test_nodes_by_label() {
3047        let store = LpgStore::new();
3048
3049        let p1 = store.create_node(&["Person"]);
3050        let p2 = store.create_node(&["Person"]);
3051        let _a = store.create_node(&["Animal"]);
3052
3053        let persons = store.nodes_by_label("Person");
3054        assert_eq!(persons.len(), 2);
3055        assert!(persons.contains(&p1));
3056        assert!(persons.contains(&p2));
3057
3058        let animals = store.nodes_by_label("Animal");
3059        assert_eq!(animals.len(), 1);
3060    }
3061
3062    #[test]
3063    fn test_delete_edge() {
3064        let store = LpgStore::new();
3065
3066        let a = store.create_node(&["Person"]);
3067        let b = store.create_node(&["Person"]);
3068        let edge_id = store.create_edge(a, b, "KNOWS");
3069
3070        assert_eq!(store.edge_count(), 1);
3071
3072        assert!(store.delete_edge(edge_id));
3073        assert_eq!(store.edge_count(), 0);
3074        assert!(store.get_edge(edge_id).is_none());
3075    }
3076
3077    // === New tests for improved coverage ===
3078
3079    #[test]
3080    fn test_lpg_store_config() {
3081        // Test with_config
3082        let config = LpgStoreConfig {
3083            backward_edges: false,
3084            initial_node_capacity: 100,
3085            initial_edge_capacity: 200,
3086        };
3087        let store = LpgStore::with_config(config);
3088
3089        // Store should work but without backward adjacency
3090        let a = store.create_node(&["Person"]);
3091        let b = store.create_node(&["Person"]);
3092        store.create_edge(a, b, "KNOWS");
3093
3094        // Outgoing should work
3095        let outgoing: Vec<_> = store.neighbors(a, Direction::Outgoing).collect();
3096        assert_eq!(outgoing.len(), 1);
3097
3098        // Incoming should be empty (no backward adjacency)
3099        let incoming: Vec<_> = store.neighbors(b, Direction::Incoming).collect();
3100        assert_eq!(incoming.len(), 0);
3101    }
3102
3103    #[test]
3104    fn test_epoch_management() {
3105        let store = LpgStore::new();
3106
3107        let epoch0 = store.current_epoch();
3108        assert_eq!(epoch0.as_u64(), 0);
3109
3110        let epoch1 = store.new_epoch();
3111        assert_eq!(epoch1.as_u64(), 1);
3112
3113        let current = store.current_epoch();
3114        assert_eq!(current.as_u64(), 1);
3115    }
3116
3117    #[test]
3118    fn test_node_properties() {
3119        let store = LpgStore::new();
3120        let id = store.create_node(&["Person"]);
3121
3122        // Set and get property
3123        store.set_node_property(id, "name", Value::from("Alice"));
3124        let name = store.get_node_property(id, &"name".into());
3125        assert!(matches!(name, Some(Value::String(s)) if s.as_str() == "Alice"));
3126
3127        // Update property
3128        store.set_node_property(id, "name", Value::from("Bob"));
3129        let name = store.get_node_property(id, &"name".into());
3130        assert!(matches!(name, Some(Value::String(s)) if s.as_str() == "Bob"));
3131
3132        // Remove property
3133        let old = store.remove_node_property(id, "name");
3134        assert!(matches!(old, Some(Value::String(s)) if s.as_str() == "Bob"));
3135
3136        // Property should be gone
3137        let name = store.get_node_property(id, &"name".into());
3138        assert!(name.is_none());
3139
3140        // Remove non-existent property
3141        let none = store.remove_node_property(id, "nonexistent");
3142        assert!(none.is_none());
3143    }
3144
3145    #[test]
3146    fn test_edge_properties() {
3147        let store = LpgStore::new();
3148        let a = store.create_node(&["Person"]);
3149        let b = store.create_node(&["Person"]);
3150        let edge_id = store.create_edge(a, b, "KNOWS");
3151
3152        // Set and get property
3153        store.set_edge_property(edge_id, "since", Value::from(2020i64));
3154        let since = store.get_edge_property(edge_id, &"since".into());
3155        assert_eq!(since.and_then(|v| v.as_int64()), Some(2020));
3156
3157        // Remove property
3158        let old = store.remove_edge_property(edge_id, "since");
3159        assert_eq!(old.and_then(|v| v.as_int64()), Some(2020));
3160
3161        let since = store.get_edge_property(edge_id, &"since".into());
3162        assert!(since.is_none());
3163    }
3164
3165    #[test]
3166    fn test_add_remove_label() {
3167        let store = LpgStore::new();
3168        let id = store.create_node(&["Person"]);
3169
3170        // Add new label
3171        assert!(store.add_label(id, "Employee"));
3172
3173        let node = store.get_node(id).unwrap();
3174        assert!(node.has_label("Person"));
3175        assert!(node.has_label("Employee"));
3176
3177        // Adding same label again should fail
3178        assert!(!store.add_label(id, "Employee"));
3179
3180        // Remove label
3181        assert!(store.remove_label(id, "Employee"));
3182
3183        let node = store.get_node(id).unwrap();
3184        assert!(node.has_label("Person"));
3185        assert!(!node.has_label("Employee"));
3186
3187        // Removing non-existent label should fail
3188        assert!(!store.remove_label(id, "Employee"));
3189        assert!(!store.remove_label(id, "NonExistent"));
3190    }
3191
3192    #[test]
3193    fn test_add_label_to_nonexistent_node() {
3194        let store = LpgStore::new();
3195        let fake_id = NodeId::new(999);
3196        assert!(!store.add_label(fake_id, "Label"));
3197    }
3198
3199    #[test]
3200    fn test_remove_label_from_nonexistent_node() {
3201        let store = LpgStore::new();
3202        let fake_id = NodeId::new(999);
3203        assert!(!store.remove_label(fake_id, "Label"));
3204    }
3205
3206    #[test]
3207    fn test_node_ids() {
3208        let store = LpgStore::new();
3209
3210        let n1 = store.create_node(&["Person"]);
3211        let n2 = store.create_node(&["Person"]);
3212        let n3 = store.create_node(&["Person"]);
3213
3214        let ids = store.node_ids();
3215        assert_eq!(ids.len(), 3);
3216        assert!(ids.contains(&n1));
3217        assert!(ids.contains(&n2));
3218        assert!(ids.contains(&n3));
3219
3220        // Delete one
3221        store.delete_node(n2);
3222        let ids = store.node_ids();
3223        assert_eq!(ids.len(), 2);
3224        assert!(!ids.contains(&n2));
3225    }
3226
3227    #[test]
3228    fn test_delete_node_nonexistent() {
3229        let store = LpgStore::new();
3230        let fake_id = NodeId::new(999);
3231        assert!(!store.delete_node(fake_id));
3232    }
3233
3234    #[test]
3235    fn test_delete_edge_nonexistent() {
3236        let store = LpgStore::new();
3237        let fake_id = EdgeId::new(999);
3238        assert!(!store.delete_edge(fake_id));
3239    }
3240
3241    #[test]
3242    fn test_delete_edge_double() {
3243        let store = LpgStore::new();
3244        let a = store.create_node(&["Person"]);
3245        let b = store.create_node(&["Person"]);
3246        let edge_id = store.create_edge(a, b, "KNOWS");
3247
3248        assert!(store.delete_edge(edge_id));
3249        assert!(!store.delete_edge(edge_id)); // Double delete
3250    }
3251
3252    #[test]
3253    fn test_create_edge_with_props() {
3254        let store = LpgStore::new();
3255        let a = store.create_node(&["Person"]);
3256        let b = store.create_node(&["Person"]);
3257
3258        let edge_id = store.create_edge_with_props(
3259            a,
3260            b,
3261            "KNOWS",
3262            [
3263                ("since", Value::from(2020i64)),
3264                ("weight", Value::from(1.0)),
3265            ],
3266        );
3267
3268        let edge = store.get_edge(edge_id).unwrap();
3269        assert_eq!(
3270            edge.get_property("since").and_then(|v| v.as_int64()),
3271            Some(2020)
3272        );
3273        assert_eq!(
3274            edge.get_property("weight").and_then(|v| v.as_float64()),
3275            Some(1.0)
3276        );
3277    }
3278
3279    #[test]
3280    fn test_delete_node_edges() {
3281        let store = LpgStore::new();
3282
3283        let a = store.create_node(&["Person"]);
3284        let b = store.create_node(&["Person"]);
3285        let c = store.create_node(&["Person"]);
3286
3287        store.create_edge(a, b, "KNOWS"); // a -> b
3288        store.create_edge(c, a, "KNOWS"); // c -> a
3289
3290        assert_eq!(store.edge_count(), 2);
3291
3292        // Delete all edges connected to a
3293        store.delete_node_edges(a);
3294
3295        assert_eq!(store.edge_count(), 0);
3296    }
3297
3298    #[test]
3299    fn test_neighbors_both_directions() {
3300        let store = LpgStore::new();
3301
3302        let a = store.create_node(&["Person"]);
3303        let b = store.create_node(&["Person"]);
3304        let c = store.create_node(&["Person"]);
3305
3306        store.create_edge(a, b, "KNOWS"); // a -> b
3307        store.create_edge(c, a, "KNOWS"); // c -> a
3308
3309        // Direction::Both for node a
3310        let neighbors: Vec<_> = store.neighbors(a, Direction::Both).collect();
3311        assert_eq!(neighbors.len(), 2);
3312        assert!(neighbors.contains(&b)); // outgoing
3313        assert!(neighbors.contains(&c)); // incoming
3314    }
3315
3316    #[test]
3317    fn test_edges_from() {
3318        let store = LpgStore::new();
3319
3320        let a = store.create_node(&["Person"]);
3321        let b = store.create_node(&["Person"]);
3322        let c = store.create_node(&["Person"]);
3323
3324        let e1 = store.create_edge(a, b, "KNOWS");
3325        let e2 = store.create_edge(a, c, "KNOWS");
3326
3327        let edges: Vec<_> = store.edges_from(a, Direction::Outgoing).collect();
3328        assert_eq!(edges.len(), 2);
3329        assert!(edges.iter().any(|(_, e)| *e == e1));
3330        assert!(edges.iter().any(|(_, e)| *e == e2));
3331
3332        // Incoming edges to b
3333        let incoming: Vec<_> = store.edges_from(b, Direction::Incoming).collect();
3334        assert_eq!(incoming.len(), 1);
3335        assert_eq!(incoming[0].1, e1);
3336    }
3337
3338    #[test]
3339    fn test_edges_to() {
3340        let store = LpgStore::new();
3341
3342        let a = store.create_node(&["Person"]);
3343        let b = store.create_node(&["Person"]);
3344        let c = store.create_node(&["Person"]);
3345
3346        let e1 = store.create_edge(a, b, "KNOWS");
3347        let e2 = store.create_edge(c, b, "KNOWS");
3348
3349        // Edges pointing TO b
3350        let to_b = store.edges_to(b);
3351        assert_eq!(to_b.len(), 2);
3352        assert!(to_b.iter().any(|(src, e)| *src == a && *e == e1));
3353        assert!(to_b.iter().any(|(src, e)| *src == c && *e == e2));
3354    }
3355
3356    #[test]
3357    fn test_out_degree_in_degree() {
3358        let store = LpgStore::new();
3359
3360        let a = store.create_node(&["Person"]);
3361        let b = store.create_node(&["Person"]);
3362        let c = store.create_node(&["Person"]);
3363
3364        store.create_edge(a, b, "KNOWS");
3365        store.create_edge(a, c, "KNOWS");
3366        store.create_edge(c, b, "KNOWS");
3367
3368        assert_eq!(store.out_degree(a), 2);
3369        assert_eq!(store.out_degree(b), 0);
3370        assert_eq!(store.out_degree(c), 1);
3371
3372        assert_eq!(store.in_degree(a), 0);
3373        assert_eq!(store.in_degree(b), 2);
3374        assert_eq!(store.in_degree(c), 1);
3375    }
3376
3377    #[test]
3378    fn test_edge_type() {
3379        let store = LpgStore::new();
3380
3381        let a = store.create_node(&["Person"]);
3382        let b = store.create_node(&["Person"]);
3383        let edge_id = store.create_edge(a, b, "KNOWS");
3384
3385        let edge_type = store.edge_type(edge_id);
3386        assert_eq!(edge_type.as_deref(), Some("KNOWS"));
3387
3388        // Non-existent edge
3389        let fake_id = EdgeId::new(999);
3390        assert!(store.edge_type(fake_id).is_none());
3391    }
3392
3393    #[test]
3394    fn test_count_methods() {
3395        let store = LpgStore::new();
3396
3397        assert_eq!(store.label_count(), 0);
3398        assert_eq!(store.edge_type_count(), 0);
3399        assert_eq!(store.property_key_count(), 0);
3400
3401        let a = store.create_node_with_props(&["Person"], [("age", Value::from(30i64))]);
3402        let b = store.create_node(&["Company"]);
3403        store.create_edge_with_props(a, b, "WORKS_AT", [("since", Value::from(2020i64))]);
3404
3405        assert_eq!(store.label_count(), 2); // Person, Company
3406        assert_eq!(store.edge_type_count(), 1); // WORKS_AT
3407        assert_eq!(store.property_key_count(), 2); // age, since
3408    }
3409
3410    #[test]
3411    fn test_all_nodes_and_edges() {
3412        let store = LpgStore::new();
3413
3414        let a = store.create_node(&["Person"]);
3415        let b = store.create_node(&["Person"]);
3416        store.create_edge(a, b, "KNOWS");
3417
3418        let nodes: Vec<_> = store.all_nodes().collect();
3419        assert_eq!(nodes.len(), 2);
3420
3421        let edges: Vec<_> = store.all_edges().collect();
3422        assert_eq!(edges.len(), 1);
3423    }
3424
3425    #[test]
3426    fn test_all_labels_and_edge_types() {
3427        let store = LpgStore::new();
3428
3429        store.create_node(&["Person"]);
3430        store.create_node(&["Company"]);
3431        let a = store.create_node(&["Animal"]);
3432        let b = store.create_node(&["Animal"]);
3433        store.create_edge(a, b, "EATS");
3434
3435        let labels = store.all_labels();
3436        assert_eq!(labels.len(), 3);
3437        assert!(labels.contains(&"Person".to_string()));
3438        assert!(labels.contains(&"Company".to_string()));
3439        assert!(labels.contains(&"Animal".to_string()));
3440
3441        let edge_types = store.all_edge_types();
3442        assert_eq!(edge_types.len(), 1);
3443        assert!(edge_types.contains(&"EATS".to_string()));
3444    }
3445
3446    #[test]
3447    fn test_all_property_keys() {
3448        let store = LpgStore::new();
3449
3450        let a = store.create_node_with_props(&["Person"], [("name", Value::from("Alice"))]);
3451        let b = store.create_node_with_props(&["Person"], [("age", Value::from(30i64))]);
3452        store.create_edge_with_props(a, b, "KNOWS", [("since", Value::from(2020i64))]);
3453
3454        let keys = store.all_property_keys();
3455        assert!(keys.contains(&"name".to_string()));
3456        assert!(keys.contains(&"age".to_string()));
3457        assert!(keys.contains(&"since".to_string()));
3458    }
3459
3460    #[test]
3461    fn test_nodes_with_label() {
3462        let store = LpgStore::new();
3463
3464        store.create_node(&["Person"]);
3465        store.create_node(&["Person"]);
3466        store.create_node(&["Company"]);
3467
3468        let persons: Vec<_> = store.nodes_with_label("Person").collect();
3469        assert_eq!(persons.len(), 2);
3470
3471        let companies: Vec<_> = store.nodes_with_label("Company").collect();
3472        assert_eq!(companies.len(), 1);
3473
3474        let none: Vec<_> = store.nodes_with_label("NonExistent").collect();
3475        assert_eq!(none.len(), 0);
3476    }
3477
3478    #[test]
3479    fn test_edges_with_type() {
3480        let store = LpgStore::new();
3481
3482        let a = store.create_node(&["Person"]);
3483        let b = store.create_node(&["Person"]);
3484        let c = store.create_node(&["Company"]);
3485
3486        store.create_edge(a, b, "KNOWS");
3487        store.create_edge(a, c, "WORKS_AT");
3488
3489        let knows: Vec<_> = store.edges_with_type("KNOWS").collect();
3490        assert_eq!(knows.len(), 1);
3491
3492        let works_at: Vec<_> = store.edges_with_type("WORKS_AT").collect();
3493        assert_eq!(works_at.len(), 1);
3494
3495        let none: Vec<_> = store.edges_with_type("NonExistent").collect();
3496        assert_eq!(none.len(), 0);
3497    }
3498
3499    #[test]
3500    fn test_nodes_by_label_nonexistent() {
3501        let store = LpgStore::new();
3502        store.create_node(&["Person"]);
3503
3504        let empty = store.nodes_by_label("NonExistent");
3505        assert!(empty.is_empty());
3506    }
3507
3508    #[test]
3509    fn test_statistics() {
3510        let store = LpgStore::new();
3511
3512        let a = store.create_node(&["Person"]);
3513        let b = store.create_node(&["Person"]);
3514        let c = store.create_node(&["Company"]);
3515
3516        store.create_edge(a, b, "KNOWS");
3517        store.create_edge(a, c, "WORKS_AT");
3518
3519        store.compute_statistics();
3520        let stats = store.statistics();
3521
3522        assert_eq!(stats.total_nodes, 3);
3523        assert_eq!(stats.total_edges, 2);
3524
3525        // Estimates
3526        let person_card = store.estimate_label_cardinality("Person");
3527        assert!(person_card > 0.0);
3528
3529        let avg_degree = store.estimate_avg_degree("KNOWS", true);
3530        assert!(avg_degree >= 0.0);
3531    }
3532
3533    #[test]
3534    fn test_zone_maps() {
3535        let store = LpgStore::new();
3536
3537        store.create_node_with_props(&["Person"], [("age", Value::from(25i64))]);
3538        store.create_node_with_props(&["Person"], [("age", Value::from(35i64))]);
3539
3540        // Zone map should indicate possible matches (30 is within [25, 35] range)
3541        let might_match =
3542            store.node_property_might_match(&"age".into(), CompareOp::Eq, &Value::from(30i64));
3543        // Zone maps return true conservatively when value is within min/max range
3544        assert!(might_match);
3545
3546        let zone = store.node_property_zone_map(&"age".into());
3547        assert!(zone.is_some());
3548
3549        // Non-existent property
3550        let no_zone = store.node_property_zone_map(&"nonexistent".into());
3551        assert!(no_zone.is_none());
3552
3553        // Edge zone maps
3554        let a = store.create_node(&["A"]);
3555        let b = store.create_node(&["B"]);
3556        store.create_edge_with_props(a, b, "REL", [("weight", Value::from(1.0))]);
3557
3558        let edge_zone = store.edge_property_zone_map(&"weight".into());
3559        assert!(edge_zone.is_some());
3560    }
3561
3562    #[test]
3563    fn test_rebuild_zone_maps() {
3564        let store = LpgStore::new();
3565        store.create_node_with_props(&["Person"], [("age", Value::from(25i64))]);
3566
3567        // Should not panic
3568        store.rebuild_zone_maps();
3569    }
3570
3571    #[test]
3572    fn test_create_node_with_id() {
3573        let store = LpgStore::new();
3574
3575        let specific_id = NodeId::new(100);
3576        store.create_node_with_id(specific_id, &["Person", "Employee"]);
3577
3578        let node = store.get_node(specific_id).unwrap();
3579        assert!(node.has_label("Person"));
3580        assert!(node.has_label("Employee"));
3581
3582        // Next auto-generated ID should be > 100
3583        let next = store.create_node(&["Other"]);
3584        assert!(next.as_u64() > 100);
3585    }
3586
3587    #[test]
3588    fn test_create_edge_with_id() {
3589        let store = LpgStore::new();
3590
3591        let a = store.create_node(&["A"]);
3592        let b = store.create_node(&["B"]);
3593
3594        let specific_id = EdgeId::new(500);
3595        store.create_edge_with_id(specific_id, a, b, "REL");
3596
3597        let edge = store.get_edge(specific_id).unwrap();
3598        assert_eq!(edge.src, a);
3599        assert_eq!(edge.dst, b);
3600        assert_eq!(edge.edge_type.as_str(), "REL");
3601
3602        // Next auto-generated ID should be > 500
3603        let next = store.create_edge(a, b, "OTHER");
3604        assert!(next.as_u64() > 500);
3605    }
3606
3607    #[test]
3608    fn test_set_epoch() {
3609        let store = LpgStore::new();
3610
3611        assert_eq!(store.current_epoch().as_u64(), 0);
3612
3613        store.set_epoch(EpochId::new(42));
3614        assert_eq!(store.current_epoch().as_u64(), 42);
3615    }
3616
3617    #[test]
3618    fn test_get_node_nonexistent() {
3619        let store = LpgStore::new();
3620        let fake_id = NodeId::new(999);
3621        assert!(store.get_node(fake_id).is_none());
3622    }
3623
3624    #[test]
3625    fn test_get_edge_nonexistent() {
3626        let store = LpgStore::new();
3627        let fake_id = EdgeId::new(999);
3628        assert!(store.get_edge(fake_id).is_none());
3629    }
3630
3631    #[test]
3632    fn test_multiple_labels() {
3633        let store = LpgStore::new();
3634
3635        let id = store.create_node(&["Person", "Employee", "Manager"]);
3636        let node = store.get_node(id).unwrap();
3637
3638        assert!(node.has_label("Person"));
3639        assert!(node.has_label("Employee"));
3640        assert!(node.has_label("Manager"));
3641        assert!(!node.has_label("Other"));
3642    }
3643
3644    #[test]
3645    fn test_default_impl() {
3646        let store: LpgStore = Default::default();
3647        assert_eq!(store.node_count(), 0);
3648        assert_eq!(store.edge_count(), 0);
3649    }
3650
3651    #[test]
3652    fn test_edges_from_both_directions() {
3653        let store = LpgStore::new();
3654
3655        let a = store.create_node(&["A"]);
3656        let b = store.create_node(&["B"]);
3657        let c = store.create_node(&["C"]);
3658
3659        let e1 = store.create_edge(a, b, "R1"); // a -> b
3660        let e2 = store.create_edge(c, a, "R2"); // c -> a
3661
3662        // Both directions from a
3663        let edges: Vec<_> = store.edges_from(a, Direction::Both).collect();
3664        assert_eq!(edges.len(), 2);
3665        assert!(edges.iter().any(|(_, e)| *e == e1)); // outgoing
3666        assert!(edges.iter().any(|(_, e)| *e == e2)); // incoming
3667    }
3668
3669    #[test]
3670    fn test_no_backward_adj_in_degree() {
3671        let config = LpgStoreConfig {
3672            backward_edges: false,
3673            initial_node_capacity: 10,
3674            initial_edge_capacity: 10,
3675        };
3676        let store = LpgStore::with_config(config);
3677
3678        let a = store.create_node(&["A"]);
3679        let b = store.create_node(&["B"]);
3680        store.create_edge(a, b, "R");
3681
3682        // in_degree should still work (falls back to scanning)
3683        let degree = store.in_degree(b);
3684        assert_eq!(degree, 1);
3685    }
3686
3687    #[test]
3688    fn test_no_backward_adj_edges_to() {
3689        let config = LpgStoreConfig {
3690            backward_edges: false,
3691            initial_node_capacity: 10,
3692            initial_edge_capacity: 10,
3693        };
3694        let store = LpgStore::with_config(config);
3695
3696        let a = store.create_node(&["A"]);
3697        let b = store.create_node(&["B"]);
3698        let e = store.create_edge(a, b, "R");
3699
3700        // edges_to should still work (falls back to scanning)
3701        let edges = store.edges_to(b);
3702        assert_eq!(edges.len(), 1);
3703        assert_eq!(edges[0].1, e);
3704    }
3705
3706    #[test]
3707    fn test_node_versioned_creation() {
3708        let store = LpgStore::new();
3709
3710        let epoch = store.new_epoch();
3711        let tx_id = TxId::new(1);
3712
3713        let id = store.create_node_versioned(&["Person"], epoch, tx_id);
3714        assert!(store.get_node(id).is_some());
3715    }
3716
3717    #[test]
3718    fn test_edge_versioned_creation() {
3719        let store = LpgStore::new();
3720
3721        let a = store.create_node(&["A"]);
3722        let b = store.create_node(&["B"]);
3723
3724        let epoch = store.new_epoch();
3725        let tx_id = TxId::new(1);
3726
3727        let edge_id = store.create_edge_versioned(a, b, "REL", epoch, tx_id);
3728        assert!(store.get_edge(edge_id).is_some());
3729    }
3730
3731    #[test]
3732    fn test_node_with_props_versioned() {
3733        let store = LpgStore::new();
3734
3735        let epoch = store.new_epoch();
3736        let tx_id = TxId::new(1);
3737
3738        let id = store.create_node_with_props_versioned(
3739            &["Person"],
3740            [("name", Value::from("Alice"))],
3741            epoch,
3742            tx_id,
3743        );
3744
3745        let node = store.get_node(id).unwrap();
3746        assert_eq!(
3747            node.get_property("name").and_then(|v| v.as_str()),
3748            Some("Alice")
3749        );
3750    }
3751
3752    #[test]
3753    fn test_discard_uncommitted_versions() {
3754        let store = LpgStore::new();
3755
3756        let epoch = store.new_epoch();
3757        let tx_id = TxId::new(42);
3758
3759        // Create node with specific tx
3760        let node_id = store.create_node_versioned(&["Person"], epoch, tx_id);
3761        assert!(store.get_node(node_id).is_some());
3762
3763        // Discard uncommitted versions for this tx
3764        store.discard_uncommitted_versions(tx_id);
3765
3766        // Node should be gone (version chain was removed)
3767        assert!(store.get_node(node_id).is_none());
3768    }
3769
3770    // === Property Index Tests ===
3771
3772    #[test]
3773    fn test_property_index_create_and_lookup() {
3774        let store = LpgStore::new();
3775
3776        // Create nodes with properties
3777        let alice = store.create_node(&["Person"]);
3778        let bob = store.create_node(&["Person"]);
3779        let charlie = store.create_node(&["Person"]);
3780
3781        store.set_node_property(alice, "city", Value::from("NYC"));
3782        store.set_node_property(bob, "city", Value::from("NYC"));
3783        store.set_node_property(charlie, "city", Value::from("LA"));
3784
3785        // Before indexing, lookup still works (via scan)
3786        let nyc_people = store.find_nodes_by_property("city", &Value::from("NYC"));
3787        assert_eq!(nyc_people.len(), 2);
3788
3789        // Create index
3790        store.create_property_index("city");
3791        assert!(store.has_property_index("city"));
3792
3793        // Indexed lookup should return same results
3794        let nyc_people = store.find_nodes_by_property("city", &Value::from("NYC"));
3795        assert_eq!(nyc_people.len(), 2);
3796        assert!(nyc_people.contains(&alice));
3797        assert!(nyc_people.contains(&bob));
3798
3799        let la_people = store.find_nodes_by_property("city", &Value::from("LA"));
3800        assert_eq!(la_people.len(), 1);
3801        assert!(la_people.contains(&charlie));
3802    }
3803
3804    #[test]
3805    fn test_property_index_maintained_on_update() {
3806        let store = LpgStore::new();
3807
3808        // Create index first
3809        store.create_property_index("status");
3810
3811        let node = store.create_node(&["Task"]);
3812        store.set_node_property(node, "status", Value::from("pending"));
3813
3814        // Should find by initial value
3815        let pending = store.find_nodes_by_property("status", &Value::from("pending"));
3816        assert_eq!(pending.len(), 1);
3817        assert!(pending.contains(&node));
3818
3819        // Update the property
3820        store.set_node_property(node, "status", Value::from("done"));
3821
3822        // Old value should not find it
3823        let pending = store.find_nodes_by_property("status", &Value::from("pending"));
3824        assert!(pending.is_empty());
3825
3826        // New value should find it
3827        let done = store.find_nodes_by_property("status", &Value::from("done"));
3828        assert_eq!(done.len(), 1);
3829        assert!(done.contains(&node));
3830    }
3831
3832    #[test]
3833    fn test_property_index_maintained_on_remove() {
3834        let store = LpgStore::new();
3835
3836        store.create_property_index("tag");
3837
3838        let node = store.create_node(&["Item"]);
3839        store.set_node_property(node, "tag", Value::from("important"));
3840
3841        // Should find it
3842        let found = store.find_nodes_by_property("tag", &Value::from("important"));
3843        assert_eq!(found.len(), 1);
3844
3845        // Remove the property
3846        store.remove_node_property(node, "tag");
3847
3848        // Should no longer find it
3849        let found = store.find_nodes_by_property("tag", &Value::from("important"));
3850        assert!(found.is_empty());
3851    }
3852
3853    #[test]
3854    fn test_property_index_drop() {
3855        let store = LpgStore::new();
3856
3857        store.create_property_index("key");
3858        assert!(store.has_property_index("key"));
3859
3860        assert!(store.drop_property_index("key"));
3861        assert!(!store.has_property_index("key"));
3862
3863        // Dropping non-existent index returns false
3864        assert!(!store.drop_property_index("key"));
3865    }
3866
3867    #[test]
3868    fn test_property_index_multiple_values() {
3869        let store = LpgStore::new();
3870
3871        store.create_property_index("age");
3872
3873        // Create multiple nodes with same and different ages
3874        let n1 = store.create_node(&["Person"]);
3875        let n2 = store.create_node(&["Person"]);
3876        let n3 = store.create_node(&["Person"]);
3877        let n4 = store.create_node(&["Person"]);
3878
3879        store.set_node_property(n1, "age", Value::from(25i64));
3880        store.set_node_property(n2, "age", Value::from(25i64));
3881        store.set_node_property(n3, "age", Value::from(30i64));
3882        store.set_node_property(n4, "age", Value::from(25i64));
3883
3884        let age_25 = store.find_nodes_by_property("age", &Value::from(25i64));
3885        assert_eq!(age_25.len(), 3);
3886
3887        let age_30 = store.find_nodes_by_property("age", &Value::from(30i64));
3888        assert_eq!(age_30.len(), 1);
3889
3890        let age_40 = store.find_nodes_by_property("age", &Value::from(40i64));
3891        assert!(age_40.is_empty());
3892    }
3893
3894    #[test]
3895    fn test_property_index_builds_from_existing_data() {
3896        let store = LpgStore::new();
3897
3898        // Create nodes first
3899        let n1 = store.create_node(&["Person"]);
3900        let n2 = store.create_node(&["Person"]);
3901        store.set_node_property(n1, "email", Value::from("alice@example.com"));
3902        store.set_node_property(n2, "email", Value::from("bob@example.com"));
3903
3904        // Create index after data exists
3905        store.create_property_index("email");
3906
3907        // Index should include existing data
3908        let alice = store.find_nodes_by_property("email", &Value::from("alice@example.com"));
3909        assert_eq!(alice.len(), 1);
3910        assert!(alice.contains(&n1));
3911
3912        let bob = store.find_nodes_by_property("email", &Value::from("bob@example.com"));
3913        assert_eq!(bob.len(), 1);
3914        assert!(bob.contains(&n2));
3915    }
3916
3917    #[test]
3918    fn test_get_node_property_batch() {
3919        let store = LpgStore::new();
3920
3921        let n1 = store.create_node(&["Person"]);
3922        let n2 = store.create_node(&["Person"]);
3923        let n3 = store.create_node(&["Person"]);
3924
3925        store.set_node_property(n1, "age", Value::from(25i64));
3926        store.set_node_property(n2, "age", Value::from(30i64));
3927        // n3 has no age property
3928
3929        let age_key = PropertyKey::new("age");
3930        let values = store.get_node_property_batch(&[n1, n2, n3], &age_key);
3931
3932        assert_eq!(values.len(), 3);
3933        assert_eq!(values[0], Some(Value::from(25i64)));
3934        assert_eq!(values[1], Some(Value::from(30i64)));
3935        assert_eq!(values[2], None);
3936    }
3937
3938    #[test]
3939    fn test_get_node_property_batch_empty() {
3940        let store = LpgStore::new();
3941        let key = PropertyKey::new("any");
3942
3943        let values = store.get_node_property_batch(&[], &key);
3944        assert!(values.is_empty());
3945    }
3946
3947    #[test]
3948    fn test_get_nodes_properties_batch() {
3949        let store = LpgStore::new();
3950
3951        let n1 = store.create_node(&["Person"]);
3952        let n2 = store.create_node(&["Person"]);
3953        let n3 = store.create_node(&["Person"]);
3954
3955        store.set_node_property(n1, "name", Value::from("Alice"));
3956        store.set_node_property(n1, "age", Value::from(25i64));
3957        store.set_node_property(n2, "name", Value::from("Bob"));
3958        // n3 has no properties
3959
3960        let all_props = store.get_nodes_properties_batch(&[n1, n2, n3]);
3961
3962        assert_eq!(all_props.len(), 3);
3963        assert_eq!(all_props[0].len(), 2); // name and age
3964        assert_eq!(all_props[1].len(), 1); // name only
3965        assert_eq!(all_props[2].len(), 0); // no properties
3966
3967        assert_eq!(
3968            all_props[0].get(&PropertyKey::new("name")),
3969            Some(&Value::from("Alice"))
3970        );
3971        assert_eq!(
3972            all_props[1].get(&PropertyKey::new("name")),
3973            Some(&Value::from("Bob"))
3974        );
3975    }
3976
3977    #[test]
3978    fn test_get_nodes_properties_batch_empty() {
3979        let store = LpgStore::new();
3980
3981        let all_props = store.get_nodes_properties_batch(&[]);
3982        assert!(all_props.is_empty());
3983    }
3984
3985    #[test]
3986    fn test_get_nodes_properties_selective_batch() {
3987        let store = LpgStore::new();
3988
3989        let n1 = store.create_node(&["Person"]);
3990        let n2 = store.create_node(&["Person"]);
3991
3992        // Set multiple properties
3993        store.set_node_property(n1, "name", Value::from("Alice"));
3994        store.set_node_property(n1, "age", Value::from(25i64));
3995        store.set_node_property(n1, "email", Value::from("alice@example.com"));
3996        store.set_node_property(n2, "name", Value::from("Bob"));
3997        store.set_node_property(n2, "age", Value::from(30i64));
3998        store.set_node_property(n2, "city", Value::from("NYC"));
3999
4000        // Request only name and age (not email or city)
4001        let keys = vec![PropertyKey::new("name"), PropertyKey::new("age")];
4002        let props = store.get_nodes_properties_selective_batch(&[n1, n2], &keys);
4003
4004        assert_eq!(props.len(), 2);
4005
4006        // n1: should have name and age, but NOT email
4007        assert_eq!(props[0].len(), 2);
4008        assert_eq!(
4009            props[0].get(&PropertyKey::new("name")),
4010            Some(&Value::from("Alice"))
4011        );
4012        assert_eq!(
4013            props[0].get(&PropertyKey::new("age")),
4014            Some(&Value::from(25i64))
4015        );
4016        assert_eq!(props[0].get(&PropertyKey::new("email")), None);
4017
4018        // n2: should have name and age, but NOT city
4019        assert_eq!(props[1].len(), 2);
4020        assert_eq!(
4021            props[1].get(&PropertyKey::new("name")),
4022            Some(&Value::from("Bob"))
4023        );
4024        assert_eq!(
4025            props[1].get(&PropertyKey::new("age")),
4026            Some(&Value::from(30i64))
4027        );
4028        assert_eq!(props[1].get(&PropertyKey::new("city")), None);
4029    }
4030
4031    #[test]
4032    fn test_get_nodes_properties_selective_batch_empty_keys() {
4033        let store = LpgStore::new();
4034
4035        let n1 = store.create_node(&["Person"]);
4036        store.set_node_property(n1, "name", Value::from("Alice"));
4037
4038        // Request no properties
4039        let props = store.get_nodes_properties_selective_batch(&[n1], &[]);
4040
4041        assert_eq!(props.len(), 1);
4042        assert!(props[0].is_empty()); // Empty map when no keys requested
4043    }
4044
4045    #[test]
4046    fn test_get_nodes_properties_selective_batch_missing_keys() {
4047        let store = LpgStore::new();
4048
4049        let n1 = store.create_node(&["Person"]);
4050        store.set_node_property(n1, "name", Value::from("Alice"));
4051
4052        // Request a property that doesn't exist
4053        let keys = vec![PropertyKey::new("nonexistent"), PropertyKey::new("name")];
4054        let props = store.get_nodes_properties_selective_batch(&[n1], &keys);
4055
4056        assert_eq!(props.len(), 1);
4057        assert_eq!(props[0].len(), 1); // Only name exists
4058        assert_eq!(
4059            props[0].get(&PropertyKey::new("name")),
4060            Some(&Value::from("Alice"))
4061        );
4062    }
4063}