Skip to main content

grafeo_core/graph/lpg/
store.rs

1//! The in-memory LPG graph store.
2//!
3//! This is where your nodes and edges actually live. Most users interact
4//! through [`GrafeoDB`](grafeo_engine::GrafeoDB), but algorithm implementers
5//! sometimes need the raw [`LpgStore`] for direct adjacency traversal.
6//!
7//! Key features:
8//! - MVCC versioning - concurrent readers don't block each other
9//! - Columnar properties with zone maps for fast filtering
10//! - Forward and backward adjacency indexes
11
12use super::property::CompareOp;
13use super::{Edge, EdgeRecord, Node, NodeRecord, PropertyStorage};
14use crate::graph::Direction;
15use crate::index::adjacency::ChunkedAdjacency;
16use crate::index::zone_map::ZoneMapEntry;
17use crate::statistics::{EdgeTypeStatistics, LabelStatistics, Statistics};
18use arcstr::ArcStr;
19use dashmap::DashMap;
20#[cfg(not(feature = "tiered-storage"))]
21use grafeo_common::mvcc::VersionChain;
22use grafeo_common::types::{EdgeId, EpochId, HashableValue, NodeId, PropertyKey, TxId, Value};
23use grafeo_common::utils::hash::{FxHashMap, FxHashSet};
24use parking_lot::RwLock;
25use std::cmp::Ordering as CmpOrdering;
26#[cfg(any(feature = "tiered-storage", feature = "vector-index"))]
27use std::sync::Arc;
28use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
29
30#[cfg(feature = "vector-index")]
31use crate::index::vector::HnswIndex;
32
33/// Compares two values for ordering (used for range checks).
34fn compare_values_for_range(a: &Value, b: &Value) -> Option<CmpOrdering> {
35    match (a, b) {
36        (Value::Int64(a), Value::Int64(b)) => Some(a.cmp(b)),
37        (Value::Float64(a), Value::Float64(b)) => a.partial_cmp(b),
38        (Value::String(a), Value::String(b)) => Some(a.cmp(b)),
39        (Value::Bool(a), Value::Bool(b)) => Some(a.cmp(b)),
40        _ => None,
41    }
42}
43
44/// Checks if a value is within a range.
45fn value_in_range(
46    value: &Value,
47    min: Option<&Value>,
48    max: Option<&Value>,
49    min_inclusive: bool,
50    max_inclusive: bool,
51) -> bool {
52    // Check lower bound
53    if let Some(min_val) = min {
54        match compare_values_for_range(value, min_val) {
55            Some(CmpOrdering::Less) => return false,
56            Some(CmpOrdering::Equal) if !min_inclusive => return false,
57            None => return false, // Can't compare
58            _ => {}
59        }
60    }
61
62    // Check upper bound
63    if let Some(max_val) = max {
64        match compare_values_for_range(value, max_val) {
65            Some(CmpOrdering::Greater) => return false,
66            Some(CmpOrdering::Equal) if !max_inclusive => return false,
67            None => return false,
68            _ => {}
69        }
70    }
71
72    true
73}
74
75// Tiered storage imports
76#[cfg(feature = "tiered-storage")]
77use crate::storage::EpochStore;
78#[cfg(feature = "tiered-storage")]
79use grafeo_common::memory::arena::ArenaAllocator;
80#[cfg(feature = "tiered-storage")]
81use grafeo_common::mvcc::{ColdVersionRef, HotVersionRef, VersionIndex, VersionRef};
82
83/// Configuration for the LPG store.
84///
85/// The defaults work well for most cases. Tune `backward_edges` if you only
86/// traverse in one direction (saves memory), or adjust capacities if you know
87/// your graph size upfront (avoids reallocations).
88#[derive(Debug, Clone)]
89pub struct LpgStoreConfig {
90    /// Maintain backward adjacency for incoming edge queries. Turn off if
91    /// you only traverse outgoing edges - saves ~50% adjacency memory.
92    pub backward_edges: bool,
93    /// Initial capacity for nodes (avoids early reallocations).
94    pub initial_node_capacity: usize,
95    /// Initial capacity for edges (avoids early reallocations).
96    pub initial_edge_capacity: usize,
97}
98
99impl Default for LpgStoreConfig {
100    fn default() -> Self {
101        Self {
102            backward_edges: true,
103            initial_node_capacity: 1024,
104            initial_edge_capacity: 4096,
105        }
106    }
107}
108
109/// The core in-memory graph storage.
110///
111/// Everything lives here: nodes, edges, properties, adjacency indexes, and
112/// version chains for MVCC. Concurrent reads never block each other.
113///
114/// Most users should go through `GrafeoDB` (from the `grafeo_engine` crate) which
115/// adds transaction management and query execution. Use `LpgStore` directly
116/// when you need raw performance for algorithm implementations.
117///
118/// # Example
119///
120/// ```
121/// use grafeo_core::graph::lpg::LpgStore;
122/// use grafeo_core::graph::Direction;
123///
124/// let store = LpgStore::new();
125///
126/// // Create a small social network
127/// let alice = store.create_node(&["Person"]);
128/// let bob = store.create_node(&["Person"]);
129/// store.create_edge(alice, bob, "KNOWS");
130///
131/// // Traverse outgoing edges
132/// for neighbor in store.neighbors(alice, Direction::Outgoing) {
133///     println!("Alice knows node {:?}", neighbor);
134/// }
135/// ```
136///
137/// # Lock Ordering
138///
139/// `LpgStore` contains multiple `RwLock` fields that must be acquired in a
140/// consistent order to prevent deadlocks. Always acquire locks in this order:
141///
142/// ## Level 1 - Entity Storage (mutually exclusive via feature flag)
143/// 1. `nodes` / `node_versions`
144/// 2. `edges` / `edge_versions`
145///
146/// ## Level 2 - Catalogs (acquire as pairs when writing)
147/// 3. `label_to_id` + `id_to_label`
148/// 4. `edge_type_to_id` + `id_to_edge_type`
149///
150/// ## Level 3 - Indexes
151/// 5. `label_index`
152/// 6. `node_labels`
153/// 7. `property_indexes`
154///
155/// ## Level 4 - Statistics
156/// 8. `statistics`
157///
158/// ## Level 5 - Nested Locks (internal to other structs)
159/// 9. `PropertyStorage::columns` (via `node_properties`/`edge_properties`)
160/// 10. `ChunkedAdjacency::lists` (via `forward_adj`/`backward_adj`)
161///
162/// ## Rules
163/// - Catalog pairs must be acquired together when writing.
164/// - Never hold entity locks while acquiring catalog locks in a different scope.
165/// - Statistics lock is always last.
166/// - Read locks are generally safe, but avoid read-to-write upgrades.
167pub struct LpgStore {
168    /// Configuration.
169    #[allow(dead_code)]
170    config: LpgStoreConfig,
171
172    /// Node records indexed by NodeId, with version chains for MVCC.
173    /// Used when `tiered-storage` feature is disabled.
174    /// Lock order: 1
175    #[cfg(not(feature = "tiered-storage"))]
176    nodes: RwLock<FxHashMap<NodeId, VersionChain<NodeRecord>>>,
177
178    /// Edge records indexed by EdgeId, with version chains for MVCC.
179    /// Used when `tiered-storage` feature is disabled.
180    /// Lock order: 2
181    #[cfg(not(feature = "tiered-storage"))]
182    edges: RwLock<FxHashMap<EdgeId, VersionChain<EdgeRecord>>>,
183
184    // === Tiered Storage Fields (feature-gated) ===
185    //
186    // Lock ordering for arena access:
187    //   version_lock (read/write) → arena read lock (via arena_allocator.arena())
188    //
189    // Rules:
190    // - Acquire arena read lock *after* version locks, never before.
191    // - Multiple threads may call arena.read_at() concurrently (shared refs only).
192    // - Never acquire arena write lock (alloc_new_chunk) while holding version locks.
193    // - freeze_epoch order: node_versions.read() → arena.read_at(),
194    //   then edge_versions.read() → arena.read_at().
195    /// Arena allocator for hot data storage.
196    /// Data is stored in per-epoch arenas for fast allocation and bulk deallocation.
197    #[cfg(feature = "tiered-storage")]
198    arena_allocator: Arc<ArenaAllocator>,
199
200    /// Node version indexes - store metadata and arena offsets.
201    /// The actual NodeRecord data is stored in the arena.
202    /// Lock order: 1
203    #[cfg(feature = "tiered-storage")]
204    node_versions: RwLock<FxHashMap<NodeId, VersionIndex>>,
205
206    /// Edge version indexes - store metadata and arena offsets.
207    /// The actual EdgeRecord data is stored in the arena.
208    /// Lock order: 2
209    #[cfg(feature = "tiered-storage")]
210    edge_versions: RwLock<FxHashMap<EdgeId, VersionIndex>>,
211
212    /// Cold storage for frozen epochs.
213    /// Contains compressed epoch blocks for historical data.
214    #[cfg(feature = "tiered-storage")]
215    epoch_store: Arc<EpochStore>,
216
217    /// Property storage for nodes.
218    node_properties: PropertyStorage<NodeId>,
219
220    /// Property storage for edges.
221    edge_properties: PropertyStorage<EdgeId>,
222
223    /// Label name to ID mapping.
224    /// Lock order: 3 (acquire with id_to_label)
225    label_to_id: RwLock<FxHashMap<ArcStr, u32>>,
226
227    /// Label ID to name mapping.
228    /// Lock order: 3 (acquire with label_to_id)
229    id_to_label: RwLock<Vec<ArcStr>>,
230
231    /// Edge type name to ID mapping.
232    /// Lock order: 4 (acquire with id_to_edge_type)
233    edge_type_to_id: RwLock<FxHashMap<ArcStr, u32>>,
234
235    /// Edge type ID to name mapping.
236    /// Lock order: 4 (acquire with edge_type_to_id)
237    id_to_edge_type: RwLock<Vec<ArcStr>>,
238
239    /// Forward adjacency lists (outgoing edges).
240    forward_adj: ChunkedAdjacency,
241
242    /// Backward adjacency lists (incoming edges).
243    /// Only populated if config.backward_edges is true.
244    backward_adj: Option<ChunkedAdjacency>,
245
246    /// Label index: label_id -> set of node IDs.
247    /// Lock order: 5
248    label_index: RwLock<Vec<FxHashMap<NodeId, ()>>>,
249
250    /// Node labels: node_id -> set of label IDs.
251    /// Reverse mapping to efficiently get labels for a node.
252    /// Lock order: 6
253    node_labels: RwLock<FxHashMap<NodeId, FxHashSet<u32>>>,
254
255    /// Property indexes: property_key -> (value -> set of node IDs).
256    ///
257    /// When a property is indexed, lookups by value are O(1) instead of O(n).
258    /// Use [`create_property_index`] to enable indexing for a property.
259    /// Lock order: 7
260    property_indexes: RwLock<FxHashMap<PropertyKey, DashMap<HashableValue, FxHashSet<NodeId>>>>,
261
262    /// Vector indexes: "label:property" -> HNSW index.
263    ///
264    /// Created via [`GrafeoDB::create_vector_index`](grafeo_engine::GrafeoDB::create_vector_index).
265    /// Lock order: 7 (same level as property_indexes, disjoint keys)
266    #[cfg(feature = "vector-index")]
267    vector_indexes: RwLock<FxHashMap<String, Arc<HnswIndex>>>,
268
269    /// Next node ID.
270    next_node_id: AtomicU64,
271
272    /// Next edge ID.
273    next_edge_id: AtomicU64,
274
275    /// Current epoch.
276    current_epoch: AtomicU64,
277
278    /// Statistics for cost-based optimization.
279    /// Lock order: 8 (always last)
280    statistics: RwLock<Statistics>,
281
282    /// Whether statistics need recomputation after mutations.
283    needs_stats_recompute: AtomicBool,
284}
285
286impl LpgStore {
287    /// Creates a new LPG store with default configuration.
288    #[must_use]
289    pub fn new() -> Self {
290        Self::with_config(LpgStoreConfig::default())
291    }
292
293    /// Creates a new LPG store with custom configuration.
294    #[must_use]
295    pub fn with_config(config: LpgStoreConfig) -> Self {
296        let backward_adj = if config.backward_edges {
297            Some(ChunkedAdjacency::new())
298        } else {
299            None
300        };
301
302        Self {
303            #[cfg(not(feature = "tiered-storage"))]
304            nodes: RwLock::new(FxHashMap::default()),
305            #[cfg(not(feature = "tiered-storage"))]
306            edges: RwLock::new(FxHashMap::default()),
307            #[cfg(feature = "tiered-storage")]
308            arena_allocator: Arc::new(ArenaAllocator::new()),
309            #[cfg(feature = "tiered-storage")]
310            node_versions: RwLock::new(FxHashMap::default()),
311            #[cfg(feature = "tiered-storage")]
312            edge_versions: RwLock::new(FxHashMap::default()),
313            #[cfg(feature = "tiered-storage")]
314            epoch_store: Arc::new(EpochStore::new()),
315            node_properties: PropertyStorage::new(),
316            edge_properties: PropertyStorage::new(),
317            label_to_id: RwLock::new(FxHashMap::default()),
318            id_to_label: RwLock::new(Vec::new()),
319            edge_type_to_id: RwLock::new(FxHashMap::default()),
320            id_to_edge_type: RwLock::new(Vec::new()),
321            forward_adj: ChunkedAdjacency::new(),
322            backward_adj,
323            label_index: RwLock::new(Vec::new()),
324            node_labels: RwLock::new(FxHashMap::default()),
325            property_indexes: RwLock::new(FxHashMap::default()),
326            #[cfg(feature = "vector-index")]
327            vector_indexes: RwLock::new(FxHashMap::default()),
328            next_node_id: AtomicU64::new(0),
329            next_edge_id: AtomicU64::new(0),
330            current_epoch: AtomicU64::new(0),
331            statistics: RwLock::new(Statistics::new()),
332            needs_stats_recompute: AtomicBool::new(true),
333            config,
334        }
335    }
336
337    /// Returns the current epoch.
338    #[must_use]
339    pub fn current_epoch(&self) -> EpochId {
340        EpochId::new(self.current_epoch.load(Ordering::Acquire))
341    }
342
343    /// Creates a new epoch.
344    pub fn new_epoch(&self) -> EpochId {
345        let id = self.current_epoch.fetch_add(1, Ordering::AcqRel) + 1;
346        EpochId::new(id)
347    }
348
349    // === Node Operations ===
350
351    /// Creates a new node with the given labels.
352    ///
353    /// Uses the system transaction for non-transactional operations.
354    pub fn create_node(&self, labels: &[&str]) -> NodeId {
355        self.needs_stats_recompute.store(true, Ordering::Relaxed);
356        self.create_node_versioned(labels, self.current_epoch(), TxId::SYSTEM)
357    }
358
359    /// Creates a new node with the given labels within a transaction context.
360    #[cfg(not(feature = "tiered-storage"))]
361    pub fn create_node_versioned(&self, labels: &[&str], epoch: EpochId, tx_id: TxId) -> NodeId {
362        let id = NodeId::new(self.next_node_id.fetch_add(1, Ordering::Relaxed));
363
364        let mut record = NodeRecord::new(id, epoch);
365        record.set_label_count(labels.len() as u16);
366
367        // Store labels in node_labels map and label_index
368        let mut node_label_set = FxHashSet::default();
369        for label in labels {
370            let label_id = self.get_or_create_label_id(*label);
371            node_label_set.insert(label_id);
372
373            // Update label index
374            let mut index = self.label_index.write();
375            while index.len() <= label_id as usize {
376                index.push(FxHashMap::default());
377            }
378            index[label_id as usize].insert(id, ());
379        }
380
381        // Store node's labels
382        self.node_labels.write().insert(id, node_label_set);
383
384        // Create version chain with initial version
385        let chain = VersionChain::with_initial(record, epoch, tx_id);
386        self.nodes.write().insert(id, chain);
387        id
388    }
389
390    /// Creates a new node with the given labels within a transaction context.
391    /// (Tiered storage version: stores data in arena, metadata in VersionIndex)
392    #[cfg(feature = "tiered-storage")]
393    pub fn create_node_versioned(&self, labels: &[&str], epoch: EpochId, tx_id: TxId) -> NodeId {
394        let id = NodeId::new(self.next_node_id.fetch_add(1, Ordering::Relaxed));
395
396        let mut record = NodeRecord::new(id, epoch);
397        record.set_label_count(labels.len() as u16);
398
399        // Store labels in node_labels map and label_index
400        let mut node_label_set = FxHashSet::default();
401        for label in labels {
402            let label_id = self.get_or_create_label_id(*label);
403            node_label_set.insert(label_id);
404
405            // Update label index
406            let mut index = self.label_index.write();
407            while index.len() <= label_id as usize {
408                index.push(FxHashMap::default());
409            }
410            index[label_id as usize].insert(id, ());
411        }
412
413        // Store node's labels
414        self.node_labels.write().insert(id, node_label_set);
415
416        // Allocate record in arena and get offset (create epoch if needed)
417        let arena = self.arena_allocator.arena_or_create(epoch);
418        let (offset, _stored) = arena.alloc_value_with_offset(record);
419
420        // Create HotVersionRef pointing to arena data
421        let hot_ref = HotVersionRef::new(epoch, offset, tx_id);
422
423        // Create or update version index
424        let mut versions = self.node_versions.write();
425        if let Some(index) = versions.get_mut(&id) {
426            index.add_hot(hot_ref);
427        } else {
428            versions.insert(id, VersionIndex::with_initial(hot_ref));
429        }
430
431        id
432    }
433
434    /// Creates a new node with labels and properties.
435    pub fn create_node_with_props(
436        &self,
437        labels: &[&str],
438        properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
439    ) -> NodeId {
440        self.create_node_with_props_versioned(
441            labels,
442            properties,
443            self.current_epoch(),
444            TxId::SYSTEM,
445        )
446    }
447
448    /// Creates a new node with labels and properties within a transaction context.
449    #[cfg(not(feature = "tiered-storage"))]
450    pub fn create_node_with_props_versioned(
451        &self,
452        labels: &[&str],
453        properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
454        epoch: EpochId,
455        tx_id: TxId,
456    ) -> NodeId {
457        let id = self.create_node_versioned(labels, epoch, tx_id);
458
459        for (key, value) in properties {
460            let prop_key: PropertyKey = key.into();
461            let prop_value: Value = value.into();
462            // Update property index before setting the property
463            self.update_property_index_on_set(id, &prop_key, &prop_value);
464            self.node_properties.set(id, prop_key, prop_value);
465        }
466
467        // Update props_count in record
468        let count = self.node_properties.get_all(id).len() as u16;
469        if let Some(chain) = self.nodes.write().get_mut(&id)
470            && let Some(record) = chain.latest_mut()
471        {
472            record.props_count = count;
473        }
474
475        id
476    }
477
478    /// Creates a new node with labels and properties within a transaction context.
479    /// (Tiered storage version)
480    #[cfg(feature = "tiered-storage")]
481    pub fn create_node_with_props_versioned(
482        &self,
483        labels: &[&str],
484        properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
485        epoch: EpochId,
486        tx_id: TxId,
487    ) -> NodeId {
488        let id = self.create_node_versioned(labels, epoch, tx_id);
489
490        for (key, value) in properties {
491            let prop_key: PropertyKey = key.into();
492            let prop_value: Value = value.into();
493            // Update property index before setting the property
494            self.update_property_index_on_set(id, &prop_key, &prop_value);
495            self.node_properties.set(id, prop_key, prop_value);
496        }
497
498        // Note: props_count in record is not updated for tiered storage.
499        // The record is immutable once allocated in the arena.
500
501        id
502    }
503
504    /// Gets a node by ID (latest visible version).
505    #[must_use]
506    pub fn get_node(&self, id: NodeId) -> Option<Node> {
507        self.get_node_at_epoch(id, self.current_epoch())
508    }
509
510    /// Gets a node by ID at a specific epoch.
511    #[must_use]
512    #[cfg(not(feature = "tiered-storage"))]
513    pub fn get_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> Option<Node> {
514        let nodes = self.nodes.read();
515        let chain = nodes.get(&id)?;
516        let record = chain.visible_at(epoch)?;
517
518        if record.is_deleted() {
519            return None;
520        }
521
522        let mut node = Node::new(id);
523
524        // Get labels from node_labels map
525        let id_to_label = self.id_to_label.read();
526        let node_labels = self.node_labels.read();
527        if let Some(label_ids) = node_labels.get(&id) {
528            for &label_id in label_ids {
529                if let Some(label) = id_to_label.get(label_id as usize) {
530                    node.labels.push(label.clone());
531                }
532            }
533        }
534
535        // Get properties
536        node.properties = self.node_properties.get_all(id).into_iter().collect();
537
538        Some(node)
539    }
540
541    /// Gets a node by ID at a specific epoch.
542    /// (Tiered storage version: reads from arena via VersionIndex)
543    #[must_use]
544    #[cfg(feature = "tiered-storage")]
545    pub fn get_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> Option<Node> {
546        let versions = self.node_versions.read();
547        let index = versions.get(&id)?;
548        let version_ref = index.visible_at(epoch)?;
549
550        // Read the record from arena
551        let record = self.read_node_record(&version_ref)?;
552
553        if record.is_deleted() {
554            return None;
555        }
556
557        let mut node = Node::new(id);
558
559        // Get labels from node_labels map
560        let id_to_label = self.id_to_label.read();
561        let node_labels = self.node_labels.read();
562        if let Some(label_ids) = node_labels.get(&id) {
563            for &label_id in label_ids {
564                if let Some(label) = id_to_label.get(label_id as usize) {
565                    node.labels.push(label.clone());
566                }
567            }
568        }
569
570        // Get properties
571        node.properties = self.node_properties.get_all(id).into_iter().collect();
572
573        Some(node)
574    }
575
576    /// Gets a node visible to a specific transaction.
577    #[must_use]
578    #[cfg(not(feature = "tiered-storage"))]
579    pub fn get_node_versioned(&self, id: NodeId, epoch: EpochId, tx_id: TxId) -> Option<Node> {
580        let nodes = self.nodes.read();
581        let chain = nodes.get(&id)?;
582        let record = chain.visible_to(epoch, tx_id)?;
583
584        if record.is_deleted() {
585            return None;
586        }
587
588        let mut node = Node::new(id);
589
590        // Get labels from node_labels map
591        let id_to_label = self.id_to_label.read();
592        let node_labels = self.node_labels.read();
593        if let Some(label_ids) = node_labels.get(&id) {
594            for &label_id in label_ids {
595                if let Some(label) = id_to_label.get(label_id as usize) {
596                    node.labels.push(label.clone());
597                }
598            }
599        }
600
601        // Get properties
602        node.properties = self.node_properties.get_all(id).into_iter().collect();
603
604        Some(node)
605    }
606
607    /// Gets a node visible to a specific transaction.
608    /// (Tiered storage version: reads from arena via VersionIndex)
609    #[must_use]
610    #[cfg(feature = "tiered-storage")]
611    pub fn get_node_versioned(&self, id: NodeId, epoch: EpochId, tx_id: TxId) -> Option<Node> {
612        let versions = self.node_versions.read();
613        let index = versions.get(&id)?;
614        let version_ref = index.visible_to(epoch, tx_id)?;
615
616        // Read the record from arena
617        let record = self.read_node_record(&version_ref)?;
618
619        if record.is_deleted() {
620            return None;
621        }
622
623        let mut node = Node::new(id);
624
625        // Get labels from node_labels map
626        let id_to_label = self.id_to_label.read();
627        let node_labels = self.node_labels.read();
628        if let Some(label_ids) = node_labels.get(&id) {
629            for &label_id in label_ids {
630                if let Some(label) = id_to_label.get(label_id as usize) {
631                    node.labels.push(label.clone());
632                }
633            }
634        }
635
636        // Get properties
637        node.properties = self.node_properties.get_all(id).into_iter().collect();
638
639        Some(node)
640    }
641
642    /// Reads a NodeRecord from arena (hot) or epoch store (cold) using a VersionRef.
643    #[cfg(feature = "tiered-storage")]
644    #[allow(unsafe_code)]
645    fn read_node_record(&self, version_ref: &VersionRef) -> Option<NodeRecord> {
646        match version_ref {
647            VersionRef::Hot(hot_ref) => {
648                let arena = self.arena_allocator.arena(hot_ref.epoch);
649                // SAFETY: The offset was returned by alloc_value_with_offset for a NodeRecord
650                let record: &NodeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
651                Some(*record)
652            }
653            VersionRef::Cold(cold_ref) => {
654                // Read from compressed epoch store
655                self.epoch_store
656                    .get_node(cold_ref.epoch, cold_ref.block_offset, cold_ref.length)
657            }
658        }
659    }
660
661    /// Deletes a node and all its edges (using latest epoch).
662    pub fn delete_node(&self, id: NodeId) -> bool {
663        self.needs_stats_recompute.store(true, Ordering::Relaxed);
664        self.delete_node_at_epoch(id, self.current_epoch())
665    }
666
667    /// Deletes a node at a specific epoch.
668    #[cfg(not(feature = "tiered-storage"))]
669    pub fn delete_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> bool {
670        let mut nodes = self.nodes.write();
671        if let Some(chain) = nodes.get_mut(&id) {
672            // Check if visible at this epoch (not already deleted)
673            if let Some(record) = chain.visible_at(epoch) {
674                if record.is_deleted() {
675                    return false;
676                }
677            } else {
678                // Not visible at this epoch (already deleted or doesn't exist)
679                return false;
680            }
681
682            // Mark the version chain as deleted at this epoch
683            chain.mark_deleted(epoch);
684
685            // Remove from label index using node_labels map
686            let mut index = self.label_index.write();
687            let mut node_labels = self.node_labels.write();
688            if let Some(label_ids) = node_labels.remove(&id) {
689                for label_id in label_ids {
690                    if let Some(set) = index.get_mut(label_id as usize) {
691                        set.remove(&id);
692                    }
693                }
694            }
695
696            // Remove properties
697            drop(nodes); // Release lock before removing properties
698            drop(index);
699            drop(node_labels);
700            self.node_properties.remove_all(id);
701
702            // Note: Caller should use delete_node_edges() first if detach is needed
703
704            true
705        } else {
706            false
707        }
708    }
709
710    /// Deletes a node at a specific epoch.
711    /// (Tiered storage version)
712    #[cfg(feature = "tiered-storage")]
713    pub fn delete_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> bool {
714        let mut versions = self.node_versions.write();
715        if let Some(index) = versions.get_mut(&id) {
716            // Check if visible at this epoch
717            if let Some(version_ref) = index.visible_at(epoch) {
718                if let Some(record) = self.read_node_record(&version_ref) {
719                    if record.is_deleted() {
720                        return false;
721                    }
722                } else {
723                    return false;
724                }
725            } else {
726                return false;
727            }
728
729            // Mark as deleted in version index
730            index.mark_deleted(epoch);
731
732            // Remove from label index using node_labels map
733            let mut label_index = self.label_index.write();
734            let mut node_labels = self.node_labels.write();
735            if let Some(label_ids) = node_labels.remove(&id) {
736                for label_id in label_ids {
737                    if let Some(set) = label_index.get_mut(label_id as usize) {
738                        set.remove(&id);
739                    }
740                }
741            }
742
743            // Remove properties
744            drop(versions);
745            drop(label_index);
746            drop(node_labels);
747            self.node_properties.remove_all(id);
748
749            true
750        } else {
751            false
752        }
753    }
754
755    /// Deletes all edges connected to a node (implements DETACH DELETE).
756    ///
757    /// Call this before `delete_node()` if you want to remove a node that
758    /// has edges. Grafeo doesn't auto-delete edges - you have to be explicit.
759    #[cfg(not(feature = "tiered-storage"))]
760    pub fn delete_node_edges(&self, node_id: NodeId) {
761        // Get outgoing edges
762        let outgoing: Vec<EdgeId> = self
763            .forward_adj
764            .edges_from(node_id)
765            .into_iter()
766            .map(|(_, edge_id)| edge_id)
767            .collect();
768
769        // Get incoming edges
770        let incoming: Vec<EdgeId> = if let Some(ref backward) = self.backward_adj {
771            backward
772                .edges_from(node_id)
773                .into_iter()
774                .map(|(_, edge_id)| edge_id)
775                .collect()
776        } else {
777            // No backward adjacency - scan all edges
778            let epoch = self.current_epoch();
779            self.edges
780                .read()
781                .iter()
782                .filter_map(|(id, chain)| {
783                    chain.visible_at(epoch).and_then(|r| {
784                        if !r.is_deleted() && r.dst == node_id {
785                            Some(*id)
786                        } else {
787                            None
788                        }
789                    })
790                })
791                .collect()
792        };
793
794        // Delete all edges
795        for edge_id in outgoing.into_iter().chain(incoming) {
796            self.delete_edge(edge_id);
797        }
798    }
799
800    /// Deletes all edges connected to a node (implements DETACH DELETE).
801    /// (Tiered storage version)
802    #[cfg(feature = "tiered-storage")]
803    pub fn delete_node_edges(&self, node_id: NodeId) {
804        // Get outgoing edges
805        let outgoing: Vec<EdgeId> = self
806            .forward_adj
807            .edges_from(node_id)
808            .into_iter()
809            .map(|(_, edge_id)| edge_id)
810            .collect();
811
812        // Get incoming edges
813        let incoming: Vec<EdgeId> = if let Some(ref backward) = self.backward_adj {
814            backward
815                .edges_from(node_id)
816                .into_iter()
817                .map(|(_, edge_id)| edge_id)
818                .collect()
819        } else {
820            // No backward adjacency - scan all edges
821            let epoch = self.current_epoch();
822            let versions = self.edge_versions.read();
823            versions
824                .iter()
825                .filter_map(|(id, index)| {
826                    index.visible_at(epoch).and_then(|vref| {
827                        self.read_edge_record(&vref).and_then(|r| {
828                            if !r.is_deleted() && r.dst == node_id {
829                                Some(*id)
830                            } else {
831                                None
832                            }
833                        })
834                    })
835                })
836                .collect()
837        };
838
839        // Delete all edges
840        for edge_id in outgoing.into_iter().chain(incoming) {
841            self.delete_edge(edge_id);
842        }
843    }
844
845    /// Sets a property on a node.
846    #[cfg(not(feature = "tiered-storage"))]
847    pub fn set_node_property(&self, id: NodeId, key: &str, value: Value) {
848        let prop_key: PropertyKey = key.into();
849
850        // Update property index before setting the property (needs to read old value)
851        self.update_property_index_on_set(id, &prop_key, &value);
852
853        self.node_properties.set(id, prop_key, value);
854
855        // Update props_count in record
856        let count = self.node_properties.get_all(id).len() as u16;
857        if let Some(chain) = self.nodes.write().get_mut(&id)
858            && let Some(record) = chain.latest_mut()
859        {
860            record.props_count = count;
861        }
862    }
863
864    /// Sets a property on a node.
865    /// (Tiered storage version: properties stored separately, record is immutable)
866    #[cfg(feature = "tiered-storage")]
867    pub fn set_node_property(&self, id: NodeId, key: &str, value: Value) {
868        let prop_key: PropertyKey = key.into();
869
870        // Update property index before setting the property (needs to read old value)
871        self.update_property_index_on_set(id, &prop_key, &value);
872
873        self.node_properties.set(id, prop_key, value);
874        // Note: props_count in record is not updated for tiered storage.
875        // The record is immutable once allocated in the arena.
876        // Property count can be derived from PropertyStorage if needed.
877    }
878
879    /// Sets a property on an edge.
880    pub fn set_edge_property(&self, id: EdgeId, key: &str, value: Value) {
881        self.edge_properties.set(id, key.into(), value);
882    }
883
884    /// Removes a property from a node.
885    ///
886    /// Returns the previous value if it existed, or None if the property didn't exist.
887    #[cfg(not(feature = "tiered-storage"))]
888    pub fn remove_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
889        let prop_key: PropertyKey = key.into();
890
891        // Update property index before removing (needs to read old value)
892        self.update_property_index_on_remove(id, &prop_key);
893
894        let result = self.node_properties.remove(id, &prop_key);
895
896        // Update props_count in record
897        let count = self.node_properties.get_all(id).len() as u16;
898        if let Some(chain) = self.nodes.write().get_mut(&id)
899            && let Some(record) = chain.latest_mut()
900        {
901            record.props_count = count;
902        }
903
904        result
905    }
906
907    /// Removes a property from a node.
908    /// (Tiered storage version)
909    #[cfg(feature = "tiered-storage")]
910    pub fn remove_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
911        let prop_key: PropertyKey = key.into();
912
913        // Update property index before removing (needs to read old value)
914        self.update_property_index_on_remove(id, &prop_key);
915
916        self.node_properties.remove(id, &prop_key)
917        // Note: props_count in record is not updated for tiered storage.
918    }
919
920    /// Removes a property from an edge.
921    ///
922    /// Returns the previous value if it existed, or None if the property didn't exist.
923    pub fn remove_edge_property(&self, id: EdgeId, key: &str) -> Option<Value> {
924        self.edge_properties.remove(id, &key.into())
925    }
926
927    /// Gets a single property from a node without loading all properties.
928    ///
929    /// This is O(1) vs O(properties) for `get_node().get_property()`.
930    /// Use this for filter predicates where you only need one property value.
931    ///
932    /// # Example
933    ///
934    /// ```ignore
935    /// // Fast: Direct single-property lookup
936    /// let age = store.get_node_property(node_id, "age");
937    ///
938    /// // Slow: Loads all properties, then extracts one
939    /// let age = store.get_node(node_id).and_then(|n| n.get_property("age").cloned());
940    /// ```
941    #[must_use]
942    pub fn get_node_property(&self, id: NodeId, key: &PropertyKey) -> Option<Value> {
943        self.node_properties.get(id, key)
944    }
945
946    /// Gets a single property from an edge without loading all properties.
947    ///
948    /// This is O(1) vs O(properties) for `get_edge().get_property()`.
949    #[must_use]
950    pub fn get_edge_property(&self, id: EdgeId, key: &PropertyKey) -> Option<Value> {
951        self.edge_properties.get(id, key)
952    }
953
954    // === Batch Property Operations ===
955
956    /// Gets a property for multiple nodes in a single batch operation.
957    ///
958    /// More efficient than calling [`Self::get_node_property`] in a loop because it
959    /// reduces lock overhead and enables better cache utilization.
960    ///
961    /// # Example
962    ///
963    /// ```
964    /// use grafeo_core::graph::lpg::LpgStore;
965    /// use grafeo_common::types::{NodeId, PropertyKey, Value};
966    ///
967    /// let store = LpgStore::new();
968    /// let n1 = store.create_node(&["Person"]);
969    /// let n2 = store.create_node(&["Person"]);
970    /// store.set_node_property(n1, "age", Value::from(25i64));
971    /// store.set_node_property(n2, "age", Value::from(30i64));
972    ///
973    /// let ages = store.get_node_property_batch(&[n1, n2], &PropertyKey::new("age"));
974    /// assert_eq!(ages, vec![Some(Value::from(25i64)), Some(Value::from(30i64))]);
975    /// ```
976    #[must_use]
977    pub fn get_node_property_batch(&self, ids: &[NodeId], key: &PropertyKey) -> Vec<Option<Value>> {
978        self.node_properties.get_batch(ids, key)
979    }
980
981    /// Gets all properties for multiple nodes in a single batch operation.
982    ///
983    /// Returns a vector of property maps, one per node ID (empty map if no properties).
984    /// More efficient than calling [`Self::get_node`] in a loop.
985    #[must_use]
986    pub fn get_nodes_properties_batch(&self, ids: &[NodeId]) -> Vec<FxHashMap<PropertyKey, Value>> {
987        self.node_properties.get_all_batch(ids)
988    }
989
990    /// Gets selected properties for multiple nodes (projection pushdown).
991    ///
992    /// This is more efficient than [`Self::get_nodes_properties_batch`] when you only
993    /// need a subset of properties. It only iterates the requested columns instead of
994    /// all columns.
995    ///
996    /// **Use this for**: Queries with explicit projections like `RETURN n.name, n.age`
997    /// instead of `RETURN n` (which requires all properties).
998    ///
999    /// # Example
1000    ///
1001    /// ```
1002    /// use grafeo_core::graph::lpg::LpgStore;
1003    /// use grafeo_common::types::{PropertyKey, Value};
1004    ///
1005    /// let store = LpgStore::new();
1006    /// let n1 = store.create_node(&["Person"]);
1007    /// store.set_node_property(n1, "name", Value::from("Alice"));
1008    /// store.set_node_property(n1, "age", Value::from(30i64));
1009    /// store.set_node_property(n1, "email", Value::from("alice@example.com"));
1010    ///
1011    /// // Only fetch name and age (faster than get_nodes_properties_batch)
1012    /// let keys = vec![PropertyKey::new("name"), PropertyKey::new("age")];
1013    /// let props = store.get_nodes_properties_selective_batch(&[n1], &keys);
1014    ///
1015    /// assert_eq!(props[0].len(), 2); // Only name and age, not email
1016    /// ```
1017    #[must_use]
1018    pub fn get_nodes_properties_selective_batch(
1019        &self,
1020        ids: &[NodeId],
1021        keys: &[PropertyKey],
1022    ) -> Vec<FxHashMap<PropertyKey, Value>> {
1023        self.node_properties.get_selective_batch(ids, keys)
1024    }
1025
1026    /// Gets selected properties for multiple edges (projection pushdown).
1027    ///
1028    /// Edge-property version of [`Self::get_nodes_properties_selective_batch`].
1029    #[must_use]
1030    pub fn get_edges_properties_selective_batch(
1031        &self,
1032        ids: &[EdgeId],
1033        keys: &[PropertyKey],
1034    ) -> Vec<FxHashMap<PropertyKey, Value>> {
1035        self.edge_properties.get_selective_batch(ids, keys)
1036    }
1037
1038    /// Finds nodes where a property value is in a range.
1039    ///
1040    /// This is useful for queries like `n.age > 30` or `n.price BETWEEN 10 AND 100`.
1041    /// Uses zone maps to skip scanning when the range definitely doesn't match.
1042    ///
1043    /// # Arguments
1044    ///
1045    /// * `property` - The property to check
1046    /// * `min` - Optional lower bound (None for unbounded)
1047    /// * `max` - Optional upper bound (None for unbounded)
1048    /// * `min_inclusive` - Whether lower bound is inclusive (>= vs >)
1049    /// * `max_inclusive` - Whether upper bound is inclusive (<= vs <)
1050    ///
1051    /// # Example
1052    ///
1053    /// ```
1054    /// use grafeo_core::graph::lpg::LpgStore;
1055    /// use grafeo_common::types::Value;
1056    ///
1057    /// let store = LpgStore::new();
1058    /// let n1 = store.create_node(&["Person"]);
1059    /// let n2 = store.create_node(&["Person"]);
1060    /// store.set_node_property(n1, "age", Value::from(25i64));
1061    /// store.set_node_property(n2, "age", Value::from(35i64));
1062    ///
1063    /// // Find nodes where age > 30
1064    /// let result = store.find_nodes_in_range(
1065    ///     "age",
1066    ///     Some(&Value::from(30i64)),
1067    ///     None,
1068    ///     false, // exclusive lower bound
1069    ///     true,  // inclusive upper bound (doesn't matter since None)
1070    /// );
1071    /// assert_eq!(result.len(), 1); // Only n2 matches
1072    /// ```
1073    #[must_use]
1074    pub fn find_nodes_in_range(
1075        &self,
1076        property: &str,
1077        min: Option<&Value>,
1078        max: Option<&Value>,
1079        min_inclusive: bool,
1080        max_inclusive: bool,
1081    ) -> Vec<NodeId> {
1082        let key = PropertyKey::new(property);
1083
1084        // Check zone map first - if no values could match, return empty
1085        if !self
1086            .node_properties
1087            .might_match_range(&key, min, max, min_inclusive, max_inclusive)
1088        {
1089            return Vec::new();
1090        }
1091
1092        // Scan all nodes and filter by range
1093        self.node_ids()
1094            .into_iter()
1095            .filter(|&node_id| {
1096                self.node_properties
1097                    .get(node_id, &key)
1098                    .is_some_and(|v| value_in_range(&v, min, max, min_inclusive, max_inclusive))
1099            })
1100            .collect()
1101    }
1102
1103    /// Finds nodes matching multiple property equality conditions.
1104    ///
1105    /// This is more efficient than intersecting multiple single-property lookups
1106    /// because it can use indexes when available and short-circuits on the first
1107    /// miss.
1108    ///
1109    /// # Example
1110    ///
1111    /// ```
1112    /// use grafeo_core::graph::lpg::LpgStore;
1113    /// use grafeo_common::types::Value;
1114    ///
1115    /// let store = LpgStore::new();
1116    /// let alice = store.create_node(&["Person"]);
1117    /// store.set_node_property(alice, "name", Value::from("Alice"));
1118    /// store.set_node_property(alice, "city", Value::from("NYC"));
1119    ///
1120    /// // Find nodes where name = "Alice" AND city = "NYC"
1121    /// let matches = store.find_nodes_by_properties(&[
1122    ///     ("name", Value::from("Alice")),
1123    ///     ("city", Value::from("NYC")),
1124    /// ]);
1125    /// assert!(matches.contains(&alice));
1126    /// ```
1127    #[must_use]
1128    pub fn find_nodes_by_properties(&self, conditions: &[(&str, Value)]) -> Vec<NodeId> {
1129        if conditions.is_empty() {
1130            return self.node_ids();
1131        }
1132
1133        // Find the most selective condition (smallest result set) to start
1134        // If any condition has an index, use that first
1135        let mut best_start: Option<(usize, Vec<NodeId>)> = None;
1136        let indexes = self.property_indexes.read();
1137
1138        for (i, (prop, value)) in conditions.iter().enumerate() {
1139            let key = PropertyKey::new(*prop);
1140            let hv = HashableValue::new(value.clone());
1141
1142            if let Some(index) = indexes.get(&key) {
1143                let matches: Vec<NodeId> = index
1144                    .get(&hv)
1145                    .map(|nodes| nodes.iter().copied().collect())
1146                    .unwrap_or_default();
1147
1148                // Short-circuit if any indexed condition has no matches
1149                if matches.is_empty() {
1150                    return Vec::new();
1151                }
1152
1153                // Use smallest indexed result as starting point
1154                if best_start
1155                    .as_ref()
1156                    .is_none_or(|(_, best)| matches.len() < best.len())
1157                {
1158                    best_start = Some((i, matches));
1159                }
1160            }
1161        }
1162        drop(indexes);
1163
1164        // Start from best indexed result or fall back to full node scan
1165        let (start_idx, mut candidates) = best_start.unwrap_or_else(|| {
1166            // No indexes available, start with first condition via full scan
1167            let (prop, value) = &conditions[0];
1168            (0, self.find_nodes_by_property(prop, value))
1169        });
1170
1171        // Filter candidates through remaining conditions
1172        for (i, (prop, value)) in conditions.iter().enumerate() {
1173            if i == start_idx {
1174                continue;
1175            }
1176
1177            let key = PropertyKey::new(*prop);
1178            candidates.retain(|&node_id| {
1179                self.node_properties
1180                    .get(node_id, &key)
1181                    .is_some_and(|v| v == *value)
1182            });
1183
1184            // Short-circuit if no candidates remain
1185            if candidates.is_empty() {
1186                return Vec::new();
1187            }
1188        }
1189
1190        candidates
1191    }
1192
1193    // === Property Index Operations ===
1194
1195    /// Creates an index on a node property for O(1) lookups by value.
1196    ///
1197    /// After creating an index, calls to [`Self::find_nodes_by_property`] will be
1198    /// O(1) instead of O(n) for this property. The index is automatically
1199    /// maintained when properties are set or removed.
1200    ///
1201    /// # Example
1202    ///
1203    /// ```
1204    /// use grafeo_core::graph::lpg::LpgStore;
1205    /// use grafeo_common::types::Value;
1206    ///
1207    /// let store = LpgStore::new();
1208    ///
1209    /// // Create nodes with an 'id' property
1210    /// let alice = store.create_node(&["Person"]);
1211    /// store.set_node_property(alice, "id", Value::from("alice_123"));
1212    ///
1213    /// // Create an index on the 'id' property
1214    /// store.create_property_index("id");
1215    ///
1216    /// // Now lookups by 'id' are O(1)
1217    /// let found = store.find_nodes_by_property("id", &Value::from("alice_123"));
1218    /// assert!(found.contains(&alice));
1219    /// ```
1220    pub fn create_property_index(&self, property: &str) {
1221        let key = PropertyKey::new(property);
1222
1223        let mut indexes = self.property_indexes.write();
1224        if indexes.contains_key(&key) {
1225            return; // Already indexed
1226        }
1227
1228        // Create the index and populate it with existing data
1229        let index: DashMap<HashableValue, FxHashSet<NodeId>> = DashMap::new();
1230
1231        // Scan all nodes to build the index
1232        for node_id in self.node_ids() {
1233            if let Some(value) = self.node_properties.get(node_id, &key) {
1234                let hv = HashableValue::new(value);
1235                index.entry(hv).or_default().insert(node_id);
1236            }
1237        }
1238
1239        indexes.insert(key, index);
1240    }
1241
1242    /// Drops an index on a node property.
1243    ///
1244    /// Returns `true` if the index existed and was removed.
1245    pub fn drop_property_index(&self, property: &str) -> bool {
1246        let key = PropertyKey::new(property);
1247        self.property_indexes.write().remove(&key).is_some()
1248    }
1249
1250    /// Returns `true` if the property has an index.
1251    #[must_use]
1252    pub fn has_property_index(&self, property: &str) -> bool {
1253        let key = PropertyKey::new(property);
1254        self.property_indexes.read().contains_key(&key)
1255    }
1256
1257    /// Stores a vector index for a label+property pair.
1258    #[cfg(feature = "vector-index")]
1259    pub fn add_vector_index(&self, label: &str, property: &str, index: Arc<HnswIndex>) {
1260        let key = format!("{label}:{property}");
1261        self.vector_indexes.write().insert(key, index);
1262    }
1263
1264    /// Retrieves the vector index for a label+property pair.
1265    #[cfg(feature = "vector-index")]
1266    #[must_use]
1267    pub fn get_vector_index(&self, label: &str, property: &str) -> Option<Arc<HnswIndex>> {
1268        let key = format!("{label}:{property}");
1269        self.vector_indexes.read().get(&key).cloned()
1270    }
1271
1272    /// Finds all nodes that have a specific property value.
1273    ///
1274    /// If the property is indexed, this is O(1). Otherwise, it scans all nodes
1275    /// which is O(n). Use [`Self::create_property_index`] for frequently queried properties.
1276    ///
1277    /// # Example
1278    ///
1279    /// ```
1280    /// use grafeo_core::graph::lpg::LpgStore;
1281    /// use grafeo_common::types::Value;
1282    ///
1283    /// let store = LpgStore::new();
1284    /// store.create_property_index("city"); // Optional but makes lookups fast
1285    ///
1286    /// let alice = store.create_node(&["Person"]);
1287    /// let bob = store.create_node(&["Person"]);
1288    /// store.set_node_property(alice, "city", Value::from("NYC"));
1289    /// store.set_node_property(bob, "city", Value::from("NYC"));
1290    ///
1291    /// let nyc_people = store.find_nodes_by_property("city", &Value::from("NYC"));
1292    /// assert_eq!(nyc_people.len(), 2);
1293    /// ```
1294    #[must_use]
1295    pub fn find_nodes_by_property(&self, property: &str, value: &Value) -> Vec<NodeId> {
1296        let key = PropertyKey::new(property);
1297        let hv = HashableValue::new(value.clone());
1298
1299        // Try indexed lookup first
1300        let indexes = self.property_indexes.read();
1301        if let Some(index) = indexes.get(&key) {
1302            if let Some(nodes) = index.get(&hv) {
1303                return nodes.iter().copied().collect();
1304            }
1305            return Vec::new();
1306        }
1307        drop(indexes);
1308
1309        // Fall back to full scan
1310        self.node_ids()
1311            .into_iter()
1312            .filter(|&node_id| {
1313                self.node_properties
1314                    .get(node_id, &key)
1315                    .is_some_and(|v| v == *value)
1316            })
1317            .collect()
1318    }
1319
1320    /// Updates property indexes when a property is set.
1321    fn update_property_index_on_set(&self, node_id: NodeId, key: &PropertyKey, new_value: &Value) {
1322        let indexes = self.property_indexes.read();
1323        if let Some(index) = indexes.get(key) {
1324            // Get old value to remove from index
1325            if let Some(old_value) = self.node_properties.get(node_id, key) {
1326                let old_hv = HashableValue::new(old_value);
1327                if let Some(mut nodes) = index.get_mut(&old_hv) {
1328                    nodes.remove(&node_id);
1329                    if nodes.is_empty() {
1330                        drop(nodes);
1331                        index.remove(&old_hv);
1332                    }
1333                }
1334            }
1335
1336            // Add new value to index
1337            let new_hv = HashableValue::new(new_value.clone());
1338            index
1339                .entry(new_hv)
1340                .or_insert_with(FxHashSet::default)
1341                .insert(node_id);
1342        }
1343    }
1344
1345    /// Updates property indexes when a property is removed.
1346    fn update_property_index_on_remove(&self, node_id: NodeId, key: &PropertyKey) {
1347        let indexes = self.property_indexes.read();
1348        if let Some(index) = indexes.get(key) {
1349            // Get old value to remove from index
1350            if let Some(old_value) = self.node_properties.get(node_id, key) {
1351                let old_hv = HashableValue::new(old_value);
1352                if let Some(mut nodes) = index.get_mut(&old_hv) {
1353                    nodes.remove(&node_id);
1354                    if nodes.is_empty() {
1355                        drop(nodes);
1356                        index.remove(&old_hv);
1357                    }
1358                }
1359            }
1360        }
1361    }
1362
1363    /// Adds a label to a node.
1364    ///
1365    /// Returns true if the label was added, false if the node doesn't exist
1366    /// or already has the label.
1367    #[cfg(not(feature = "tiered-storage"))]
1368    pub fn add_label(&self, node_id: NodeId, label: &str) -> bool {
1369        let epoch = self.current_epoch();
1370
1371        // Check if node exists
1372        let nodes = self.nodes.read();
1373        if let Some(chain) = nodes.get(&node_id) {
1374            if chain.visible_at(epoch).map_or(true, |r| r.is_deleted()) {
1375                return false;
1376            }
1377        } else {
1378            return false;
1379        }
1380        drop(nodes);
1381
1382        // Get or create label ID
1383        let label_id = self.get_or_create_label_id(label);
1384
1385        // Add to node_labels map
1386        let mut node_labels = self.node_labels.write();
1387        let label_set = node_labels.entry(node_id).or_default();
1388
1389        if label_set.contains(&label_id) {
1390            return false; // Already has this label
1391        }
1392
1393        label_set.insert(label_id);
1394        drop(node_labels);
1395
1396        // Add to label_index
1397        let mut index = self.label_index.write();
1398        if (label_id as usize) >= index.len() {
1399            index.resize(label_id as usize + 1, FxHashMap::default());
1400        }
1401        index[label_id as usize].insert(node_id, ());
1402
1403        // Update label count in node record
1404        if let Some(chain) = self.nodes.write().get_mut(&node_id)
1405            && let Some(record) = chain.latest_mut()
1406        {
1407            let count = self.node_labels.read().get(&node_id).map_or(0, |s| s.len());
1408            record.set_label_count(count as u16);
1409        }
1410
1411        true
1412    }
1413
1414    /// Adds a label to a node.
1415    /// (Tiered storage version)
1416    #[cfg(feature = "tiered-storage")]
1417    pub fn add_label(&self, node_id: NodeId, label: &str) -> bool {
1418        let epoch = self.current_epoch();
1419
1420        // Check if node exists
1421        let versions = self.node_versions.read();
1422        if let Some(index) = versions.get(&node_id) {
1423            if let Some(vref) = index.visible_at(epoch) {
1424                if let Some(record) = self.read_node_record(&vref) {
1425                    if record.is_deleted() {
1426                        return false;
1427                    }
1428                } else {
1429                    return false;
1430                }
1431            } else {
1432                return false;
1433            }
1434        } else {
1435            return false;
1436        }
1437        drop(versions);
1438
1439        // Get or create label ID
1440        let label_id = self.get_or_create_label_id(label);
1441
1442        // Add to node_labels map
1443        let mut node_labels = self.node_labels.write();
1444        let label_set = node_labels.entry(node_id).or_default();
1445
1446        if label_set.contains(&label_id) {
1447            return false; // Already has this label
1448        }
1449
1450        label_set.insert(label_id);
1451        drop(node_labels);
1452
1453        // Add to label_index
1454        let mut index = self.label_index.write();
1455        if (label_id as usize) >= index.len() {
1456            index.resize(label_id as usize + 1, FxHashMap::default());
1457        }
1458        index[label_id as usize].insert(node_id, ());
1459
1460        // Note: label_count in record is not updated for tiered storage.
1461        // The record is immutable once allocated in the arena.
1462
1463        true
1464    }
1465
1466    /// Removes a label from a node.
1467    ///
1468    /// Returns true if the label was removed, false if the node doesn't exist
1469    /// or doesn't have the label.
1470    #[cfg(not(feature = "tiered-storage"))]
1471    pub fn remove_label(&self, node_id: NodeId, label: &str) -> bool {
1472        let epoch = self.current_epoch();
1473
1474        // Check if node exists
1475        let nodes = self.nodes.read();
1476        if let Some(chain) = nodes.get(&node_id) {
1477            if chain.visible_at(epoch).map_or(true, |r| r.is_deleted()) {
1478                return false;
1479            }
1480        } else {
1481            return false;
1482        }
1483        drop(nodes);
1484
1485        // Get label ID
1486        let label_id = {
1487            let label_ids = self.label_to_id.read();
1488            match label_ids.get(label) {
1489                Some(&id) => id,
1490                None => return false, // Label doesn't exist
1491            }
1492        };
1493
1494        // Remove from node_labels map
1495        let mut node_labels = self.node_labels.write();
1496        if let Some(label_set) = node_labels.get_mut(&node_id) {
1497            if !label_set.remove(&label_id) {
1498                return false; // Node doesn't have this label
1499            }
1500        } else {
1501            return false;
1502        }
1503        drop(node_labels);
1504
1505        // Remove from label_index
1506        let mut index = self.label_index.write();
1507        if (label_id as usize) < index.len() {
1508            index[label_id as usize].remove(&node_id);
1509        }
1510
1511        // Update label count in node record
1512        if let Some(chain) = self.nodes.write().get_mut(&node_id)
1513            && let Some(record) = chain.latest_mut()
1514        {
1515            let count = self.node_labels.read().get(&node_id).map_or(0, |s| s.len());
1516            record.set_label_count(count as u16);
1517        }
1518
1519        true
1520    }
1521
1522    /// Removes a label from a node.
1523    /// (Tiered storage version)
1524    #[cfg(feature = "tiered-storage")]
1525    pub fn remove_label(&self, node_id: NodeId, label: &str) -> bool {
1526        let epoch = self.current_epoch();
1527
1528        // Check if node exists
1529        let versions = self.node_versions.read();
1530        if let Some(index) = versions.get(&node_id) {
1531            if let Some(vref) = index.visible_at(epoch) {
1532                if let Some(record) = self.read_node_record(&vref) {
1533                    if record.is_deleted() {
1534                        return false;
1535                    }
1536                } else {
1537                    return false;
1538                }
1539            } else {
1540                return false;
1541            }
1542        } else {
1543            return false;
1544        }
1545        drop(versions);
1546
1547        // Get label ID
1548        let label_id = {
1549            let label_ids = self.label_to_id.read();
1550            match label_ids.get(label) {
1551                Some(&id) => id,
1552                None => return false, // Label doesn't exist
1553            }
1554        };
1555
1556        // Remove from node_labels map
1557        let mut node_labels = self.node_labels.write();
1558        if let Some(label_set) = node_labels.get_mut(&node_id) {
1559            if !label_set.remove(&label_id) {
1560                return false; // Node doesn't have this label
1561            }
1562        } else {
1563            return false;
1564        }
1565        drop(node_labels);
1566
1567        // Remove from label_index
1568        let mut index = self.label_index.write();
1569        if (label_id as usize) < index.len() {
1570            index[label_id as usize].remove(&node_id);
1571        }
1572
1573        // Note: label_count in record is not updated for tiered storage.
1574
1575        true
1576    }
1577
1578    /// Returns the number of nodes (non-deleted at current epoch).
1579    #[must_use]
1580    #[cfg(not(feature = "tiered-storage"))]
1581    pub fn node_count(&self) -> usize {
1582        let epoch = self.current_epoch();
1583        self.nodes
1584            .read()
1585            .values()
1586            .filter_map(|chain| chain.visible_at(epoch))
1587            .filter(|r| !r.is_deleted())
1588            .count()
1589    }
1590
1591    /// Returns the number of nodes (non-deleted at current epoch).
1592    /// (Tiered storage version)
1593    #[must_use]
1594    #[cfg(feature = "tiered-storage")]
1595    pub fn node_count(&self) -> usize {
1596        let epoch = self.current_epoch();
1597        let versions = self.node_versions.read();
1598        versions
1599            .iter()
1600            .filter(|(_, index)| {
1601                index.visible_at(epoch).map_or(false, |vref| {
1602                    self.read_node_record(&vref)
1603                        .map_or(false, |r| !r.is_deleted())
1604                })
1605            })
1606            .count()
1607    }
1608
1609    /// Returns all node IDs in the store.
1610    ///
1611    /// This returns a snapshot of current node IDs. The returned vector
1612    /// excludes deleted nodes. Results are sorted by NodeId for deterministic
1613    /// iteration order.
1614    #[must_use]
1615    #[cfg(not(feature = "tiered-storage"))]
1616    pub fn node_ids(&self) -> Vec<NodeId> {
1617        let epoch = self.current_epoch();
1618        let mut ids: Vec<NodeId> = self
1619            .nodes
1620            .read()
1621            .iter()
1622            .filter_map(|(id, chain)| {
1623                chain
1624                    .visible_at(epoch)
1625                    .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
1626            })
1627            .collect();
1628        ids.sort_unstable();
1629        ids
1630    }
1631
1632    /// Returns all node IDs in the store.
1633    /// (Tiered storage version)
1634    #[must_use]
1635    #[cfg(feature = "tiered-storage")]
1636    pub fn node_ids(&self) -> Vec<NodeId> {
1637        let epoch = self.current_epoch();
1638        let versions = self.node_versions.read();
1639        let mut ids: Vec<NodeId> = versions
1640            .iter()
1641            .filter_map(|(id, index)| {
1642                index.visible_at(epoch).and_then(|vref| {
1643                    self.read_node_record(&vref)
1644                        .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
1645                })
1646            })
1647            .collect();
1648        ids.sort_unstable();
1649        ids
1650    }
1651
1652    // === Edge Operations ===
1653
1654    /// Creates a new edge.
1655    pub fn create_edge(&self, src: NodeId, dst: NodeId, edge_type: &str) -> EdgeId {
1656        self.needs_stats_recompute.store(true, Ordering::Relaxed);
1657        self.create_edge_versioned(src, dst, edge_type, self.current_epoch(), TxId::SYSTEM)
1658    }
1659
1660    /// Creates a new edge within a transaction context.
1661    #[cfg(not(feature = "tiered-storage"))]
1662    pub fn create_edge_versioned(
1663        &self,
1664        src: NodeId,
1665        dst: NodeId,
1666        edge_type: &str,
1667        epoch: EpochId,
1668        tx_id: TxId,
1669    ) -> EdgeId {
1670        let id = EdgeId::new(self.next_edge_id.fetch_add(1, Ordering::Relaxed));
1671        let type_id = self.get_or_create_edge_type_id(edge_type);
1672
1673        let record = EdgeRecord::new(id, src, dst, type_id, epoch);
1674        let chain = VersionChain::with_initial(record, epoch, tx_id);
1675        self.edges.write().insert(id, chain);
1676
1677        // Update adjacency
1678        self.forward_adj.add_edge(src, dst, id);
1679        if let Some(ref backward) = self.backward_adj {
1680            backward.add_edge(dst, src, id);
1681        }
1682
1683        id
1684    }
1685
1686    /// Creates a new edge within a transaction context.
1687    /// (Tiered storage version)
1688    #[cfg(feature = "tiered-storage")]
1689    pub fn create_edge_versioned(
1690        &self,
1691        src: NodeId,
1692        dst: NodeId,
1693        edge_type: &str,
1694        epoch: EpochId,
1695        tx_id: TxId,
1696    ) -> EdgeId {
1697        let id = EdgeId::new(self.next_edge_id.fetch_add(1, Ordering::Relaxed));
1698        let type_id = self.get_or_create_edge_type_id(edge_type);
1699
1700        let record = EdgeRecord::new(id, src, dst, type_id, epoch);
1701
1702        // Allocate record in arena and get offset (create epoch if needed)
1703        let arena = self.arena_allocator.arena_or_create(epoch);
1704        let (offset, _stored) = arena.alloc_value_with_offset(record);
1705
1706        // Create HotVersionRef pointing to arena data
1707        let hot_ref = HotVersionRef::new(epoch, offset, tx_id);
1708
1709        // Create or update version index
1710        let mut versions = self.edge_versions.write();
1711        if let Some(index) = versions.get_mut(&id) {
1712            index.add_hot(hot_ref);
1713        } else {
1714            versions.insert(id, VersionIndex::with_initial(hot_ref));
1715        }
1716
1717        // Update adjacency
1718        self.forward_adj.add_edge(src, dst, id);
1719        if let Some(ref backward) = self.backward_adj {
1720            backward.add_edge(dst, src, id);
1721        }
1722
1723        id
1724    }
1725
1726    /// Creates a new edge with properties.
1727    pub fn create_edge_with_props(
1728        &self,
1729        src: NodeId,
1730        dst: NodeId,
1731        edge_type: &str,
1732        properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
1733    ) -> EdgeId {
1734        let id = self.create_edge(src, dst, edge_type);
1735
1736        for (key, value) in properties {
1737            self.edge_properties.set(id, key.into(), value.into());
1738        }
1739
1740        id
1741    }
1742
1743    /// Gets an edge by ID (latest visible version).
1744    #[must_use]
1745    pub fn get_edge(&self, id: EdgeId) -> Option<Edge> {
1746        self.get_edge_at_epoch(id, self.current_epoch())
1747    }
1748
1749    /// Gets an edge by ID at a specific epoch.
1750    #[must_use]
1751    #[cfg(not(feature = "tiered-storage"))]
1752    pub fn get_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> Option<Edge> {
1753        let edges = self.edges.read();
1754        let chain = edges.get(&id)?;
1755        let record = chain.visible_at(epoch)?;
1756
1757        if record.is_deleted() {
1758            return None;
1759        }
1760
1761        let edge_type = {
1762            let id_to_type = self.id_to_edge_type.read();
1763            id_to_type.get(record.type_id as usize)?.clone()
1764        };
1765
1766        let mut edge = Edge::new(id, record.src, record.dst, edge_type);
1767
1768        // Get properties
1769        edge.properties = self.edge_properties.get_all(id).into_iter().collect();
1770
1771        Some(edge)
1772    }
1773
1774    /// Gets an edge by ID at a specific epoch.
1775    /// (Tiered storage version)
1776    #[must_use]
1777    #[cfg(feature = "tiered-storage")]
1778    pub fn get_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> Option<Edge> {
1779        let versions = self.edge_versions.read();
1780        let index = versions.get(&id)?;
1781        let version_ref = index.visible_at(epoch)?;
1782
1783        let record = self.read_edge_record(&version_ref)?;
1784
1785        if record.is_deleted() {
1786            return None;
1787        }
1788
1789        let edge_type = {
1790            let id_to_type = self.id_to_edge_type.read();
1791            id_to_type.get(record.type_id as usize)?.clone()
1792        };
1793
1794        let mut edge = Edge::new(id, record.src, record.dst, edge_type);
1795
1796        // Get properties
1797        edge.properties = self.edge_properties.get_all(id).into_iter().collect();
1798
1799        Some(edge)
1800    }
1801
1802    /// Gets an edge visible to a specific transaction.
1803    #[must_use]
1804    #[cfg(not(feature = "tiered-storage"))]
1805    pub fn get_edge_versioned(&self, id: EdgeId, epoch: EpochId, tx_id: TxId) -> Option<Edge> {
1806        let edges = self.edges.read();
1807        let chain = edges.get(&id)?;
1808        let record = chain.visible_to(epoch, tx_id)?;
1809
1810        if record.is_deleted() {
1811            return None;
1812        }
1813
1814        let edge_type = {
1815            let id_to_type = self.id_to_edge_type.read();
1816            id_to_type.get(record.type_id as usize)?.clone()
1817        };
1818
1819        let mut edge = Edge::new(id, record.src, record.dst, edge_type);
1820
1821        // Get properties
1822        edge.properties = self.edge_properties.get_all(id).into_iter().collect();
1823
1824        Some(edge)
1825    }
1826
1827    /// Gets an edge visible to a specific transaction.
1828    /// (Tiered storage version)
1829    #[must_use]
1830    #[cfg(feature = "tiered-storage")]
1831    pub fn get_edge_versioned(&self, id: EdgeId, epoch: EpochId, tx_id: TxId) -> Option<Edge> {
1832        let versions = self.edge_versions.read();
1833        let index = versions.get(&id)?;
1834        let version_ref = index.visible_to(epoch, tx_id)?;
1835
1836        let record = self.read_edge_record(&version_ref)?;
1837
1838        if record.is_deleted() {
1839            return None;
1840        }
1841
1842        let edge_type = {
1843            let id_to_type = self.id_to_edge_type.read();
1844            id_to_type.get(record.type_id as usize)?.clone()
1845        };
1846
1847        let mut edge = Edge::new(id, record.src, record.dst, edge_type);
1848
1849        // Get properties
1850        edge.properties = self.edge_properties.get_all(id).into_iter().collect();
1851
1852        Some(edge)
1853    }
1854
1855    /// Reads an EdgeRecord from arena using a VersionRef.
1856    #[cfg(feature = "tiered-storage")]
1857    #[allow(unsafe_code)]
1858    fn read_edge_record(&self, version_ref: &VersionRef) -> Option<EdgeRecord> {
1859        match version_ref {
1860            VersionRef::Hot(hot_ref) => {
1861                let arena = self.arena_allocator.arena(hot_ref.epoch);
1862                // SAFETY: The offset was returned by alloc_value_with_offset for an EdgeRecord
1863                let record: &EdgeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
1864                Some(*record)
1865            }
1866            VersionRef::Cold(cold_ref) => {
1867                // Read from compressed epoch store
1868                self.epoch_store
1869                    .get_edge(cold_ref.epoch, cold_ref.block_offset, cold_ref.length)
1870            }
1871        }
1872    }
1873
1874    /// Deletes an edge (using latest epoch).
1875    pub fn delete_edge(&self, id: EdgeId) -> bool {
1876        self.needs_stats_recompute.store(true, Ordering::Relaxed);
1877        self.delete_edge_at_epoch(id, self.current_epoch())
1878    }
1879
1880    /// Deletes an edge at a specific epoch.
1881    #[cfg(not(feature = "tiered-storage"))]
1882    pub fn delete_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> bool {
1883        let mut edges = self.edges.write();
1884        if let Some(chain) = edges.get_mut(&id) {
1885            // Get the visible record to check if deleted and get src/dst
1886            let (src, dst) = {
1887                match chain.visible_at(epoch) {
1888                    Some(record) => {
1889                        if record.is_deleted() {
1890                            return false;
1891                        }
1892                        (record.src, record.dst)
1893                    }
1894                    None => return false, // Not visible at this epoch (already deleted)
1895                }
1896            };
1897
1898            // Mark the version chain as deleted
1899            chain.mark_deleted(epoch);
1900
1901            drop(edges); // Release lock
1902
1903            // Mark as deleted in adjacency (soft delete)
1904            self.forward_adj.mark_deleted(src, id);
1905            if let Some(ref backward) = self.backward_adj {
1906                backward.mark_deleted(dst, id);
1907            }
1908
1909            // Remove properties
1910            self.edge_properties.remove_all(id);
1911
1912            true
1913        } else {
1914            false
1915        }
1916    }
1917
1918    /// Deletes an edge at a specific epoch.
1919    /// (Tiered storage version)
1920    #[cfg(feature = "tiered-storage")]
1921    pub fn delete_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> bool {
1922        let mut versions = self.edge_versions.write();
1923        if let Some(index) = versions.get_mut(&id) {
1924            // Get the visible record to check if deleted and get src/dst
1925            let (src, dst) = {
1926                match index.visible_at(epoch) {
1927                    Some(version_ref) => {
1928                        if let Some(record) = self.read_edge_record(&version_ref) {
1929                            if record.is_deleted() {
1930                                return false;
1931                            }
1932                            (record.src, record.dst)
1933                        } else {
1934                            return false;
1935                        }
1936                    }
1937                    None => return false,
1938                }
1939            };
1940
1941            // Mark as deleted in version index
1942            index.mark_deleted(epoch);
1943
1944            drop(versions); // Release lock
1945
1946            // Mark as deleted in adjacency (soft delete)
1947            self.forward_adj.mark_deleted(src, id);
1948            if let Some(ref backward) = self.backward_adj {
1949                backward.mark_deleted(dst, id);
1950            }
1951
1952            // Remove properties
1953            self.edge_properties.remove_all(id);
1954
1955            true
1956        } else {
1957            false
1958        }
1959    }
1960
1961    /// Returns the number of edges (non-deleted at current epoch).
1962    #[must_use]
1963    #[cfg(not(feature = "tiered-storage"))]
1964    pub fn edge_count(&self) -> usize {
1965        let epoch = self.current_epoch();
1966        self.edges
1967            .read()
1968            .values()
1969            .filter_map(|chain| chain.visible_at(epoch))
1970            .filter(|r| !r.is_deleted())
1971            .count()
1972    }
1973
1974    /// Returns the number of edges (non-deleted at current epoch).
1975    /// (Tiered storage version)
1976    #[must_use]
1977    #[cfg(feature = "tiered-storage")]
1978    pub fn edge_count(&self) -> usize {
1979        let epoch = self.current_epoch();
1980        let versions = self.edge_versions.read();
1981        versions
1982            .iter()
1983            .filter(|(_, index)| {
1984                index.visible_at(epoch).map_or(false, |vref| {
1985                    self.read_edge_record(&vref)
1986                        .map_or(false, |r| !r.is_deleted())
1987                })
1988            })
1989            .count()
1990    }
1991
1992    /// Discards all uncommitted versions created by a transaction.
1993    ///
1994    /// This is called during transaction rollback to clean up uncommitted changes.
1995    /// The method removes version chain entries created by the specified transaction.
1996    #[cfg(not(feature = "tiered-storage"))]
1997    pub fn discard_uncommitted_versions(&self, tx_id: TxId) {
1998        // Remove uncommitted node versions
1999        {
2000            let mut nodes = self.nodes.write();
2001            for chain in nodes.values_mut() {
2002                chain.remove_versions_by(tx_id);
2003            }
2004            // Remove completely empty chains (no versions left)
2005            nodes.retain(|_, chain| !chain.is_empty());
2006        }
2007
2008        // Remove uncommitted edge versions
2009        {
2010            let mut edges = self.edges.write();
2011            for chain in edges.values_mut() {
2012                chain.remove_versions_by(tx_id);
2013            }
2014            // Remove completely empty chains (no versions left)
2015            edges.retain(|_, chain| !chain.is_empty());
2016        }
2017    }
2018
2019    /// Discards all uncommitted versions created by a transaction.
2020    /// (Tiered storage version)
2021    #[cfg(feature = "tiered-storage")]
2022    pub fn discard_uncommitted_versions(&self, tx_id: TxId) {
2023        // Remove uncommitted node versions
2024        {
2025            let mut versions = self.node_versions.write();
2026            for index in versions.values_mut() {
2027                index.remove_versions_by(tx_id);
2028            }
2029            // Remove completely empty indexes (no versions left)
2030            versions.retain(|_, index| !index.is_empty());
2031        }
2032
2033        // Remove uncommitted edge versions
2034        {
2035            let mut versions = self.edge_versions.write();
2036            for index in versions.values_mut() {
2037                index.remove_versions_by(tx_id);
2038            }
2039            // Remove completely empty indexes (no versions left)
2040            versions.retain(|_, index| !index.is_empty());
2041        }
2042    }
2043
2044    /// Freezes an epoch from hot (arena) storage to cold (compressed) storage.
2045    ///
2046    /// This is called by the transaction manager when an epoch becomes eligible
2047    /// for freezing (no active transactions can see it). The freeze process:
2048    ///
2049    /// 1. Collects all hot version refs for the epoch
2050    /// 2. Reads the corresponding records from arena
2051    /// 3. Compresses them into a `CompressedEpochBlock`
2052    /// 4. Updates `VersionIndex` entries to point to cold storage
2053    /// 5. The arena can be deallocated after all epochs in it are frozen
2054    ///
2055    /// # Arguments
2056    ///
2057    /// * `epoch` - The epoch to freeze
2058    ///
2059    /// # Returns
2060    ///
2061    /// The number of records frozen (nodes + edges).
2062    #[cfg(feature = "tiered-storage")]
2063    #[allow(unsafe_code)]
2064    pub fn freeze_epoch(&self, epoch: EpochId) -> usize {
2065        // Collect node records to freeze
2066        let mut node_records: Vec<(u64, NodeRecord)> = Vec::new();
2067        let mut node_hot_refs: Vec<(NodeId, HotVersionRef)> = Vec::new();
2068
2069        {
2070            let versions = self.node_versions.read();
2071            for (node_id, index) in versions.iter() {
2072                for hot_ref in index.hot_refs_for_epoch(epoch) {
2073                    let arena = self.arena_allocator.arena(hot_ref.epoch);
2074                    // SAFETY: The offset was returned by alloc_value_with_offset for a NodeRecord
2075                    let record: &NodeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
2076                    node_records.push((node_id.as_u64(), *record));
2077                    node_hot_refs.push((*node_id, *hot_ref));
2078                }
2079            }
2080        }
2081
2082        // Collect edge records to freeze
2083        let mut edge_records: Vec<(u64, EdgeRecord)> = Vec::new();
2084        let mut edge_hot_refs: Vec<(EdgeId, HotVersionRef)> = Vec::new();
2085
2086        {
2087            let versions = self.edge_versions.read();
2088            for (edge_id, index) in versions.iter() {
2089                for hot_ref in index.hot_refs_for_epoch(epoch) {
2090                    let arena = self.arena_allocator.arena(hot_ref.epoch);
2091                    // SAFETY: The offset was returned by alloc_value_with_offset for an EdgeRecord
2092                    let record: &EdgeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
2093                    edge_records.push((edge_id.as_u64(), *record));
2094                    edge_hot_refs.push((*edge_id, *hot_ref));
2095                }
2096            }
2097        }
2098
2099        let total_frozen = node_records.len() + edge_records.len();
2100
2101        if total_frozen == 0 {
2102            return 0;
2103        }
2104
2105        // Freeze to compressed storage
2106        let (node_entries, edge_entries) =
2107            self.epoch_store
2108                .freeze_epoch(epoch, node_records, edge_records);
2109
2110        // Build lookup maps for index entries
2111        let node_entry_map: FxHashMap<u64, _> = node_entries
2112            .iter()
2113            .map(|e| (e.entity_id, (e.offset, e.length)))
2114            .collect();
2115        let edge_entry_map: FxHashMap<u64, _> = edge_entries
2116            .iter()
2117            .map(|e| (e.entity_id, (e.offset, e.length)))
2118            .collect();
2119
2120        // Update version indexes to use cold refs
2121        {
2122            let mut versions = self.node_versions.write();
2123            for (node_id, hot_ref) in &node_hot_refs {
2124                if let Some(index) = versions.get_mut(node_id)
2125                    && let Some(&(offset, length)) = node_entry_map.get(&node_id.as_u64())
2126                {
2127                    let cold_ref = ColdVersionRef {
2128                        epoch,
2129                        block_offset: offset,
2130                        length,
2131                        created_by: hot_ref.created_by,
2132                        deleted_epoch: hot_ref.deleted_epoch,
2133                    };
2134                    index.freeze_epoch(epoch, std::iter::once(cold_ref));
2135                }
2136            }
2137        }
2138
2139        {
2140            let mut versions = self.edge_versions.write();
2141            for (edge_id, hot_ref) in &edge_hot_refs {
2142                if let Some(index) = versions.get_mut(edge_id)
2143                    && let Some(&(offset, length)) = edge_entry_map.get(&edge_id.as_u64())
2144                {
2145                    let cold_ref = ColdVersionRef {
2146                        epoch,
2147                        block_offset: offset,
2148                        length,
2149                        created_by: hot_ref.created_by,
2150                        deleted_epoch: hot_ref.deleted_epoch,
2151                    };
2152                    index.freeze_epoch(epoch, std::iter::once(cold_ref));
2153                }
2154            }
2155        }
2156
2157        total_frozen
2158    }
2159
2160    /// Returns the epoch store for cold storage statistics.
2161    #[cfg(feature = "tiered-storage")]
2162    #[must_use]
2163    pub fn epoch_store(&self) -> &EpochStore {
2164        &self.epoch_store
2165    }
2166
2167    /// Returns the number of distinct labels in the store.
2168    #[must_use]
2169    pub fn label_count(&self) -> usize {
2170        self.id_to_label.read().len()
2171    }
2172
2173    /// Returns the number of distinct property keys in the store.
2174    ///
2175    /// This counts unique property keys across both nodes and edges.
2176    #[must_use]
2177    pub fn property_key_count(&self) -> usize {
2178        let node_keys = self.node_properties.column_count();
2179        let edge_keys = self.edge_properties.column_count();
2180        // Note: This may count some keys twice if the same key is used
2181        // for both nodes and edges. A more precise count would require
2182        // tracking unique keys across both storages.
2183        node_keys + edge_keys
2184    }
2185
2186    /// Returns the number of distinct edge types in the store.
2187    #[must_use]
2188    pub fn edge_type_count(&self) -> usize {
2189        self.id_to_edge_type.read().len()
2190    }
2191
2192    // === Traversal ===
2193
2194    /// Iterates over neighbors of a node in the specified direction.
2195    ///
2196    /// This is the fast path for graph traversal - goes straight to the
2197    /// adjacency index without loading full node data.
2198    pub fn neighbors(
2199        &self,
2200        node: NodeId,
2201        direction: Direction,
2202    ) -> impl Iterator<Item = NodeId> + '_ {
2203        let forward: Box<dyn Iterator<Item = NodeId>> = match direction {
2204            Direction::Outgoing | Direction::Both => {
2205                Box::new(self.forward_adj.neighbors(node).into_iter())
2206            }
2207            Direction::Incoming => Box::new(std::iter::empty()),
2208        };
2209
2210        let backward: Box<dyn Iterator<Item = NodeId>> = match direction {
2211            Direction::Incoming | Direction::Both => {
2212                if let Some(ref adj) = self.backward_adj {
2213                    Box::new(adj.neighbors(node).into_iter())
2214                } else {
2215                    Box::new(std::iter::empty())
2216                }
2217            }
2218            Direction::Outgoing => Box::new(std::iter::empty()),
2219        };
2220
2221        forward.chain(backward)
2222    }
2223
2224    /// Returns edges from a node with their targets.
2225    ///
2226    /// Returns an iterator of (target_node, edge_id) pairs.
2227    pub fn edges_from(
2228        &self,
2229        node: NodeId,
2230        direction: Direction,
2231    ) -> impl Iterator<Item = (NodeId, EdgeId)> + '_ {
2232        let forward: Box<dyn Iterator<Item = (NodeId, EdgeId)>> = match direction {
2233            Direction::Outgoing | Direction::Both => {
2234                Box::new(self.forward_adj.edges_from(node).into_iter())
2235            }
2236            Direction::Incoming => Box::new(std::iter::empty()),
2237        };
2238
2239        let backward: Box<dyn Iterator<Item = (NodeId, EdgeId)>> = match direction {
2240            Direction::Incoming | Direction::Both => {
2241                if let Some(ref adj) = self.backward_adj {
2242                    Box::new(adj.edges_from(node).into_iter())
2243                } else {
2244                    Box::new(std::iter::empty())
2245                }
2246            }
2247            Direction::Outgoing => Box::new(std::iter::empty()),
2248        };
2249
2250        forward.chain(backward)
2251    }
2252
2253    /// Returns edges to a node (where the node is the destination).
2254    ///
2255    /// Returns (source_node, edge_id) pairs for all edges pointing TO this node.
2256    /// Uses the backward adjacency index for O(degree) lookup.
2257    ///
2258    /// # Example
2259    ///
2260    /// ```ignore
2261    /// // For edges: A->B, C->B
2262    /// let incoming = store.edges_to(B);
2263    /// // Returns: [(A, edge1), (C, edge2)]
2264    /// ```
2265    pub fn edges_to(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
2266        if let Some(ref backward) = self.backward_adj {
2267            backward.edges_from(node)
2268        } else {
2269            // Fallback: scan all edges (slow but correct)
2270            self.all_edges()
2271                .filter_map(|edge| {
2272                    if edge.dst == node {
2273                        Some((edge.src, edge.id))
2274                    } else {
2275                        None
2276                    }
2277                })
2278                .collect()
2279        }
2280    }
2281
2282    /// Returns the out-degree of a node (number of outgoing edges).
2283    ///
2284    /// Uses the forward adjacency index for O(1) lookup.
2285    #[must_use]
2286    pub fn out_degree(&self, node: NodeId) -> usize {
2287        self.forward_adj.out_degree(node)
2288    }
2289
2290    /// Returns the in-degree of a node (number of incoming edges).
2291    ///
2292    /// Uses the backward adjacency index for O(1) lookup if available,
2293    /// otherwise falls back to scanning edges.
2294    #[must_use]
2295    pub fn in_degree(&self, node: NodeId) -> usize {
2296        if let Some(ref backward) = self.backward_adj {
2297            backward.in_degree(node)
2298        } else {
2299            // Fallback: count edges (slow)
2300            self.all_edges().filter(|edge| edge.dst == node).count()
2301        }
2302    }
2303
2304    /// Gets the type of an edge by ID.
2305    #[must_use]
2306    #[cfg(not(feature = "tiered-storage"))]
2307    pub fn edge_type(&self, id: EdgeId) -> Option<ArcStr> {
2308        let edges = self.edges.read();
2309        let chain = edges.get(&id)?;
2310        let epoch = self.current_epoch();
2311        let record = chain.visible_at(epoch)?;
2312        let id_to_type = self.id_to_edge_type.read();
2313        id_to_type.get(record.type_id as usize).cloned()
2314    }
2315
2316    /// Gets the type of an edge by ID.
2317    /// (Tiered storage version)
2318    #[must_use]
2319    #[cfg(feature = "tiered-storage")]
2320    pub fn edge_type(&self, id: EdgeId) -> Option<ArcStr> {
2321        let versions = self.edge_versions.read();
2322        let index = versions.get(&id)?;
2323        let epoch = self.current_epoch();
2324        let vref = index.visible_at(epoch)?;
2325        let record = self.read_edge_record(&vref)?;
2326        let id_to_type = self.id_to_edge_type.read();
2327        id_to_type.get(record.type_id as usize).cloned()
2328    }
2329
2330    /// Returns all nodes with a specific label.
2331    ///
2332    /// Uses the label index for O(1) lookup per label. Returns a snapshot -
2333    /// concurrent modifications won't affect the returned vector. Results are
2334    /// sorted by NodeId for deterministic iteration order.
2335    pub fn nodes_by_label(&self, label: &str) -> Vec<NodeId> {
2336        let label_to_id = self.label_to_id.read();
2337        if let Some(&label_id) = label_to_id.get(label) {
2338            let index = self.label_index.read();
2339            if let Some(set) = index.get(label_id as usize) {
2340                let mut ids: Vec<NodeId> = set.keys().copied().collect();
2341                ids.sort_unstable();
2342                return ids;
2343            }
2344        }
2345        Vec::new()
2346    }
2347
2348    // === Admin API: Iteration ===
2349
2350    /// Returns an iterator over all nodes in the database.
2351    ///
2352    /// This creates a snapshot of all visible nodes at the current epoch.
2353    /// Useful for dump/export operations.
2354    #[cfg(not(feature = "tiered-storage"))]
2355    pub fn all_nodes(&self) -> impl Iterator<Item = Node> + '_ {
2356        let epoch = self.current_epoch();
2357        let node_ids: Vec<NodeId> = self
2358            .nodes
2359            .read()
2360            .iter()
2361            .filter_map(|(id, chain)| {
2362                chain
2363                    .visible_at(epoch)
2364                    .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
2365            })
2366            .collect();
2367
2368        node_ids.into_iter().filter_map(move |id| self.get_node(id))
2369    }
2370
2371    /// Returns an iterator over all nodes in the database.
2372    /// (Tiered storage version)
2373    #[cfg(feature = "tiered-storage")]
2374    pub fn all_nodes(&self) -> impl Iterator<Item = Node> + '_ {
2375        let node_ids = self.node_ids();
2376        node_ids.into_iter().filter_map(move |id| self.get_node(id))
2377    }
2378
2379    /// Returns an iterator over all edges in the database.
2380    ///
2381    /// This creates a snapshot of all visible edges at the current epoch.
2382    /// Useful for dump/export operations.
2383    #[cfg(not(feature = "tiered-storage"))]
2384    pub fn all_edges(&self) -> impl Iterator<Item = Edge> + '_ {
2385        let epoch = self.current_epoch();
2386        let edge_ids: Vec<EdgeId> = self
2387            .edges
2388            .read()
2389            .iter()
2390            .filter_map(|(id, chain)| {
2391                chain
2392                    .visible_at(epoch)
2393                    .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
2394            })
2395            .collect();
2396
2397        edge_ids.into_iter().filter_map(move |id| self.get_edge(id))
2398    }
2399
2400    /// Returns an iterator over all edges in the database.
2401    /// (Tiered storage version)
2402    #[cfg(feature = "tiered-storage")]
2403    pub fn all_edges(&self) -> impl Iterator<Item = Edge> + '_ {
2404        let epoch = self.current_epoch();
2405        let versions = self.edge_versions.read();
2406        let edge_ids: Vec<EdgeId> = versions
2407            .iter()
2408            .filter_map(|(id, index)| {
2409                index.visible_at(epoch).and_then(|vref| {
2410                    self.read_edge_record(&vref)
2411                        .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
2412                })
2413            })
2414            .collect();
2415
2416        edge_ids.into_iter().filter_map(move |id| self.get_edge(id))
2417    }
2418
2419    /// Returns all label names in the database.
2420    pub fn all_labels(&self) -> Vec<String> {
2421        self.id_to_label
2422            .read()
2423            .iter()
2424            .map(|s| s.to_string())
2425            .collect()
2426    }
2427
2428    /// Returns all edge type names in the database.
2429    pub fn all_edge_types(&self) -> Vec<String> {
2430        self.id_to_edge_type
2431            .read()
2432            .iter()
2433            .map(|s| s.to_string())
2434            .collect()
2435    }
2436
2437    /// Returns all property keys used in the database.
2438    pub fn all_property_keys(&self) -> Vec<String> {
2439        let mut keys = std::collections::HashSet::new();
2440        for key in self.node_properties.keys() {
2441            keys.insert(key.to_string());
2442        }
2443        for key in self.edge_properties.keys() {
2444            keys.insert(key.to_string());
2445        }
2446        keys.into_iter().collect()
2447    }
2448
2449    /// Returns an iterator over nodes with a specific label.
2450    pub fn nodes_with_label<'a>(&'a self, label: &str) -> impl Iterator<Item = Node> + 'a {
2451        let node_ids = self.nodes_by_label(label);
2452        node_ids.into_iter().filter_map(move |id| self.get_node(id))
2453    }
2454
2455    /// Returns an iterator over edges with a specific type.
2456    #[cfg(not(feature = "tiered-storage"))]
2457    pub fn edges_with_type<'a>(&'a self, edge_type: &str) -> impl Iterator<Item = Edge> + 'a {
2458        let epoch = self.current_epoch();
2459        let type_to_id = self.edge_type_to_id.read();
2460
2461        if let Some(&type_id) = type_to_id.get(edge_type) {
2462            let edge_ids: Vec<EdgeId> = self
2463                .edges
2464                .read()
2465                .iter()
2466                .filter_map(|(id, chain)| {
2467                    chain.visible_at(epoch).and_then(|r| {
2468                        if !r.is_deleted() && r.type_id == type_id {
2469                            Some(*id)
2470                        } else {
2471                            None
2472                        }
2473                    })
2474                })
2475                .collect();
2476
2477            // Return a boxed iterator for the found edges
2478            Box::new(edge_ids.into_iter().filter_map(move |id| self.get_edge(id)))
2479                as Box<dyn Iterator<Item = Edge> + 'a>
2480        } else {
2481            // Return empty iterator
2482            Box::new(std::iter::empty()) as Box<dyn Iterator<Item = Edge> + 'a>
2483        }
2484    }
2485
2486    /// Returns an iterator over edges with a specific type.
2487    /// (Tiered storage version)
2488    #[cfg(feature = "tiered-storage")]
2489    pub fn edges_with_type<'a>(&'a self, edge_type: &str) -> impl Iterator<Item = Edge> + 'a {
2490        let epoch = self.current_epoch();
2491        let type_to_id = self.edge_type_to_id.read();
2492
2493        if let Some(&type_id) = type_to_id.get(edge_type) {
2494            let versions = self.edge_versions.read();
2495            let edge_ids: Vec<EdgeId> = versions
2496                .iter()
2497                .filter_map(|(id, index)| {
2498                    index.visible_at(epoch).and_then(|vref| {
2499                        self.read_edge_record(&vref).and_then(|r| {
2500                            if !r.is_deleted() && r.type_id == type_id {
2501                                Some(*id)
2502                            } else {
2503                                None
2504                            }
2505                        })
2506                    })
2507                })
2508                .collect();
2509
2510            Box::new(edge_ids.into_iter().filter_map(move |id| self.get_edge(id)))
2511                as Box<dyn Iterator<Item = Edge> + 'a>
2512        } else {
2513            Box::new(std::iter::empty()) as Box<dyn Iterator<Item = Edge> + 'a>
2514        }
2515    }
2516
2517    // === Zone Map Support ===
2518
2519    /// Checks if a node property predicate might match any nodes.
2520    ///
2521    /// Uses zone maps for early filtering. Returns `true` if there might be
2522    /// matching nodes, `false` if there definitely aren't.
2523    #[must_use]
2524    pub fn node_property_might_match(
2525        &self,
2526        property: &PropertyKey,
2527        op: CompareOp,
2528        value: &Value,
2529    ) -> bool {
2530        self.node_properties.might_match(property, op, value)
2531    }
2532
2533    /// Checks if an edge property predicate might match any edges.
2534    #[must_use]
2535    pub fn edge_property_might_match(
2536        &self,
2537        property: &PropertyKey,
2538        op: CompareOp,
2539        value: &Value,
2540    ) -> bool {
2541        self.edge_properties.might_match(property, op, value)
2542    }
2543
2544    /// Gets the zone map for a node property.
2545    #[must_use]
2546    pub fn node_property_zone_map(&self, property: &PropertyKey) -> Option<ZoneMapEntry> {
2547        self.node_properties.zone_map(property)
2548    }
2549
2550    /// Gets the zone map for an edge property.
2551    #[must_use]
2552    pub fn edge_property_zone_map(&self, property: &PropertyKey) -> Option<ZoneMapEntry> {
2553        self.edge_properties.zone_map(property)
2554    }
2555
2556    /// Rebuilds zone maps for all properties.
2557    pub fn rebuild_zone_maps(&self) {
2558        self.node_properties.rebuild_zone_maps();
2559        self.edge_properties.rebuild_zone_maps();
2560    }
2561
2562    // === Statistics ===
2563
2564    /// Returns the current statistics.
2565    #[must_use]
2566    pub fn statistics(&self) -> Statistics {
2567        self.statistics.read().clone()
2568    }
2569
2570    /// Recomputes statistics if they are stale (i.e., after mutations).
2571    ///
2572    /// Call this before reading statistics for query optimization.
2573    /// Avoids redundant recomputation if no mutations occurred.
2574    pub fn ensure_statistics_fresh(&self) {
2575        if self.needs_stats_recompute.swap(false, Ordering::Relaxed) {
2576            self.compute_statistics();
2577        }
2578    }
2579
2580    /// Recomputes statistics from current data.
2581    ///
2582    /// Scans all labels and edge types to build cardinality estimates for the
2583    /// query optimizer. Call this periodically or after bulk data loads.
2584    #[cfg(not(feature = "tiered-storage"))]
2585    pub fn compute_statistics(&self) {
2586        let mut stats = Statistics::new();
2587
2588        // Compute total counts
2589        stats.total_nodes = self.node_count() as u64;
2590        stats.total_edges = self.edge_count() as u64;
2591
2592        // Compute per-label statistics
2593        let id_to_label = self.id_to_label.read();
2594        let label_index = self.label_index.read();
2595
2596        for (label_id, label_name) in id_to_label.iter().enumerate() {
2597            let node_count = label_index.get(label_id).map_or(0, |set| set.len() as u64);
2598
2599            if node_count > 0 {
2600                // Estimate average degree
2601                let avg_out_degree = if stats.total_nodes > 0 {
2602                    stats.total_edges as f64 / stats.total_nodes as f64
2603                } else {
2604                    0.0
2605                };
2606
2607                let label_stats =
2608                    LabelStatistics::new(node_count).with_degrees(avg_out_degree, avg_out_degree);
2609
2610                stats.update_label(label_name.as_ref(), label_stats);
2611            }
2612        }
2613
2614        // Compute per-edge-type statistics
2615        let id_to_edge_type = self.id_to_edge_type.read();
2616        let edges = self.edges.read();
2617        let epoch = self.current_epoch();
2618
2619        let mut edge_type_counts: FxHashMap<u32, u64> = FxHashMap::default();
2620        for chain in edges.values() {
2621            if let Some(record) = chain.visible_at(epoch)
2622                && !record.is_deleted()
2623            {
2624                *edge_type_counts.entry(record.type_id).or_default() += 1;
2625            }
2626        }
2627
2628        for (type_id, count) in edge_type_counts {
2629            if let Some(type_name) = id_to_edge_type.get(type_id as usize) {
2630                let avg_degree = if stats.total_nodes > 0 {
2631                    count as f64 / stats.total_nodes as f64
2632                } else {
2633                    0.0
2634                };
2635
2636                let edge_stats = EdgeTypeStatistics::new(count, avg_degree, avg_degree);
2637                stats.update_edge_type(type_name.as_ref(), edge_stats);
2638            }
2639        }
2640
2641        *self.statistics.write() = stats;
2642    }
2643
2644    /// Recomputes statistics from current data.
2645    /// (Tiered storage version)
2646    #[cfg(feature = "tiered-storage")]
2647    pub fn compute_statistics(&self) {
2648        let mut stats = Statistics::new();
2649
2650        // Compute total counts
2651        stats.total_nodes = self.node_count() as u64;
2652        stats.total_edges = self.edge_count() as u64;
2653
2654        // Compute per-label statistics
2655        let id_to_label = self.id_to_label.read();
2656        let label_index = self.label_index.read();
2657
2658        for (label_id, label_name) in id_to_label.iter().enumerate() {
2659            let node_count = label_index.get(label_id).map_or(0, |set| set.len() as u64);
2660
2661            if node_count > 0 {
2662                let avg_out_degree = if stats.total_nodes > 0 {
2663                    stats.total_edges as f64 / stats.total_nodes as f64
2664                } else {
2665                    0.0
2666                };
2667
2668                let label_stats =
2669                    LabelStatistics::new(node_count).with_degrees(avg_out_degree, avg_out_degree);
2670
2671                stats.update_label(label_name.as_ref(), label_stats);
2672            }
2673        }
2674
2675        // Compute per-edge-type statistics
2676        let id_to_edge_type = self.id_to_edge_type.read();
2677        let versions = self.edge_versions.read();
2678        let epoch = self.current_epoch();
2679
2680        let mut edge_type_counts: FxHashMap<u32, u64> = FxHashMap::default();
2681        for index in versions.values() {
2682            if let Some(vref) = index.visible_at(epoch)
2683                && let Some(record) = self.read_edge_record(&vref)
2684                && !record.is_deleted()
2685            {
2686                *edge_type_counts.entry(record.type_id).or_default() += 1;
2687            }
2688        }
2689
2690        for (type_id, count) in edge_type_counts {
2691            if let Some(type_name) = id_to_edge_type.get(type_id as usize) {
2692                let avg_degree = if stats.total_nodes > 0 {
2693                    count as f64 / stats.total_nodes as f64
2694                } else {
2695                    0.0
2696                };
2697
2698                let edge_stats = EdgeTypeStatistics::new(count, avg_degree, avg_degree);
2699                stats.update_edge_type(type_name.as_ref(), edge_stats);
2700            }
2701        }
2702
2703        *self.statistics.write() = stats;
2704    }
2705
2706    /// Estimates cardinality for a label scan.
2707    #[must_use]
2708    pub fn estimate_label_cardinality(&self, label: &str) -> f64 {
2709        self.statistics.read().estimate_label_cardinality(label)
2710    }
2711
2712    /// Estimates average degree for an edge type.
2713    #[must_use]
2714    pub fn estimate_avg_degree(&self, edge_type: &str, outgoing: bool) -> f64 {
2715        self.statistics
2716            .read()
2717            .estimate_avg_degree(edge_type, outgoing)
2718    }
2719
2720    // === Internal Helpers ===
2721
2722    fn get_or_create_label_id(&self, label: &str) -> u32 {
2723        {
2724            let label_to_id = self.label_to_id.read();
2725            if let Some(&id) = label_to_id.get(label) {
2726                return id;
2727            }
2728        }
2729
2730        let mut label_to_id = self.label_to_id.write();
2731        let mut id_to_label = self.id_to_label.write();
2732
2733        // Double-check after acquiring write lock
2734        if let Some(&id) = label_to_id.get(label) {
2735            return id;
2736        }
2737
2738        let id = id_to_label.len() as u32;
2739
2740        let label: ArcStr = label.into();
2741        label_to_id.insert(label.clone(), id);
2742        id_to_label.push(label);
2743
2744        id
2745    }
2746
2747    fn get_or_create_edge_type_id(&self, edge_type: &str) -> u32 {
2748        {
2749            let type_to_id = self.edge_type_to_id.read();
2750            if let Some(&id) = type_to_id.get(edge_type) {
2751                return id;
2752            }
2753        }
2754
2755        let mut type_to_id = self.edge_type_to_id.write();
2756        let mut id_to_type = self.id_to_edge_type.write();
2757
2758        // Double-check
2759        if let Some(&id) = type_to_id.get(edge_type) {
2760            return id;
2761        }
2762
2763        let id = id_to_type.len() as u32;
2764        let edge_type: ArcStr = edge_type.into();
2765        type_to_id.insert(edge_type.clone(), id);
2766        id_to_type.push(edge_type);
2767
2768        id
2769    }
2770
2771    // === Recovery Support ===
2772
2773    /// Creates a node with a specific ID during recovery.
2774    ///
2775    /// This is used for WAL recovery to restore nodes with their original IDs.
2776    /// The caller must ensure IDs don't conflict with existing nodes.
2777    #[cfg(not(feature = "tiered-storage"))]
2778    pub fn create_node_with_id(&self, id: NodeId, labels: &[&str]) {
2779        let epoch = self.current_epoch();
2780        let mut record = NodeRecord::new(id, epoch);
2781        record.set_label_count(labels.len() as u16);
2782
2783        // Store labels in node_labels map and label_index
2784        let mut node_label_set = FxHashSet::default();
2785        for label in labels {
2786            let label_id = self.get_or_create_label_id(*label);
2787            node_label_set.insert(label_id);
2788
2789            // Update label index
2790            let mut index = self.label_index.write();
2791            while index.len() <= label_id as usize {
2792                index.push(FxHashMap::default());
2793            }
2794            index[label_id as usize].insert(id, ());
2795        }
2796
2797        // Store node's labels
2798        self.node_labels.write().insert(id, node_label_set);
2799
2800        // Create version chain with initial version (using SYSTEM tx for recovery)
2801        let chain = VersionChain::with_initial(record, epoch, TxId::SYSTEM);
2802        self.nodes.write().insert(id, chain);
2803
2804        // Update next_node_id if necessary to avoid future collisions
2805        let id_val = id.as_u64();
2806        let _ = self
2807            .next_node_id
2808            .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
2809                if id_val >= current {
2810                    Some(id_val + 1)
2811                } else {
2812                    None
2813                }
2814            });
2815    }
2816
2817    /// Creates a node with a specific ID during recovery.
2818    /// (Tiered storage version)
2819    #[cfg(feature = "tiered-storage")]
2820    pub fn create_node_with_id(&self, id: NodeId, labels: &[&str]) {
2821        let epoch = self.current_epoch();
2822        let mut record = NodeRecord::new(id, epoch);
2823        record.set_label_count(labels.len() as u16);
2824
2825        // Store labels in node_labels map and label_index
2826        let mut node_label_set = FxHashSet::default();
2827        for label in labels {
2828            let label_id = self.get_or_create_label_id(*label);
2829            node_label_set.insert(label_id);
2830
2831            // Update label index
2832            let mut index = self.label_index.write();
2833            while index.len() <= label_id as usize {
2834                index.push(FxHashMap::default());
2835            }
2836            index[label_id as usize].insert(id, ());
2837        }
2838
2839        // Store node's labels
2840        self.node_labels.write().insert(id, node_label_set);
2841
2842        // Allocate record in arena and get offset (create epoch if needed)
2843        let arena = self.arena_allocator.arena_or_create(epoch);
2844        let (offset, _stored) = arena.alloc_value_with_offset(record);
2845
2846        // Create HotVersionRef (using SYSTEM tx for recovery)
2847        let hot_ref = HotVersionRef::new(epoch, offset, TxId::SYSTEM);
2848        let mut versions = self.node_versions.write();
2849        versions.insert(id, VersionIndex::with_initial(hot_ref));
2850
2851        // Update next_node_id if necessary to avoid future collisions
2852        let id_val = id.as_u64();
2853        let _ = self
2854            .next_node_id
2855            .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
2856                if id_val >= current {
2857                    Some(id_val + 1)
2858                } else {
2859                    None
2860                }
2861            });
2862    }
2863
2864    /// Creates an edge with a specific ID during recovery.
2865    ///
2866    /// This is used for WAL recovery to restore edges with their original IDs.
2867    #[cfg(not(feature = "tiered-storage"))]
2868    pub fn create_edge_with_id(&self, id: EdgeId, src: NodeId, dst: NodeId, edge_type: &str) {
2869        let epoch = self.current_epoch();
2870        let type_id = self.get_or_create_edge_type_id(edge_type);
2871
2872        let record = EdgeRecord::new(id, src, dst, type_id, epoch);
2873        let chain = VersionChain::with_initial(record, epoch, TxId::SYSTEM);
2874        self.edges.write().insert(id, chain);
2875
2876        // Update adjacency
2877        self.forward_adj.add_edge(src, dst, id);
2878        if let Some(ref backward) = self.backward_adj {
2879            backward.add_edge(dst, src, id);
2880        }
2881
2882        // Update next_edge_id if necessary
2883        let id_val = id.as_u64();
2884        let _ = self
2885            .next_edge_id
2886            .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
2887                if id_val >= current {
2888                    Some(id_val + 1)
2889                } else {
2890                    None
2891                }
2892            });
2893    }
2894
2895    /// Creates an edge with a specific ID during recovery.
2896    /// (Tiered storage version)
2897    #[cfg(feature = "tiered-storage")]
2898    pub fn create_edge_with_id(&self, id: EdgeId, src: NodeId, dst: NodeId, edge_type: &str) {
2899        let epoch = self.current_epoch();
2900        let type_id = self.get_or_create_edge_type_id(edge_type);
2901
2902        let record = EdgeRecord::new(id, src, dst, type_id, epoch);
2903
2904        // Allocate record in arena and get offset (create epoch if needed)
2905        let arena = self.arena_allocator.arena_or_create(epoch);
2906        let (offset, _stored) = arena.alloc_value_with_offset(record);
2907
2908        // Create HotVersionRef (using SYSTEM tx for recovery)
2909        let hot_ref = HotVersionRef::new(epoch, offset, TxId::SYSTEM);
2910        let mut versions = self.edge_versions.write();
2911        versions.insert(id, VersionIndex::with_initial(hot_ref));
2912
2913        // Update adjacency
2914        self.forward_adj.add_edge(src, dst, id);
2915        if let Some(ref backward) = self.backward_adj {
2916            backward.add_edge(dst, src, id);
2917        }
2918
2919        // Update next_edge_id if necessary
2920        let id_val = id.as_u64();
2921        let _ = self
2922            .next_edge_id
2923            .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
2924                if id_val >= current {
2925                    Some(id_val + 1)
2926                } else {
2927                    None
2928                }
2929            });
2930    }
2931
2932    /// Sets the current epoch during recovery.
2933    pub fn set_epoch(&self, epoch: EpochId) {
2934        self.current_epoch.store(epoch.as_u64(), Ordering::SeqCst);
2935    }
2936}
2937
2938impl Default for LpgStore {
2939    fn default() -> Self {
2940        Self::new()
2941    }
2942}
2943
2944#[cfg(test)]
2945mod tests {
2946    use super::*;
2947
2948    #[test]
2949    fn test_create_node() {
2950        let store = LpgStore::new();
2951
2952        let id = store.create_node(&["Person"]);
2953        assert!(id.is_valid());
2954
2955        let node = store.get_node(id).unwrap();
2956        assert!(node.has_label("Person"));
2957        assert!(!node.has_label("Animal"));
2958    }
2959
2960    #[test]
2961    fn test_create_node_with_props() {
2962        let store = LpgStore::new();
2963
2964        let id = store.create_node_with_props(
2965            &["Person"],
2966            [("name", Value::from("Alice")), ("age", Value::from(30i64))],
2967        );
2968
2969        let node = store.get_node(id).unwrap();
2970        assert_eq!(
2971            node.get_property("name").and_then(|v| v.as_str()),
2972            Some("Alice")
2973        );
2974        assert_eq!(
2975            node.get_property("age").and_then(|v| v.as_int64()),
2976            Some(30)
2977        );
2978    }
2979
2980    #[test]
2981    fn test_delete_node() {
2982        let store = LpgStore::new();
2983
2984        let id = store.create_node(&["Person"]);
2985        assert_eq!(store.node_count(), 1);
2986
2987        assert!(store.delete_node(id));
2988        assert_eq!(store.node_count(), 0);
2989        assert!(store.get_node(id).is_none());
2990
2991        // Double delete should return false
2992        assert!(!store.delete_node(id));
2993    }
2994
2995    #[test]
2996    fn test_create_edge() {
2997        let store = LpgStore::new();
2998
2999        let alice = store.create_node(&["Person"]);
3000        let bob = store.create_node(&["Person"]);
3001
3002        let edge_id = store.create_edge(alice, bob, "KNOWS");
3003        assert!(edge_id.is_valid());
3004
3005        let edge = store.get_edge(edge_id).unwrap();
3006        assert_eq!(edge.src, alice);
3007        assert_eq!(edge.dst, bob);
3008        assert_eq!(edge.edge_type.as_str(), "KNOWS");
3009    }
3010
3011    #[test]
3012    fn test_neighbors() {
3013        let store = LpgStore::new();
3014
3015        let a = store.create_node(&["Person"]);
3016        let b = store.create_node(&["Person"]);
3017        let c = store.create_node(&["Person"]);
3018
3019        store.create_edge(a, b, "KNOWS");
3020        store.create_edge(a, c, "KNOWS");
3021
3022        let outgoing: Vec<_> = store.neighbors(a, Direction::Outgoing).collect();
3023        assert_eq!(outgoing.len(), 2);
3024        assert!(outgoing.contains(&b));
3025        assert!(outgoing.contains(&c));
3026
3027        let incoming: Vec<_> = store.neighbors(b, Direction::Incoming).collect();
3028        assert_eq!(incoming.len(), 1);
3029        assert!(incoming.contains(&a));
3030    }
3031
3032    #[test]
3033    fn test_nodes_by_label() {
3034        let store = LpgStore::new();
3035
3036        let p1 = store.create_node(&["Person"]);
3037        let p2 = store.create_node(&["Person"]);
3038        let _a = store.create_node(&["Animal"]);
3039
3040        let persons = store.nodes_by_label("Person");
3041        assert_eq!(persons.len(), 2);
3042        assert!(persons.contains(&p1));
3043        assert!(persons.contains(&p2));
3044
3045        let animals = store.nodes_by_label("Animal");
3046        assert_eq!(animals.len(), 1);
3047    }
3048
3049    #[test]
3050    fn test_delete_edge() {
3051        let store = LpgStore::new();
3052
3053        let a = store.create_node(&["Person"]);
3054        let b = store.create_node(&["Person"]);
3055        let edge_id = store.create_edge(a, b, "KNOWS");
3056
3057        assert_eq!(store.edge_count(), 1);
3058
3059        assert!(store.delete_edge(edge_id));
3060        assert_eq!(store.edge_count(), 0);
3061        assert!(store.get_edge(edge_id).is_none());
3062    }
3063
3064    // === New tests for improved coverage ===
3065
3066    #[test]
3067    fn test_lpg_store_config() {
3068        // Test with_config
3069        let config = LpgStoreConfig {
3070            backward_edges: false,
3071            initial_node_capacity: 100,
3072            initial_edge_capacity: 200,
3073        };
3074        let store = LpgStore::with_config(config);
3075
3076        // Store should work but without backward adjacency
3077        let a = store.create_node(&["Person"]);
3078        let b = store.create_node(&["Person"]);
3079        store.create_edge(a, b, "KNOWS");
3080
3081        // Outgoing should work
3082        let outgoing: Vec<_> = store.neighbors(a, Direction::Outgoing).collect();
3083        assert_eq!(outgoing.len(), 1);
3084
3085        // Incoming should be empty (no backward adjacency)
3086        let incoming: Vec<_> = store.neighbors(b, Direction::Incoming).collect();
3087        assert_eq!(incoming.len(), 0);
3088    }
3089
3090    #[test]
3091    fn test_epoch_management() {
3092        let store = LpgStore::new();
3093
3094        let epoch0 = store.current_epoch();
3095        assert_eq!(epoch0.as_u64(), 0);
3096
3097        let epoch1 = store.new_epoch();
3098        assert_eq!(epoch1.as_u64(), 1);
3099
3100        let current = store.current_epoch();
3101        assert_eq!(current.as_u64(), 1);
3102    }
3103
3104    #[test]
3105    fn test_node_properties() {
3106        let store = LpgStore::new();
3107        let id = store.create_node(&["Person"]);
3108
3109        // Set and get property
3110        store.set_node_property(id, "name", Value::from("Alice"));
3111        let name = store.get_node_property(id, &"name".into());
3112        assert!(matches!(name, Some(Value::String(s)) if s.as_str() == "Alice"));
3113
3114        // Update property
3115        store.set_node_property(id, "name", Value::from("Bob"));
3116        let name = store.get_node_property(id, &"name".into());
3117        assert!(matches!(name, Some(Value::String(s)) if s.as_str() == "Bob"));
3118
3119        // Remove property
3120        let old = store.remove_node_property(id, "name");
3121        assert!(matches!(old, Some(Value::String(s)) if s.as_str() == "Bob"));
3122
3123        // Property should be gone
3124        let name = store.get_node_property(id, &"name".into());
3125        assert!(name.is_none());
3126
3127        // Remove non-existent property
3128        let none = store.remove_node_property(id, "nonexistent");
3129        assert!(none.is_none());
3130    }
3131
3132    #[test]
3133    fn test_edge_properties() {
3134        let store = LpgStore::new();
3135        let a = store.create_node(&["Person"]);
3136        let b = store.create_node(&["Person"]);
3137        let edge_id = store.create_edge(a, b, "KNOWS");
3138
3139        // Set and get property
3140        store.set_edge_property(edge_id, "since", Value::from(2020i64));
3141        let since = store.get_edge_property(edge_id, &"since".into());
3142        assert_eq!(since.and_then(|v| v.as_int64()), Some(2020));
3143
3144        // Remove property
3145        let old = store.remove_edge_property(edge_id, "since");
3146        assert_eq!(old.and_then(|v| v.as_int64()), Some(2020));
3147
3148        let since = store.get_edge_property(edge_id, &"since".into());
3149        assert!(since.is_none());
3150    }
3151
3152    #[test]
3153    fn test_add_remove_label() {
3154        let store = LpgStore::new();
3155        let id = store.create_node(&["Person"]);
3156
3157        // Add new label
3158        assert!(store.add_label(id, "Employee"));
3159
3160        let node = store.get_node(id).unwrap();
3161        assert!(node.has_label("Person"));
3162        assert!(node.has_label("Employee"));
3163
3164        // Adding same label again should fail
3165        assert!(!store.add_label(id, "Employee"));
3166
3167        // Remove label
3168        assert!(store.remove_label(id, "Employee"));
3169
3170        let node = store.get_node(id).unwrap();
3171        assert!(node.has_label("Person"));
3172        assert!(!node.has_label("Employee"));
3173
3174        // Removing non-existent label should fail
3175        assert!(!store.remove_label(id, "Employee"));
3176        assert!(!store.remove_label(id, "NonExistent"));
3177    }
3178
3179    #[test]
3180    fn test_add_label_to_nonexistent_node() {
3181        let store = LpgStore::new();
3182        let fake_id = NodeId::new(999);
3183        assert!(!store.add_label(fake_id, "Label"));
3184    }
3185
3186    #[test]
3187    fn test_remove_label_from_nonexistent_node() {
3188        let store = LpgStore::new();
3189        let fake_id = NodeId::new(999);
3190        assert!(!store.remove_label(fake_id, "Label"));
3191    }
3192
3193    #[test]
3194    fn test_node_ids() {
3195        let store = LpgStore::new();
3196
3197        let n1 = store.create_node(&["Person"]);
3198        let n2 = store.create_node(&["Person"]);
3199        let n3 = store.create_node(&["Person"]);
3200
3201        let ids = store.node_ids();
3202        assert_eq!(ids.len(), 3);
3203        assert!(ids.contains(&n1));
3204        assert!(ids.contains(&n2));
3205        assert!(ids.contains(&n3));
3206
3207        // Delete one
3208        store.delete_node(n2);
3209        let ids = store.node_ids();
3210        assert_eq!(ids.len(), 2);
3211        assert!(!ids.contains(&n2));
3212    }
3213
3214    #[test]
3215    fn test_delete_node_nonexistent() {
3216        let store = LpgStore::new();
3217        let fake_id = NodeId::new(999);
3218        assert!(!store.delete_node(fake_id));
3219    }
3220
3221    #[test]
3222    fn test_delete_edge_nonexistent() {
3223        let store = LpgStore::new();
3224        let fake_id = EdgeId::new(999);
3225        assert!(!store.delete_edge(fake_id));
3226    }
3227
3228    #[test]
3229    fn test_delete_edge_double() {
3230        let store = LpgStore::new();
3231        let a = store.create_node(&["Person"]);
3232        let b = store.create_node(&["Person"]);
3233        let edge_id = store.create_edge(a, b, "KNOWS");
3234
3235        assert!(store.delete_edge(edge_id));
3236        assert!(!store.delete_edge(edge_id)); // Double delete
3237    }
3238
3239    #[test]
3240    fn test_create_edge_with_props() {
3241        let store = LpgStore::new();
3242        let a = store.create_node(&["Person"]);
3243        let b = store.create_node(&["Person"]);
3244
3245        let edge_id = store.create_edge_with_props(
3246            a,
3247            b,
3248            "KNOWS",
3249            [
3250                ("since", Value::from(2020i64)),
3251                ("weight", Value::from(1.0)),
3252            ],
3253        );
3254
3255        let edge = store.get_edge(edge_id).unwrap();
3256        assert_eq!(
3257            edge.get_property("since").and_then(|v| v.as_int64()),
3258            Some(2020)
3259        );
3260        assert_eq!(
3261            edge.get_property("weight").and_then(|v| v.as_float64()),
3262            Some(1.0)
3263        );
3264    }
3265
3266    #[test]
3267    fn test_delete_node_edges() {
3268        let store = LpgStore::new();
3269
3270        let a = store.create_node(&["Person"]);
3271        let b = store.create_node(&["Person"]);
3272        let c = store.create_node(&["Person"]);
3273
3274        store.create_edge(a, b, "KNOWS"); // a -> b
3275        store.create_edge(c, a, "KNOWS"); // c -> a
3276
3277        assert_eq!(store.edge_count(), 2);
3278
3279        // Delete all edges connected to a
3280        store.delete_node_edges(a);
3281
3282        assert_eq!(store.edge_count(), 0);
3283    }
3284
3285    #[test]
3286    fn test_neighbors_both_directions() {
3287        let store = LpgStore::new();
3288
3289        let a = store.create_node(&["Person"]);
3290        let b = store.create_node(&["Person"]);
3291        let c = store.create_node(&["Person"]);
3292
3293        store.create_edge(a, b, "KNOWS"); // a -> b
3294        store.create_edge(c, a, "KNOWS"); // c -> a
3295
3296        // Direction::Both for node a
3297        let neighbors: Vec<_> = store.neighbors(a, Direction::Both).collect();
3298        assert_eq!(neighbors.len(), 2);
3299        assert!(neighbors.contains(&b)); // outgoing
3300        assert!(neighbors.contains(&c)); // incoming
3301    }
3302
3303    #[test]
3304    fn test_edges_from() {
3305        let store = LpgStore::new();
3306
3307        let a = store.create_node(&["Person"]);
3308        let b = store.create_node(&["Person"]);
3309        let c = store.create_node(&["Person"]);
3310
3311        let e1 = store.create_edge(a, b, "KNOWS");
3312        let e2 = store.create_edge(a, c, "KNOWS");
3313
3314        let edges: Vec<_> = store.edges_from(a, Direction::Outgoing).collect();
3315        assert_eq!(edges.len(), 2);
3316        assert!(edges.iter().any(|(_, e)| *e == e1));
3317        assert!(edges.iter().any(|(_, e)| *e == e2));
3318
3319        // Incoming edges to b
3320        let incoming: Vec<_> = store.edges_from(b, Direction::Incoming).collect();
3321        assert_eq!(incoming.len(), 1);
3322        assert_eq!(incoming[0].1, e1);
3323    }
3324
3325    #[test]
3326    fn test_edges_to() {
3327        let store = LpgStore::new();
3328
3329        let a = store.create_node(&["Person"]);
3330        let b = store.create_node(&["Person"]);
3331        let c = store.create_node(&["Person"]);
3332
3333        let e1 = store.create_edge(a, b, "KNOWS");
3334        let e2 = store.create_edge(c, b, "KNOWS");
3335
3336        // Edges pointing TO b
3337        let to_b = store.edges_to(b);
3338        assert_eq!(to_b.len(), 2);
3339        assert!(to_b.iter().any(|(src, e)| *src == a && *e == e1));
3340        assert!(to_b.iter().any(|(src, e)| *src == c && *e == e2));
3341    }
3342
3343    #[test]
3344    fn test_out_degree_in_degree() {
3345        let store = LpgStore::new();
3346
3347        let a = store.create_node(&["Person"]);
3348        let b = store.create_node(&["Person"]);
3349        let c = store.create_node(&["Person"]);
3350
3351        store.create_edge(a, b, "KNOWS");
3352        store.create_edge(a, c, "KNOWS");
3353        store.create_edge(c, b, "KNOWS");
3354
3355        assert_eq!(store.out_degree(a), 2);
3356        assert_eq!(store.out_degree(b), 0);
3357        assert_eq!(store.out_degree(c), 1);
3358
3359        assert_eq!(store.in_degree(a), 0);
3360        assert_eq!(store.in_degree(b), 2);
3361        assert_eq!(store.in_degree(c), 1);
3362    }
3363
3364    #[test]
3365    fn test_edge_type() {
3366        let store = LpgStore::new();
3367
3368        let a = store.create_node(&["Person"]);
3369        let b = store.create_node(&["Person"]);
3370        let edge_id = store.create_edge(a, b, "KNOWS");
3371
3372        let edge_type = store.edge_type(edge_id);
3373        assert_eq!(edge_type.as_deref(), Some("KNOWS"));
3374
3375        // Non-existent edge
3376        let fake_id = EdgeId::new(999);
3377        assert!(store.edge_type(fake_id).is_none());
3378    }
3379
3380    #[test]
3381    fn test_count_methods() {
3382        let store = LpgStore::new();
3383
3384        assert_eq!(store.label_count(), 0);
3385        assert_eq!(store.edge_type_count(), 0);
3386        assert_eq!(store.property_key_count(), 0);
3387
3388        let a = store.create_node_with_props(&["Person"], [("age", Value::from(30i64))]);
3389        let b = store.create_node(&["Company"]);
3390        store.create_edge_with_props(a, b, "WORKS_AT", [("since", Value::from(2020i64))]);
3391
3392        assert_eq!(store.label_count(), 2); // Person, Company
3393        assert_eq!(store.edge_type_count(), 1); // WORKS_AT
3394        assert_eq!(store.property_key_count(), 2); // age, since
3395    }
3396
3397    #[test]
3398    fn test_all_nodes_and_edges() {
3399        let store = LpgStore::new();
3400
3401        let a = store.create_node(&["Person"]);
3402        let b = store.create_node(&["Person"]);
3403        store.create_edge(a, b, "KNOWS");
3404
3405        let nodes: Vec<_> = store.all_nodes().collect();
3406        assert_eq!(nodes.len(), 2);
3407
3408        let edges: Vec<_> = store.all_edges().collect();
3409        assert_eq!(edges.len(), 1);
3410    }
3411
3412    #[test]
3413    fn test_all_labels_and_edge_types() {
3414        let store = LpgStore::new();
3415
3416        store.create_node(&["Person"]);
3417        store.create_node(&["Company"]);
3418        let a = store.create_node(&["Animal"]);
3419        let b = store.create_node(&["Animal"]);
3420        store.create_edge(a, b, "EATS");
3421
3422        let labels = store.all_labels();
3423        assert_eq!(labels.len(), 3);
3424        assert!(labels.contains(&"Person".to_string()));
3425        assert!(labels.contains(&"Company".to_string()));
3426        assert!(labels.contains(&"Animal".to_string()));
3427
3428        let edge_types = store.all_edge_types();
3429        assert_eq!(edge_types.len(), 1);
3430        assert!(edge_types.contains(&"EATS".to_string()));
3431    }
3432
3433    #[test]
3434    fn test_all_property_keys() {
3435        let store = LpgStore::new();
3436
3437        let a = store.create_node_with_props(&["Person"], [("name", Value::from("Alice"))]);
3438        let b = store.create_node_with_props(&["Person"], [("age", Value::from(30i64))]);
3439        store.create_edge_with_props(a, b, "KNOWS", [("since", Value::from(2020i64))]);
3440
3441        let keys = store.all_property_keys();
3442        assert!(keys.contains(&"name".to_string()));
3443        assert!(keys.contains(&"age".to_string()));
3444        assert!(keys.contains(&"since".to_string()));
3445    }
3446
3447    #[test]
3448    fn test_nodes_with_label() {
3449        let store = LpgStore::new();
3450
3451        store.create_node(&["Person"]);
3452        store.create_node(&["Person"]);
3453        store.create_node(&["Company"]);
3454
3455        let persons: Vec<_> = store.nodes_with_label("Person").collect();
3456        assert_eq!(persons.len(), 2);
3457
3458        let companies: Vec<_> = store.nodes_with_label("Company").collect();
3459        assert_eq!(companies.len(), 1);
3460
3461        let none: Vec<_> = store.nodes_with_label("NonExistent").collect();
3462        assert_eq!(none.len(), 0);
3463    }
3464
3465    #[test]
3466    fn test_edges_with_type() {
3467        let store = LpgStore::new();
3468
3469        let a = store.create_node(&["Person"]);
3470        let b = store.create_node(&["Person"]);
3471        let c = store.create_node(&["Company"]);
3472
3473        store.create_edge(a, b, "KNOWS");
3474        store.create_edge(a, c, "WORKS_AT");
3475
3476        let knows: Vec<_> = store.edges_with_type("KNOWS").collect();
3477        assert_eq!(knows.len(), 1);
3478
3479        let works_at: Vec<_> = store.edges_with_type("WORKS_AT").collect();
3480        assert_eq!(works_at.len(), 1);
3481
3482        let none: Vec<_> = store.edges_with_type("NonExistent").collect();
3483        assert_eq!(none.len(), 0);
3484    }
3485
3486    #[test]
3487    fn test_nodes_by_label_nonexistent() {
3488        let store = LpgStore::new();
3489        store.create_node(&["Person"]);
3490
3491        let empty = store.nodes_by_label("NonExistent");
3492        assert!(empty.is_empty());
3493    }
3494
3495    #[test]
3496    fn test_statistics() {
3497        let store = LpgStore::new();
3498
3499        let a = store.create_node(&["Person"]);
3500        let b = store.create_node(&["Person"]);
3501        let c = store.create_node(&["Company"]);
3502
3503        store.create_edge(a, b, "KNOWS");
3504        store.create_edge(a, c, "WORKS_AT");
3505
3506        store.compute_statistics();
3507        let stats = store.statistics();
3508
3509        assert_eq!(stats.total_nodes, 3);
3510        assert_eq!(stats.total_edges, 2);
3511
3512        // Estimates
3513        let person_card = store.estimate_label_cardinality("Person");
3514        assert!(person_card > 0.0);
3515
3516        let avg_degree = store.estimate_avg_degree("KNOWS", true);
3517        assert!(avg_degree >= 0.0);
3518    }
3519
3520    #[test]
3521    fn test_zone_maps() {
3522        let store = LpgStore::new();
3523
3524        store.create_node_with_props(&["Person"], [("age", Value::from(25i64))]);
3525        store.create_node_with_props(&["Person"], [("age", Value::from(35i64))]);
3526
3527        // Zone map should indicate possible matches (30 is within [25, 35] range)
3528        let might_match =
3529            store.node_property_might_match(&"age".into(), CompareOp::Eq, &Value::from(30i64));
3530        // Zone maps return true conservatively when value is within min/max range
3531        assert!(might_match);
3532
3533        let zone = store.node_property_zone_map(&"age".into());
3534        assert!(zone.is_some());
3535
3536        // Non-existent property
3537        let no_zone = store.node_property_zone_map(&"nonexistent".into());
3538        assert!(no_zone.is_none());
3539
3540        // Edge zone maps
3541        let a = store.create_node(&["A"]);
3542        let b = store.create_node(&["B"]);
3543        store.create_edge_with_props(a, b, "REL", [("weight", Value::from(1.0))]);
3544
3545        let edge_zone = store.edge_property_zone_map(&"weight".into());
3546        assert!(edge_zone.is_some());
3547    }
3548
3549    #[test]
3550    fn test_rebuild_zone_maps() {
3551        let store = LpgStore::new();
3552        store.create_node_with_props(&["Person"], [("age", Value::from(25i64))]);
3553
3554        // Should not panic
3555        store.rebuild_zone_maps();
3556    }
3557
3558    #[test]
3559    fn test_create_node_with_id() {
3560        let store = LpgStore::new();
3561
3562        let specific_id = NodeId::new(100);
3563        store.create_node_with_id(specific_id, &["Person", "Employee"]);
3564
3565        let node = store.get_node(specific_id).unwrap();
3566        assert!(node.has_label("Person"));
3567        assert!(node.has_label("Employee"));
3568
3569        // Next auto-generated ID should be > 100
3570        let next = store.create_node(&["Other"]);
3571        assert!(next.as_u64() > 100);
3572    }
3573
3574    #[test]
3575    fn test_create_edge_with_id() {
3576        let store = LpgStore::new();
3577
3578        let a = store.create_node(&["A"]);
3579        let b = store.create_node(&["B"]);
3580
3581        let specific_id = EdgeId::new(500);
3582        store.create_edge_with_id(specific_id, a, b, "REL");
3583
3584        let edge = store.get_edge(specific_id).unwrap();
3585        assert_eq!(edge.src, a);
3586        assert_eq!(edge.dst, b);
3587        assert_eq!(edge.edge_type.as_str(), "REL");
3588
3589        // Next auto-generated ID should be > 500
3590        let next = store.create_edge(a, b, "OTHER");
3591        assert!(next.as_u64() > 500);
3592    }
3593
3594    #[test]
3595    fn test_set_epoch() {
3596        let store = LpgStore::new();
3597
3598        assert_eq!(store.current_epoch().as_u64(), 0);
3599
3600        store.set_epoch(EpochId::new(42));
3601        assert_eq!(store.current_epoch().as_u64(), 42);
3602    }
3603
3604    #[test]
3605    fn test_get_node_nonexistent() {
3606        let store = LpgStore::new();
3607        let fake_id = NodeId::new(999);
3608        assert!(store.get_node(fake_id).is_none());
3609    }
3610
3611    #[test]
3612    fn test_get_edge_nonexistent() {
3613        let store = LpgStore::new();
3614        let fake_id = EdgeId::new(999);
3615        assert!(store.get_edge(fake_id).is_none());
3616    }
3617
3618    #[test]
3619    fn test_multiple_labels() {
3620        let store = LpgStore::new();
3621
3622        let id = store.create_node(&["Person", "Employee", "Manager"]);
3623        let node = store.get_node(id).unwrap();
3624
3625        assert!(node.has_label("Person"));
3626        assert!(node.has_label("Employee"));
3627        assert!(node.has_label("Manager"));
3628        assert!(!node.has_label("Other"));
3629    }
3630
3631    #[test]
3632    fn test_default_impl() {
3633        let store: LpgStore = Default::default();
3634        assert_eq!(store.node_count(), 0);
3635        assert_eq!(store.edge_count(), 0);
3636    }
3637
3638    #[test]
3639    fn test_edges_from_both_directions() {
3640        let store = LpgStore::new();
3641
3642        let a = store.create_node(&["A"]);
3643        let b = store.create_node(&["B"]);
3644        let c = store.create_node(&["C"]);
3645
3646        let e1 = store.create_edge(a, b, "R1"); // a -> b
3647        let e2 = store.create_edge(c, a, "R2"); // c -> a
3648
3649        // Both directions from a
3650        let edges: Vec<_> = store.edges_from(a, Direction::Both).collect();
3651        assert_eq!(edges.len(), 2);
3652        assert!(edges.iter().any(|(_, e)| *e == e1)); // outgoing
3653        assert!(edges.iter().any(|(_, e)| *e == e2)); // incoming
3654    }
3655
3656    #[test]
3657    fn test_no_backward_adj_in_degree() {
3658        let config = LpgStoreConfig {
3659            backward_edges: false,
3660            initial_node_capacity: 10,
3661            initial_edge_capacity: 10,
3662        };
3663        let store = LpgStore::with_config(config);
3664
3665        let a = store.create_node(&["A"]);
3666        let b = store.create_node(&["B"]);
3667        store.create_edge(a, b, "R");
3668
3669        // in_degree should still work (falls back to scanning)
3670        let degree = store.in_degree(b);
3671        assert_eq!(degree, 1);
3672    }
3673
3674    #[test]
3675    fn test_no_backward_adj_edges_to() {
3676        let config = LpgStoreConfig {
3677            backward_edges: false,
3678            initial_node_capacity: 10,
3679            initial_edge_capacity: 10,
3680        };
3681        let store = LpgStore::with_config(config);
3682
3683        let a = store.create_node(&["A"]);
3684        let b = store.create_node(&["B"]);
3685        let e = store.create_edge(a, b, "R");
3686
3687        // edges_to should still work (falls back to scanning)
3688        let edges = store.edges_to(b);
3689        assert_eq!(edges.len(), 1);
3690        assert_eq!(edges[0].1, e);
3691    }
3692
3693    #[test]
3694    fn test_node_versioned_creation() {
3695        let store = LpgStore::new();
3696
3697        let epoch = store.new_epoch();
3698        let tx_id = TxId::new(1);
3699
3700        let id = store.create_node_versioned(&["Person"], epoch, tx_id);
3701        assert!(store.get_node(id).is_some());
3702    }
3703
3704    #[test]
3705    fn test_edge_versioned_creation() {
3706        let store = LpgStore::new();
3707
3708        let a = store.create_node(&["A"]);
3709        let b = store.create_node(&["B"]);
3710
3711        let epoch = store.new_epoch();
3712        let tx_id = TxId::new(1);
3713
3714        let edge_id = store.create_edge_versioned(a, b, "REL", epoch, tx_id);
3715        assert!(store.get_edge(edge_id).is_some());
3716    }
3717
3718    #[test]
3719    fn test_node_with_props_versioned() {
3720        let store = LpgStore::new();
3721
3722        let epoch = store.new_epoch();
3723        let tx_id = TxId::new(1);
3724
3725        let id = store.create_node_with_props_versioned(
3726            &["Person"],
3727            [("name", Value::from("Alice"))],
3728            epoch,
3729            tx_id,
3730        );
3731
3732        let node = store.get_node(id).unwrap();
3733        assert_eq!(
3734            node.get_property("name").and_then(|v| v.as_str()),
3735            Some("Alice")
3736        );
3737    }
3738
3739    #[test]
3740    fn test_discard_uncommitted_versions() {
3741        let store = LpgStore::new();
3742
3743        let epoch = store.new_epoch();
3744        let tx_id = TxId::new(42);
3745
3746        // Create node with specific tx
3747        let node_id = store.create_node_versioned(&["Person"], epoch, tx_id);
3748        assert!(store.get_node(node_id).is_some());
3749
3750        // Discard uncommitted versions for this tx
3751        store.discard_uncommitted_versions(tx_id);
3752
3753        // Node should be gone (version chain was removed)
3754        assert!(store.get_node(node_id).is_none());
3755    }
3756
3757    // === Property Index Tests ===
3758
3759    #[test]
3760    fn test_property_index_create_and_lookup() {
3761        let store = LpgStore::new();
3762
3763        // Create nodes with properties
3764        let alice = store.create_node(&["Person"]);
3765        let bob = store.create_node(&["Person"]);
3766        let charlie = store.create_node(&["Person"]);
3767
3768        store.set_node_property(alice, "city", Value::from("NYC"));
3769        store.set_node_property(bob, "city", Value::from("NYC"));
3770        store.set_node_property(charlie, "city", Value::from("LA"));
3771
3772        // Before indexing, lookup still works (via scan)
3773        let nyc_people = store.find_nodes_by_property("city", &Value::from("NYC"));
3774        assert_eq!(nyc_people.len(), 2);
3775
3776        // Create index
3777        store.create_property_index("city");
3778        assert!(store.has_property_index("city"));
3779
3780        // Indexed lookup should return same results
3781        let nyc_people = store.find_nodes_by_property("city", &Value::from("NYC"));
3782        assert_eq!(nyc_people.len(), 2);
3783        assert!(nyc_people.contains(&alice));
3784        assert!(nyc_people.contains(&bob));
3785
3786        let la_people = store.find_nodes_by_property("city", &Value::from("LA"));
3787        assert_eq!(la_people.len(), 1);
3788        assert!(la_people.contains(&charlie));
3789    }
3790
3791    #[test]
3792    fn test_property_index_maintained_on_update() {
3793        let store = LpgStore::new();
3794
3795        // Create index first
3796        store.create_property_index("status");
3797
3798        let node = store.create_node(&["Task"]);
3799        store.set_node_property(node, "status", Value::from("pending"));
3800
3801        // Should find by initial value
3802        let pending = store.find_nodes_by_property("status", &Value::from("pending"));
3803        assert_eq!(pending.len(), 1);
3804        assert!(pending.contains(&node));
3805
3806        // Update the property
3807        store.set_node_property(node, "status", Value::from("done"));
3808
3809        // Old value should not find it
3810        let pending = store.find_nodes_by_property("status", &Value::from("pending"));
3811        assert!(pending.is_empty());
3812
3813        // New value should find it
3814        let done = store.find_nodes_by_property("status", &Value::from("done"));
3815        assert_eq!(done.len(), 1);
3816        assert!(done.contains(&node));
3817    }
3818
3819    #[test]
3820    fn test_property_index_maintained_on_remove() {
3821        let store = LpgStore::new();
3822
3823        store.create_property_index("tag");
3824
3825        let node = store.create_node(&["Item"]);
3826        store.set_node_property(node, "tag", Value::from("important"));
3827
3828        // Should find it
3829        let found = store.find_nodes_by_property("tag", &Value::from("important"));
3830        assert_eq!(found.len(), 1);
3831
3832        // Remove the property
3833        store.remove_node_property(node, "tag");
3834
3835        // Should no longer find it
3836        let found = store.find_nodes_by_property("tag", &Value::from("important"));
3837        assert!(found.is_empty());
3838    }
3839
3840    #[test]
3841    fn test_property_index_drop() {
3842        let store = LpgStore::new();
3843
3844        store.create_property_index("key");
3845        assert!(store.has_property_index("key"));
3846
3847        assert!(store.drop_property_index("key"));
3848        assert!(!store.has_property_index("key"));
3849
3850        // Dropping non-existent index returns false
3851        assert!(!store.drop_property_index("key"));
3852    }
3853
3854    #[test]
3855    fn test_property_index_multiple_values() {
3856        let store = LpgStore::new();
3857
3858        store.create_property_index("age");
3859
3860        // Create multiple nodes with same and different ages
3861        let n1 = store.create_node(&["Person"]);
3862        let n2 = store.create_node(&["Person"]);
3863        let n3 = store.create_node(&["Person"]);
3864        let n4 = store.create_node(&["Person"]);
3865
3866        store.set_node_property(n1, "age", Value::from(25i64));
3867        store.set_node_property(n2, "age", Value::from(25i64));
3868        store.set_node_property(n3, "age", Value::from(30i64));
3869        store.set_node_property(n4, "age", Value::from(25i64));
3870
3871        let age_25 = store.find_nodes_by_property("age", &Value::from(25i64));
3872        assert_eq!(age_25.len(), 3);
3873
3874        let age_30 = store.find_nodes_by_property("age", &Value::from(30i64));
3875        assert_eq!(age_30.len(), 1);
3876
3877        let age_40 = store.find_nodes_by_property("age", &Value::from(40i64));
3878        assert!(age_40.is_empty());
3879    }
3880
3881    #[test]
3882    fn test_property_index_builds_from_existing_data() {
3883        let store = LpgStore::new();
3884
3885        // Create nodes first
3886        let n1 = store.create_node(&["Person"]);
3887        let n2 = store.create_node(&["Person"]);
3888        store.set_node_property(n1, "email", Value::from("alice@example.com"));
3889        store.set_node_property(n2, "email", Value::from("bob@example.com"));
3890
3891        // Create index after data exists
3892        store.create_property_index("email");
3893
3894        // Index should include existing data
3895        let alice = store.find_nodes_by_property("email", &Value::from("alice@example.com"));
3896        assert_eq!(alice.len(), 1);
3897        assert!(alice.contains(&n1));
3898
3899        let bob = store.find_nodes_by_property("email", &Value::from("bob@example.com"));
3900        assert_eq!(bob.len(), 1);
3901        assert!(bob.contains(&n2));
3902    }
3903
3904    #[test]
3905    fn test_get_node_property_batch() {
3906        let store = LpgStore::new();
3907
3908        let n1 = store.create_node(&["Person"]);
3909        let n2 = store.create_node(&["Person"]);
3910        let n3 = store.create_node(&["Person"]);
3911
3912        store.set_node_property(n1, "age", Value::from(25i64));
3913        store.set_node_property(n2, "age", Value::from(30i64));
3914        // n3 has no age property
3915
3916        let age_key = PropertyKey::new("age");
3917        let values = store.get_node_property_batch(&[n1, n2, n3], &age_key);
3918
3919        assert_eq!(values.len(), 3);
3920        assert_eq!(values[0], Some(Value::from(25i64)));
3921        assert_eq!(values[1], Some(Value::from(30i64)));
3922        assert_eq!(values[2], None);
3923    }
3924
3925    #[test]
3926    fn test_get_node_property_batch_empty() {
3927        let store = LpgStore::new();
3928        let key = PropertyKey::new("any");
3929
3930        let values = store.get_node_property_batch(&[], &key);
3931        assert!(values.is_empty());
3932    }
3933
3934    #[test]
3935    fn test_get_nodes_properties_batch() {
3936        let store = LpgStore::new();
3937
3938        let n1 = store.create_node(&["Person"]);
3939        let n2 = store.create_node(&["Person"]);
3940        let n3 = store.create_node(&["Person"]);
3941
3942        store.set_node_property(n1, "name", Value::from("Alice"));
3943        store.set_node_property(n1, "age", Value::from(25i64));
3944        store.set_node_property(n2, "name", Value::from("Bob"));
3945        // n3 has no properties
3946
3947        let all_props = store.get_nodes_properties_batch(&[n1, n2, n3]);
3948
3949        assert_eq!(all_props.len(), 3);
3950        assert_eq!(all_props[0].len(), 2); // name and age
3951        assert_eq!(all_props[1].len(), 1); // name only
3952        assert_eq!(all_props[2].len(), 0); // no properties
3953
3954        assert_eq!(
3955            all_props[0].get(&PropertyKey::new("name")),
3956            Some(&Value::from("Alice"))
3957        );
3958        assert_eq!(
3959            all_props[1].get(&PropertyKey::new("name")),
3960            Some(&Value::from("Bob"))
3961        );
3962    }
3963
3964    #[test]
3965    fn test_get_nodes_properties_batch_empty() {
3966        let store = LpgStore::new();
3967
3968        let all_props = store.get_nodes_properties_batch(&[]);
3969        assert!(all_props.is_empty());
3970    }
3971
3972    #[test]
3973    fn test_get_nodes_properties_selective_batch() {
3974        let store = LpgStore::new();
3975
3976        let n1 = store.create_node(&["Person"]);
3977        let n2 = store.create_node(&["Person"]);
3978
3979        // Set multiple properties
3980        store.set_node_property(n1, "name", Value::from("Alice"));
3981        store.set_node_property(n1, "age", Value::from(25i64));
3982        store.set_node_property(n1, "email", Value::from("alice@example.com"));
3983        store.set_node_property(n2, "name", Value::from("Bob"));
3984        store.set_node_property(n2, "age", Value::from(30i64));
3985        store.set_node_property(n2, "city", Value::from("NYC"));
3986
3987        // Request only name and age (not email or city)
3988        let keys = vec![PropertyKey::new("name"), PropertyKey::new("age")];
3989        let props = store.get_nodes_properties_selective_batch(&[n1, n2], &keys);
3990
3991        assert_eq!(props.len(), 2);
3992
3993        // n1: should have name and age, but NOT email
3994        assert_eq!(props[0].len(), 2);
3995        assert_eq!(
3996            props[0].get(&PropertyKey::new("name")),
3997            Some(&Value::from("Alice"))
3998        );
3999        assert_eq!(
4000            props[0].get(&PropertyKey::new("age")),
4001            Some(&Value::from(25i64))
4002        );
4003        assert_eq!(props[0].get(&PropertyKey::new("email")), None);
4004
4005        // n2: should have name and age, but NOT city
4006        assert_eq!(props[1].len(), 2);
4007        assert_eq!(
4008            props[1].get(&PropertyKey::new("name")),
4009            Some(&Value::from("Bob"))
4010        );
4011        assert_eq!(
4012            props[1].get(&PropertyKey::new("age")),
4013            Some(&Value::from(30i64))
4014        );
4015        assert_eq!(props[1].get(&PropertyKey::new("city")), None);
4016    }
4017
4018    #[test]
4019    fn test_get_nodes_properties_selective_batch_empty_keys() {
4020        let store = LpgStore::new();
4021
4022        let n1 = store.create_node(&["Person"]);
4023        store.set_node_property(n1, "name", Value::from("Alice"));
4024
4025        // Request no properties
4026        let props = store.get_nodes_properties_selective_batch(&[n1], &[]);
4027
4028        assert_eq!(props.len(), 1);
4029        assert!(props[0].is_empty()); // Empty map when no keys requested
4030    }
4031
4032    #[test]
4033    fn test_get_nodes_properties_selective_batch_missing_keys() {
4034        let store = LpgStore::new();
4035
4036        let n1 = store.create_node(&["Person"]);
4037        store.set_node_property(n1, "name", Value::from("Alice"));
4038
4039        // Request a property that doesn't exist
4040        let keys = vec![PropertyKey::new("nonexistent"), PropertyKey::new("name")];
4041        let props = store.get_nodes_properties_selective_batch(&[n1], &keys);
4042
4043        assert_eq!(props.len(), 1);
4044        assert_eq!(props[0].len(), 1); // Only name exists
4045        assert_eq!(
4046            props[0].get(&PropertyKey::new("name")),
4047            Some(&Value::from("Alice"))
4048        );
4049    }
4050
4051    // === Range Query Tests ===
4052
4053    #[test]
4054    fn test_find_nodes_in_range_inclusive() {
4055        let store = LpgStore::new();
4056
4057        let n1 = store.create_node_with_props(&["Person"], [("age", Value::from(20i64))]);
4058        let n2 = store.create_node_with_props(&["Person"], [("age", Value::from(30i64))]);
4059        let n3 = store.create_node_with_props(&["Person"], [("age", Value::from(40i64))]);
4060        let _n4 = store.create_node_with_props(&["Person"], [("age", Value::from(50i64))]);
4061
4062        // age >= 20 AND age <= 40 (inclusive both sides)
4063        let result = store.find_nodes_in_range(
4064            "age",
4065            Some(&Value::from(20i64)),
4066            Some(&Value::from(40i64)),
4067            true,
4068            true,
4069        );
4070        assert_eq!(result.len(), 3);
4071        assert!(result.contains(&n1));
4072        assert!(result.contains(&n2));
4073        assert!(result.contains(&n3));
4074    }
4075
4076    #[test]
4077    fn test_find_nodes_in_range_exclusive() {
4078        let store = LpgStore::new();
4079
4080        store.create_node_with_props(&["Person"], [("age", Value::from(20i64))]);
4081        let n2 = store.create_node_with_props(&["Person"], [("age", Value::from(30i64))]);
4082        store.create_node_with_props(&["Person"], [("age", Value::from(40i64))]);
4083
4084        // age > 20 AND age < 40 (exclusive both sides)
4085        let result = store.find_nodes_in_range(
4086            "age",
4087            Some(&Value::from(20i64)),
4088            Some(&Value::from(40i64)),
4089            false,
4090            false,
4091        );
4092        assert_eq!(result.len(), 1);
4093        assert!(result.contains(&n2));
4094    }
4095
4096    #[test]
4097    fn test_find_nodes_in_range_open_ended() {
4098        let store = LpgStore::new();
4099
4100        store.create_node_with_props(&["Person"], [("age", Value::from(20i64))]);
4101        store.create_node_with_props(&["Person"], [("age", Value::from(30i64))]);
4102        let n3 = store.create_node_with_props(&["Person"], [("age", Value::from(40i64))]);
4103        let n4 = store.create_node_with_props(&["Person"], [("age", Value::from(50i64))]);
4104
4105        // age >= 35 (no upper bound)
4106        let result = store.find_nodes_in_range("age", Some(&Value::from(35i64)), None, true, true);
4107        assert_eq!(result.len(), 2);
4108        assert!(result.contains(&n3));
4109        assert!(result.contains(&n4));
4110
4111        // age <= 25 (no lower bound)
4112        let result = store.find_nodes_in_range("age", None, Some(&Value::from(25i64)), true, true);
4113        assert_eq!(result.len(), 1);
4114    }
4115
4116    #[test]
4117    fn test_find_nodes_in_range_empty_result() {
4118        let store = LpgStore::new();
4119
4120        store.create_node_with_props(&["Person"], [("age", Value::from(20i64))]);
4121
4122        // Range that doesn't match anything
4123        let result = store.find_nodes_in_range(
4124            "age",
4125            Some(&Value::from(100i64)),
4126            Some(&Value::from(200i64)),
4127            true,
4128            true,
4129        );
4130        assert!(result.is_empty());
4131    }
4132
4133    #[test]
4134    fn test_find_nodes_in_range_nonexistent_property() {
4135        let store = LpgStore::new();
4136
4137        store.create_node_with_props(&["Person"], [("age", Value::from(20i64))]);
4138
4139        let result = store.find_nodes_in_range(
4140            "weight",
4141            Some(&Value::from(50i64)),
4142            Some(&Value::from(100i64)),
4143            true,
4144            true,
4145        );
4146        assert!(result.is_empty());
4147    }
4148
4149    // === Multi-Property Query Tests ===
4150
4151    #[test]
4152    fn test_find_nodes_by_properties_multiple_conditions() {
4153        let store = LpgStore::new();
4154
4155        let alice = store.create_node_with_props(
4156            &["Person"],
4157            [("name", Value::from("Alice")), ("city", Value::from("NYC"))],
4158        );
4159        store.create_node_with_props(
4160            &["Person"],
4161            [("name", Value::from("Bob")), ("city", Value::from("NYC"))],
4162        );
4163        store.create_node_with_props(
4164            &["Person"],
4165            [("name", Value::from("Alice")), ("city", Value::from("LA"))],
4166        );
4167
4168        // Match name="Alice" AND city="NYC"
4169        let result = store.find_nodes_by_properties(&[
4170            ("name", Value::from("Alice")),
4171            ("city", Value::from("NYC")),
4172        ]);
4173        assert_eq!(result.len(), 1);
4174        assert!(result.contains(&alice));
4175    }
4176
4177    #[test]
4178    fn test_find_nodes_by_properties_empty_conditions() {
4179        let store = LpgStore::new();
4180
4181        store.create_node(&["Person"]);
4182        store.create_node(&["Person"]);
4183
4184        // Empty conditions should return all nodes
4185        let result = store.find_nodes_by_properties(&[]);
4186        assert_eq!(result.len(), 2);
4187    }
4188
4189    #[test]
4190    fn test_find_nodes_by_properties_no_match() {
4191        let store = LpgStore::new();
4192
4193        store.create_node_with_props(&["Person"], [("name", Value::from("Alice"))]);
4194
4195        let result = store.find_nodes_by_properties(&[("name", Value::from("Nobody"))]);
4196        assert!(result.is_empty());
4197    }
4198
4199    #[test]
4200    fn test_find_nodes_by_properties_with_index() {
4201        let store = LpgStore::new();
4202
4203        // Create index on name
4204        store.create_property_index("name");
4205
4206        let alice = store.create_node_with_props(
4207            &["Person"],
4208            [("name", Value::from("Alice")), ("age", Value::from(30i64))],
4209        );
4210        store.create_node_with_props(
4211            &["Person"],
4212            [("name", Value::from("Bob")), ("age", Value::from(30i64))],
4213        );
4214
4215        // Index should accelerate the lookup
4216        let result = store.find_nodes_by_properties(&[
4217            ("name", Value::from("Alice")),
4218            ("age", Value::from(30i64)),
4219        ]);
4220        assert_eq!(result.len(), 1);
4221        assert!(result.contains(&alice));
4222    }
4223
4224    // === Cardinality Estimation Tests ===
4225
4226    #[test]
4227    fn test_estimate_label_cardinality() {
4228        let store = LpgStore::new();
4229
4230        store.create_node(&["Person"]);
4231        store.create_node(&["Person"]);
4232        store.create_node(&["Animal"]);
4233
4234        store.ensure_statistics_fresh();
4235
4236        let person_est = store.estimate_label_cardinality("Person");
4237        let animal_est = store.estimate_label_cardinality("Animal");
4238        let unknown_est = store.estimate_label_cardinality("Unknown");
4239
4240        assert!(
4241            person_est >= 1.0,
4242            "Person should have cardinality >= 1, got {person_est}"
4243        );
4244        assert!(
4245            animal_est >= 1.0,
4246            "Animal should have cardinality >= 1, got {animal_est}"
4247        );
4248        // Unknown label should return some default (not panic)
4249        assert!(unknown_est >= 0.0);
4250    }
4251
4252    #[test]
4253    fn test_estimate_avg_degree() {
4254        let store = LpgStore::new();
4255
4256        let a = store.create_node(&["Person"]);
4257        let b = store.create_node(&["Person"]);
4258        let c = store.create_node(&["Person"]);
4259
4260        store.create_edge(a, b, "KNOWS");
4261        store.create_edge(a, c, "KNOWS");
4262        store.create_edge(b, c, "KNOWS");
4263
4264        store.ensure_statistics_fresh();
4265
4266        let outgoing = store.estimate_avg_degree("KNOWS", true);
4267        let incoming = store.estimate_avg_degree("KNOWS", false);
4268
4269        assert!(
4270            outgoing > 0.0,
4271            "Outgoing degree should be > 0, got {outgoing}"
4272        );
4273        assert!(
4274            incoming > 0.0,
4275            "Incoming degree should be > 0, got {incoming}"
4276        );
4277    }
4278
4279    // === Delete operations ===
4280
4281    #[test]
4282    fn test_delete_node_does_not_cascade() {
4283        let store = LpgStore::new();
4284
4285        let a = store.create_node(&["A"]);
4286        let b = store.create_node(&["B"]);
4287        let e = store.create_edge(a, b, "KNOWS");
4288
4289        assert!(store.delete_node(a));
4290        assert!(store.get_node(a).is_none());
4291
4292        // Edges are NOT automatically deleted (non-detach delete)
4293        assert!(
4294            store.get_edge(e).is_some(),
4295            "Edge should survive non-detach node delete"
4296        );
4297    }
4298
4299    #[test]
4300    fn test_delete_already_deleted_node() {
4301        let store = LpgStore::new();
4302        let a = store.create_node(&["A"]);
4303
4304        assert!(store.delete_node(a));
4305        // Second delete should return false (already deleted)
4306        assert!(!store.delete_node(a));
4307    }
4308
4309    #[test]
4310    fn test_delete_nonexistent_node() {
4311        let store = LpgStore::new();
4312        assert!(!store.delete_node(NodeId::new(999)));
4313    }
4314}