Skip to main content

grafeo_core/graph/lpg/
store.rs

1//! The in-memory LPG graph store.
2//!
3//! This is where your nodes and edges actually live. Most users interact
4//! through [`GrafeoDB`](grafeo_engine::GrafeoDB), but algorithm implementers
5//! sometimes need the raw [`LpgStore`] for direct adjacency traversal.
6//!
7//! Key features:
8//! - MVCC versioning - concurrent readers don't block each other
9//! - Columnar properties with zone maps for fast filtering
10//! - Forward and backward adjacency indexes
11
12use super::property::CompareOp;
13use super::{Edge, EdgeRecord, Node, NodeRecord, PropertyStorage};
14use crate::graph::Direction;
15use crate::index::adjacency::ChunkedAdjacency;
16use crate::index::zone_map::ZoneMapEntry;
17use crate::statistics::{EdgeTypeStatistics, LabelStatistics, Statistics};
18use arcstr::ArcStr;
19use dashmap::DashMap;
20#[cfg(not(feature = "tiered-storage"))]
21use grafeo_common::mvcc::VersionChain;
22use grafeo_common::types::{EdgeId, EpochId, HashableValue, NodeId, PropertyKey, TxId, Value};
23use grafeo_common::utils::hash::{FxHashMap, FxHashSet};
24use parking_lot::RwLock;
25use std::cmp::Ordering as CmpOrdering;
26#[cfg(any(feature = "tiered-storage", feature = "vector-index"))]
27use std::sync::Arc;
28use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
29
30#[cfg(feature = "vector-index")]
31use crate::index::vector::HnswIndex;
32
33/// Compares two values for ordering (used for range checks).
34fn compare_values_for_range(a: &Value, b: &Value) -> Option<CmpOrdering> {
35    match (a, b) {
36        (Value::Int64(a), Value::Int64(b)) => Some(a.cmp(b)),
37        (Value::Float64(a), Value::Float64(b)) => a.partial_cmp(b),
38        (Value::String(a), Value::String(b)) => Some(a.cmp(b)),
39        (Value::Bool(a), Value::Bool(b)) => Some(a.cmp(b)),
40        _ => None,
41    }
42}
43
44/// Checks if a value is within a range.
45fn value_in_range(
46    value: &Value,
47    min: Option<&Value>,
48    max: Option<&Value>,
49    min_inclusive: bool,
50    max_inclusive: bool,
51) -> bool {
52    // Check lower bound
53    if let Some(min_val) = min {
54        match compare_values_for_range(value, min_val) {
55            Some(CmpOrdering::Less) => return false,
56            Some(CmpOrdering::Equal) if !min_inclusive => return false,
57            None => return false, // Can't compare
58            _ => {}
59        }
60    }
61
62    // Check upper bound
63    if let Some(max_val) = max {
64        match compare_values_for_range(value, max_val) {
65            Some(CmpOrdering::Greater) => return false,
66            Some(CmpOrdering::Equal) if !max_inclusive => return false,
67            None => return false,
68            _ => {}
69        }
70    }
71
72    true
73}
74
75// Tiered storage imports
76#[cfg(feature = "tiered-storage")]
77use crate::storage::EpochStore;
78#[cfg(feature = "tiered-storage")]
79use grafeo_common::memory::arena::ArenaAllocator;
80#[cfg(feature = "tiered-storage")]
81use grafeo_common::mvcc::{ColdVersionRef, HotVersionRef, VersionIndex, VersionRef};
82
83/// Configuration for the LPG store.
84///
85/// The defaults work well for most cases. Tune `backward_edges` if you only
86/// traverse in one direction (saves memory), or adjust capacities if you know
87/// your graph size upfront (avoids reallocations).
88#[derive(Debug, Clone)]
89pub struct LpgStoreConfig {
90    /// Maintain backward adjacency for incoming edge queries. Turn off if
91    /// you only traverse outgoing edges - saves ~50% adjacency memory.
92    pub backward_edges: bool,
93    /// Initial capacity for nodes (avoids early reallocations).
94    pub initial_node_capacity: usize,
95    /// Initial capacity for edges (avoids early reallocations).
96    pub initial_edge_capacity: usize,
97}
98
99impl Default for LpgStoreConfig {
100    fn default() -> Self {
101        Self {
102            backward_edges: true,
103            initial_node_capacity: 1024,
104            initial_edge_capacity: 4096,
105        }
106    }
107}
108
109/// The core in-memory graph storage.
110///
111/// Everything lives here: nodes, edges, properties, adjacency indexes, and
112/// version chains for MVCC. Concurrent reads never block each other.
113///
114/// Most users should go through `GrafeoDB` (from the `grafeo_engine` crate) which
115/// adds transaction management and query execution. Use `LpgStore` directly
116/// when you need raw performance for algorithm implementations.
117///
118/// # Example
119///
120/// ```
121/// use grafeo_core::graph::lpg::LpgStore;
122/// use grafeo_core::graph::Direction;
123///
124/// let store = LpgStore::new();
125///
126/// // Create a small social network
127/// let alice = store.create_node(&["Person"]);
128/// let bob = store.create_node(&["Person"]);
129/// store.create_edge(alice, bob, "KNOWS");
130///
131/// // Traverse outgoing edges
132/// for neighbor in store.neighbors(alice, Direction::Outgoing) {
133///     println!("Alice knows node {:?}", neighbor);
134/// }
135/// ```
136///
137/// # Lock Ordering
138///
139/// `LpgStore` contains multiple `RwLock` fields that must be acquired in a
140/// consistent order to prevent deadlocks. Always acquire locks in this order:
141///
142/// ## Level 1 - Entity Storage (mutually exclusive via feature flag)
143/// 1. `nodes` / `node_versions`
144/// 2. `edges` / `edge_versions`
145///
146/// ## Level 2 - Catalogs (acquire as pairs when writing)
147/// 3. `label_to_id` + `id_to_label`
148/// 4. `edge_type_to_id` + `id_to_edge_type`
149///
150/// ## Level 3 - Indexes
151/// 5. `label_index`
152/// 6. `node_labels`
153/// 7. `property_indexes`
154///
155/// ## Level 4 - Statistics
156/// 8. `statistics`
157///
158/// ## Level 5 - Nested Locks (internal to other structs)
159/// 9. `PropertyStorage::columns` (via `node_properties`/`edge_properties`)
160/// 10. `ChunkedAdjacency::lists` (via `forward_adj`/`backward_adj`)
161///
162/// ## Rules
163/// - Catalog pairs must be acquired together when writing.
164/// - Never hold entity locks while acquiring catalog locks in a different scope.
165/// - Statistics lock is always last.
166/// - Read locks are generally safe, but avoid read-to-write upgrades.
167pub struct LpgStore {
168    /// Configuration.
169    #[allow(dead_code)]
170    config: LpgStoreConfig,
171
172    /// Node records indexed by NodeId, with version chains for MVCC.
173    /// Used when `tiered-storage` feature is disabled.
174    /// Lock order: 1
175    #[cfg(not(feature = "tiered-storage"))]
176    nodes: RwLock<FxHashMap<NodeId, VersionChain<NodeRecord>>>,
177
178    /// Edge records indexed by EdgeId, with version chains for MVCC.
179    /// Used when `tiered-storage` feature is disabled.
180    /// Lock order: 2
181    #[cfg(not(feature = "tiered-storage"))]
182    edges: RwLock<FxHashMap<EdgeId, VersionChain<EdgeRecord>>>,
183
184    // === Tiered Storage Fields (feature-gated) ===
185    //
186    // Lock ordering for arena access:
187    //   version_lock (read/write) → arena read lock (via arena_allocator.arena())
188    //
189    // Rules:
190    // - Acquire arena read lock *after* version locks, never before.
191    // - Multiple threads may call arena.read_at() concurrently (shared refs only).
192    // - Never acquire arena write lock (alloc_new_chunk) while holding version locks.
193    // - freeze_epoch order: node_versions.read() → arena.read_at(),
194    //   then edge_versions.read() → arena.read_at().
195    /// Arena allocator for hot data storage.
196    /// Data is stored in per-epoch arenas for fast allocation and bulk deallocation.
197    #[cfg(feature = "tiered-storage")]
198    arena_allocator: Arc<ArenaAllocator>,
199
200    /// Node version indexes - store metadata and arena offsets.
201    /// The actual NodeRecord data is stored in the arena.
202    /// Lock order: 1
203    #[cfg(feature = "tiered-storage")]
204    node_versions: RwLock<FxHashMap<NodeId, VersionIndex>>,
205
206    /// Edge version indexes - store metadata and arena offsets.
207    /// The actual EdgeRecord data is stored in the arena.
208    /// Lock order: 2
209    #[cfg(feature = "tiered-storage")]
210    edge_versions: RwLock<FxHashMap<EdgeId, VersionIndex>>,
211
212    /// Cold storage for frozen epochs.
213    /// Contains compressed epoch blocks for historical data.
214    #[cfg(feature = "tiered-storage")]
215    epoch_store: Arc<EpochStore>,
216
217    /// Property storage for nodes.
218    node_properties: PropertyStorage<NodeId>,
219
220    /// Property storage for edges.
221    edge_properties: PropertyStorage<EdgeId>,
222
223    /// Label name to ID mapping.
224    /// Lock order: 3 (acquire with id_to_label)
225    label_to_id: RwLock<FxHashMap<ArcStr, u32>>,
226
227    /// Label ID to name mapping.
228    /// Lock order: 3 (acquire with label_to_id)
229    id_to_label: RwLock<Vec<ArcStr>>,
230
231    /// Edge type name to ID mapping.
232    /// Lock order: 4 (acquire with id_to_edge_type)
233    edge_type_to_id: RwLock<FxHashMap<ArcStr, u32>>,
234
235    /// Edge type ID to name mapping.
236    /// Lock order: 4 (acquire with edge_type_to_id)
237    id_to_edge_type: RwLock<Vec<ArcStr>>,
238
239    /// Forward adjacency lists (outgoing edges).
240    forward_adj: ChunkedAdjacency,
241
242    /// Backward adjacency lists (incoming edges).
243    /// Only populated if config.backward_edges is true.
244    backward_adj: Option<ChunkedAdjacency>,
245
246    /// Label index: label_id -> set of node IDs.
247    /// Lock order: 5
248    label_index: RwLock<Vec<FxHashMap<NodeId, ()>>>,
249
250    /// Node labels: node_id -> set of label IDs.
251    /// Reverse mapping to efficiently get labels for a node.
252    /// Lock order: 6
253    node_labels: RwLock<FxHashMap<NodeId, FxHashSet<u32>>>,
254
255    /// Property indexes: property_key -> (value -> set of node IDs).
256    ///
257    /// When a property is indexed, lookups by value are O(1) instead of O(n).
258    /// Use [`create_property_index`] to enable indexing for a property.
259    /// Lock order: 7
260    property_indexes: RwLock<FxHashMap<PropertyKey, DashMap<HashableValue, FxHashSet<NodeId>>>>,
261
262    /// Vector indexes: "label:property" -> HNSW index.
263    ///
264    /// Created via [`GrafeoDB::create_vector_index`](grafeo_engine::GrafeoDB::create_vector_index).
265    /// Lock order: 7 (same level as property_indexes, disjoint keys)
266    #[cfg(feature = "vector-index")]
267    vector_indexes: RwLock<FxHashMap<String, Arc<HnswIndex>>>,
268
269    /// Next node ID.
270    next_node_id: AtomicU64,
271
272    /// Next edge ID.
273    next_edge_id: AtomicU64,
274
275    /// Current epoch.
276    current_epoch: AtomicU64,
277
278    /// Statistics for cost-based optimization.
279    /// Lock order: 8 (always last)
280    statistics: RwLock<Statistics>,
281
282    /// Whether statistics need recomputation after mutations.
283    needs_stats_recompute: AtomicBool,
284}
285
286impl LpgStore {
287    /// Creates a new LPG store with default configuration.
288    #[must_use]
289    pub fn new() -> Self {
290        Self::with_config(LpgStoreConfig::default())
291    }
292
293    /// Creates a new LPG store with custom configuration.
294    #[must_use]
295    pub fn with_config(config: LpgStoreConfig) -> Self {
296        let backward_adj = if config.backward_edges {
297            Some(ChunkedAdjacency::new())
298        } else {
299            None
300        };
301
302        Self {
303            #[cfg(not(feature = "tiered-storage"))]
304            nodes: RwLock::new(FxHashMap::default()),
305            #[cfg(not(feature = "tiered-storage"))]
306            edges: RwLock::new(FxHashMap::default()),
307            #[cfg(feature = "tiered-storage")]
308            arena_allocator: Arc::new(ArenaAllocator::new()),
309            #[cfg(feature = "tiered-storage")]
310            node_versions: RwLock::new(FxHashMap::default()),
311            #[cfg(feature = "tiered-storage")]
312            edge_versions: RwLock::new(FxHashMap::default()),
313            #[cfg(feature = "tiered-storage")]
314            epoch_store: Arc::new(EpochStore::new()),
315            node_properties: PropertyStorage::new(),
316            edge_properties: PropertyStorage::new(),
317            label_to_id: RwLock::new(FxHashMap::default()),
318            id_to_label: RwLock::new(Vec::new()),
319            edge_type_to_id: RwLock::new(FxHashMap::default()),
320            id_to_edge_type: RwLock::new(Vec::new()),
321            forward_adj: ChunkedAdjacency::new(),
322            backward_adj,
323            label_index: RwLock::new(Vec::new()),
324            node_labels: RwLock::new(FxHashMap::default()),
325            property_indexes: RwLock::new(FxHashMap::default()),
326            #[cfg(feature = "vector-index")]
327            vector_indexes: RwLock::new(FxHashMap::default()),
328            next_node_id: AtomicU64::new(0),
329            next_edge_id: AtomicU64::new(0),
330            current_epoch: AtomicU64::new(0),
331            statistics: RwLock::new(Statistics::new()),
332            needs_stats_recompute: AtomicBool::new(true),
333            config,
334        }
335    }
336
337    /// Returns the current epoch.
338    #[must_use]
339    pub fn current_epoch(&self) -> EpochId {
340        EpochId::new(self.current_epoch.load(Ordering::Acquire))
341    }
342
343    /// Creates a new epoch.
344    pub fn new_epoch(&self) -> EpochId {
345        let id = self.current_epoch.fetch_add(1, Ordering::AcqRel) + 1;
346        EpochId::new(id)
347    }
348
349    // === Node Operations ===
350
351    /// Creates a new node with the given labels.
352    ///
353    /// Uses the system transaction for non-transactional operations.
354    pub fn create_node(&self, labels: &[&str]) -> NodeId {
355        self.needs_stats_recompute.store(true, Ordering::Relaxed);
356        self.create_node_versioned(labels, self.current_epoch(), TxId::SYSTEM)
357    }
358
359    /// Creates a new node with the given labels within a transaction context.
360    #[cfg(not(feature = "tiered-storage"))]
361    pub fn create_node_versioned(&self, labels: &[&str], epoch: EpochId, tx_id: TxId) -> NodeId {
362        let id = NodeId::new(self.next_node_id.fetch_add(1, Ordering::Relaxed));
363
364        let mut record = NodeRecord::new(id, epoch);
365        record.set_label_count(labels.len() as u16);
366
367        // Store labels in node_labels map and label_index
368        let mut node_label_set = FxHashSet::default();
369        for label in labels {
370            let label_id = self.get_or_create_label_id(*label);
371            node_label_set.insert(label_id);
372
373            // Update label index
374            let mut index = self.label_index.write();
375            while index.len() <= label_id as usize {
376                index.push(FxHashMap::default());
377            }
378            index[label_id as usize].insert(id, ());
379        }
380
381        // Store node's labels
382        self.node_labels.write().insert(id, node_label_set);
383
384        // Create version chain with initial version
385        let chain = VersionChain::with_initial(record, epoch, tx_id);
386        self.nodes.write().insert(id, chain);
387        id
388    }
389
390    /// Creates a new node with the given labels within a transaction context.
391    /// (Tiered storage version: stores data in arena, metadata in VersionIndex)
392    #[cfg(feature = "tiered-storage")]
393    pub fn create_node_versioned(&self, labels: &[&str], epoch: EpochId, tx_id: TxId) -> NodeId {
394        let id = NodeId::new(self.next_node_id.fetch_add(1, Ordering::Relaxed));
395
396        let mut record = NodeRecord::new(id, epoch);
397        record.set_label_count(labels.len() as u16);
398
399        // Store labels in node_labels map and label_index
400        let mut node_label_set = FxHashSet::default();
401        for label in labels {
402            let label_id = self.get_or_create_label_id(*label);
403            node_label_set.insert(label_id);
404
405            // Update label index
406            let mut index = self.label_index.write();
407            while index.len() <= label_id as usize {
408                index.push(FxHashMap::default());
409            }
410            index[label_id as usize].insert(id, ());
411        }
412
413        // Store node's labels
414        self.node_labels.write().insert(id, node_label_set);
415
416        // Allocate record in arena and get offset (create epoch if needed)
417        let arena = self.arena_allocator.arena_or_create(epoch);
418        let (offset, _stored) = arena.alloc_value_with_offset(record);
419
420        // Create HotVersionRef pointing to arena data
421        let hot_ref = HotVersionRef::new(epoch, offset, tx_id);
422
423        // Create or update version index
424        let mut versions = self.node_versions.write();
425        if let Some(index) = versions.get_mut(&id) {
426            index.add_hot(hot_ref);
427        } else {
428            versions.insert(id, VersionIndex::with_initial(hot_ref));
429        }
430
431        id
432    }
433
434    /// Creates a new node with labels and properties.
435    pub fn create_node_with_props(
436        &self,
437        labels: &[&str],
438        properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
439    ) -> NodeId {
440        self.create_node_with_props_versioned(
441            labels,
442            properties,
443            self.current_epoch(),
444            TxId::SYSTEM,
445        )
446    }
447
448    /// Creates a new node with labels and properties within a transaction context.
449    #[cfg(not(feature = "tiered-storage"))]
450    pub fn create_node_with_props_versioned(
451        &self,
452        labels: &[&str],
453        properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
454        epoch: EpochId,
455        tx_id: TxId,
456    ) -> NodeId {
457        let id = self.create_node_versioned(labels, epoch, tx_id);
458
459        for (key, value) in properties {
460            let prop_key: PropertyKey = key.into();
461            let prop_value: Value = value.into();
462            // Update property index before setting the property
463            self.update_property_index_on_set(id, &prop_key, &prop_value);
464            self.node_properties.set(id, prop_key, prop_value);
465        }
466
467        // Update props_count in record
468        let count = self.node_properties.get_all(id).len() as u16;
469        if let Some(chain) = self.nodes.write().get_mut(&id) {
470            if let Some(record) = chain.latest_mut() {
471                record.props_count = count;
472            }
473        }
474
475        id
476    }
477
478    /// Creates a new node with labels and properties within a transaction context.
479    /// (Tiered storage version)
480    #[cfg(feature = "tiered-storage")]
481    pub fn create_node_with_props_versioned(
482        &self,
483        labels: &[&str],
484        properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
485        epoch: EpochId,
486        tx_id: TxId,
487    ) -> NodeId {
488        let id = self.create_node_versioned(labels, epoch, tx_id);
489
490        for (key, value) in properties {
491            let prop_key: PropertyKey = key.into();
492            let prop_value: Value = value.into();
493            // Update property index before setting the property
494            self.update_property_index_on_set(id, &prop_key, &prop_value);
495            self.node_properties.set(id, prop_key, prop_value);
496        }
497
498        // Note: props_count in record is not updated for tiered storage.
499        // The record is immutable once allocated in the arena.
500
501        id
502    }
503
504    /// Gets a node by ID (latest visible version).
505    #[must_use]
506    pub fn get_node(&self, id: NodeId) -> Option<Node> {
507        self.get_node_at_epoch(id, self.current_epoch())
508    }
509
510    /// Gets a node by ID at a specific epoch.
511    #[must_use]
512    #[cfg(not(feature = "tiered-storage"))]
513    pub fn get_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> Option<Node> {
514        let nodes = self.nodes.read();
515        let chain = nodes.get(&id)?;
516        let record = chain.visible_at(epoch)?;
517
518        if record.is_deleted() {
519            return None;
520        }
521
522        let mut node = Node::new(id);
523
524        // Get labels from node_labels map
525        let id_to_label = self.id_to_label.read();
526        let node_labels = self.node_labels.read();
527        if let Some(label_ids) = node_labels.get(&id) {
528            for &label_id in label_ids {
529                if let Some(label) = id_to_label.get(label_id as usize) {
530                    node.labels.push(label.clone());
531                }
532            }
533        }
534
535        // Get properties
536        node.properties = self.node_properties.get_all(id).into_iter().collect();
537
538        Some(node)
539    }
540
541    /// Gets a node by ID at a specific epoch.
542    /// (Tiered storage version: reads from arena via VersionIndex)
543    #[must_use]
544    #[cfg(feature = "tiered-storage")]
545    pub fn get_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> Option<Node> {
546        let versions = self.node_versions.read();
547        let index = versions.get(&id)?;
548        let version_ref = index.visible_at(epoch)?;
549
550        // Read the record from arena
551        let record = self.read_node_record(&version_ref)?;
552
553        if record.is_deleted() {
554            return None;
555        }
556
557        let mut node = Node::new(id);
558
559        // Get labels from node_labels map
560        let id_to_label = self.id_to_label.read();
561        let node_labels = self.node_labels.read();
562        if let Some(label_ids) = node_labels.get(&id) {
563            for &label_id in label_ids {
564                if let Some(label) = id_to_label.get(label_id as usize) {
565                    node.labels.push(label.clone());
566                }
567            }
568        }
569
570        // Get properties
571        node.properties = self.node_properties.get_all(id).into_iter().collect();
572
573        Some(node)
574    }
575
576    /// Gets a node visible to a specific transaction.
577    #[must_use]
578    #[cfg(not(feature = "tiered-storage"))]
579    pub fn get_node_versioned(&self, id: NodeId, epoch: EpochId, tx_id: TxId) -> Option<Node> {
580        let nodes = self.nodes.read();
581        let chain = nodes.get(&id)?;
582        let record = chain.visible_to(epoch, tx_id)?;
583
584        if record.is_deleted() {
585            return None;
586        }
587
588        let mut node = Node::new(id);
589
590        // Get labels from node_labels map
591        let id_to_label = self.id_to_label.read();
592        let node_labels = self.node_labels.read();
593        if let Some(label_ids) = node_labels.get(&id) {
594            for &label_id in label_ids {
595                if let Some(label) = id_to_label.get(label_id as usize) {
596                    node.labels.push(label.clone());
597                }
598            }
599        }
600
601        // Get properties
602        node.properties = self.node_properties.get_all(id).into_iter().collect();
603
604        Some(node)
605    }
606
607    /// Gets a node visible to a specific transaction.
608    /// (Tiered storage version: reads from arena via VersionIndex)
609    #[must_use]
610    #[cfg(feature = "tiered-storage")]
611    pub fn get_node_versioned(&self, id: NodeId, epoch: EpochId, tx_id: TxId) -> Option<Node> {
612        let versions = self.node_versions.read();
613        let index = versions.get(&id)?;
614        let version_ref = index.visible_to(epoch, tx_id)?;
615
616        // Read the record from arena
617        let record = self.read_node_record(&version_ref)?;
618
619        if record.is_deleted() {
620            return None;
621        }
622
623        let mut node = Node::new(id);
624
625        // Get labels from node_labels map
626        let id_to_label = self.id_to_label.read();
627        let node_labels = self.node_labels.read();
628        if let Some(label_ids) = node_labels.get(&id) {
629            for &label_id in label_ids {
630                if let Some(label) = id_to_label.get(label_id as usize) {
631                    node.labels.push(label.clone());
632                }
633            }
634        }
635
636        // Get properties
637        node.properties = self.node_properties.get_all(id).into_iter().collect();
638
639        Some(node)
640    }
641
642    /// Reads a NodeRecord from arena (hot) or epoch store (cold) using a VersionRef.
643    #[cfg(feature = "tiered-storage")]
644    #[allow(unsafe_code)]
645    fn read_node_record(&self, version_ref: &VersionRef) -> Option<NodeRecord> {
646        match version_ref {
647            VersionRef::Hot(hot_ref) => {
648                let arena = self.arena_allocator.arena(hot_ref.epoch);
649                // SAFETY: The offset was returned by alloc_value_with_offset for a NodeRecord
650                let record: &NodeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
651                Some(*record)
652            }
653            VersionRef::Cold(cold_ref) => {
654                // Read from compressed epoch store
655                self.epoch_store
656                    .get_node(cold_ref.epoch, cold_ref.block_offset, cold_ref.length)
657            }
658        }
659    }
660
661    /// Deletes a node and all its edges (using latest epoch).
662    pub fn delete_node(&self, id: NodeId) -> bool {
663        self.needs_stats_recompute.store(true, Ordering::Relaxed);
664        self.delete_node_at_epoch(id, self.current_epoch())
665    }
666
667    /// Deletes a node at a specific epoch.
668    #[cfg(not(feature = "tiered-storage"))]
669    pub fn delete_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> bool {
670        let mut nodes = self.nodes.write();
671        if let Some(chain) = nodes.get_mut(&id) {
672            // Check if visible at this epoch (not already deleted)
673            if let Some(record) = chain.visible_at(epoch) {
674                if record.is_deleted() {
675                    return false;
676                }
677            } else {
678                // Not visible at this epoch (already deleted or doesn't exist)
679                return false;
680            }
681
682            // Mark the version chain as deleted at this epoch
683            chain.mark_deleted(epoch);
684
685            // Remove from label index using node_labels map
686            let mut index = self.label_index.write();
687            let mut node_labels = self.node_labels.write();
688            if let Some(label_ids) = node_labels.remove(&id) {
689                for label_id in label_ids {
690                    if let Some(set) = index.get_mut(label_id as usize) {
691                        set.remove(&id);
692                    }
693                }
694            }
695
696            // Remove properties
697            drop(nodes); // Release lock before removing properties
698            drop(index);
699            drop(node_labels);
700            self.node_properties.remove_all(id);
701
702            // Note: Caller should use delete_node_edges() first if detach is needed
703
704            true
705        } else {
706            false
707        }
708    }
709
710    /// Deletes a node at a specific epoch.
711    /// (Tiered storage version)
712    #[cfg(feature = "tiered-storage")]
713    pub fn delete_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> bool {
714        let mut versions = self.node_versions.write();
715        if let Some(index) = versions.get_mut(&id) {
716            // Check if visible at this epoch
717            if let Some(version_ref) = index.visible_at(epoch) {
718                if let Some(record) = self.read_node_record(&version_ref) {
719                    if record.is_deleted() {
720                        return false;
721                    }
722                } else {
723                    return false;
724                }
725            } else {
726                return false;
727            }
728
729            // Mark as deleted in version index
730            index.mark_deleted(epoch);
731
732            // Remove from label index using node_labels map
733            let mut label_index = self.label_index.write();
734            let mut node_labels = self.node_labels.write();
735            if let Some(label_ids) = node_labels.remove(&id) {
736                for label_id in label_ids {
737                    if let Some(set) = label_index.get_mut(label_id as usize) {
738                        set.remove(&id);
739                    }
740                }
741            }
742
743            // Remove properties
744            drop(versions);
745            drop(label_index);
746            drop(node_labels);
747            self.node_properties.remove_all(id);
748
749            true
750        } else {
751            false
752        }
753    }
754
755    /// Deletes all edges connected to a node (implements DETACH DELETE).
756    ///
757    /// Call this before `delete_node()` if you want to remove a node that
758    /// has edges. Grafeo doesn't auto-delete edges - you have to be explicit.
759    #[cfg(not(feature = "tiered-storage"))]
760    pub fn delete_node_edges(&self, node_id: NodeId) {
761        // Get outgoing edges
762        let outgoing: Vec<EdgeId> = self
763            .forward_adj
764            .edges_from(node_id)
765            .into_iter()
766            .map(|(_, edge_id)| edge_id)
767            .collect();
768
769        // Get incoming edges
770        let incoming: Vec<EdgeId> = if let Some(ref backward) = self.backward_adj {
771            backward
772                .edges_from(node_id)
773                .into_iter()
774                .map(|(_, edge_id)| edge_id)
775                .collect()
776        } else {
777            // No backward adjacency - scan all edges
778            let epoch = self.current_epoch();
779            self.edges
780                .read()
781                .iter()
782                .filter_map(|(id, chain)| {
783                    chain.visible_at(epoch).and_then(|r| {
784                        if !r.is_deleted() && r.dst == node_id {
785                            Some(*id)
786                        } else {
787                            None
788                        }
789                    })
790                })
791                .collect()
792        };
793
794        // Delete all edges
795        for edge_id in outgoing.into_iter().chain(incoming) {
796            self.delete_edge(edge_id);
797        }
798    }
799
800    /// Deletes all edges connected to a node (implements DETACH DELETE).
801    /// (Tiered storage version)
802    #[cfg(feature = "tiered-storage")]
803    pub fn delete_node_edges(&self, node_id: NodeId) {
804        // Get outgoing edges
805        let outgoing: Vec<EdgeId> = self
806            .forward_adj
807            .edges_from(node_id)
808            .into_iter()
809            .map(|(_, edge_id)| edge_id)
810            .collect();
811
812        // Get incoming edges
813        let incoming: Vec<EdgeId> = if let Some(ref backward) = self.backward_adj {
814            backward
815                .edges_from(node_id)
816                .into_iter()
817                .map(|(_, edge_id)| edge_id)
818                .collect()
819        } else {
820            // No backward adjacency - scan all edges
821            let epoch = self.current_epoch();
822            let versions = self.edge_versions.read();
823            versions
824                .iter()
825                .filter_map(|(id, index)| {
826                    index.visible_at(epoch).and_then(|vref| {
827                        self.read_edge_record(&vref).and_then(|r| {
828                            if !r.is_deleted() && r.dst == node_id {
829                                Some(*id)
830                            } else {
831                                None
832                            }
833                        })
834                    })
835                })
836                .collect()
837        };
838
839        // Delete all edges
840        for edge_id in outgoing.into_iter().chain(incoming) {
841            self.delete_edge(edge_id);
842        }
843    }
844
845    /// Sets a property on a node.
846    #[cfg(not(feature = "tiered-storage"))]
847    pub fn set_node_property(&self, id: NodeId, key: &str, value: Value) {
848        let prop_key: PropertyKey = key.into();
849
850        // Update property index before setting the property (needs to read old value)
851        self.update_property_index_on_set(id, &prop_key, &value);
852
853        self.node_properties.set(id, prop_key, value);
854
855        // Update props_count in record
856        let count = self.node_properties.get_all(id).len() as u16;
857        if let Some(chain) = self.nodes.write().get_mut(&id) {
858            if let Some(record) = chain.latest_mut() {
859                record.props_count = count;
860            }
861        }
862    }
863
864    /// Sets a property on a node.
865    /// (Tiered storage version: properties stored separately, record is immutable)
866    #[cfg(feature = "tiered-storage")]
867    pub fn set_node_property(&self, id: NodeId, key: &str, value: Value) {
868        let prop_key: PropertyKey = key.into();
869
870        // Update property index before setting the property (needs to read old value)
871        self.update_property_index_on_set(id, &prop_key, &value);
872
873        self.node_properties.set(id, prop_key, value);
874        // Note: props_count in record is not updated for tiered storage.
875        // The record is immutable once allocated in the arena.
876        // Property count can be derived from PropertyStorage if needed.
877    }
878
879    /// Sets a property on an edge.
880    pub fn set_edge_property(&self, id: EdgeId, key: &str, value: Value) {
881        self.edge_properties.set(id, key.into(), value);
882    }
883
884    /// Removes a property from a node.
885    ///
886    /// Returns the previous value if it existed, or None if the property didn't exist.
887    #[cfg(not(feature = "tiered-storage"))]
888    pub fn remove_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
889        let prop_key: PropertyKey = key.into();
890
891        // Update property index before removing (needs to read old value)
892        self.update_property_index_on_remove(id, &prop_key);
893
894        let result = self.node_properties.remove(id, &prop_key);
895
896        // Update props_count in record
897        let count = self.node_properties.get_all(id).len() as u16;
898        if let Some(chain) = self.nodes.write().get_mut(&id) {
899            if let Some(record) = chain.latest_mut() {
900                record.props_count = count;
901            }
902        }
903
904        result
905    }
906
907    /// Removes a property from a node.
908    /// (Tiered storage version)
909    #[cfg(feature = "tiered-storage")]
910    pub fn remove_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
911        let prop_key: PropertyKey = key.into();
912
913        // Update property index before removing (needs to read old value)
914        self.update_property_index_on_remove(id, &prop_key);
915
916        self.node_properties.remove(id, &prop_key)
917        // Note: props_count in record is not updated for tiered storage.
918    }
919
920    /// Removes a property from an edge.
921    ///
922    /// Returns the previous value if it existed, or None if the property didn't exist.
923    pub fn remove_edge_property(&self, id: EdgeId, key: &str) -> Option<Value> {
924        self.edge_properties.remove(id, &key.into())
925    }
926
927    /// Gets a single property from a node without loading all properties.
928    ///
929    /// This is O(1) vs O(properties) for `get_node().get_property()`.
930    /// Use this for filter predicates where you only need one property value.
931    ///
932    /// # Example
933    ///
934    /// ```ignore
935    /// // Fast: Direct single-property lookup
936    /// let age = store.get_node_property(node_id, "age");
937    ///
938    /// // Slow: Loads all properties, then extracts one
939    /// let age = store.get_node(node_id).and_then(|n| n.get_property("age").cloned());
940    /// ```
941    #[must_use]
942    pub fn get_node_property(&self, id: NodeId, key: &PropertyKey) -> Option<Value> {
943        self.node_properties.get(id, key)
944    }
945
946    /// Gets a single property from an edge without loading all properties.
947    ///
948    /// This is O(1) vs O(properties) for `get_edge().get_property()`.
949    #[must_use]
950    pub fn get_edge_property(&self, id: EdgeId, key: &PropertyKey) -> Option<Value> {
951        self.edge_properties.get(id, key)
952    }
953
954    // === Batch Property Operations ===
955
956    /// Gets a property for multiple nodes in a single batch operation.
957    ///
958    /// More efficient than calling [`Self::get_node_property`] in a loop because it
959    /// reduces lock overhead and enables better cache utilization.
960    ///
961    /// # Example
962    ///
963    /// ```
964    /// use grafeo_core::graph::lpg::LpgStore;
965    /// use grafeo_common::types::{NodeId, PropertyKey, Value};
966    ///
967    /// let store = LpgStore::new();
968    /// let n1 = store.create_node(&["Person"]);
969    /// let n2 = store.create_node(&["Person"]);
970    /// store.set_node_property(n1, "age", Value::from(25i64));
971    /// store.set_node_property(n2, "age", Value::from(30i64));
972    ///
973    /// let ages = store.get_node_property_batch(&[n1, n2], &PropertyKey::new("age"));
974    /// assert_eq!(ages, vec![Some(Value::from(25i64)), Some(Value::from(30i64))]);
975    /// ```
976    #[must_use]
977    pub fn get_node_property_batch(&self, ids: &[NodeId], key: &PropertyKey) -> Vec<Option<Value>> {
978        self.node_properties.get_batch(ids, key)
979    }
980
981    /// Gets all properties for multiple nodes in a single batch operation.
982    ///
983    /// Returns a vector of property maps, one per node ID (empty map if no properties).
984    /// More efficient than calling [`Self::get_node`] in a loop.
985    #[must_use]
986    pub fn get_nodes_properties_batch(&self, ids: &[NodeId]) -> Vec<FxHashMap<PropertyKey, Value>> {
987        self.node_properties.get_all_batch(ids)
988    }
989
990    /// Gets selected properties for multiple nodes (projection pushdown).
991    ///
992    /// This is more efficient than [`Self::get_nodes_properties_batch`] when you only
993    /// need a subset of properties. It only iterates the requested columns instead of
994    /// all columns.
995    ///
996    /// **Use this for**: Queries with explicit projections like `RETURN n.name, n.age`
997    /// instead of `RETURN n` (which requires all properties).
998    ///
999    /// # Example
1000    ///
1001    /// ```
1002    /// use grafeo_core::graph::lpg::LpgStore;
1003    /// use grafeo_common::types::{PropertyKey, Value};
1004    ///
1005    /// let store = LpgStore::new();
1006    /// let n1 = store.create_node(&["Person"]);
1007    /// store.set_node_property(n1, "name", Value::from("Alice"));
1008    /// store.set_node_property(n1, "age", Value::from(30i64));
1009    /// store.set_node_property(n1, "email", Value::from("alice@example.com"));
1010    ///
1011    /// // Only fetch name and age (faster than get_nodes_properties_batch)
1012    /// let keys = vec![PropertyKey::new("name"), PropertyKey::new("age")];
1013    /// let props = store.get_nodes_properties_selective_batch(&[n1], &keys);
1014    ///
1015    /// assert_eq!(props[0].len(), 2); // Only name and age, not email
1016    /// ```
1017    #[must_use]
1018    pub fn get_nodes_properties_selective_batch(
1019        &self,
1020        ids: &[NodeId],
1021        keys: &[PropertyKey],
1022    ) -> Vec<FxHashMap<PropertyKey, Value>> {
1023        self.node_properties.get_selective_batch(ids, keys)
1024    }
1025
1026    /// Gets selected properties for multiple edges (projection pushdown).
1027    ///
1028    /// Edge-property version of [`Self::get_nodes_properties_selective_batch`].
1029    #[must_use]
1030    pub fn get_edges_properties_selective_batch(
1031        &self,
1032        ids: &[EdgeId],
1033        keys: &[PropertyKey],
1034    ) -> Vec<FxHashMap<PropertyKey, Value>> {
1035        self.edge_properties.get_selective_batch(ids, keys)
1036    }
1037
1038    /// Finds nodes where a property value is in a range.
1039    ///
1040    /// This is useful for queries like `n.age > 30` or `n.price BETWEEN 10 AND 100`.
1041    /// Uses zone maps to skip scanning when the range definitely doesn't match.
1042    ///
1043    /// # Arguments
1044    ///
1045    /// * `property` - The property to check
1046    /// * `min` - Optional lower bound (None for unbounded)
1047    /// * `max` - Optional upper bound (None for unbounded)
1048    /// * `min_inclusive` - Whether lower bound is inclusive (>= vs >)
1049    /// * `max_inclusive` - Whether upper bound is inclusive (<= vs <)
1050    ///
1051    /// # Example
1052    ///
1053    /// ```
1054    /// use grafeo_core::graph::lpg::LpgStore;
1055    /// use grafeo_common::types::Value;
1056    ///
1057    /// let store = LpgStore::new();
1058    /// let n1 = store.create_node(&["Person"]);
1059    /// let n2 = store.create_node(&["Person"]);
1060    /// store.set_node_property(n1, "age", Value::from(25i64));
1061    /// store.set_node_property(n2, "age", Value::from(35i64));
1062    ///
1063    /// // Find nodes where age > 30
1064    /// let result = store.find_nodes_in_range(
1065    ///     "age",
1066    ///     Some(&Value::from(30i64)),
1067    ///     None,
1068    ///     false, // exclusive lower bound
1069    ///     true,  // inclusive upper bound (doesn't matter since None)
1070    /// );
1071    /// assert_eq!(result.len(), 1); // Only n2 matches
1072    /// ```
1073    #[must_use]
1074    pub fn find_nodes_in_range(
1075        &self,
1076        property: &str,
1077        min: Option<&Value>,
1078        max: Option<&Value>,
1079        min_inclusive: bool,
1080        max_inclusive: bool,
1081    ) -> Vec<NodeId> {
1082        let key = PropertyKey::new(property);
1083
1084        // Check zone map first - if no values could match, return empty
1085        if !self
1086            .node_properties
1087            .might_match_range(&key, min, max, min_inclusive, max_inclusive)
1088        {
1089            return Vec::new();
1090        }
1091
1092        // Scan all nodes and filter by range
1093        self.node_ids()
1094            .into_iter()
1095            .filter(|&node_id| {
1096                self.node_properties
1097                    .get(node_id, &key)
1098                    .is_some_and(|v| value_in_range(&v, min, max, min_inclusive, max_inclusive))
1099            })
1100            .collect()
1101    }
1102
1103    /// Finds nodes matching multiple property equality conditions.
1104    ///
1105    /// This is more efficient than intersecting multiple single-property lookups
1106    /// because it can use indexes when available and short-circuits on the first
1107    /// miss.
1108    ///
1109    /// # Example
1110    ///
1111    /// ```
1112    /// use grafeo_core::graph::lpg::LpgStore;
1113    /// use grafeo_common::types::Value;
1114    ///
1115    /// let store = LpgStore::new();
1116    /// let alice = store.create_node(&["Person"]);
1117    /// store.set_node_property(alice, "name", Value::from("Alice"));
1118    /// store.set_node_property(alice, "city", Value::from("NYC"));
1119    ///
1120    /// // Find nodes where name = "Alice" AND city = "NYC"
1121    /// let matches = store.find_nodes_by_properties(&[
1122    ///     ("name", Value::from("Alice")),
1123    ///     ("city", Value::from("NYC")),
1124    /// ]);
1125    /// assert!(matches.contains(&alice));
1126    /// ```
1127    #[must_use]
1128    pub fn find_nodes_by_properties(&self, conditions: &[(&str, Value)]) -> Vec<NodeId> {
1129        if conditions.is_empty() {
1130            return self.node_ids();
1131        }
1132
1133        // Find the most selective condition (smallest result set) to start
1134        // If any condition has an index, use that first
1135        let mut best_start: Option<(usize, Vec<NodeId>)> = None;
1136        let indexes = self.property_indexes.read();
1137
1138        for (i, (prop, value)) in conditions.iter().enumerate() {
1139            let key = PropertyKey::new(*prop);
1140            let hv = HashableValue::new(value.clone());
1141
1142            if let Some(index) = indexes.get(&key) {
1143                let matches: Vec<NodeId> = index
1144                    .get(&hv)
1145                    .map(|nodes| nodes.iter().copied().collect())
1146                    .unwrap_or_default();
1147
1148                // Short-circuit if any indexed condition has no matches
1149                if matches.is_empty() {
1150                    return Vec::new();
1151                }
1152
1153                // Use smallest indexed result as starting point
1154                if best_start
1155                    .as_ref()
1156                    .is_none_or(|(_, best)| matches.len() < best.len())
1157                {
1158                    best_start = Some((i, matches));
1159                }
1160            }
1161        }
1162        drop(indexes);
1163
1164        // Start from best indexed result or fall back to full node scan
1165        let (start_idx, mut candidates) = best_start.unwrap_or_else(|| {
1166            // No indexes available, start with first condition via full scan
1167            let (prop, value) = &conditions[0];
1168            (0, self.find_nodes_by_property(prop, value))
1169        });
1170
1171        // Filter candidates through remaining conditions
1172        for (i, (prop, value)) in conditions.iter().enumerate() {
1173            if i == start_idx {
1174                continue;
1175            }
1176
1177            let key = PropertyKey::new(*prop);
1178            candidates.retain(|&node_id| {
1179                self.node_properties
1180                    .get(node_id, &key)
1181                    .is_some_and(|v| v == *value)
1182            });
1183
1184            // Short-circuit if no candidates remain
1185            if candidates.is_empty() {
1186                return Vec::new();
1187            }
1188        }
1189
1190        candidates
1191    }
1192
1193    // === Property Index Operations ===
1194
1195    /// Creates an index on a node property for O(1) lookups by value.
1196    ///
1197    /// After creating an index, calls to [`Self::find_nodes_by_property`] will be
1198    /// O(1) instead of O(n) for this property. The index is automatically
1199    /// maintained when properties are set or removed.
1200    ///
1201    /// # Example
1202    ///
1203    /// ```
1204    /// use grafeo_core::graph::lpg::LpgStore;
1205    /// use grafeo_common::types::Value;
1206    ///
1207    /// let store = LpgStore::new();
1208    ///
1209    /// // Create nodes with an 'id' property
1210    /// let alice = store.create_node(&["Person"]);
1211    /// store.set_node_property(alice, "id", Value::from("alice_123"));
1212    ///
1213    /// // Create an index on the 'id' property
1214    /// store.create_property_index("id");
1215    ///
1216    /// // Now lookups by 'id' are O(1)
1217    /// let found = store.find_nodes_by_property("id", &Value::from("alice_123"));
1218    /// assert!(found.contains(&alice));
1219    /// ```
1220    pub fn create_property_index(&self, property: &str) {
1221        let key = PropertyKey::new(property);
1222
1223        let mut indexes = self.property_indexes.write();
1224        if indexes.contains_key(&key) {
1225            return; // Already indexed
1226        }
1227
1228        // Create the index and populate it with existing data
1229        let index: DashMap<HashableValue, FxHashSet<NodeId>> = DashMap::new();
1230
1231        // Scan all nodes to build the index
1232        for node_id in self.node_ids() {
1233            if let Some(value) = self.node_properties.get(node_id, &key) {
1234                let hv = HashableValue::new(value);
1235                index.entry(hv).or_default().insert(node_id);
1236            }
1237        }
1238
1239        indexes.insert(key, index);
1240    }
1241
1242    /// Drops an index on a node property.
1243    ///
1244    /// Returns `true` if the index existed and was removed.
1245    pub fn drop_property_index(&self, property: &str) -> bool {
1246        let key = PropertyKey::new(property);
1247        self.property_indexes.write().remove(&key).is_some()
1248    }
1249
1250    /// Returns `true` if the property has an index.
1251    #[must_use]
1252    pub fn has_property_index(&self, property: &str) -> bool {
1253        let key = PropertyKey::new(property);
1254        self.property_indexes.read().contains_key(&key)
1255    }
1256
1257    /// Stores a vector index for a label+property pair.
1258    #[cfg(feature = "vector-index")]
1259    pub fn add_vector_index(&self, label: &str, property: &str, index: Arc<HnswIndex>) {
1260        let key = format!("{label}:{property}");
1261        self.vector_indexes.write().insert(key, index);
1262    }
1263
1264    /// Retrieves the vector index for a label+property pair.
1265    #[cfg(feature = "vector-index")]
1266    #[must_use]
1267    pub fn get_vector_index(&self, label: &str, property: &str) -> Option<Arc<HnswIndex>> {
1268        let key = format!("{label}:{property}");
1269        self.vector_indexes.read().get(&key).cloned()
1270    }
1271
1272    /// Finds all nodes that have a specific property value.
1273    ///
1274    /// If the property is indexed, this is O(1). Otherwise, it scans all nodes
1275    /// which is O(n). Use [`Self::create_property_index`] for frequently queried properties.
1276    ///
1277    /// # Example
1278    ///
1279    /// ```
1280    /// use grafeo_core::graph::lpg::LpgStore;
1281    /// use grafeo_common::types::Value;
1282    ///
1283    /// let store = LpgStore::new();
1284    /// store.create_property_index("city"); // Optional but makes lookups fast
1285    ///
1286    /// let alice = store.create_node(&["Person"]);
1287    /// let bob = store.create_node(&["Person"]);
1288    /// store.set_node_property(alice, "city", Value::from("NYC"));
1289    /// store.set_node_property(bob, "city", Value::from("NYC"));
1290    ///
1291    /// let nyc_people = store.find_nodes_by_property("city", &Value::from("NYC"));
1292    /// assert_eq!(nyc_people.len(), 2);
1293    /// ```
1294    #[must_use]
1295    pub fn find_nodes_by_property(&self, property: &str, value: &Value) -> Vec<NodeId> {
1296        let key = PropertyKey::new(property);
1297        let hv = HashableValue::new(value.clone());
1298
1299        // Try indexed lookup first
1300        let indexes = self.property_indexes.read();
1301        if let Some(index) = indexes.get(&key) {
1302            if let Some(nodes) = index.get(&hv) {
1303                return nodes.iter().copied().collect();
1304            }
1305            return Vec::new();
1306        }
1307        drop(indexes);
1308
1309        // Fall back to full scan
1310        self.node_ids()
1311            .into_iter()
1312            .filter(|&node_id| {
1313                self.node_properties
1314                    .get(node_id, &key)
1315                    .is_some_and(|v| v == *value)
1316            })
1317            .collect()
1318    }
1319
1320    /// Updates property indexes when a property is set.
1321    fn update_property_index_on_set(&self, node_id: NodeId, key: &PropertyKey, new_value: &Value) {
1322        let indexes = self.property_indexes.read();
1323        if let Some(index) = indexes.get(key) {
1324            // Get old value to remove from index
1325            if let Some(old_value) = self.node_properties.get(node_id, key) {
1326                let old_hv = HashableValue::new(old_value);
1327                if let Some(mut nodes) = index.get_mut(&old_hv) {
1328                    nodes.remove(&node_id);
1329                    if nodes.is_empty() {
1330                        drop(nodes);
1331                        index.remove(&old_hv);
1332                    }
1333                }
1334            }
1335
1336            // Add new value to index
1337            let new_hv = HashableValue::new(new_value.clone());
1338            index
1339                .entry(new_hv)
1340                .or_insert_with(FxHashSet::default)
1341                .insert(node_id);
1342        }
1343    }
1344
1345    /// Updates property indexes when a property is removed.
1346    fn update_property_index_on_remove(&self, node_id: NodeId, key: &PropertyKey) {
1347        let indexes = self.property_indexes.read();
1348        if let Some(index) = indexes.get(key) {
1349            // Get old value to remove from index
1350            if let Some(old_value) = self.node_properties.get(node_id, key) {
1351                let old_hv = HashableValue::new(old_value);
1352                if let Some(mut nodes) = index.get_mut(&old_hv) {
1353                    nodes.remove(&node_id);
1354                    if nodes.is_empty() {
1355                        drop(nodes);
1356                        index.remove(&old_hv);
1357                    }
1358                }
1359            }
1360        }
1361    }
1362
1363    /// Adds a label to a node.
1364    ///
1365    /// Returns true if the label was added, false if the node doesn't exist
1366    /// or already has the label.
1367    #[cfg(not(feature = "tiered-storage"))]
1368    pub fn add_label(&self, node_id: NodeId, label: &str) -> bool {
1369        let epoch = self.current_epoch();
1370
1371        // Check if node exists
1372        let nodes = self.nodes.read();
1373        if let Some(chain) = nodes.get(&node_id) {
1374            if chain.visible_at(epoch).map_or(true, |r| r.is_deleted()) {
1375                return false;
1376            }
1377        } else {
1378            return false;
1379        }
1380        drop(nodes);
1381
1382        // Get or create label ID
1383        let label_id = self.get_or_create_label_id(label);
1384
1385        // Add to node_labels map
1386        let mut node_labels = self.node_labels.write();
1387        let label_set = node_labels
1388            .entry(node_id)
1389            .or_insert_with(FxHashSet::default);
1390
1391        if label_set.contains(&label_id) {
1392            return false; // Already has this label
1393        }
1394
1395        label_set.insert(label_id);
1396        drop(node_labels);
1397
1398        // Add to label_index
1399        let mut index = self.label_index.write();
1400        if (label_id as usize) >= index.len() {
1401            index.resize(label_id as usize + 1, FxHashMap::default());
1402        }
1403        index[label_id as usize].insert(node_id, ());
1404
1405        // Update label count in node record
1406        if let Some(chain) = self.nodes.write().get_mut(&node_id) {
1407            if let Some(record) = chain.latest_mut() {
1408                let count = self.node_labels.read().get(&node_id).map_or(0, |s| s.len());
1409                record.set_label_count(count as u16);
1410            }
1411        }
1412
1413        true
1414    }
1415
1416    /// Adds a label to a node.
1417    /// (Tiered storage version)
1418    #[cfg(feature = "tiered-storage")]
1419    pub fn add_label(&self, node_id: NodeId, label: &str) -> bool {
1420        let epoch = self.current_epoch();
1421
1422        // Check if node exists
1423        let versions = self.node_versions.read();
1424        if let Some(index) = versions.get(&node_id) {
1425            if let Some(vref) = index.visible_at(epoch) {
1426                if let Some(record) = self.read_node_record(&vref) {
1427                    if record.is_deleted() {
1428                        return false;
1429                    }
1430                } else {
1431                    return false;
1432                }
1433            } else {
1434                return false;
1435            }
1436        } else {
1437            return false;
1438        }
1439        drop(versions);
1440
1441        // Get or create label ID
1442        let label_id = self.get_or_create_label_id(label);
1443
1444        // Add to node_labels map
1445        let mut node_labels = self.node_labels.write();
1446        let label_set = node_labels.entry(node_id).or_default();
1447
1448        if label_set.contains(&label_id) {
1449            return false; // Already has this label
1450        }
1451
1452        label_set.insert(label_id);
1453        drop(node_labels);
1454
1455        // Add to label_index
1456        let mut index = self.label_index.write();
1457        if (label_id as usize) >= index.len() {
1458            index.resize(label_id as usize + 1, FxHashMap::default());
1459        }
1460        index[label_id as usize].insert(node_id, ());
1461
1462        // Note: label_count in record is not updated for tiered storage.
1463        // The record is immutable once allocated in the arena.
1464
1465        true
1466    }
1467
1468    /// Removes a label from a node.
1469    ///
1470    /// Returns true if the label was removed, false if the node doesn't exist
1471    /// or doesn't have the label.
1472    #[cfg(not(feature = "tiered-storage"))]
1473    pub fn remove_label(&self, node_id: NodeId, label: &str) -> bool {
1474        let epoch = self.current_epoch();
1475
1476        // Check if node exists
1477        let nodes = self.nodes.read();
1478        if let Some(chain) = nodes.get(&node_id) {
1479            if chain.visible_at(epoch).map_or(true, |r| r.is_deleted()) {
1480                return false;
1481            }
1482        } else {
1483            return false;
1484        }
1485        drop(nodes);
1486
1487        // Get label ID
1488        let label_id = {
1489            let label_ids = self.label_to_id.read();
1490            match label_ids.get(label) {
1491                Some(&id) => id,
1492                None => return false, // Label doesn't exist
1493            }
1494        };
1495
1496        // Remove from node_labels map
1497        let mut node_labels = self.node_labels.write();
1498        if let Some(label_set) = node_labels.get_mut(&node_id) {
1499            if !label_set.remove(&label_id) {
1500                return false; // Node doesn't have this label
1501            }
1502        } else {
1503            return false;
1504        }
1505        drop(node_labels);
1506
1507        // Remove from label_index
1508        let mut index = self.label_index.write();
1509        if (label_id as usize) < index.len() {
1510            index[label_id as usize].remove(&node_id);
1511        }
1512
1513        // Update label count in node record
1514        if let Some(chain) = self.nodes.write().get_mut(&node_id) {
1515            if let Some(record) = chain.latest_mut() {
1516                let count = self.node_labels.read().get(&node_id).map_or(0, |s| s.len());
1517                record.set_label_count(count as u16);
1518            }
1519        }
1520
1521        true
1522    }
1523
1524    /// Removes a label from a node.
1525    /// (Tiered storage version)
1526    #[cfg(feature = "tiered-storage")]
1527    pub fn remove_label(&self, node_id: NodeId, label: &str) -> bool {
1528        let epoch = self.current_epoch();
1529
1530        // Check if node exists
1531        let versions = self.node_versions.read();
1532        if let Some(index) = versions.get(&node_id) {
1533            if let Some(vref) = index.visible_at(epoch) {
1534                if let Some(record) = self.read_node_record(&vref) {
1535                    if record.is_deleted() {
1536                        return false;
1537                    }
1538                } else {
1539                    return false;
1540                }
1541            } else {
1542                return false;
1543            }
1544        } else {
1545            return false;
1546        }
1547        drop(versions);
1548
1549        // Get label ID
1550        let label_id = {
1551            let label_ids = self.label_to_id.read();
1552            match label_ids.get(label) {
1553                Some(&id) => id,
1554                None => return false, // Label doesn't exist
1555            }
1556        };
1557
1558        // Remove from node_labels map
1559        let mut node_labels = self.node_labels.write();
1560        if let Some(label_set) = node_labels.get_mut(&node_id) {
1561            if !label_set.remove(&label_id) {
1562                return false; // Node doesn't have this label
1563            }
1564        } else {
1565            return false;
1566        }
1567        drop(node_labels);
1568
1569        // Remove from label_index
1570        let mut index = self.label_index.write();
1571        if (label_id as usize) < index.len() {
1572            index[label_id as usize].remove(&node_id);
1573        }
1574
1575        // Note: label_count in record is not updated for tiered storage.
1576
1577        true
1578    }
1579
1580    /// Returns the number of nodes (non-deleted at current epoch).
1581    #[must_use]
1582    #[cfg(not(feature = "tiered-storage"))]
1583    pub fn node_count(&self) -> usize {
1584        let epoch = self.current_epoch();
1585        self.nodes
1586            .read()
1587            .values()
1588            .filter_map(|chain| chain.visible_at(epoch))
1589            .filter(|r| !r.is_deleted())
1590            .count()
1591    }
1592
1593    /// Returns the number of nodes (non-deleted at current epoch).
1594    /// (Tiered storage version)
1595    #[must_use]
1596    #[cfg(feature = "tiered-storage")]
1597    pub fn node_count(&self) -> usize {
1598        let epoch = self.current_epoch();
1599        let versions = self.node_versions.read();
1600        versions
1601            .iter()
1602            .filter(|(_, index)| {
1603                index.visible_at(epoch).map_or(false, |vref| {
1604                    self.read_node_record(&vref)
1605                        .map_or(false, |r| !r.is_deleted())
1606                })
1607            })
1608            .count()
1609    }
1610
1611    /// Returns all node IDs in the store.
1612    ///
1613    /// This returns a snapshot of current node IDs. The returned vector
1614    /// excludes deleted nodes. Results are sorted by NodeId for deterministic
1615    /// iteration order.
1616    #[must_use]
1617    #[cfg(not(feature = "tiered-storage"))]
1618    pub fn node_ids(&self) -> Vec<NodeId> {
1619        let epoch = self.current_epoch();
1620        let mut ids: Vec<NodeId> = self
1621            .nodes
1622            .read()
1623            .iter()
1624            .filter_map(|(id, chain)| {
1625                chain
1626                    .visible_at(epoch)
1627                    .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
1628            })
1629            .collect();
1630        ids.sort_unstable();
1631        ids
1632    }
1633
1634    /// Returns all node IDs in the store.
1635    /// (Tiered storage version)
1636    #[must_use]
1637    #[cfg(feature = "tiered-storage")]
1638    pub fn node_ids(&self) -> Vec<NodeId> {
1639        let epoch = self.current_epoch();
1640        let versions = self.node_versions.read();
1641        let mut ids: Vec<NodeId> = versions
1642            .iter()
1643            .filter_map(|(id, index)| {
1644                index.visible_at(epoch).and_then(|vref| {
1645                    self.read_node_record(&vref)
1646                        .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
1647                })
1648            })
1649            .collect();
1650        ids.sort_unstable();
1651        ids
1652    }
1653
1654    // === Edge Operations ===
1655
1656    /// Creates a new edge.
1657    pub fn create_edge(&self, src: NodeId, dst: NodeId, edge_type: &str) -> EdgeId {
1658        self.needs_stats_recompute.store(true, Ordering::Relaxed);
1659        self.create_edge_versioned(src, dst, edge_type, self.current_epoch(), TxId::SYSTEM)
1660    }
1661
1662    /// Creates a new edge within a transaction context.
1663    #[cfg(not(feature = "tiered-storage"))]
1664    pub fn create_edge_versioned(
1665        &self,
1666        src: NodeId,
1667        dst: NodeId,
1668        edge_type: &str,
1669        epoch: EpochId,
1670        tx_id: TxId,
1671    ) -> EdgeId {
1672        let id = EdgeId::new(self.next_edge_id.fetch_add(1, Ordering::Relaxed));
1673        let type_id = self.get_or_create_edge_type_id(edge_type);
1674
1675        let record = EdgeRecord::new(id, src, dst, type_id, epoch);
1676        let chain = VersionChain::with_initial(record, epoch, tx_id);
1677        self.edges.write().insert(id, chain);
1678
1679        // Update adjacency
1680        self.forward_adj.add_edge(src, dst, id);
1681        if let Some(ref backward) = self.backward_adj {
1682            backward.add_edge(dst, src, id);
1683        }
1684
1685        id
1686    }
1687
1688    /// Creates a new edge within a transaction context.
1689    /// (Tiered storage version)
1690    #[cfg(feature = "tiered-storage")]
1691    pub fn create_edge_versioned(
1692        &self,
1693        src: NodeId,
1694        dst: NodeId,
1695        edge_type: &str,
1696        epoch: EpochId,
1697        tx_id: TxId,
1698    ) -> EdgeId {
1699        let id = EdgeId::new(self.next_edge_id.fetch_add(1, Ordering::Relaxed));
1700        let type_id = self.get_or_create_edge_type_id(edge_type);
1701
1702        let record = EdgeRecord::new(id, src, dst, type_id, epoch);
1703
1704        // Allocate record in arena and get offset (create epoch if needed)
1705        let arena = self.arena_allocator.arena_or_create(epoch);
1706        let (offset, _stored) = arena.alloc_value_with_offset(record);
1707
1708        // Create HotVersionRef pointing to arena data
1709        let hot_ref = HotVersionRef::new(epoch, offset, tx_id);
1710
1711        // Create or update version index
1712        let mut versions = self.edge_versions.write();
1713        if let Some(index) = versions.get_mut(&id) {
1714            index.add_hot(hot_ref);
1715        } else {
1716            versions.insert(id, VersionIndex::with_initial(hot_ref));
1717        }
1718
1719        // Update adjacency
1720        self.forward_adj.add_edge(src, dst, id);
1721        if let Some(ref backward) = self.backward_adj {
1722            backward.add_edge(dst, src, id);
1723        }
1724
1725        id
1726    }
1727
1728    /// Creates a new edge with properties.
1729    pub fn create_edge_with_props(
1730        &self,
1731        src: NodeId,
1732        dst: NodeId,
1733        edge_type: &str,
1734        properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
1735    ) -> EdgeId {
1736        let id = self.create_edge(src, dst, edge_type);
1737
1738        for (key, value) in properties {
1739            self.edge_properties.set(id, key.into(), value.into());
1740        }
1741
1742        id
1743    }
1744
1745    /// Gets an edge by ID (latest visible version).
1746    #[must_use]
1747    pub fn get_edge(&self, id: EdgeId) -> Option<Edge> {
1748        self.get_edge_at_epoch(id, self.current_epoch())
1749    }
1750
1751    /// Gets an edge by ID at a specific epoch.
1752    #[must_use]
1753    #[cfg(not(feature = "tiered-storage"))]
1754    pub fn get_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> Option<Edge> {
1755        let edges = self.edges.read();
1756        let chain = edges.get(&id)?;
1757        let record = chain.visible_at(epoch)?;
1758
1759        if record.is_deleted() {
1760            return None;
1761        }
1762
1763        let edge_type = {
1764            let id_to_type = self.id_to_edge_type.read();
1765            id_to_type.get(record.type_id as usize)?.clone()
1766        };
1767
1768        let mut edge = Edge::new(id, record.src, record.dst, edge_type);
1769
1770        // Get properties
1771        edge.properties = self.edge_properties.get_all(id).into_iter().collect();
1772
1773        Some(edge)
1774    }
1775
1776    /// Gets an edge by ID at a specific epoch.
1777    /// (Tiered storage version)
1778    #[must_use]
1779    #[cfg(feature = "tiered-storage")]
1780    pub fn get_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> Option<Edge> {
1781        let versions = self.edge_versions.read();
1782        let index = versions.get(&id)?;
1783        let version_ref = index.visible_at(epoch)?;
1784
1785        let record = self.read_edge_record(&version_ref)?;
1786
1787        if record.is_deleted() {
1788            return None;
1789        }
1790
1791        let edge_type = {
1792            let id_to_type = self.id_to_edge_type.read();
1793            id_to_type.get(record.type_id as usize)?.clone()
1794        };
1795
1796        let mut edge = Edge::new(id, record.src, record.dst, edge_type);
1797
1798        // Get properties
1799        edge.properties = self.edge_properties.get_all(id).into_iter().collect();
1800
1801        Some(edge)
1802    }
1803
1804    /// Gets an edge visible to a specific transaction.
1805    #[must_use]
1806    #[cfg(not(feature = "tiered-storage"))]
1807    pub fn get_edge_versioned(&self, id: EdgeId, epoch: EpochId, tx_id: TxId) -> Option<Edge> {
1808        let edges = self.edges.read();
1809        let chain = edges.get(&id)?;
1810        let record = chain.visible_to(epoch, tx_id)?;
1811
1812        if record.is_deleted() {
1813            return None;
1814        }
1815
1816        let edge_type = {
1817            let id_to_type = self.id_to_edge_type.read();
1818            id_to_type.get(record.type_id as usize)?.clone()
1819        };
1820
1821        let mut edge = Edge::new(id, record.src, record.dst, edge_type);
1822
1823        // Get properties
1824        edge.properties = self.edge_properties.get_all(id).into_iter().collect();
1825
1826        Some(edge)
1827    }
1828
1829    /// Gets an edge visible to a specific transaction.
1830    /// (Tiered storage version)
1831    #[must_use]
1832    #[cfg(feature = "tiered-storage")]
1833    pub fn get_edge_versioned(&self, id: EdgeId, epoch: EpochId, tx_id: TxId) -> Option<Edge> {
1834        let versions = self.edge_versions.read();
1835        let index = versions.get(&id)?;
1836        let version_ref = index.visible_to(epoch, tx_id)?;
1837
1838        let record = self.read_edge_record(&version_ref)?;
1839
1840        if record.is_deleted() {
1841            return None;
1842        }
1843
1844        let edge_type = {
1845            let id_to_type = self.id_to_edge_type.read();
1846            id_to_type.get(record.type_id as usize)?.clone()
1847        };
1848
1849        let mut edge = Edge::new(id, record.src, record.dst, edge_type);
1850
1851        // Get properties
1852        edge.properties = self.edge_properties.get_all(id).into_iter().collect();
1853
1854        Some(edge)
1855    }
1856
1857    /// Reads an EdgeRecord from arena using a VersionRef.
1858    #[cfg(feature = "tiered-storage")]
1859    #[allow(unsafe_code)]
1860    fn read_edge_record(&self, version_ref: &VersionRef) -> Option<EdgeRecord> {
1861        match version_ref {
1862            VersionRef::Hot(hot_ref) => {
1863                let arena = self.arena_allocator.arena(hot_ref.epoch);
1864                // SAFETY: The offset was returned by alloc_value_with_offset for an EdgeRecord
1865                let record: &EdgeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
1866                Some(*record)
1867            }
1868            VersionRef::Cold(cold_ref) => {
1869                // Read from compressed epoch store
1870                self.epoch_store
1871                    .get_edge(cold_ref.epoch, cold_ref.block_offset, cold_ref.length)
1872            }
1873        }
1874    }
1875
1876    /// Deletes an edge (using latest epoch).
1877    pub fn delete_edge(&self, id: EdgeId) -> bool {
1878        self.needs_stats_recompute.store(true, Ordering::Relaxed);
1879        self.delete_edge_at_epoch(id, self.current_epoch())
1880    }
1881
1882    /// Deletes an edge at a specific epoch.
1883    #[cfg(not(feature = "tiered-storage"))]
1884    pub fn delete_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> bool {
1885        let mut edges = self.edges.write();
1886        if let Some(chain) = edges.get_mut(&id) {
1887            // Get the visible record to check if deleted and get src/dst
1888            let (src, dst) = {
1889                match chain.visible_at(epoch) {
1890                    Some(record) => {
1891                        if record.is_deleted() {
1892                            return false;
1893                        }
1894                        (record.src, record.dst)
1895                    }
1896                    None => return false, // Not visible at this epoch (already deleted)
1897                }
1898            };
1899
1900            // Mark the version chain as deleted
1901            chain.mark_deleted(epoch);
1902
1903            drop(edges); // Release lock
1904
1905            // Mark as deleted in adjacency (soft delete)
1906            self.forward_adj.mark_deleted(src, id);
1907            if let Some(ref backward) = self.backward_adj {
1908                backward.mark_deleted(dst, id);
1909            }
1910
1911            // Remove properties
1912            self.edge_properties.remove_all(id);
1913
1914            true
1915        } else {
1916            false
1917        }
1918    }
1919
1920    /// Deletes an edge at a specific epoch.
1921    /// (Tiered storage version)
1922    #[cfg(feature = "tiered-storage")]
1923    pub fn delete_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> bool {
1924        let mut versions = self.edge_versions.write();
1925        if let Some(index) = versions.get_mut(&id) {
1926            // Get the visible record to check if deleted and get src/dst
1927            let (src, dst) = {
1928                match index.visible_at(epoch) {
1929                    Some(version_ref) => {
1930                        if let Some(record) = self.read_edge_record(&version_ref) {
1931                            if record.is_deleted() {
1932                                return false;
1933                            }
1934                            (record.src, record.dst)
1935                        } else {
1936                            return false;
1937                        }
1938                    }
1939                    None => return false,
1940                }
1941            };
1942
1943            // Mark as deleted in version index
1944            index.mark_deleted(epoch);
1945
1946            drop(versions); // Release lock
1947
1948            // Mark as deleted in adjacency (soft delete)
1949            self.forward_adj.mark_deleted(src, id);
1950            if let Some(ref backward) = self.backward_adj {
1951                backward.mark_deleted(dst, id);
1952            }
1953
1954            // Remove properties
1955            self.edge_properties.remove_all(id);
1956
1957            true
1958        } else {
1959            false
1960        }
1961    }
1962
1963    /// Returns the number of edges (non-deleted at current epoch).
1964    #[must_use]
1965    #[cfg(not(feature = "tiered-storage"))]
1966    pub fn edge_count(&self) -> usize {
1967        let epoch = self.current_epoch();
1968        self.edges
1969            .read()
1970            .values()
1971            .filter_map(|chain| chain.visible_at(epoch))
1972            .filter(|r| !r.is_deleted())
1973            .count()
1974    }
1975
1976    /// Returns the number of edges (non-deleted at current epoch).
1977    /// (Tiered storage version)
1978    #[must_use]
1979    #[cfg(feature = "tiered-storage")]
1980    pub fn edge_count(&self) -> usize {
1981        let epoch = self.current_epoch();
1982        let versions = self.edge_versions.read();
1983        versions
1984            .iter()
1985            .filter(|(_, index)| {
1986                index.visible_at(epoch).map_or(false, |vref| {
1987                    self.read_edge_record(&vref)
1988                        .map_or(false, |r| !r.is_deleted())
1989                })
1990            })
1991            .count()
1992    }
1993
1994    /// Discards all uncommitted versions created by a transaction.
1995    ///
1996    /// This is called during transaction rollback to clean up uncommitted changes.
1997    /// The method removes version chain entries created by the specified transaction.
1998    #[cfg(not(feature = "tiered-storage"))]
1999    pub fn discard_uncommitted_versions(&self, tx_id: TxId) {
2000        // Remove uncommitted node versions
2001        {
2002            let mut nodes = self.nodes.write();
2003            for chain in nodes.values_mut() {
2004                chain.remove_versions_by(tx_id);
2005            }
2006            // Remove completely empty chains (no versions left)
2007            nodes.retain(|_, chain| !chain.is_empty());
2008        }
2009
2010        // Remove uncommitted edge versions
2011        {
2012            let mut edges = self.edges.write();
2013            for chain in edges.values_mut() {
2014                chain.remove_versions_by(tx_id);
2015            }
2016            // Remove completely empty chains (no versions left)
2017            edges.retain(|_, chain| !chain.is_empty());
2018        }
2019    }
2020
2021    /// Discards all uncommitted versions created by a transaction.
2022    /// (Tiered storage version)
2023    #[cfg(feature = "tiered-storage")]
2024    pub fn discard_uncommitted_versions(&self, tx_id: TxId) {
2025        // Remove uncommitted node versions
2026        {
2027            let mut versions = self.node_versions.write();
2028            for index in versions.values_mut() {
2029                index.remove_versions_by(tx_id);
2030            }
2031            // Remove completely empty indexes (no versions left)
2032            versions.retain(|_, index| !index.is_empty());
2033        }
2034
2035        // Remove uncommitted edge versions
2036        {
2037            let mut versions = self.edge_versions.write();
2038            for index in versions.values_mut() {
2039                index.remove_versions_by(tx_id);
2040            }
2041            // Remove completely empty indexes (no versions left)
2042            versions.retain(|_, index| !index.is_empty());
2043        }
2044    }
2045
2046    /// Freezes an epoch from hot (arena) storage to cold (compressed) storage.
2047    ///
2048    /// This is called by the transaction manager when an epoch becomes eligible
2049    /// for freezing (no active transactions can see it). The freeze process:
2050    ///
2051    /// 1. Collects all hot version refs for the epoch
2052    /// 2. Reads the corresponding records from arena
2053    /// 3. Compresses them into a `CompressedEpochBlock`
2054    /// 4. Updates `VersionIndex` entries to point to cold storage
2055    /// 5. The arena can be deallocated after all epochs in it are frozen
2056    ///
2057    /// # Arguments
2058    ///
2059    /// * `epoch` - The epoch to freeze
2060    ///
2061    /// # Returns
2062    ///
2063    /// The number of records frozen (nodes + edges).
2064    #[cfg(feature = "tiered-storage")]
2065    #[allow(unsafe_code)]
2066    pub fn freeze_epoch(&self, epoch: EpochId) -> usize {
2067        // Collect node records to freeze
2068        let mut node_records: Vec<(u64, NodeRecord)> = Vec::new();
2069        let mut node_hot_refs: Vec<(NodeId, HotVersionRef)> = Vec::new();
2070
2071        {
2072            let versions = self.node_versions.read();
2073            for (node_id, index) in versions.iter() {
2074                for hot_ref in index.hot_refs_for_epoch(epoch) {
2075                    let arena = self.arena_allocator.arena(hot_ref.epoch);
2076                    // SAFETY: The offset was returned by alloc_value_with_offset for a NodeRecord
2077                    let record: &NodeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
2078                    node_records.push((node_id.as_u64(), *record));
2079                    node_hot_refs.push((*node_id, *hot_ref));
2080                }
2081            }
2082        }
2083
2084        // Collect edge records to freeze
2085        let mut edge_records: Vec<(u64, EdgeRecord)> = Vec::new();
2086        let mut edge_hot_refs: Vec<(EdgeId, HotVersionRef)> = Vec::new();
2087
2088        {
2089            let versions = self.edge_versions.read();
2090            for (edge_id, index) in versions.iter() {
2091                for hot_ref in index.hot_refs_for_epoch(epoch) {
2092                    let arena = self.arena_allocator.arena(hot_ref.epoch);
2093                    // SAFETY: The offset was returned by alloc_value_with_offset for an EdgeRecord
2094                    let record: &EdgeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
2095                    edge_records.push((edge_id.as_u64(), *record));
2096                    edge_hot_refs.push((*edge_id, *hot_ref));
2097                }
2098            }
2099        }
2100
2101        let total_frozen = node_records.len() + edge_records.len();
2102
2103        if total_frozen == 0 {
2104            return 0;
2105        }
2106
2107        // Freeze to compressed storage
2108        let (node_entries, edge_entries) =
2109            self.epoch_store
2110                .freeze_epoch(epoch, node_records, edge_records);
2111
2112        // Build lookup maps for index entries
2113        let node_entry_map: FxHashMap<u64, _> = node_entries
2114            .iter()
2115            .map(|e| (e.entity_id, (e.offset, e.length)))
2116            .collect();
2117        let edge_entry_map: FxHashMap<u64, _> = edge_entries
2118            .iter()
2119            .map(|e| (e.entity_id, (e.offset, e.length)))
2120            .collect();
2121
2122        // Update version indexes to use cold refs
2123        {
2124            let mut versions = self.node_versions.write();
2125            for (node_id, hot_ref) in &node_hot_refs {
2126                if let Some(index) = versions.get_mut(node_id)
2127                    && let Some(&(offset, length)) = node_entry_map.get(&node_id.as_u64())
2128                {
2129                    let cold_ref = ColdVersionRef {
2130                        epoch,
2131                        block_offset: offset,
2132                        length,
2133                        created_by: hot_ref.created_by,
2134                        deleted_epoch: hot_ref.deleted_epoch,
2135                    };
2136                    index.freeze_epoch(epoch, std::iter::once(cold_ref));
2137                }
2138            }
2139        }
2140
2141        {
2142            let mut versions = self.edge_versions.write();
2143            for (edge_id, hot_ref) in &edge_hot_refs {
2144                if let Some(index) = versions.get_mut(edge_id)
2145                    && let Some(&(offset, length)) = edge_entry_map.get(&edge_id.as_u64())
2146                {
2147                    let cold_ref = ColdVersionRef {
2148                        epoch,
2149                        block_offset: offset,
2150                        length,
2151                        created_by: hot_ref.created_by,
2152                        deleted_epoch: hot_ref.deleted_epoch,
2153                    };
2154                    index.freeze_epoch(epoch, std::iter::once(cold_ref));
2155                }
2156            }
2157        }
2158
2159        total_frozen
2160    }
2161
2162    /// Returns the epoch store for cold storage statistics.
2163    #[cfg(feature = "tiered-storage")]
2164    #[must_use]
2165    pub fn epoch_store(&self) -> &EpochStore {
2166        &self.epoch_store
2167    }
2168
2169    /// Returns the number of distinct labels in the store.
2170    #[must_use]
2171    pub fn label_count(&self) -> usize {
2172        self.id_to_label.read().len()
2173    }
2174
2175    /// Returns the number of distinct property keys in the store.
2176    ///
2177    /// This counts unique property keys across both nodes and edges.
2178    #[must_use]
2179    pub fn property_key_count(&self) -> usize {
2180        let node_keys = self.node_properties.column_count();
2181        let edge_keys = self.edge_properties.column_count();
2182        // Note: This may count some keys twice if the same key is used
2183        // for both nodes and edges. A more precise count would require
2184        // tracking unique keys across both storages.
2185        node_keys + edge_keys
2186    }
2187
2188    /// Returns the number of distinct edge types in the store.
2189    #[must_use]
2190    pub fn edge_type_count(&self) -> usize {
2191        self.id_to_edge_type.read().len()
2192    }
2193
2194    // === Traversal ===
2195
2196    /// Iterates over neighbors of a node in the specified direction.
2197    ///
2198    /// This is the fast path for graph traversal - goes straight to the
2199    /// adjacency index without loading full node data.
2200    pub fn neighbors(
2201        &self,
2202        node: NodeId,
2203        direction: Direction,
2204    ) -> impl Iterator<Item = NodeId> + '_ {
2205        let forward: Box<dyn Iterator<Item = NodeId>> = match direction {
2206            Direction::Outgoing | Direction::Both => {
2207                Box::new(self.forward_adj.neighbors(node).into_iter())
2208            }
2209            Direction::Incoming => Box::new(std::iter::empty()),
2210        };
2211
2212        let backward: Box<dyn Iterator<Item = NodeId>> = match direction {
2213            Direction::Incoming | Direction::Both => {
2214                if let Some(ref adj) = self.backward_adj {
2215                    Box::new(adj.neighbors(node).into_iter())
2216                } else {
2217                    Box::new(std::iter::empty())
2218                }
2219            }
2220            Direction::Outgoing => Box::new(std::iter::empty()),
2221        };
2222
2223        forward.chain(backward)
2224    }
2225
2226    /// Returns edges from a node with their targets.
2227    ///
2228    /// Returns an iterator of (target_node, edge_id) pairs.
2229    pub fn edges_from(
2230        &self,
2231        node: NodeId,
2232        direction: Direction,
2233    ) -> impl Iterator<Item = (NodeId, EdgeId)> + '_ {
2234        let forward: Box<dyn Iterator<Item = (NodeId, EdgeId)>> = match direction {
2235            Direction::Outgoing | Direction::Both => {
2236                Box::new(self.forward_adj.edges_from(node).into_iter())
2237            }
2238            Direction::Incoming => Box::new(std::iter::empty()),
2239        };
2240
2241        let backward: Box<dyn Iterator<Item = (NodeId, EdgeId)>> = match direction {
2242            Direction::Incoming | Direction::Both => {
2243                if let Some(ref adj) = self.backward_adj {
2244                    Box::new(adj.edges_from(node).into_iter())
2245                } else {
2246                    Box::new(std::iter::empty())
2247                }
2248            }
2249            Direction::Outgoing => Box::new(std::iter::empty()),
2250        };
2251
2252        forward.chain(backward)
2253    }
2254
2255    /// Returns edges to a node (where the node is the destination).
2256    ///
2257    /// Returns (source_node, edge_id) pairs for all edges pointing TO this node.
2258    /// Uses the backward adjacency index for O(degree) lookup.
2259    ///
2260    /// # Example
2261    ///
2262    /// ```ignore
2263    /// // For edges: A->B, C->B
2264    /// let incoming = store.edges_to(B);
2265    /// // Returns: [(A, edge1), (C, edge2)]
2266    /// ```
2267    pub fn edges_to(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
2268        if let Some(ref backward) = self.backward_adj {
2269            backward.edges_from(node)
2270        } else {
2271            // Fallback: scan all edges (slow but correct)
2272            self.all_edges()
2273                .filter_map(|edge| {
2274                    if edge.dst == node {
2275                        Some((edge.src, edge.id))
2276                    } else {
2277                        None
2278                    }
2279                })
2280                .collect()
2281        }
2282    }
2283
2284    /// Returns the out-degree of a node (number of outgoing edges).
2285    ///
2286    /// Uses the forward adjacency index for O(1) lookup.
2287    #[must_use]
2288    pub fn out_degree(&self, node: NodeId) -> usize {
2289        self.forward_adj.out_degree(node)
2290    }
2291
2292    /// Returns the in-degree of a node (number of incoming edges).
2293    ///
2294    /// Uses the backward adjacency index for O(1) lookup if available,
2295    /// otherwise falls back to scanning edges.
2296    #[must_use]
2297    pub fn in_degree(&self, node: NodeId) -> usize {
2298        if let Some(ref backward) = self.backward_adj {
2299            backward.in_degree(node)
2300        } else {
2301            // Fallback: count edges (slow)
2302            self.all_edges().filter(|edge| edge.dst == node).count()
2303        }
2304    }
2305
2306    /// Gets the type of an edge by ID.
2307    #[must_use]
2308    #[cfg(not(feature = "tiered-storage"))]
2309    pub fn edge_type(&self, id: EdgeId) -> Option<ArcStr> {
2310        let edges = self.edges.read();
2311        let chain = edges.get(&id)?;
2312        let epoch = self.current_epoch();
2313        let record = chain.visible_at(epoch)?;
2314        let id_to_type = self.id_to_edge_type.read();
2315        id_to_type.get(record.type_id as usize).cloned()
2316    }
2317
2318    /// Gets the type of an edge by ID.
2319    /// (Tiered storage version)
2320    #[must_use]
2321    #[cfg(feature = "tiered-storage")]
2322    pub fn edge_type(&self, id: EdgeId) -> Option<ArcStr> {
2323        let versions = self.edge_versions.read();
2324        let index = versions.get(&id)?;
2325        let epoch = self.current_epoch();
2326        let vref = index.visible_at(epoch)?;
2327        let record = self.read_edge_record(&vref)?;
2328        let id_to_type = self.id_to_edge_type.read();
2329        id_to_type.get(record.type_id as usize).cloned()
2330    }
2331
2332    /// Returns all nodes with a specific label.
2333    ///
2334    /// Uses the label index for O(1) lookup per label. Returns a snapshot -
2335    /// concurrent modifications won't affect the returned vector. Results are
2336    /// sorted by NodeId for deterministic iteration order.
2337    pub fn nodes_by_label(&self, label: &str) -> Vec<NodeId> {
2338        let label_to_id = self.label_to_id.read();
2339        if let Some(&label_id) = label_to_id.get(label) {
2340            let index = self.label_index.read();
2341            if let Some(set) = index.get(label_id as usize) {
2342                let mut ids: Vec<NodeId> = set.keys().copied().collect();
2343                ids.sort_unstable();
2344                return ids;
2345            }
2346        }
2347        Vec::new()
2348    }
2349
2350    // === Admin API: Iteration ===
2351
2352    /// Returns an iterator over all nodes in the database.
2353    ///
2354    /// This creates a snapshot of all visible nodes at the current epoch.
2355    /// Useful for dump/export operations.
2356    #[cfg(not(feature = "tiered-storage"))]
2357    pub fn all_nodes(&self) -> impl Iterator<Item = Node> + '_ {
2358        let epoch = self.current_epoch();
2359        let node_ids: Vec<NodeId> = self
2360            .nodes
2361            .read()
2362            .iter()
2363            .filter_map(|(id, chain)| {
2364                chain
2365                    .visible_at(epoch)
2366                    .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
2367            })
2368            .collect();
2369
2370        node_ids.into_iter().filter_map(move |id| self.get_node(id))
2371    }
2372
2373    /// Returns an iterator over all nodes in the database.
2374    /// (Tiered storage version)
2375    #[cfg(feature = "tiered-storage")]
2376    pub fn all_nodes(&self) -> impl Iterator<Item = Node> + '_ {
2377        let node_ids = self.node_ids();
2378        node_ids.into_iter().filter_map(move |id| self.get_node(id))
2379    }
2380
2381    /// Returns an iterator over all edges in the database.
2382    ///
2383    /// This creates a snapshot of all visible edges at the current epoch.
2384    /// Useful for dump/export operations.
2385    #[cfg(not(feature = "tiered-storage"))]
2386    pub fn all_edges(&self) -> impl Iterator<Item = Edge> + '_ {
2387        let epoch = self.current_epoch();
2388        let edge_ids: Vec<EdgeId> = self
2389            .edges
2390            .read()
2391            .iter()
2392            .filter_map(|(id, chain)| {
2393                chain
2394                    .visible_at(epoch)
2395                    .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
2396            })
2397            .collect();
2398
2399        edge_ids.into_iter().filter_map(move |id| self.get_edge(id))
2400    }
2401
2402    /// Returns an iterator over all edges in the database.
2403    /// (Tiered storage version)
2404    #[cfg(feature = "tiered-storage")]
2405    pub fn all_edges(&self) -> impl Iterator<Item = Edge> + '_ {
2406        let epoch = self.current_epoch();
2407        let versions = self.edge_versions.read();
2408        let edge_ids: Vec<EdgeId> = versions
2409            .iter()
2410            .filter_map(|(id, index)| {
2411                index.visible_at(epoch).and_then(|vref| {
2412                    self.read_edge_record(&vref)
2413                        .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
2414                })
2415            })
2416            .collect();
2417
2418        edge_ids.into_iter().filter_map(move |id| self.get_edge(id))
2419    }
2420
2421    /// Returns all label names in the database.
2422    pub fn all_labels(&self) -> Vec<String> {
2423        self.id_to_label
2424            .read()
2425            .iter()
2426            .map(|s| s.to_string())
2427            .collect()
2428    }
2429
2430    /// Returns all edge type names in the database.
2431    pub fn all_edge_types(&self) -> Vec<String> {
2432        self.id_to_edge_type
2433            .read()
2434            .iter()
2435            .map(|s| s.to_string())
2436            .collect()
2437    }
2438
2439    /// Returns all property keys used in the database.
2440    pub fn all_property_keys(&self) -> Vec<String> {
2441        let mut keys = std::collections::HashSet::new();
2442        for key in self.node_properties.keys() {
2443            keys.insert(key.to_string());
2444        }
2445        for key in self.edge_properties.keys() {
2446            keys.insert(key.to_string());
2447        }
2448        keys.into_iter().collect()
2449    }
2450
2451    /// Returns an iterator over nodes with a specific label.
2452    pub fn nodes_with_label<'a>(&'a self, label: &str) -> impl Iterator<Item = Node> + 'a {
2453        let node_ids = self.nodes_by_label(label);
2454        node_ids.into_iter().filter_map(move |id| self.get_node(id))
2455    }
2456
2457    /// Returns an iterator over edges with a specific type.
2458    #[cfg(not(feature = "tiered-storage"))]
2459    pub fn edges_with_type<'a>(&'a self, edge_type: &str) -> impl Iterator<Item = Edge> + 'a {
2460        let epoch = self.current_epoch();
2461        let type_to_id = self.edge_type_to_id.read();
2462
2463        if let Some(&type_id) = type_to_id.get(edge_type) {
2464            let edge_ids: Vec<EdgeId> = self
2465                .edges
2466                .read()
2467                .iter()
2468                .filter_map(|(id, chain)| {
2469                    chain.visible_at(epoch).and_then(|r| {
2470                        if !r.is_deleted() && r.type_id == type_id {
2471                            Some(*id)
2472                        } else {
2473                            None
2474                        }
2475                    })
2476                })
2477                .collect();
2478
2479            // Return a boxed iterator for the found edges
2480            Box::new(edge_ids.into_iter().filter_map(move |id| self.get_edge(id)))
2481                as Box<dyn Iterator<Item = Edge> + 'a>
2482        } else {
2483            // Return empty iterator
2484            Box::new(std::iter::empty()) as Box<dyn Iterator<Item = Edge> + 'a>
2485        }
2486    }
2487
2488    /// Returns an iterator over edges with a specific type.
2489    /// (Tiered storage version)
2490    #[cfg(feature = "tiered-storage")]
2491    pub fn edges_with_type<'a>(&'a self, edge_type: &str) -> impl Iterator<Item = Edge> + 'a {
2492        let epoch = self.current_epoch();
2493        let type_to_id = self.edge_type_to_id.read();
2494
2495        if let Some(&type_id) = type_to_id.get(edge_type) {
2496            let versions = self.edge_versions.read();
2497            let edge_ids: Vec<EdgeId> = versions
2498                .iter()
2499                .filter_map(|(id, index)| {
2500                    index.visible_at(epoch).and_then(|vref| {
2501                        self.read_edge_record(&vref).and_then(|r| {
2502                            if !r.is_deleted() && r.type_id == type_id {
2503                                Some(*id)
2504                            } else {
2505                                None
2506                            }
2507                        })
2508                    })
2509                })
2510                .collect();
2511
2512            Box::new(edge_ids.into_iter().filter_map(move |id| self.get_edge(id)))
2513                as Box<dyn Iterator<Item = Edge> + 'a>
2514        } else {
2515            Box::new(std::iter::empty()) as Box<dyn Iterator<Item = Edge> + 'a>
2516        }
2517    }
2518
2519    // === Zone Map Support ===
2520
2521    /// Checks if a node property predicate might match any nodes.
2522    ///
2523    /// Uses zone maps for early filtering. Returns `true` if there might be
2524    /// matching nodes, `false` if there definitely aren't.
2525    #[must_use]
2526    pub fn node_property_might_match(
2527        &self,
2528        property: &PropertyKey,
2529        op: CompareOp,
2530        value: &Value,
2531    ) -> bool {
2532        self.node_properties.might_match(property, op, value)
2533    }
2534
2535    /// Checks if an edge property predicate might match any edges.
2536    #[must_use]
2537    pub fn edge_property_might_match(
2538        &self,
2539        property: &PropertyKey,
2540        op: CompareOp,
2541        value: &Value,
2542    ) -> bool {
2543        self.edge_properties.might_match(property, op, value)
2544    }
2545
2546    /// Gets the zone map for a node property.
2547    #[must_use]
2548    pub fn node_property_zone_map(&self, property: &PropertyKey) -> Option<ZoneMapEntry> {
2549        self.node_properties.zone_map(property)
2550    }
2551
2552    /// Gets the zone map for an edge property.
2553    #[must_use]
2554    pub fn edge_property_zone_map(&self, property: &PropertyKey) -> Option<ZoneMapEntry> {
2555        self.edge_properties.zone_map(property)
2556    }
2557
2558    /// Rebuilds zone maps for all properties.
2559    pub fn rebuild_zone_maps(&self) {
2560        self.node_properties.rebuild_zone_maps();
2561        self.edge_properties.rebuild_zone_maps();
2562    }
2563
2564    // === Statistics ===
2565
2566    /// Returns the current statistics.
2567    #[must_use]
2568    pub fn statistics(&self) -> Statistics {
2569        self.statistics.read().clone()
2570    }
2571
2572    /// Recomputes statistics if they are stale (i.e., after mutations).
2573    ///
2574    /// Call this before reading statistics for query optimization.
2575    /// Avoids redundant recomputation if no mutations occurred.
2576    pub fn ensure_statistics_fresh(&self) {
2577        if self.needs_stats_recompute.swap(false, Ordering::Relaxed) {
2578            self.compute_statistics();
2579        }
2580    }
2581
2582    /// Recomputes statistics from current data.
2583    ///
2584    /// Scans all labels and edge types to build cardinality estimates for the
2585    /// query optimizer. Call this periodically or after bulk data loads.
2586    #[cfg(not(feature = "tiered-storage"))]
2587    pub fn compute_statistics(&self) {
2588        let mut stats = Statistics::new();
2589
2590        // Compute total counts
2591        stats.total_nodes = self.node_count() as u64;
2592        stats.total_edges = self.edge_count() as u64;
2593
2594        // Compute per-label statistics
2595        let id_to_label = self.id_to_label.read();
2596        let label_index = self.label_index.read();
2597
2598        for (label_id, label_name) in id_to_label.iter().enumerate() {
2599            let node_count = label_index
2600                .get(label_id)
2601                .map(|set| set.len() as u64)
2602                .unwrap_or(0);
2603
2604            if node_count > 0 {
2605                // Estimate average degree
2606                let avg_out_degree = if stats.total_nodes > 0 {
2607                    stats.total_edges as f64 / stats.total_nodes as f64
2608                } else {
2609                    0.0
2610                };
2611
2612                let label_stats =
2613                    LabelStatistics::new(node_count).with_degrees(avg_out_degree, avg_out_degree);
2614
2615                stats.update_label(label_name.as_ref(), label_stats);
2616            }
2617        }
2618
2619        // Compute per-edge-type statistics
2620        let id_to_edge_type = self.id_to_edge_type.read();
2621        let edges = self.edges.read();
2622        let epoch = self.current_epoch();
2623
2624        let mut edge_type_counts: FxHashMap<u32, u64> = FxHashMap::default();
2625        for chain in edges.values() {
2626            if let Some(record) = chain.visible_at(epoch) {
2627                if !record.is_deleted() {
2628                    *edge_type_counts.entry(record.type_id).or_default() += 1;
2629                }
2630            }
2631        }
2632
2633        for (type_id, count) in edge_type_counts {
2634            if let Some(type_name) = id_to_edge_type.get(type_id as usize) {
2635                let avg_degree = if stats.total_nodes > 0 {
2636                    count as f64 / stats.total_nodes as f64
2637                } else {
2638                    0.0
2639                };
2640
2641                let edge_stats = EdgeTypeStatistics::new(count, avg_degree, avg_degree);
2642                stats.update_edge_type(type_name.as_ref(), edge_stats);
2643            }
2644        }
2645
2646        *self.statistics.write() = stats;
2647    }
2648
2649    /// Recomputes statistics from current data.
2650    /// (Tiered storage version)
2651    #[cfg(feature = "tiered-storage")]
2652    pub fn compute_statistics(&self) {
2653        let mut stats = Statistics::new();
2654
2655        // Compute total counts
2656        stats.total_nodes = self.node_count() as u64;
2657        stats.total_edges = self.edge_count() as u64;
2658
2659        // Compute per-label statistics
2660        let id_to_label = self.id_to_label.read();
2661        let label_index = self.label_index.read();
2662
2663        for (label_id, label_name) in id_to_label.iter().enumerate() {
2664            let node_count = label_index.get(label_id).map_or(0, |set| set.len() as u64);
2665
2666            if node_count > 0 {
2667                let avg_out_degree = if stats.total_nodes > 0 {
2668                    stats.total_edges as f64 / stats.total_nodes as f64
2669                } else {
2670                    0.0
2671                };
2672
2673                let label_stats =
2674                    LabelStatistics::new(node_count).with_degrees(avg_out_degree, avg_out_degree);
2675
2676                stats.update_label(label_name.as_ref(), label_stats);
2677            }
2678        }
2679
2680        // Compute per-edge-type statistics
2681        let id_to_edge_type = self.id_to_edge_type.read();
2682        let versions = self.edge_versions.read();
2683        let epoch = self.current_epoch();
2684
2685        let mut edge_type_counts: FxHashMap<u32, u64> = FxHashMap::default();
2686        for index in versions.values() {
2687            if let Some(vref) = index.visible_at(epoch)
2688                && let Some(record) = self.read_edge_record(&vref)
2689                && !record.is_deleted()
2690            {
2691                *edge_type_counts.entry(record.type_id).or_default() += 1;
2692            }
2693        }
2694
2695        for (type_id, count) in edge_type_counts {
2696            if let Some(type_name) = id_to_edge_type.get(type_id as usize) {
2697                let avg_degree = if stats.total_nodes > 0 {
2698                    count as f64 / stats.total_nodes as f64
2699                } else {
2700                    0.0
2701                };
2702
2703                let edge_stats = EdgeTypeStatistics::new(count, avg_degree, avg_degree);
2704                stats.update_edge_type(type_name.as_ref(), edge_stats);
2705            }
2706        }
2707
2708        *self.statistics.write() = stats;
2709    }
2710
2711    /// Estimates cardinality for a label scan.
2712    #[must_use]
2713    pub fn estimate_label_cardinality(&self, label: &str) -> f64 {
2714        self.statistics.read().estimate_label_cardinality(label)
2715    }
2716
2717    /// Estimates average degree for an edge type.
2718    #[must_use]
2719    pub fn estimate_avg_degree(&self, edge_type: &str, outgoing: bool) -> f64 {
2720        self.statistics
2721            .read()
2722            .estimate_avg_degree(edge_type, outgoing)
2723    }
2724
2725    // === Internal Helpers ===
2726
2727    fn get_or_create_label_id(&self, label: &str) -> u32 {
2728        {
2729            let label_to_id = self.label_to_id.read();
2730            if let Some(&id) = label_to_id.get(label) {
2731                return id;
2732            }
2733        }
2734
2735        let mut label_to_id = self.label_to_id.write();
2736        let mut id_to_label = self.id_to_label.write();
2737
2738        // Double-check after acquiring write lock
2739        if let Some(&id) = label_to_id.get(label) {
2740            return id;
2741        }
2742
2743        let id = id_to_label.len() as u32;
2744
2745        let label: ArcStr = label.into();
2746        label_to_id.insert(label.clone(), id);
2747        id_to_label.push(label);
2748
2749        id
2750    }
2751
2752    fn get_or_create_edge_type_id(&self, edge_type: &str) -> u32 {
2753        {
2754            let type_to_id = self.edge_type_to_id.read();
2755            if let Some(&id) = type_to_id.get(edge_type) {
2756                return id;
2757            }
2758        }
2759
2760        let mut type_to_id = self.edge_type_to_id.write();
2761        let mut id_to_type = self.id_to_edge_type.write();
2762
2763        // Double-check
2764        if let Some(&id) = type_to_id.get(edge_type) {
2765            return id;
2766        }
2767
2768        let id = id_to_type.len() as u32;
2769        let edge_type: ArcStr = edge_type.into();
2770        type_to_id.insert(edge_type.clone(), id);
2771        id_to_type.push(edge_type);
2772
2773        id
2774    }
2775
2776    // === Recovery Support ===
2777
2778    /// Creates a node with a specific ID during recovery.
2779    ///
2780    /// This is used for WAL recovery to restore nodes with their original IDs.
2781    /// The caller must ensure IDs don't conflict with existing nodes.
2782    #[cfg(not(feature = "tiered-storage"))]
2783    pub fn create_node_with_id(&self, id: NodeId, labels: &[&str]) {
2784        let epoch = self.current_epoch();
2785        let mut record = NodeRecord::new(id, epoch);
2786        record.set_label_count(labels.len() as u16);
2787
2788        // Store labels in node_labels map and label_index
2789        let mut node_label_set = FxHashSet::default();
2790        for label in labels {
2791            let label_id = self.get_or_create_label_id(*label);
2792            node_label_set.insert(label_id);
2793
2794            // Update label index
2795            let mut index = self.label_index.write();
2796            while index.len() <= label_id as usize {
2797                index.push(FxHashMap::default());
2798            }
2799            index[label_id as usize].insert(id, ());
2800        }
2801
2802        // Store node's labels
2803        self.node_labels.write().insert(id, node_label_set);
2804
2805        // Create version chain with initial version (using SYSTEM tx for recovery)
2806        let chain = VersionChain::with_initial(record, epoch, TxId::SYSTEM);
2807        self.nodes.write().insert(id, chain);
2808
2809        // Update next_node_id if necessary to avoid future collisions
2810        let id_val = id.as_u64();
2811        let _ = self
2812            .next_node_id
2813            .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
2814                if id_val >= current {
2815                    Some(id_val + 1)
2816                } else {
2817                    None
2818                }
2819            });
2820    }
2821
2822    /// Creates a node with a specific ID during recovery.
2823    /// (Tiered storage version)
2824    #[cfg(feature = "tiered-storage")]
2825    pub fn create_node_with_id(&self, id: NodeId, labels: &[&str]) {
2826        let epoch = self.current_epoch();
2827        let mut record = NodeRecord::new(id, epoch);
2828        record.set_label_count(labels.len() as u16);
2829
2830        // Store labels in node_labels map and label_index
2831        let mut node_label_set = FxHashSet::default();
2832        for label in labels {
2833            let label_id = self.get_or_create_label_id(*label);
2834            node_label_set.insert(label_id);
2835
2836            // Update label index
2837            let mut index = self.label_index.write();
2838            while index.len() <= label_id as usize {
2839                index.push(FxHashMap::default());
2840            }
2841            index[label_id as usize].insert(id, ());
2842        }
2843
2844        // Store node's labels
2845        self.node_labels.write().insert(id, node_label_set);
2846
2847        // Allocate record in arena and get offset (create epoch if needed)
2848        let arena = self.arena_allocator.arena_or_create(epoch);
2849        let (offset, _stored) = arena.alloc_value_with_offset(record);
2850
2851        // Create HotVersionRef (using SYSTEM tx for recovery)
2852        let hot_ref = HotVersionRef::new(epoch, offset, TxId::SYSTEM);
2853        let mut versions = self.node_versions.write();
2854        versions.insert(id, VersionIndex::with_initial(hot_ref));
2855
2856        // Update next_node_id if necessary to avoid future collisions
2857        let id_val = id.as_u64();
2858        let _ = self
2859            .next_node_id
2860            .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
2861                if id_val >= current {
2862                    Some(id_val + 1)
2863                } else {
2864                    None
2865                }
2866            });
2867    }
2868
2869    /// Creates an edge with a specific ID during recovery.
2870    ///
2871    /// This is used for WAL recovery to restore edges with their original IDs.
2872    #[cfg(not(feature = "tiered-storage"))]
2873    pub fn create_edge_with_id(&self, id: EdgeId, src: NodeId, dst: NodeId, edge_type: &str) {
2874        let epoch = self.current_epoch();
2875        let type_id = self.get_or_create_edge_type_id(edge_type);
2876
2877        let record = EdgeRecord::new(id, src, dst, type_id, epoch);
2878        let chain = VersionChain::with_initial(record, epoch, TxId::SYSTEM);
2879        self.edges.write().insert(id, chain);
2880
2881        // Update adjacency
2882        self.forward_adj.add_edge(src, dst, id);
2883        if let Some(ref backward) = self.backward_adj {
2884            backward.add_edge(dst, src, id);
2885        }
2886
2887        // Update next_edge_id if necessary
2888        let id_val = id.as_u64();
2889        let _ = self
2890            .next_edge_id
2891            .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
2892                if id_val >= current {
2893                    Some(id_val + 1)
2894                } else {
2895                    None
2896                }
2897            });
2898    }
2899
2900    /// Creates an edge with a specific ID during recovery.
2901    /// (Tiered storage version)
2902    #[cfg(feature = "tiered-storage")]
2903    pub fn create_edge_with_id(&self, id: EdgeId, src: NodeId, dst: NodeId, edge_type: &str) {
2904        let epoch = self.current_epoch();
2905        let type_id = self.get_or_create_edge_type_id(edge_type);
2906
2907        let record = EdgeRecord::new(id, src, dst, type_id, epoch);
2908
2909        // Allocate record in arena and get offset (create epoch if needed)
2910        let arena = self.arena_allocator.arena_or_create(epoch);
2911        let (offset, _stored) = arena.alloc_value_with_offset(record);
2912
2913        // Create HotVersionRef (using SYSTEM tx for recovery)
2914        let hot_ref = HotVersionRef::new(epoch, offset, TxId::SYSTEM);
2915        let mut versions = self.edge_versions.write();
2916        versions.insert(id, VersionIndex::with_initial(hot_ref));
2917
2918        // Update adjacency
2919        self.forward_adj.add_edge(src, dst, id);
2920        if let Some(ref backward) = self.backward_adj {
2921            backward.add_edge(dst, src, id);
2922        }
2923
2924        // Update next_edge_id if necessary
2925        let id_val = id.as_u64();
2926        let _ = self
2927            .next_edge_id
2928            .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
2929                if id_val >= current {
2930                    Some(id_val + 1)
2931                } else {
2932                    None
2933                }
2934            });
2935    }
2936
2937    /// Sets the current epoch during recovery.
2938    pub fn set_epoch(&self, epoch: EpochId) {
2939        self.current_epoch.store(epoch.as_u64(), Ordering::SeqCst);
2940    }
2941}
2942
2943impl Default for LpgStore {
2944    fn default() -> Self {
2945        Self::new()
2946    }
2947}
2948
2949#[cfg(test)]
2950mod tests {
2951    use super::*;
2952
2953    #[test]
2954    fn test_create_node() {
2955        let store = LpgStore::new();
2956
2957        let id = store.create_node(&["Person"]);
2958        assert!(id.is_valid());
2959
2960        let node = store.get_node(id).unwrap();
2961        assert!(node.has_label("Person"));
2962        assert!(!node.has_label("Animal"));
2963    }
2964
2965    #[test]
2966    fn test_create_node_with_props() {
2967        let store = LpgStore::new();
2968
2969        let id = store.create_node_with_props(
2970            &["Person"],
2971            [("name", Value::from("Alice")), ("age", Value::from(30i64))],
2972        );
2973
2974        let node = store.get_node(id).unwrap();
2975        assert_eq!(
2976            node.get_property("name").and_then(|v| v.as_str()),
2977            Some("Alice")
2978        );
2979        assert_eq!(
2980            node.get_property("age").and_then(|v| v.as_int64()),
2981            Some(30)
2982        );
2983    }
2984
2985    #[test]
2986    fn test_delete_node() {
2987        let store = LpgStore::new();
2988
2989        let id = store.create_node(&["Person"]);
2990        assert_eq!(store.node_count(), 1);
2991
2992        assert!(store.delete_node(id));
2993        assert_eq!(store.node_count(), 0);
2994        assert!(store.get_node(id).is_none());
2995
2996        // Double delete should return false
2997        assert!(!store.delete_node(id));
2998    }
2999
3000    #[test]
3001    fn test_create_edge() {
3002        let store = LpgStore::new();
3003
3004        let alice = store.create_node(&["Person"]);
3005        let bob = store.create_node(&["Person"]);
3006
3007        let edge_id = store.create_edge(alice, bob, "KNOWS");
3008        assert!(edge_id.is_valid());
3009
3010        let edge = store.get_edge(edge_id).unwrap();
3011        assert_eq!(edge.src, alice);
3012        assert_eq!(edge.dst, bob);
3013        assert_eq!(edge.edge_type.as_str(), "KNOWS");
3014    }
3015
3016    #[test]
3017    fn test_neighbors() {
3018        let store = LpgStore::new();
3019
3020        let a = store.create_node(&["Person"]);
3021        let b = store.create_node(&["Person"]);
3022        let c = store.create_node(&["Person"]);
3023
3024        store.create_edge(a, b, "KNOWS");
3025        store.create_edge(a, c, "KNOWS");
3026
3027        let outgoing: Vec<_> = store.neighbors(a, Direction::Outgoing).collect();
3028        assert_eq!(outgoing.len(), 2);
3029        assert!(outgoing.contains(&b));
3030        assert!(outgoing.contains(&c));
3031
3032        let incoming: Vec<_> = store.neighbors(b, Direction::Incoming).collect();
3033        assert_eq!(incoming.len(), 1);
3034        assert!(incoming.contains(&a));
3035    }
3036
3037    #[test]
3038    fn test_nodes_by_label() {
3039        let store = LpgStore::new();
3040
3041        let p1 = store.create_node(&["Person"]);
3042        let p2 = store.create_node(&["Person"]);
3043        let _a = store.create_node(&["Animal"]);
3044
3045        let persons = store.nodes_by_label("Person");
3046        assert_eq!(persons.len(), 2);
3047        assert!(persons.contains(&p1));
3048        assert!(persons.contains(&p2));
3049
3050        let animals = store.nodes_by_label("Animal");
3051        assert_eq!(animals.len(), 1);
3052    }
3053
3054    #[test]
3055    fn test_delete_edge() {
3056        let store = LpgStore::new();
3057
3058        let a = store.create_node(&["Person"]);
3059        let b = store.create_node(&["Person"]);
3060        let edge_id = store.create_edge(a, b, "KNOWS");
3061
3062        assert_eq!(store.edge_count(), 1);
3063
3064        assert!(store.delete_edge(edge_id));
3065        assert_eq!(store.edge_count(), 0);
3066        assert!(store.get_edge(edge_id).is_none());
3067    }
3068
3069    // === New tests for improved coverage ===
3070
3071    #[test]
3072    fn test_lpg_store_config() {
3073        // Test with_config
3074        let config = LpgStoreConfig {
3075            backward_edges: false,
3076            initial_node_capacity: 100,
3077            initial_edge_capacity: 200,
3078        };
3079        let store = LpgStore::with_config(config);
3080
3081        // Store should work but without backward adjacency
3082        let a = store.create_node(&["Person"]);
3083        let b = store.create_node(&["Person"]);
3084        store.create_edge(a, b, "KNOWS");
3085
3086        // Outgoing should work
3087        let outgoing: Vec<_> = store.neighbors(a, Direction::Outgoing).collect();
3088        assert_eq!(outgoing.len(), 1);
3089
3090        // Incoming should be empty (no backward adjacency)
3091        let incoming: Vec<_> = store.neighbors(b, Direction::Incoming).collect();
3092        assert_eq!(incoming.len(), 0);
3093    }
3094
3095    #[test]
3096    fn test_epoch_management() {
3097        let store = LpgStore::new();
3098
3099        let epoch0 = store.current_epoch();
3100        assert_eq!(epoch0.as_u64(), 0);
3101
3102        let epoch1 = store.new_epoch();
3103        assert_eq!(epoch1.as_u64(), 1);
3104
3105        let current = store.current_epoch();
3106        assert_eq!(current.as_u64(), 1);
3107    }
3108
3109    #[test]
3110    fn test_node_properties() {
3111        let store = LpgStore::new();
3112        let id = store.create_node(&["Person"]);
3113
3114        // Set and get property
3115        store.set_node_property(id, "name", Value::from("Alice"));
3116        let name = store.get_node_property(id, &"name".into());
3117        assert!(matches!(name, Some(Value::String(s)) if s.as_str() == "Alice"));
3118
3119        // Update property
3120        store.set_node_property(id, "name", Value::from("Bob"));
3121        let name = store.get_node_property(id, &"name".into());
3122        assert!(matches!(name, Some(Value::String(s)) if s.as_str() == "Bob"));
3123
3124        // Remove property
3125        let old = store.remove_node_property(id, "name");
3126        assert!(matches!(old, Some(Value::String(s)) if s.as_str() == "Bob"));
3127
3128        // Property should be gone
3129        let name = store.get_node_property(id, &"name".into());
3130        assert!(name.is_none());
3131
3132        // Remove non-existent property
3133        let none = store.remove_node_property(id, "nonexistent");
3134        assert!(none.is_none());
3135    }
3136
3137    #[test]
3138    fn test_edge_properties() {
3139        let store = LpgStore::new();
3140        let a = store.create_node(&["Person"]);
3141        let b = store.create_node(&["Person"]);
3142        let edge_id = store.create_edge(a, b, "KNOWS");
3143
3144        // Set and get property
3145        store.set_edge_property(edge_id, "since", Value::from(2020i64));
3146        let since = store.get_edge_property(edge_id, &"since".into());
3147        assert_eq!(since.and_then(|v| v.as_int64()), Some(2020));
3148
3149        // Remove property
3150        let old = store.remove_edge_property(edge_id, "since");
3151        assert_eq!(old.and_then(|v| v.as_int64()), Some(2020));
3152
3153        let since = store.get_edge_property(edge_id, &"since".into());
3154        assert!(since.is_none());
3155    }
3156
3157    #[test]
3158    fn test_add_remove_label() {
3159        let store = LpgStore::new();
3160        let id = store.create_node(&["Person"]);
3161
3162        // Add new label
3163        assert!(store.add_label(id, "Employee"));
3164
3165        let node = store.get_node(id).unwrap();
3166        assert!(node.has_label("Person"));
3167        assert!(node.has_label("Employee"));
3168
3169        // Adding same label again should fail
3170        assert!(!store.add_label(id, "Employee"));
3171
3172        // Remove label
3173        assert!(store.remove_label(id, "Employee"));
3174
3175        let node = store.get_node(id).unwrap();
3176        assert!(node.has_label("Person"));
3177        assert!(!node.has_label("Employee"));
3178
3179        // Removing non-existent label should fail
3180        assert!(!store.remove_label(id, "Employee"));
3181        assert!(!store.remove_label(id, "NonExistent"));
3182    }
3183
3184    #[test]
3185    fn test_add_label_to_nonexistent_node() {
3186        let store = LpgStore::new();
3187        let fake_id = NodeId::new(999);
3188        assert!(!store.add_label(fake_id, "Label"));
3189    }
3190
3191    #[test]
3192    fn test_remove_label_from_nonexistent_node() {
3193        let store = LpgStore::new();
3194        let fake_id = NodeId::new(999);
3195        assert!(!store.remove_label(fake_id, "Label"));
3196    }
3197
3198    #[test]
3199    fn test_node_ids() {
3200        let store = LpgStore::new();
3201
3202        let n1 = store.create_node(&["Person"]);
3203        let n2 = store.create_node(&["Person"]);
3204        let n3 = store.create_node(&["Person"]);
3205
3206        let ids = store.node_ids();
3207        assert_eq!(ids.len(), 3);
3208        assert!(ids.contains(&n1));
3209        assert!(ids.contains(&n2));
3210        assert!(ids.contains(&n3));
3211
3212        // Delete one
3213        store.delete_node(n2);
3214        let ids = store.node_ids();
3215        assert_eq!(ids.len(), 2);
3216        assert!(!ids.contains(&n2));
3217    }
3218
3219    #[test]
3220    fn test_delete_node_nonexistent() {
3221        let store = LpgStore::new();
3222        let fake_id = NodeId::new(999);
3223        assert!(!store.delete_node(fake_id));
3224    }
3225
3226    #[test]
3227    fn test_delete_edge_nonexistent() {
3228        let store = LpgStore::new();
3229        let fake_id = EdgeId::new(999);
3230        assert!(!store.delete_edge(fake_id));
3231    }
3232
3233    #[test]
3234    fn test_delete_edge_double() {
3235        let store = LpgStore::new();
3236        let a = store.create_node(&["Person"]);
3237        let b = store.create_node(&["Person"]);
3238        let edge_id = store.create_edge(a, b, "KNOWS");
3239
3240        assert!(store.delete_edge(edge_id));
3241        assert!(!store.delete_edge(edge_id)); // Double delete
3242    }
3243
3244    #[test]
3245    fn test_create_edge_with_props() {
3246        let store = LpgStore::new();
3247        let a = store.create_node(&["Person"]);
3248        let b = store.create_node(&["Person"]);
3249
3250        let edge_id = store.create_edge_with_props(
3251            a,
3252            b,
3253            "KNOWS",
3254            [
3255                ("since", Value::from(2020i64)),
3256                ("weight", Value::from(1.0)),
3257            ],
3258        );
3259
3260        let edge = store.get_edge(edge_id).unwrap();
3261        assert_eq!(
3262            edge.get_property("since").and_then(|v| v.as_int64()),
3263            Some(2020)
3264        );
3265        assert_eq!(
3266            edge.get_property("weight").and_then(|v| v.as_float64()),
3267            Some(1.0)
3268        );
3269    }
3270
3271    #[test]
3272    fn test_delete_node_edges() {
3273        let store = LpgStore::new();
3274
3275        let a = store.create_node(&["Person"]);
3276        let b = store.create_node(&["Person"]);
3277        let c = store.create_node(&["Person"]);
3278
3279        store.create_edge(a, b, "KNOWS"); // a -> b
3280        store.create_edge(c, a, "KNOWS"); // c -> a
3281
3282        assert_eq!(store.edge_count(), 2);
3283
3284        // Delete all edges connected to a
3285        store.delete_node_edges(a);
3286
3287        assert_eq!(store.edge_count(), 0);
3288    }
3289
3290    #[test]
3291    fn test_neighbors_both_directions() {
3292        let store = LpgStore::new();
3293
3294        let a = store.create_node(&["Person"]);
3295        let b = store.create_node(&["Person"]);
3296        let c = store.create_node(&["Person"]);
3297
3298        store.create_edge(a, b, "KNOWS"); // a -> b
3299        store.create_edge(c, a, "KNOWS"); // c -> a
3300
3301        // Direction::Both for node a
3302        let neighbors: Vec<_> = store.neighbors(a, Direction::Both).collect();
3303        assert_eq!(neighbors.len(), 2);
3304        assert!(neighbors.contains(&b)); // outgoing
3305        assert!(neighbors.contains(&c)); // incoming
3306    }
3307
3308    #[test]
3309    fn test_edges_from() {
3310        let store = LpgStore::new();
3311
3312        let a = store.create_node(&["Person"]);
3313        let b = store.create_node(&["Person"]);
3314        let c = store.create_node(&["Person"]);
3315
3316        let e1 = store.create_edge(a, b, "KNOWS");
3317        let e2 = store.create_edge(a, c, "KNOWS");
3318
3319        let edges: Vec<_> = store.edges_from(a, Direction::Outgoing).collect();
3320        assert_eq!(edges.len(), 2);
3321        assert!(edges.iter().any(|(_, e)| *e == e1));
3322        assert!(edges.iter().any(|(_, e)| *e == e2));
3323
3324        // Incoming edges to b
3325        let incoming: Vec<_> = store.edges_from(b, Direction::Incoming).collect();
3326        assert_eq!(incoming.len(), 1);
3327        assert_eq!(incoming[0].1, e1);
3328    }
3329
3330    #[test]
3331    fn test_edges_to() {
3332        let store = LpgStore::new();
3333
3334        let a = store.create_node(&["Person"]);
3335        let b = store.create_node(&["Person"]);
3336        let c = store.create_node(&["Person"]);
3337
3338        let e1 = store.create_edge(a, b, "KNOWS");
3339        let e2 = store.create_edge(c, b, "KNOWS");
3340
3341        // Edges pointing TO b
3342        let to_b = store.edges_to(b);
3343        assert_eq!(to_b.len(), 2);
3344        assert!(to_b.iter().any(|(src, e)| *src == a && *e == e1));
3345        assert!(to_b.iter().any(|(src, e)| *src == c && *e == e2));
3346    }
3347
3348    #[test]
3349    fn test_out_degree_in_degree() {
3350        let store = LpgStore::new();
3351
3352        let a = store.create_node(&["Person"]);
3353        let b = store.create_node(&["Person"]);
3354        let c = store.create_node(&["Person"]);
3355
3356        store.create_edge(a, b, "KNOWS");
3357        store.create_edge(a, c, "KNOWS");
3358        store.create_edge(c, b, "KNOWS");
3359
3360        assert_eq!(store.out_degree(a), 2);
3361        assert_eq!(store.out_degree(b), 0);
3362        assert_eq!(store.out_degree(c), 1);
3363
3364        assert_eq!(store.in_degree(a), 0);
3365        assert_eq!(store.in_degree(b), 2);
3366        assert_eq!(store.in_degree(c), 1);
3367    }
3368
3369    #[test]
3370    fn test_edge_type() {
3371        let store = LpgStore::new();
3372
3373        let a = store.create_node(&["Person"]);
3374        let b = store.create_node(&["Person"]);
3375        let edge_id = store.create_edge(a, b, "KNOWS");
3376
3377        let edge_type = store.edge_type(edge_id);
3378        assert_eq!(edge_type.as_deref(), Some("KNOWS"));
3379
3380        // Non-existent edge
3381        let fake_id = EdgeId::new(999);
3382        assert!(store.edge_type(fake_id).is_none());
3383    }
3384
3385    #[test]
3386    fn test_count_methods() {
3387        let store = LpgStore::new();
3388
3389        assert_eq!(store.label_count(), 0);
3390        assert_eq!(store.edge_type_count(), 0);
3391        assert_eq!(store.property_key_count(), 0);
3392
3393        let a = store.create_node_with_props(&["Person"], [("age", Value::from(30i64))]);
3394        let b = store.create_node(&["Company"]);
3395        store.create_edge_with_props(a, b, "WORKS_AT", [("since", Value::from(2020i64))]);
3396
3397        assert_eq!(store.label_count(), 2); // Person, Company
3398        assert_eq!(store.edge_type_count(), 1); // WORKS_AT
3399        assert_eq!(store.property_key_count(), 2); // age, since
3400    }
3401
3402    #[test]
3403    fn test_all_nodes_and_edges() {
3404        let store = LpgStore::new();
3405
3406        let a = store.create_node(&["Person"]);
3407        let b = store.create_node(&["Person"]);
3408        store.create_edge(a, b, "KNOWS");
3409
3410        let nodes: Vec<_> = store.all_nodes().collect();
3411        assert_eq!(nodes.len(), 2);
3412
3413        let edges: Vec<_> = store.all_edges().collect();
3414        assert_eq!(edges.len(), 1);
3415    }
3416
3417    #[test]
3418    fn test_all_labels_and_edge_types() {
3419        let store = LpgStore::new();
3420
3421        store.create_node(&["Person"]);
3422        store.create_node(&["Company"]);
3423        let a = store.create_node(&["Animal"]);
3424        let b = store.create_node(&["Animal"]);
3425        store.create_edge(a, b, "EATS");
3426
3427        let labels = store.all_labels();
3428        assert_eq!(labels.len(), 3);
3429        assert!(labels.contains(&"Person".to_string()));
3430        assert!(labels.contains(&"Company".to_string()));
3431        assert!(labels.contains(&"Animal".to_string()));
3432
3433        let edge_types = store.all_edge_types();
3434        assert_eq!(edge_types.len(), 1);
3435        assert!(edge_types.contains(&"EATS".to_string()));
3436    }
3437
3438    #[test]
3439    fn test_all_property_keys() {
3440        let store = LpgStore::new();
3441
3442        let a = store.create_node_with_props(&["Person"], [("name", Value::from("Alice"))]);
3443        let b = store.create_node_with_props(&["Person"], [("age", Value::from(30i64))]);
3444        store.create_edge_with_props(a, b, "KNOWS", [("since", Value::from(2020i64))]);
3445
3446        let keys = store.all_property_keys();
3447        assert!(keys.contains(&"name".to_string()));
3448        assert!(keys.contains(&"age".to_string()));
3449        assert!(keys.contains(&"since".to_string()));
3450    }
3451
3452    #[test]
3453    fn test_nodes_with_label() {
3454        let store = LpgStore::new();
3455
3456        store.create_node(&["Person"]);
3457        store.create_node(&["Person"]);
3458        store.create_node(&["Company"]);
3459
3460        let persons: Vec<_> = store.nodes_with_label("Person").collect();
3461        assert_eq!(persons.len(), 2);
3462
3463        let companies: Vec<_> = store.nodes_with_label("Company").collect();
3464        assert_eq!(companies.len(), 1);
3465
3466        let none: Vec<_> = store.nodes_with_label("NonExistent").collect();
3467        assert_eq!(none.len(), 0);
3468    }
3469
3470    #[test]
3471    fn test_edges_with_type() {
3472        let store = LpgStore::new();
3473
3474        let a = store.create_node(&["Person"]);
3475        let b = store.create_node(&["Person"]);
3476        let c = store.create_node(&["Company"]);
3477
3478        store.create_edge(a, b, "KNOWS");
3479        store.create_edge(a, c, "WORKS_AT");
3480
3481        let knows: Vec<_> = store.edges_with_type("KNOWS").collect();
3482        assert_eq!(knows.len(), 1);
3483
3484        let works_at: Vec<_> = store.edges_with_type("WORKS_AT").collect();
3485        assert_eq!(works_at.len(), 1);
3486
3487        let none: Vec<_> = store.edges_with_type("NonExistent").collect();
3488        assert_eq!(none.len(), 0);
3489    }
3490
3491    #[test]
3492    fn test_nodes_by_label_nonexistent() {
3493        let store = LpgStore::new();
3494        store.create_node(&["Person"]);
3495
3496        let empty = store.nodes_by_label("NonExistent");
3497        assert!(empty.is_empty());
3498    }
3499
3500    #[test]
3501    fn test_statistics() {
3502        let store = LpgStore::new();
3503
3504        let a = store.create_node(&["Person"]);
3505        let b = store.create_node(&["Person"]);
3506        let c = store.create_node(&["Company"]);
3507
3508        store.create_edge(a, b, "KNOWS");
3509        store.create_edge(a, c, "WORKS_AT");
3510
3511        store.compute_statistics();
3512        let stats = store.statistics();
3513
3514        assert_eq!(stats.total_nodes, 3);
3515        assert_eq!(stats.total_edges, 2);
3516
3517        // Estimates
3518        let person_card = store.estimate_label_cardinality("Person");
3519        assert!(person_card > 0.0);
3520
3521        let avg_degree = store.estimate_avg_degree("KNOWS", true);
3522        assert!(avg_degree >= 0.0);
3523    }
3524
3525    #[test]
3526    fn test_zone_maps() {
3527        let store = LpgStore::new();
3528
3529        store.create_node_with_props(&["Person"], [("age", Value::from(25i64))]);
3530        store.create_node_with_props(&["Person"], [("age", Value::from(35i64))]);
3531
3532        // Zone map should indicate possible matches (30 is within [25, 35] range)
3533        let might_match =
3534            store.node_property_might_match(&"age".into(), CompareOp::Eq, &Value::from(30i64));
3535        // Zone maps return true conservatively when value is within min/max range
3536        assert!(might_match);
3537
3538        let zone = store.node_property_zone_map(&"age".into());
3539        assert!(zone.is_some());
3540
3541        // Non-existent property
3542        let no_zone = store.node_property_zone_map(&"nonexistent".into());
3543        assert!(no_zone.is_none());
3544
3545        // Edge zone maps
3546        let a = store.create_node(&["A"]);
3547        let b = store.create_node(&["B"]);
3548        store.create_edge_with_props(a, b, "REL", [("weight", Value::from(1.0))]);
3549
3550        let edge_zone = store.edge_property_zone_map(&"weight".into());
3551        assert!(edge_zone.is_some());
3552    }
3553
3554    #[test]
3555    fn test_rebuild_zone_maps() {
3556        let store = LpgStore::new();
3557        store.create_node_with_props(&["Person"], [("age", Value::from(25i64))]);
3558
3559        // Should not panic
3560        store.rebuild_zone_maps();
3561    }
3562
3563    #[test]
3564    fn test_create_node_with_id() {
3565        let store = LpgStore::new();
3566
3567        let specific_id = NodeId::new(100);
3568        store.create_node_with_id(specific_id, &["Person", "Employee"]);
3569
3570        let node = store.get_node(specific_id).unwrap();
3571        assert!(node.has_label("Person"));
3572        assert!(node.has_label("Employee"));
3573
3574        // Next auto-generated ID should be > 100
3575        let next = store.create_node(&["Other"]);
3576        assert!(next.as_u64() > 100);
3577    }
3578
3579    #[test]
3580    fn test_create_edge_with_id() {
3581        let store = LpgStore::new();
3582
3583        let a = store.create_node(&["A"]);
3584        let b = store.create_node(&["B"]);
3585
3586        let specific_id = EdgeId::new(500);
3587        store.create_edge_with_id(specific_id, a, b, "REL");
3588
3589        let edge = store.get_edge(specific_id).unwrap();
3590        assert_eq!(edge.src, a);
3591        assert_eq!(edge.dst, b);
3592        assert_eq!(edge.edge_type.as_str(), "REL");
3593
3594        // Next auto-generated ID should be > 500
3595        let next = store.create_edge(a, b, "OTHER");
3596        assert!(next.as_u64() > 500);
3597    }
3598
3599    #[test]
3600    fn test_set_epoch() {
3601        let store = LpgStore::new();
3602
3603        assert_eq!(store.current_epoch().as_u64(), 0);
3604
3605        store.set_epoch(EpochId::new(42));
3606        assert_eq!(store.current_epoch().as_u64(), 42);
3607    }
3608
3609    #[test]
3610    fn test_get_node_nonexistent() {
3611        let store = LpgStore::new();
3612        let fake_id = NodeId::new(999);
3613        assert!(store.get_node(fake_id).is_none());
3614    }
3615
3616    #[test]
3617    fn test_get_edge_nonexistent() {
3618        let store = LpgStore::new();
3619        let fake_id = EdgeId::new(999);
3620        assert!(store.get_edge(fake_id).is_none());
3621    }
3622
3623    #[test]
3624    fn test_multiple_labels() {
3625        let store = LpgStore::new();
3626
3627        let id = store.create_node(&["Person", "Employee", "Manager"]);
3628        let node = store.get_node(id).unwrap();
3629
3630        assert!(node.has_label("Person"));
3631        assert!(node.has_label("Employee"));
3632        assert!(node.has_label("Manager"));
3633        assert!(!node.has_label("Other"));
3634    }
3635
3636    #[test]
3637    fn test_default_impl() {
3638        let store: LpgStore = Default::default();
3639        assert_eq!(store.node_count(), 0);
3640        assert_eq!(store.edge_count(), 0);
3641    }
3642
3643    #[test]
3644    fn test_edges_from_both_directions() {
3645        let store = LpgStore::new();
3646
3647        let a = store.create_node(&["A"]);
3648        let b = store.create_node(&["B"]);
3649        let c = store.create_node(&["C"]);
3650
3651        let e1 = store.create_edge(a, b, "R1"); // a -> b
3652        let e2 = store.create_edge(c, a, "R2"); // c -> a
3653
3654        // Both directions from a
3655        let edges: Vec<_> = store.edges_from(a, Direction::Both).collect();
3656        assert_eq!(edges.len(), 2);
3657        assert!(edges.iter().any(|(_, e)| *e == e1)); // outgoing
3658        assert!(edges.iter().any(|(_, e)| *e == e2)); // incoming
3659    }
3660
3661    #[test]
3662    fn test_no_backward_adj_in_degree() {
3663        let config = LpgStoreConfig {
3664            backward_edges: false,
3665            initial_node_capacity: 10,
3666            initial_edge_capacity: 10,
3667        };
3668        let store = LpgStore::with_config(config);
3669
3670        let a = store.create_node(&["A"]);
3671        let b = store.create_node(&["B"]);
3672        store.create_edge(a, b, "R");
3673
3674        // in_degree should still work (falls back to scanning)
3675        let degree = store.in_degree(b);
3676        assert_eq!(degree, 1);
3677    }
3678
3679    #[test]
3680    fn test_no_backward_adj_edges_to() {
3681        let config = LpgStoreConfig {
3682            backward_edges: false,
3683            initial_node_capacity: 10,
3684            initial_edge_capacity: 10,
3685        };
3686        let store = LpgStore::with_config(config);
3687
3688        let a = store.create_node(&["A"]);
3689        let b = store.create_node(&["B"]);
3690        let e = store.create_edge(a, b, "R");
3691
3692        // edges_to should still work (falls back to scanning)
3693        let edges = store.edges_to(b);
3694        assert_eq!(edges.len(), 1);
3695        assert_eq!(edges[0].1, e);
3696    }
3697
3698    #[test]
3699    fn test_node_versioned_creation() {
3700        let store = LpgStore::new();
3701
3702        let epoch = store.new_epoch();
3703        let tx_id = TxId::new(1);
3704
3705        let id = store.create_node_versioned(&["Person"], epoch, tx_id);
3706        assert!(store.get_node(id).is_some());
3707    }
3708
3709    #[test]
3710    fn test_edge_versioned_creation() {
3711        let store = LpgStore::new();
3712
3713        let a = store.create_node(&["A"]);
3714        let b = store.create_node(&["B"]);
3715
3716        let epoch = store.new_epoch();
3717        let tx_id = TxId::new(1);
3718
3719        let edge_id = store.create_edge_versioned(a, b, "REL", epoch, tx_id);
3720        assert!(store.get_edge(edge_id).is_some());
3721    }
3722
3723    #[test]
3724    fn test_node_with_props_versioned() {
3725        let store = LpgStore::new();
3726
3727        let epoch = store.new_epoch();
3728        let tx_id = TxId::new(1);
3729
3730        let id = store.create_node_with_props_versioned(
3731            &["Person"],
3732            [("name", Value::from("Alice"))],
3733            epoch,
3734            tx_id,
3735        );
3736
3737        let node = store.get_node(id).unwrap();
3738        assert_eq!(
3739            node.get_property("name").and_then(|v| v.as_str()),
3740            Some("Alice")
3741        );
3742    }
3743
3744    #[test]
3745    fn test_discard_uncommitted_versions() {
3746        let store = LpgStore::new();
3747
3748        let epoch = store.new_epoch();
3749        let tx_id = TxId::new(42);
3750
3751        // Create node with specific tx
3752        let node_id = store.create_node_versioned(&["Person"], epoch, tx_id);
3753        assert!(store.get_node(node_id).is_some());
3754
3755        // Discard uncommitted versions for this tx
3756        store.discard_uncommitted_versions(tx_id);
3757
3758        // Node should be gone (version chain was removed)
3759        assert!(store.get_node(node_id).is_none());
3760    }
3761
3762    // === Property Index Tests ===
3763
3764    #[test]
3765    fn test_property_index_create_and_lookup() {
3766        let store = LpgStore::new();
3767
3768        // Create nodes with properties
3769        let alice = store.create_node(&["Person"]);
3770        let bob = store.create_node(&["Person"]);
3771        let charlie = store.create_node(&["Person"]);
3772
3773        store.set_node_property(alice, "city", Value::from("NYC"));
3774        store.set_node_property(bob, "city", Value::from("NYC"));
3775        store.set_node_property(charlie, "city", Value::from("LA"));
3776
3777        // Before indexing, lookup still works (via scan)
3778        let nyc_people = store.find_nodes_by_property("city", &Value::from("NYC"));
3779        assert_eq!(nyc_people.len(), 2);
3780
3781        // Create index
3782        store.create_property_index("city");
3783        assert!(store.has_property_index("city"));
3784
3785        // Indexed lookup should return same results
3786        let nyc_people = store.find_nodes_by_property("city", &Value::from("NYC"));
3787        assert_eq!(nyc_people.len(), 2);
3788        assert!(nyc_people.contains(&alice));
3789        assert!(nyc_people.contains(&bob));
3790
3791        let la_people = store.find_nodes_by_property("city", &Value::from("LA"));
3792        assert_eq!(la_people.len(), 1);
3793        assert!(la_people.contains(&charlie));
3794    }
3795
3796    #[test]
3797    fn test_property_index_maintained_on_update() {
3798        let store = LpgStore::new();
3799
3800        // Create index first
3801        store.create_property_index("status");
3802
3803        let node = store.create_node(&["Task"]);
3804        store.set_node_property(node, "status", Value::from("pending"));
3805
3806        // Should find by initial value
3807        let pending = store.find_nodes_by_property("status", &Value::from("pending"));
3808        assert_eq!(pending.len(), 1);
3809        assert!(pending.contains(&node));
3810
3811        // Update the property
3812        store.set_node_property(node, "status", Value::from("done"));
3813
3814        // Old value should not find it
3815        let pending = store.find_nodes_by_property("status", &Value::from("pending"));
3816        assert!(pending.is_empty());
3817
3818        // New value should find it
3819        let done = store.find_nodes_by_property("status", &Value::from("done"));
3820        assert_eq!(done.len(), 1);
3821        assert!(done.contains(&node));
3822    }
3823
3824    #[test]
3825    fn test_property_index_maintained_on_remove() {
3826        let store = LpgStore::new();
3827
3828        store.create_property_index("tag");
3829
3830        let node = store.create_node(&["Item"]);
3831        store.set_node_property(node, "tag", Value::from("important"));
3832
3833        // Should find it
3834        let found = store.find_nodes_by_property("tag", &Value::from("important"));
3835        assert_eq!(found.len(), 1);
3836
3837        // Remove the property
3838        store.remove_node_property(node, "tag");
3839
3840        // Should no longer find it
3841        let found = store.find_nodes_by_property("tag", &Value::from("important"));
3842        assert!(found.is_empty());
3843    }
3844
3845    #[test]
3846    fn test_property_index_drop() {
3847        let store = LpgStore::new();
3848
3849        store.create_property_index("key");
3850        assert!(store.has_property_index("key"));
3851
3852        assert!(store.drop_property_index("key"));
3853        assert!(!store.has_property_index("key"));
3854
3855        // Dropping non-existent index returns false
3856        assert!(!store.drop_property_index("key"));
3857    }
3858
3859    #[test]
3860    fn test_property_index_multiple_values() {
3861        let store = LpgStore::new();
3862
3863        store.create_property_index("age");
3864
3865        // Create multiple nodes with same and different ages
3866        let n1 = store.create_node(&["Person"]);
3867        let n2 = store.create_node(&["Person"]);
3868        let n3 = store.create_node(&["Person"]);
3869        let n4 = store.create_node(&["Person"]);
3870
3871        store.set_node_property(n1, "age", Value::from(25i64));
3872        store.set_node_property(n2, "age", Value::from(25i64));
3873        store.set_node_property(n3, "age", Value::from(30i64));
3874        store.set_node_property(n4, "age", Value::from(25i64));
3875
3876        let age_25 = store.find_nodes_by_property("age", &Value::from(25i64));
3877        assert_eq!(age_25.len(), 3);
3878
3879        let age_30 = store.find_nodes_by_property("age", &Value::from(30i64));
3880        assert_eq!(age_30.len(), 1);
3881
3882        let age_40 = store.find_nodes_by_property("age", &Value::from(40i64));
3883        assert!(age_40.is_empty());
3884    }
3885
3886    #[test]
3887    fn test_property_index_builds_from_existing_data() {
3888        let store = LpgStore::new();
3889
3890        // Create nodes first
3891        let n1 = store.create_node(&["Person"]);
3892        let n2 = store.create_node(&["Person"]);
3893        store.set_node_property(n1, "email", Value::from("alice@example.com"));
3894        store.set_node_property(n2, "email", Value::from("bob@example.com"));
3895
3896        // Create index after data exists
3897        store.create_property_index("email");
3898
3899        // Index should include existing data
3900        let alice = store.find_nodes_by_property("email", &Value::from("alice@example.com"));
3901        assert_eq!(alice.len(), 1);
3902        assert!(alice.contains(&n1));
3903
3904        let bob = store.find_nodes_by_property("email", &Value::from("bob@example.com"));
3905        assert_eq!(bob.len(), 1);
3906        assert!(bob.contains(&n2));
3907    }
3908
3909    #[test]
3910    fn test_get_node_property_batch() {
3911        let store = LpgStore::new();
3912
3913        let n1 = store.create_node(&["Person"]);
3914        let n2 = store.create_node(&["Person"]);
3915        let n3 = store.create_node(&["Person"]);
3916
3917        store.set_node_property(n1, "age", Value::from(25i64));
3918        store.set_node_property(n2, "age", Value::from(30i64));
3919        // n3 has no age property
3920
3921        let age_key = PropertyKey::new("age");
3922        let values = store.get_node_property_batch(&[n1, n2, n3], &age_key);
3923
3924        assert_eq!(values.len(), 3);
3925        assert_eq!(values[0], Some(Value::from(25i64)));
3926        assert_eq!(values[1], Some(Value::from(30i64)));
3927        assert_eq!(values[2], None);
3928    }
3929
3930    #[test]
3931    fn test_get_node_property_batch_empty() {
3932        let store = LpgStore::new();
3933        let key = PropertyKey::new("any");
3934
3935        let values = store.get_node_property_batch(&[], &key);
3936        assert!(values.is_empty());
3937    }
3938
3939    #[test]
3940    fn test_get_nodes_properties_batch() {
3941        let store = LpgStore::new();
3942
3943        let n1 = store.create_node(&["Person"]);
3944        let n2 = store.create_node(&["Person"]);
3945        let n3 = store.create_node(&["Person"]);
3946
3947        store.set_node_property(n1, "name", Value::from("Alice"));
3948        store.set_node_property(n1, "age", Value::from(25i64));
3949        store.set_node_property(n2, "name", Value::from("Bob"));
3950        // n3 has no properties
3951
3952        let all_props = store.get_nodes_properties_batch(&[n1, n2, n3]);
3953
3954        assert_eq!(all_props.len(), 3);
3955        assert_eq!(all_props[0].len(), 2); // name and age
3956        assert_eq!(all_props[1].len(), 1); // name only
3957        assert_eq!(all_props[2].len(), 0); // no properties
3958
3959        assert_eq!(
3960            all_props[0].get(&PropertyKey::new("name")),
3961            Some(&Value::from("Alice"))
3962        );
3963        assert_eq!(
3964            all_props[1].get(&PropertyKey::new("name")),
3965            Some(&Value::from("Bob"))
3966        );
3967    }
3968
3969    #[test]
3970    fn test_get_nodes_properties_batch_empty() {
3971        let store = LpgStore::new();
3972
3973        let all_props = store.get_nodes_properties_batch(&[]);
3974        assert!(all_props.is_empty());
3975    }
3976
3977    #[test]
3978    fn test_get_nodes_properties_selective_batch() {
3979        let store = LpgStore::new();
3980
3981        let n1 = store.create_node(&["Person"]);
3982        let n2 = store.create_node(&["Person"]);
3983
3984        // Set multiple properties
3985        store.set_node_property(n1, "name", Value::from("Alice"));
3986        store.set_node_property(n1, "age", Value::from(25i64));
3987        store.set_node_property(n1, "email", Value::from("alice@example.com"));
3988        store.set_node_property(n2, "name", Value::from("Bob"));
3989        store.set_node_property(n2, "age", Value::from(30i64));
3990        store.set_node_property(n2, "city", Value::from("NYC"));
3991
3992        // Request only name and age (not email or city)
3993        let keys = vec![PropertyKey::new("name"), PropertyKey::new("age")];
3994        let props = store.get_nodes_properties_selective_batch(&[n1, n2], &keys);
3995
3996        assert_eq!(props.len(), 2);
3997
3998        // n1: should have name and age, but NOT email
3999        assert_eq!(props[0].len(), 2);
4000        assert_eq!(
4001            props[0].get(&PropertyKey::new("name")),
4002            Some(&Value::from("Alice"))
4003        );
4004        assert_eq!(
4005            props[0].get(&PropertyKey::new("age")),
4006            Some(&Value::from(25i64))
4007        );
4008        assert_eq!(props[0].get(&PropertyKey::new("email")), None);
4009
4010        // n2: should have name and age, but NOT city
4011        assert_eq!(props[1].len(), 2);
4012        assert_eq!(
4013            props[1].get(&PropertyKey::new("name")),
4014            Some(&Value::from("Bob"))
4015        );
4016        assert_eq!(
4017            props[1].get(&PropertyKey::new("age")),
4018            Some(&Value::from(30i64))
4019        );
4020        assert_eq!(props[1].get(&PropertyKey::new("city")), None);
4021    }
4022
4023    #[test]
4024    fn test_get_nodes_properties_selective_batch_empty_keys() {
4025        let store = LpgStore::new();
4026
4027        let n1 = store.create_node(&["Person"]);
4028        store.set_node_property(n1, "name", Value::from("Alice"));
4029
4030        // Request no properties
4031        let props = store.get_nodes_properties_selective_batch(&[n1], &[]);
4032
4033        assert_eq!(props.len(), 1);
4034        assert!(props[0].is_empty()); // Empty map when no keys requested
4035    }
4036
4037    #[test]
4038    fn test_get_nodes_properties_selective_batch_missing_keys() {
4039        let store = LpgStore::new();
4040
4041        let n1 = store.create_node(&["Person"]);
4042        store.set_node_property(n1, "name", Value::from("Alice"));
4043
4044        // Request a property that doesn't exist
4045        let keys = vec![PropertyKey::new("nonexistent"), PropertyKey::new("name")];
4046        let props = store.get_nodes_properties_selective_batch(&[n1], &keys);
4047
4048        assert_eq!(props.len(), 1);
4049        assert_eq!(props[0].len(), 1); // Only name exists
4050        assert_eq!(
4051            props[0].get(&PropertyKey::new("name")),
4052            Some(&Value::from("Alice"))
4053        );
4054    }
4055
4056    // === Range Query Tests ===
4057
4058    #[test]
4059    fn test_find_nodes_in_range_inclusive() {
4060        let store = LpgStore::new();
4061
4062        let n1 = store.create_node_with_props(&["Person"], [("age", Value::from(20i64))]);
4063        let n2 = store.create_node_with_props(&["Person"], [("age", Value::from(30i64))]);
4064        let n3 = store.create_node_with_props(&["Person"], [("age", Value::from(40i64))]);
4065        let _n4 = store.create_node_with_props(&["Person"], [("age", Value::from(50i64))]);
4066
4067        // age >= 20 AND age <= 40 (inclusive both sides)
4068        let result = store.find_nodes_in_range(
4069            "age",
4070            Some(&Value::from(20i64)),
4071            Some(&Value::from(40i64)),
4072            true,
4073            true,
4074        );
4075        assert_eq!(result.len(), 3);
4076        assert!(result.contains(&n1));
4077        assert!(result.contains(&n2));
4078        assert!(result.contains(&n3));
4079    }
4080
4081    #[test]
4082    fn test_find_nodes_in_range_exclusive() {
4083        let store = LpgStore::new();
4084
4085        store.create_node_with_props(&["Person"], [("age", Value::from(20i64))]);
4086        let n2 = store.create_node_with_props(&["Person"], [("age", Value::from(30i64))]);
4087        store.create_node_with_props(&["Person"], [("age", Value::from(40i64))]);
4088
4089        // age > 20 AND age < 40 (exclusive both sides)
4090        let result = store.find_nodes_in_range(
4091            "age",
4092            Some(&Value::from(20i64)),
4093            Some(&Value::from(40i64)),
4094            false,
4095            false,
4096        );
4097        assert_eq!(result.len(), 1);
4098        assert!(result.contains(&n2));
4099    }
4100
4101    #[test]
4102    fn test_find_nodes_in_range_open_ended() {
4103        let store = LpgStore::new();
4104
4105        store.create_node_with_props(&["Person"], [("age", Value::from(20i64))]);
4106        store.create_node_with_props(&["Person"], [("age", Value::from(30i64))]);
4107        let n3 = store.create_node_with_props(&["Person"], [("age", Value::from(40i64))]);
4108        let n4 = store.create_node_with_props(&["Person"], [("age", Value::from(50i64))]);
4109
4110        // age >= 35 (no upper bound)
4111        let result = store.find_nodes_in_range("age", Some(&Value::from(35i64)), None, true, true);
4112        assert_eq!(result.len(), 2);
4113        assert!(result.contains(&n3));
4114        assert!(result.contains(&n4));
4115
4116        // age <= 25 (no lower bound)
4117        let result = store.find_nodes_in_range("age", None, Some(&Value::from(25i64)), true, true);
4118        assert_eq!(result.len(), 1);
4119    }
4120
4121    #[test]
4122    fn test_find_nodes_in_range_empty_result() {
4123        let store = LpgStore::new();
4124
4125        store.create_node_with_props(&["Person"], [("age", Value::from(20i64))]);
4126
4127        // Range that doesn't match anything
4128        let result = store.find_nodes_in_range(
4129            "age",
4130            Some(&Value::from(100i64)),
4131            Some(&Value::from(200i64)),
4132            true,
4133            true,
4134        );
4135        assert!(result.is_empty());
4136    }
4137
4138    #[test]
4139    fn test_find_nodes_in_range_nonexistent_property() {
4140        let store = LpgStore::new();
4141
4142        store.create_node_with_props(&["Person"], [("age", Value::from(20i64))]);
4143
4144        let result = store.find_nodes_in_range(
4145            "weight",
4146            Some(&Value::from(50i64)),
4147            Some(&Value::from(100i64)),
4148            true,
4149            true,
4150        );
4151        assert!(result.is_empty());
4152    }
4153
4154    // === Multi-Property Query Tests ===
4155
4156    #[test]
4157    fn test_find_nodes_by_properties_multiple_conditions() {
4158        let store = LpgStore::new();
4159
4160        let alice = store.create_node_with_props(
4161            &["Person"],
4162            [("name", Value::from("Alice")), ("city", Value::from("NYC"))],
4163        );
4164        store.create_node_with_props(
4165            &["Person"],
4166            [("name", Value::from("Bob")), ("city", Value::from("NYC"))],
4167        );
4168        store.create_node_with_props(
4169            &["Person"],
4170            [("name", Value::from("Alice")), ("city", Value::from("LA"))],
4171        );
4172
4173        // Match name="Alice" AND city="NYC"
4174        let result = store.find_nodes_by_properties(&[
4175            ("name", Value::from("Alice")),
4176            ("city", Value::from("NYC")),
4177        ]);
4178        assert_eq!(result.len(), 1);
4179        assert!(result.contains(&alice));
4180    }
4181
4182    #[test]
4183    fn test_find_nodes_by_properties_empty_conditions() {
4184        let store = LpgStore::new();
4185
4186        store.create_node(&["Person"]);
4187        store.create_node(&["Person"]);
4188
4189        // Empty conditions should return all nodes
4190        let result = store.find_nodes_by_properties(&[]);
4191        assert_eq!(result.len(), 2);
4192    }
4193
4194    #[test]
4195    fn test_find_nodes_by_properties_no_match() {
4196        let store = LpgStore::new();
4197
4198        store.create_node_with_props(&["Person"], [("name", Value::from("Alice"))]);
4199
4200        let result = store.find_nodes_by_properties(&[("name", Value::from("Nobody"))]);
4201        assert!(result.is_empty());
4202    }
4203
4204    #[test]
4205    fn test_find_nodes_by_properties_with_index() {
4206        let store = LpgStore::new();
4207
4208        // Create index on name
4209        store.create_property_index("name");
4210
4211        let alice = store.create_node_with_props(
4212            &["Person"],
4213            [("name", Value::from("Alice")), ("age", Value::from(30i64))],
4214        );
4215        store.create_node_with_props(
4216            &["Person"],
4217            [("name", Value::from("Bob")), ("age", Value::from(30i64))],
4218        );
4219
4220        // Index should accelerate the lookup
4221        let result = store.find_nodes_by_properties(&[
4222            ("name", Value::from("Alice")),
4223            ("age", Value::from(30i64)),
4224        ]);
4225        assert_eq!(result.len(), 1);
4226        assert!(result.contains(&alice));
4227    }
4228
4229    // === Cardinality Estimation Tests ===
4230
4231    #[test]
4232    fn test_estimate_label_cardinality() {
4233        let store = LpgStore::new();
4234
4235        store.create_node(&["Person"]);
4236        store.create_node(&["Person"]);
4237        store.create_node(&["Animal"]);
4238
4239        store.ensure_statistics_fresh();
4240
4241        let person_est = store.estimate_label_cardinality("Person");
4242        let animal_est = store.estimate_label_cardinality("Animal");
4243        let unknown_est = store.estimate_label_cardinality("Unknown");
4244
4245        assert!(
4246            person_est >= 1.0,
4247            "Person should have cardinality >= 1, got {person_est}"
4248        );
4249        assert!(
4250            animal_est >= 1.0,
4251            "Animal should have cardinality >= 1, got {animal_est}"
4252        );
4253        // Unknown label should return some default (not panic)
4254        assert!(unknown_est >= 0.0);
4255    }
4256
4257    #[test]
4258    fn test_estimate_avg_degree() {
4259        let store = LpgStore::new();
4260
4261        let a = store.create_node(&["Person"]);
4262        let b = store.create_node(&["Person"]);
4263        let c = store.create_node(&["Person"]);
4264
4265        store.create_edge(a, b, "KNOWS");
4266        store.create_edge(a, c, "KNOWS");
4267        store.create_edge(b, c, "KNOWS");
4268
4269        store.ensure_statistics_fresh();
4270
4271        let outgoing = store.estimate_avg_degree("KNOWS", true);
4272        let incoming = store.estimate_avg_degree("KNOWS", false);
4273
4274        assert!(
4275            outgoing > 0.0,
4276            "Outgoing degree should be > 0, got {outgoing}"
4277        );
4278        assert!(
4279            incoming > 0.0,
4280            "Incoming degree should be > 0, got {incoming}"
4281        );
4282    }
4283
4284    // === Delete operations ===
4285
4286    #[test]
4287    fn test_delete_node_does_not_cascade() {
4288        let store = LpgStore::new();
4289
4290        let a = store.create_node(&["A"]);
4291        let b = store.create_node(&["B"]);
4292        let e = store.create_edge(a, b, "KNOWS");
4293
4294        assert!(store.delete_node(a));
4295        assert!(store.get_node(a).is_none());
4296
4297        // Edges are NOT automatically deleted (non-detach delete)
4298        assert!(
4299            store.get_edge(e).is_some(),
4300            "Edge should survive non-detach node delete"
4301        );
4302    }
4303
4304    #[test]
4305    fn test_delete_already_deleted_node() {
4306        let store = LpgStore::new();
4307        let a = store.create_node(&["A"]);
4308
4309        assert!(store.delete_node(a));
4310        // Second delete should return false (already deleted)
4311        assert!(!store.delete_node(a));
4312    }
4313
4314    #[test]
4315    fn test_delete_nonexistent_node() {
4316        let store = LpgStore::new();
4317        assert!(!store.delete_node(NodeId::new(999)));
4318    }
4319}