Skip to main content

grafeo_core/graph/lpg/
store.rs

1//! The in-memory LPG graph store.
2//!
3//! This is where your nodes and edges actually live. Most users interact
4//! through [`GrafeoDB`](grafeo_engine::GrafeoDB), but algorithm implementers
5//! sometimes need the raw [`LpgStore`] for direct adjacency traversal.
6//!
7//! Key features:
8//! - MVCC versioning - concurrent readers don't block each other
9//! - Columnar properties with zone maps for fast filtering
10//! - Forward and backward adjacency indexes
11
12use super::property::CompareOp;
13use super::{Edge, EdgeRecord, Node, NodeRecord, PropertyStorage};
14use crate::graph::Direction;
15use crate::index::adjacency::ChunkedAdjacency;
16use crate::index::zone_map::ZoneMapEntry;
17use crate::statistics::{EdgeTypeStatistics, LabelStatistics, Statistics};
18use arcstr::ArcStr;
19use dashmap::DashMap;
20#[cfg(not(feature = "tiered-storage"))]
21use grafeo_common::mvcc::VersionChain;
22use grafeo_common::types::{EdgeId, EpochId, HashableValue, NodeId, PropertyKey, TxId, Value};
23use grafeo_common::utils::hash::{FxHashMap, FxHashSet};
24use parking_lot::RwLock;
25use std::cmp::Ordering as CmpOrdering;
26#[cfg(any(feature = "tiered-storage", feature = "vector-index"))]
27use std::sync::Arc;
28use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
29
30#[cfg(feature = "vector-index")]
31use crate::index::vector::HnswIndex;
32
33/// Compares two values for ordering (used for range checks).
34fn compare_values_for_range(a: &Value, b: &Value) -> Option<CmpOrdering> {
35    match (a, b) {
36        (Value::Int64(a), Value::Int64(b)) => Some(a.cmp(b)),
37        (Value::Float64(a), Value::Float64(b)) => a.partial_cmp(b),
38        (Value::String(a), Value::String(b)) => Some(a.cmp(b)),
39        (Value::Bool(a), Value::Bool(b)) => Some(a.cmp(b)),
40        _ => None,
41    }
42}
43
44/// Checks if a value is within a range.
45fn value_in_range(
46    value: &Value,
47    min: Option<&Value>,
48    max: Option<&Value>,
49    min_inclusive: bool,
50    max_inclusive: bool,
51) -> bool {
52    // Check lower bound
53    if let Some(min_val) = min {
54        match compare_values_for_range(value, min_val) {
55            Some(CmpOrdering::Less) => return false,
56            Some(CmpOrdering::Equal) if !min_inclusive => return false,
57            None => return false, // Can't compare
58            _ => {}
59        }
60    }
61
62    // Check upper bound
63    if let Some(max_val) = max {
64        match compare_values_for_range(value, max_val) {
65            Some(CmpOrdering::Greater) => return false,
66            Some(CmpOrdering::Equal) if !max_inclusive => return false,
67            None => return false,
68            _ => {}
69        }
70    }
71
72    true
73}
74
75// Tiered storage imports
76#[cfg(feature = "tiered-storage")]
77use crate::storage::EpochStore;
78#[cfg(feature = "tiered-storage")]
79use grafeo_common::memory::arena::ArenaAllocator;
80#[cfg(feature = "tiered-storage")]
81use grafeo_common::mvcc::{ColdVersionRef, HotVersionRef, VersionIndex, VersionRef};
82
83/// Configuration for the LPG store.
84///
85/// The defaults work well for most cases. Tune `backward_edges` if you only
86/// traverse in one direction (saves memory), or adjust capacities if you know
87/// your graph size upfront (avoids reallocations).
88#[derive(Debug, Clone)]
89pub struct LpgStoreConfig {
90    /// Maintain backward adjacency for incoming edge queries. Turn off if
91    /// you only traverse outgoing edges - saves ~50% adjacency memory.
92    pub backward_edges: bool,
93    /// Initial capacity for nodes (avoids early reallocations).
94    pub initial_node_capacity: usize,
95    /// Initial capacity for edges (avoids early reallocations).
96    pub initial_edge_capacity: usize,
97}
98
99impl Default for LpgStoreConfig {
100    fn default() -> Self {
101        Self {
102            backward_edges: true,
103            initial_node_capacity: 1024,
104            initial_edge_capacity: 4096,
105        }
106    }
107}
108
109/// The core in-memory graph storage.
110///
111/// Everything lives here: nodes, edges, properties, adjacency indexes, and
112/// version chains for MVCC. Concurrent reads never block each other.
113///
114/// Most users should go through `GrafeoDB` (from the `grafeo_engine` crate) which
115/// adds transaction management and query execution. Use `LpgStore` directly
116/// when you need raw performance for algorithm implementations.
117///
118/// # Example
119///
120/// ```
121/// use grafeo_core::graph::lpg::LpgStore;
122/// use grafeo_core::graph::Direction;
123///
124/// let store = LpgStore::new();
125///
126/// // Create a small social network
127/// let alice = store.create_node(&["Person"]);
128/// let bob = store.create_node(&["Person"]);
129/// store.create_edge(alice, bob, "KNOWS");
130///
131/// // Traverse outgoing edges
132/// for neighbor in store.neighbors(alice, Direction::Outgoing) {
133///     println!("Alice knows node {:?}", neighbor);
134/// }
135/// ```
136///
137/// # Lock Ordering
138///
139/// `LpgStore` contains multiple `RwLock` fields that must be acquired in a
140/// consistent order to prevent deadlocks. Always acquire locks in this order:
141///
142/// ## Level 1 - Entity Storage (mutually exclusive via feature flag)
143/// 1. `nodes` / `node_versions`
144/// 2. `edges` / `edge_versions`
145///
146/// ## Level 2 - Catalogs (acquire as pairs when writing)
147/// 3. `label_to_id` + `id_to_label`
148/// 4. `edge_type_to_id` + `id_to_edge_type`
149///
150/// ## Level 3 - Indexes
151/// 5. `label_index`
152/// 6. `node_labels`
153/// 7. `property_indexes`
154///
155/// ## Level 4 - Statistics
156/// 8. `statistics`
157///
158/// ## Level 5 - Nested Locks (internal to other structs)
159/// 9. `PropertyStorage::columns` (via `node_properties`/`edge_properties`)
160/// 10. `ChunkedAdjacency::lists` (via `forward_adj`/`backward_adj`)
161///
162/// ## Rules
163/// - Catalog pairs must be acquired together when writing.
164/// - Never hold entity locks while acquiring catalog locks in a different scope.
165/// - Statistics lock is always last.
166/// - Read locks are generally safe, but avoid read-to-write upgrades.
167pub struct LpgStore {
168    /// Configuration.
169    #[allow(dead_code)]
170    config: LpgStoreConfig,
171
172    /// Node records indexed by NodeId, with version chains for MVCC.
173    /// Used when `tiered-storage` feature is disabled.
174    /// Lock order: 1
175    #[cfg(not(feature = "tiered-storage"))]
176    nodes: RwLock<FxHashMap<NodeId, VersionChain<NodeRecord>>>,
177
178    /// Edge records indexed by EdgeId, with version chains for MVCC.
179    /// Used when `tiered-storage` feature is disabled.
180    /// Lock order: 2
181    #[cfg(not(feature = "tiered-storage"))]
182    edges: RwLock<FxHashMap<EdgeId, VersionChain<EdgeRecord>>>,
183
184    // === Tiered Storage Fields (feature-gated) ===
185    //
186    // Lock ordering for arena access:
187    //   version_lock (read/write) → arena read lock (via arena_allocator.arena())
188    //
189    // Rules:
190    // - Acquire arena read lock *after* version locks, never before.
191    // - Multiple threads may call arena.read_at() concurrently (shared refs only).
192    // - Never acquire arena write lock (alloc_new_chunk) while holding version locks.
193    // - freeze_epoch order: node_versions.read() → arena.read_at(),
194    //   then edge_versions.read() → arena.read_at().
195    /// Arena allocator for hot data storage.
196    /// Data is stored in per-epoch arenas for fast allocation and bulk deallocation.
197    #[cfg(feature = "tiered-storage")]
198    arena_allocator: Arc<ArenaAllocator>,
199
200    /// Node version indexes - store metadata and arena offsets.
201    /// The actual NodeRecord data is stored in the arena.
202    /// Lock order: 1
203    #[cfg(feature = "tiered-storage")]
204    node_versions: RwLock<FxHashMap<NodeId, VersionIndex>>,
205
206    /// Edge version indexes - store metadata and arena offsets.
207    /// The actual EdgeRecord data is stored in the arena.
208    /// Lock order: 2
209    #[cfg(feature = "tiered-storage")]
210    edge_versions: RwLock<FxHashMap<EdgeId, VersionIndex>>,
211
212    /// Cold storage for frozen epochs.
213    /// Contains compressed epoch blocks for historical data.
214    #[cfg(feature = "tiered-storage")]
215    epoch_store: Arc<EpochStore>,
216
217    /// Property storage for nodes.
218    node_properties: PropertyStorage<NodeId>,
219
220    /// Property storage for edges.
221    edge_properties: PropertyStorage<EdgeId>,
222
223    /// Label name to ID mapping.
224    /// Lock order: 3 (acquire with id_to_label)
225    label_to_id: RwLock<FxHashMap<ArcStr, u32>>,
226
227    /// Label ID to name mapping.
228    /// Lock order: 3 (acquire with label_to_id)
229    id_to_label: RwLock<Vec<ArcStr>>,
230
231    /// Edge type name to ID mapping.
232    /// Lock order: 4 (acquire with id_to_edge_type)
233    edge_type_to_id: RwLock<FxHashMap<ArcStr, u32>>,
234
235    /// Edge type ID to name mapping.
236    /// Lock order: 4 (acquire with edge_type_to_id)
237    id_to_edge_type: RwLock<Vec<ArcStr>>,
238
239    /// Forward adjacency lists (outgoing edges).
240    forward_adj: ChunkedAdjacency,
241
242    /// Backward adjacency lists (incoming edges).
243    /// Only populated if config.backward_edges is true.
244    backward_adj: Option<ChunkedAdjacency>,
245
246    /// Label index: label_id -> set of node IDs.
247    /// Lock order: 5
248    label_index: RwLock<Vec<FxHashMap<NodeId, ()>>>,
249
250    /// Node labels: node_id -> set of label IDs.
251    /// Reverse mapping to efficiently get labels for a node.
252    /// Lock order: 6
253    node_labels: RwLock<FxHashMap<NodeId, FxHashSet<u32>>>,
254
255    /// Property indexes: property_key -> (value -> set of node IDs).
256    ///
257    /// When a property is indexed, lookups by value are O(1) instead of O(n).
258    /// Use [`create_property_index`] to enable indexing for a property.
259    /// Lock order: 7
260    property_indexes: RwLock<FxHashMap<PropertyKey, DashMap<HashableValue, FxHashSet<NodeId>>>>,
261
262    /// Vector indexes: "label:property" -> HNSW index.
263    ///
264    /// Created via [`GrafeoDB::create_vector_index`](grafeo_engine::GrafeoDB::create_vector_index).
265    /// Lock order: 7 (same level as property_indexes, disjoint keys)
266    #[cfg(feature = "vector-index")]
267    vector_indexes: RwLock<FxHashMap<String, Arc<HnswIndex>>>,
268
269    /// Next node ID.
270    next_node_id: AtomicU64,
271
272    /// Next edge ID.
273    next_edge_id: AtomicU64,
274
275    /// Current epoch.
276    current_epoch: AtomicU64,
277
278    /// Statistics for cost-based optimization.
279    /// Lock order: 8 (always last)
280    statistics: RwLock<Statistics>,
281
282    /// Whether statistics need recomputation after mutations.
283    needs_stats_recompute: AtomicBool,
284}
285
286impl LpgStore {
287    /// Creates a new LPG store with default configuration.
288    #[must_use]
289    pub fn new() -> Self {
290        Self::with_config(LpgStoreConfig::default())
291    }
292
293    /// Creates a new LPG store with custom configuration.
294    #[must_use]
295    pub fn with_config(config: LpgStoreConfig) -> Self {
296        let backward_adj = if config.backward_edges {
297            Some(ChunkedAdjacency::new())
298        } else {
299            None
300        };
301
302        Self {
303            #[cfg(not(feature = "tiered-storage"))]
304            nodes: RwLock::new(FxHashMap::default()),
305            #[cfg(not(feature = "tiered-storage"))]
306            edges: RwLock::new(FxHashMap::default()),
307            #[cfg(feature = "tiered-storage")]
308            arena_allocator: Arc::new(ArenaAllocator::new()),
309            #[cfg(feature = "tiered-storage")]
310            node_versions: RwLock::new(FxHashMap::default()),
311            #[cfg(feature = "tiered-storage")]
312            edge_versions: RwLock::new(FxHashMap::default()),
313            #[cfg(feature = "tiered-storage")]
314            epoch_store: Arc::new(EpochStore::new()),
315            node_properties: PropertyStorage::new(),
316            edge_properties: PropertyStorage::new(),
317            label_to_id: RwLock::new(FxHashMap::default()),
318            id_to_label: RwLock::new(Vec::new()),
319            edge_type_to_id: RwLock::new(FxHashMap::default()),
320            id_to_edge_type: RwLock::new(Vec::new()),
321            forward_adj: ChunkedAdjacency::new(),
322            backward_adj,
323            label_index: RwLock::new(Vec::new()),
324            node_labels: RwLock::new(FxHashMap::default()),
325            property_indexes: RwLock::new(FxHashMap::default()),
326            #[cfg(feature = "vector-index")]
327            vector_indexes: RwLock::new(FxHashMap::default()),
328            next_node_id: AtomicU64::new(0),
329            next_edge_id: AtomicU64::new(0),
330            current_epoch: AtomicU64::new(0),
331            statistics: RwLock::new(Statistics::new()),
332            needs_stats_recompute: AtomicBool::new(true),
333            config,
334        }
335    }
336
337    /// Returns the current epoch.
338    #[must_use]
339    pub fn current_epoch(&self) -> EpochId {
340        EpochId::new(self.current_epoch.load(Ordering::Acquire))
341    }
342
343    /// Creates a new epoch.
344    pub fn new_epoch(&self) -> EpochId {
345        let id = self.current_epoch.fetch_add(1, Ordering::AcqRel) + 1;
346        EpochId::new(id)
347    }
348
349    // === Node Operations ===
350
351    /// Creates a new node with the given labels.
352    ///
353    /// Uses the system transaction for non-transactional operations.
354    pub fn create_node(&self, labels: &[&str]) -> NodeId {
355        self.needs_stats_recompute.store(true, Ordering::Relaxed);
356        self.create_node_versioned(labels, self.current_epoch(), TxId::SYSTEM)
357    }
358
359    /// Creates a new node with the given labels within a transaction context.
360    #[cfg(not(feature = "tiered-storage"))]
361    pub fn create_node_versioned(&self, labels: &[&str], epoch: EpochId, tx_id: TxId) -> NodeId {
362        let id = NodeId::new(self.next_node_id.fetch_add(1, Ordering::Relaxed));
363
364        let mut record = NodeRecord::new(id, epoch);
365        record.set_label_count(labels.len() as u16);
366
367        // Store labels in node_labels map and label_index
368        let mut node_label_set = FxHashSet::default();
369        for label in labels {
370            let label_id = self.get_or_create_label_id(*label);
371            node_label_set.insert(label_id);
372
373            // Update label index
374            let mut index = self.label_index.write();
375            while index.len() <= label_id as usize {
376                index.push(FxHashMap::default());
377            }
378            index[label_id as usize].insert(id, ());
379        }
380
381        // Store node's labels
382        self.node_labels.write().insert(id, node_label_set);
383
384        // Create version chain with initial version
385        let chain = VersionChain::with_initial(record, epoch, tx_id);
386        self.nodes.write().insert(id, chain);
387        id
388    }
389
390    /// Creates a new node with the given labels within a transaction context.
391    /// (Tiered storage version: stores data in arena, metadata in VersionIndex)
392    #[cfg(feature = "tiered-storage")]
393    pub fn create_node_versioned(&self, labels: &[&str], epoch: EpochId, tx_id: TxId) -> NodeId {
394        let id = NodeId::new(self.next_node_id.fetch_add(1, Ordering::Relaxed));
395
396        let mut record = NodeRecord::new(id, epoch);
397        record.set_label_count(labels.len() as u16);
398
399        // Store labels in node_labels map and label_index
400        let mut node_label_set = FxHashSet::default();
401        for label in labels {
402            let label_id = self.get_or_create_label_id(*label);
403            node_label_set.insert(label_id);
404
405            // Update label index
406            let mut index = self.label_index.write();
407            while index.len() <= label_id as usize {
408                index.push(FxHashMap::default());
409            }
410            index[label_id as usize].insert(id, ());
411        }
412
413        // Store node's labels
414        self.node_labels.write().insert(id, node_label_set);
415
416        // Allocate record in arena and get offset (create epoch if needed)
417        let arena = self.arena_allocator.arena_or_create(epoch);
418        let (offset, _stored) = arena.alloc_value_with_offset(record);
419
420        // Create HotVersionRef pointing to arena data
421        let hot_ref = HotVersionRef::new(epoch, offset, tx_id);
422
423        // Create or update version index
424        let mut versions = self.node_versions.write();
425        if let Some(index) = versions.get_mut(&id) {
426            index.add_hot(hot_ref);
427        } else {
428            versions.insert(id, VersionIndex::with_initial(hot_ref));
429        }
430
431        id
432    }
433
434    /// Creates a new node with labels and properties.
435    pub fn create_node_with_props(
436        &self,
437        labels: &[&str],
438        properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
439    ) -> NodeId {
440        self.create_node_with_props_versioned(
441            labels,
442            properties,
443            self.current_epoch(),
444            TxId::SYSTEM,
445        )
446    }
447
448    /// Creates a new node with labels and properties within a transaction context.
449    #[cfg(not(feature = "tiered-storage"))]
450    pub fn create_node_with_props_versioned(
451        &self,
452        labels: &[&str],
453        properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
454        epoch: EpochId,
455        tx_id: TxId,
456    ) -> NodeId {
457        let id = self.create_node_versioned(labels, epoch, tx_id);
458
459        for (key, value) in properties {
460            let prop_key: PropertyKey = key.into();
461            let prop_value: Value = value.into();
462            // Update property index before setting the property
463            self.update_property_index_on_set(id, &prop_key, &prop_value);
464            self.node_properties.set(id, prop_key, prop_value);
465        }
466
467        // Update props_count in record
468        let count = self.node_properties.get_all(id).len() as u16;
469        if let Some(chain) = self.nodes.write().get_mut(&id)
470            && let Some(record) = chain.latest_mut()
471        {
472            record.props_count = count;
473        }
474
475        id
476    }
477
478    /// Creates a new node with labels and properties within a transaction context.
479    /// (Tiered storage version)
480    #[cfg(feature = "tiered-storage")]
481    pub fn create_node_with_props_versioned(
482        &self,
483        labels: &[&str],
484        properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
485        epoch: EpochId,
486        tx_id: TxId,
487    ) -> NodeId {
488        let id = self.create_node_versioned(labels, epoch, tx_id);
489
490        for (key, value) in properties {
491            let prop_key: PropertyKey = key.into();
492            let prop_value: Value = value.into();
493            // Update property index before setting the property
494            self.update_property_index_on_set(id, &prop_key, &prop_value);
495            self.node_properties.set(id, prop_key, prop_value);
496        }
497
498        // Note: props_count in record is not updated for tiered storage.
499        // The record is immutable once allocated in the arena.
500
501        id
502    }
503
504    /// Gets a node by ID (latest visible version).
505    #[must_use]
506    pub fn get_node(&self, id: NodeId) -> Option<Node> {
507        self.get_node_at_epoch(id, self.current_epoch())
508    }
509
510    /// Gets a node by ID at a specific epoch.
511    #[must_use]
512    #[cfg(not(feature = "tiered-storage"))]
513    pub fn get_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> Option<Node> {
514        let nodes = self.nodes.read();
515        let chain = nodes.get(&id)?;
516        let record = chain.visible_at(epoch)?;
517
518        if record.is_deleted() {
519            return None;
520        }
521
522        let mut node = Node::new(id);
523
524        // Get labels from node_labels map
525        let id_to_label = self.id_to_label.read();
526        let node_labels = self.node_labels.read();
527        if let Some(label_ids) = node_labels.get(&id) {
528            for &label_id in label_ids {
529                if let Some(label) = id_to_label.get(label_id as usize) {
530                    node.labels.push(label.clone());
531                }
532            }
533        }
534
535        // Get properties
536        node.properties = self.node_properties.get_all(id).into_iter().collect();
537
538        Some(node)
539    }
540
541    /// Gets a node by ID at a specific epoch.
542    /// (Tiered storage version: reads from arena via VersionIndex)
543    #[must_use]
544    #[cfg(feature = "tiered-storage")]
545    pub fn get_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> Option<Node> {
546        let versions = self.node_versions.read();
547        let index = versions.get(&id)?;
548        let version_ref = index.visible_at(epoch)?;
549
550        // Read the record from arena
551        let record = self.read_node_record(&version_ref)?;
552
553        if record.is_deleted() {
554            return None;
555        }
556
557        let mut node = Node::new(id);
558
559        // Get labels from node_labels map
560        let id_to_label = self.id_to_label.read();
561        let node_labels = self.node_labels.read();
562        if let Some(label_ids) = node_labels.get(&id) {
563            for &label_id in label_ids {
564                if let Some(label) = id_to_label.get(label_id as usize) {
565                    node.labels.push(label.clone());
566                }
567            }
568        }
569
570        // Get properties
571        node.properties = self.node_properties.get_all(id).into_iter().collect();
572
573        Some(node)
574    }
575
576    /// Gets a node visible to a specific transaction.
577    #[must_use]
578    #[cfg(not(feature = "tiered-storage"))]
579    pub fn get_node_versioned(&self, id: NodeId, epoch: EpochId, tx_id: TxId) -> Option<Node> {
580        let nodes = self.nodes.read();
581        let chain = nodes.get(&id)?;
582        let record = chain.visible_to(epoch, tx_id)?;
583
584        if record.is_deleted() {
585            return None;
586        }
587
588        let mut node = Node::new(id);
589
590        // Get labels from node_labels map
591        let id_to_label = self.id_to_label.read();
592        let node_labels = self.node_labels.read();
593        if let Some(label_ids) = node_labels.get(&id) {
594            for &label_id in label_ids {
595                if let Some(label) = id_to_label.get(label_id as usize) {
596                    node.labels.push(label.clone());
597                }
598            }
599        }
600
601        // Get properties
602        node.properties = self.node_properties.get_all(id).into_iter().collect();
603
604        Some(node)
605    }
606
607    /// Gets a node visible to a specific transaction.
608    /// (Tiered storage version: reads from arena via VersionIndex)
609    #[must_use]
610    #[cfg(feature = "tiered-storage")]
611    pub fn get_node_versioned(&self, id: NodeId, epoch: EpochId, tx_id: TxId) -> Option<Node> {
612        let versions = self.node_versions.read();
613        let index = versions.get(&id)?;
614        let version_ref = index.visible_to(epoch, tx_id)?;
615
616        // Read the record from arena
617        let record = self.read_node_record(&version_ref)?;
618
619        if record.is_deleted() {
620            return None;
621        }
622
623        let mut node = Node::new(id);
624
625        // Get labels from node_labels map
626        let id_to_label = self.id_to_label.read();
627        let node_labels = self.node_labels.read();
628        if let Some(label_ids) = node_labels.get(&id) {
629            for &label_id in label_ids {
630                if let Some(label) = id_to_label.get(label_id as usize) {
631                    node.labels.push(label.clone());
632                }
633            }
634        }
635
636        // Get properties
637        node.properties = self.node_properties.get_all(id).into_iter().collect();
638
639        Some(node)
640    }
641
642    /// Reads a NodeRecord from arena (hot) or epoch store (cold) using a VersionRef.
643    #[cfg(feature = "tiered-storage")]
644    #[allow(unsafe_code)]
645    fn read_node_record(&self, version_ref: &VersionRef) -> Option<NodeRecord> {
646        match version_ref {
647            VersionRef::Hot(hot_ref) => {
648                let arena = self.arena_allocator.arena(hot_ref.epoch);
649                // SAFETY: The offset was returned by alloc_value_with_offset for a NodeRecord
650                let record: &NodeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
651                Some(*record)
652            }
653            VersionRef::Cold(cold_ref) => {
654                // Read from compressed epoch store
655                self.epoch_store
656                    .get_node(cold_ref.epoch, cold_ref.block_offset, cold_ref.length)
657            }
658        }
659    }
660
661    /// Deletes a node and all its edges (using latest epoch).
662    pub fn delete_node(&self, id: NodeId) -> bool {
663        self.needs_stats_recompute.store(true, Ordering::Relaxed);
664        self.delete_node_at_epoch(id, self.current_epoch())
665    }
666
667    /// Deletes a node at a specific epoch.
668    #[cfg(not(feature = "tiered-storage"))]
669    pub fn delete_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> bool {
670        let mut nodes = self.nodes.write();
671        if let Some(chain) = nodes.get_mut(&id) {
672            // Check if visible at this epoch (not already deleted)
673            if let Some(record) = chain.visible_at(epoch) {
674                if record.is_deleted() {
675                    return false;
676                }
677            } else {
678                // Not visible at this epoch (already deleted or doesn't exist)
679                return false;
680            }
681
682            // Mark the version chain as deleted at this epoch
683            chain.mark_deleted(epoch);
684
685            // Remove from label index using node_labels map
686            let mut index = self.label_index.write();
687            let mut node_labels = self.node_labels.write();
688            if let Some(label_ids) = node_labels.remove(&id) {
689                for label_id in label_ids {
690                    if let Some(set) = index.get_mut(label_id as usize) {
691                        set.remove(&id);
692                    }
693                }
694            }
695
696            // Remove properties
697            drop(nodes); // Release lock before removing properties
698            drop(index);
699            drop(node_labels);
700            self.node_properties.remove_all(id);
701
702            // Note: Caller should use delete_node_edges() first if detach is needed
703
704            true
705        } else {
706            false
707        }
708    }
709
710    /// Deletes a node at a specific epoch.
711    /// (Tiered storage version)
712    #[cfg(feature = "tiered-storage")]
713    pub fn delete_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> bool {
714        let mut versions = self.node_versions.write();
715        if let Some(index) = versions.get_mut(&id) {
716            // Check if visible at this epoch
717            if let Some(version_ref) = index.visible_at(epoch) {
718                if let Some(record) = self.read_node_record(&version_ref) {
719                    if record.is_deleted() {
720                        return false;
721                    }
722                } else {
723                    return false;
724                }
725            } else {
726                return false;
727            }
728
729            // Mark as deleted in version index
730            index.mark_deleted(epoch);
731
732            // Remove from label index using node_labels map
733            let mut label_index = self.label_index.write();
734            let mut node_labels = self.node_labels.write();
735            if let Some(label_ids) = node_labels.remove(&id) {
736                for label_id in label_ids {
737                    if let Some(set) = label_index.get_mut(label_id as usize) {
738                        set.remove(&id);
739                    }
740                }
741            }
742
743            // Remove properties
744            drop(versions);
745            drop(label_index);
746            drop(node_labels);
747            self.node_properties.remove_all(id);
748
749            true
750        } else {
751            false
752        }
753    }
754
755    /// Deletes all edges connected to a node (implements DETACH DELETE).
756    ///
757    /// Call this before `delete_node()` if you want to remove a node that
758    /// has edges. Grafeo doesn't auto-delete edges - you have to be explicit.
759    #[cfg(not(feature = "tiered-storage"))]
760    pub fn delete_node_edges(&self, node_id: NodeId) {
761        // Get outgoing edges
762        let outgoing: Vec<EdgeId> = self
763            .forward_adj
764            .edges_from(node_id)
765            .into_iter()
766            .map(|(_, edge_id)| edge_id)
767            .collect();
768
769        // Get incoming edges
770        let incoming: Vec<EdgeId> = if let Some(ref backward) = self.backward_adj {
771            backward
772                .edges_from(node_id)
773                .into_iter()
774                .map(|(_, edge_id)| edge_id)
775                .collect()
776        } else {
777            // No backward adjacency - scan all edges
778            let epoch = self.current_epoch();
779            self.edges
780                .read()
781                .iter()
782                .filter_map(|(id, chain)| {
783                    chain.visible_at(epoch).and_then(|r| {
784                        if !r.is_deleted() && r.dst == node_id {
785                            Some(*id)
786                        } else {
787                            None
788                        }
789                    })
790                })
791                .collect()
792        };
793
794        // Delete all edges
795        for edge_id in outgoing.into_iter().chain(incoming) {
796            self.delete_edge(edge_id);
797        }
798    }
799
800    /// Deletes all edges connected to a node (implements DETACH DELETE).
801    /// (Tiered storage version)
802    #[cfg(feature = "tiered-storage")]
803    pub fn delete_node_edges(&self, node_id: NodeId) {
804        // Get outgoing edges
805        let outgoing: Vec<EdgeId> = self
806            .forward_adj
807            .edges_from(node_id)
808            .into_iter()
809            .map(|(_, edge_id)| edge_id)
810            .collect();
811
812        // Get incoming edges
813        let incoming: Vec<EdgeId> = if let Some(ref backward) = self.backward_adj {
814            backward
815                .edges_from(node_id)
816                .into_iter()
817                .map(|(_, edge_id)| edge_id)
818                .collect()
819        } else {
820            // No backward adjacency - scan all edges
821            let epoch = self.current_epoch();
822            let versions = self.edge_versions.read();
823            versions
824                .iter()
825                .filter_map(|(id, index)| {
826                    index.visible_at(epoch).and_then(|vref| {
827                        self.read_edge_record(&vref).and_then(|r| {
828                            if !r.is_deleted() && r.dst == node_id {
829                                Some(*id)
830                            } else {
831                                None
832                            }
833                        })
834                    })
835                })
836                .collect()
837        };
838
839        // Delete all edges
840        for edge_id in outgoing.into_iter().chain(incoming) {
841            self.delete_edge(edge_id);
842        }
843    }
844
845    /// Sets a property on a node.
846    #[cfg(not(feature = "tiered-storage"))]
847    pub fn set_node_property(&self, id: NodeId, key: &str, value: Value) {
848        let prop_key: PropertyKey = key.into();
849
850        // Update property index before setting the property (needs to read old value)
851        self.update_property_index_on_set(id, &prop_key, &value);
852
853        self.node_properties.set(id, prop_key, value);
854
855        // Update props_count in record
856        let count = self.node_properties.get_all(id).len() as u16;
857        if let Some(chain) = self.nodes.write().get_mut(&id)
858            && let Some(record) = chain.latest_mut()
859        {
860            record.props_count = count;
861        }
862    }
863
864    /// Sets a property on a node.
865    /// (Tiered storage version: properties stored separately, record is immutable)
866    #[cfg(feature = "tiered-storage")]
867    pub fn set_node_property(&self, id: NodeId, key: &str, value: Value) {
868        let prop_key: PropertyKey = key.into();
869
870        // Update property index before setting the property (needs to read old value)
871        self.update_property_index_on_set(id, &prop_key, &value);
872
873        self.node_properties.set(id, prop_key, value);
874        // Note: props_count in record is not updated for tiered storage.
875        // The record is immutable once allocated in the arena.
876        // Property count can be derived from PropertyStorage if needed.
877    }
878
879    /// Sets a property on an edge.
880    pub fn set_edge_property(&self, id: EdgeId, key: &str, value: Value) {
881        self.edge_properties.set(id, key.into(), value);
882    }
883
884    /// Removes a property from a node.
885    ///
886    /// Returns the previous value if it existed, or None if the property didn't exist.
887    #[cfg(not(feature = "tiered-storage"))]
888    pub fn remove_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
889        let prop_key: PropertyKey = key.into();
890
891        // Update property index before removing (needs to read old value)
892        self.update_property_index_on_remove(id, &prop_key);
893
894        let result = self.node_properties.remove(id, &prop_key);
895
896        // Update props_count in record
897        let count = self.node_properties.get_all(id).len() as u16;
898        if let Some(chain) = self.nodes.write().get_mut(&id)
899            && let Some(record) = chain.latest_mut()
900        {
901            record.props_count = count;
902        }
903
904        result
905    }
906
907    /// Removes a property from a node.
908    /// (Tiered storage version)
909    #[cfg(feature = "tiered-storage")]
910    pub fn remove_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
911        let prop_key: PropertyKey = key.into();
912
913        // Update property index before removing (needs to read old value)
914        self.update_property_index_on_remove(id, &prop_key);
915
916        self.node_properties.remove(id, &prop_key)
917        // Note: props_count in record is not updated for tiered storage.
918    }
919
920    /// Removes a property from an edge.
921    ///
922    /// Returns the previous value if it existed, or None if the property didn't exist.
923    pub fn remove_edge_property(&self, id: EdgeId, key: &str) -> Option<Value> {
924        self.edge_properties.remove(id, &key.into())
925    }
926
927    /// Gets a single property from a node without loading all properties.
928    ///
929    /// This is O(1) vs O(properties) for `get_node().get_property()`.
930    /// Use this for filter predicates where you only need one property value.
931    ///
932    /// # Example
933    ///
934    /// ```ignore
935    /// // Fast: Direct single-property lookup
936    /// let age = store.get_node_property(node_id, "age");
937    ///
938    /// // Slow: Loads all properties, then extracts one
939    /// let age = store.get_node(node_id).and_then(|n| n.get_property("age").cloned());
940    /// ```
941    #[must_use]
942    pub fn get_node_property(&self, id: NodeId, key: &PropertyKey) -> Option<Value> {
943        self.node_properties.get(id, key)
944    }
945
946    /// Gets a single property from an edge without loading all properties.
947    ///
948    /// This is O(1) vs O(properties) for `get_edge().get_property()`.
949    #[must_use]
950    pub fn get_edge_property(&self, id: EdgeId, key: &PropertyKey) -> Option<Value> {
951        self.edge_properties.get(id, key)
952    }
953
954    // === Batch Property Operations ===
955
956    /// Gets a property for multiple nodes in a single batch operation.
957    ///
958    /// More efficient than calling [`Self::get_node_property`] in a loop because it
959    /// reduces lock overhead and enables better cache utilization.
960    ///
961    /// # Example
962    ///
963    /// ```
964    /// use grafeo_core::graph::lpg::LpgStore;
965    /// use grafeo_common::types::{NodeId, PropertyKey, Value};
966    ///
967    /// let store = LpgStore::new();
968    /// let n1 = store.create_node(&["Person"]);
969    /// let n2 = store.create_node(&["Person"]);
970    /// store.set_node_property(n1, "age", Value::from(25i64));
971    /// store.set_node_property(n2, "age", Value::from(30i64));
972    ///
973    /// let ages = store.get_node_property_batch(&[n1, n2], &PropertyKey::new("age"));
974    /// assert_eq!(ages, vec![Some(Value::from(25i64)), Some(Value::from(30i64))]);
975    /// ```
976    #[must_use]
977    pub fn get_node_property_batch(&self, ids: &[NodeId], key: &PropertyKey) -> Vec<Option<Value>> {
978        self.node_properties.get_batch(ids, key)
979    }
980
981    /// Gets all properties for multiple nodes in a single batch operation.
982    ///
983    /// Returns a vector of property maps, one per node ID (empty map if no properties).
984    /// More efficient than calling [`Self::get_node`] in a loop.
985    #[must_use]
986    pub fn get_nodes_properties_batch(&self, ids: &[NodeId]) -> Vec<FxHashMap<PropertyKey, Value>> {
987        self.node_properties.get_all_batch(ids)
988    }
989
990    /// Gets selected properties for multiple nodes (projection pushdown).
991    ///
992    /// This is more efficient than [`Self::get_nodes_properties_batch`] when you only
993    /// need a subset of properties. It only iterates the requested columns instead of
994    /// all columns.
995    ///
996    /// **Use this for**: Queries with explicit projections like `RETURN n.name, n.age`
997    /// instead of `RETURN n` (which requires all properties).
998    ///
999    /// # Example
1000    ///
1001    /// ```
1002    /// use grafeo_core::graph::lpg::LpgStore;
1003    /// use grafeo_common::types::{PropertyKey, Value};
1004    ///
1005    /// let store = LpgStore::new();
1006    /// let n1 = store.create_node(&["Person"]);
1007    /// store.set_node_property(n1, "name", Value::from("Alice"));
1008    /// store.set_node_property(n1, "age", Value::from(30i64));
1009    /// store.set_node_property(n1, "email", Value::from("alice@example.com"));
1010    ///
1011    /// // Only fetch name and age (faster than get_nodes_properties_batch)
1012    /// let keys = vec![PropertyKey::new("name"), PropertyKey::new("age")];
1013    /// let props = store.get_nodes_properties_selective_batch(&[n1], &keys);
1014    ///
1015    /// assert_eq!(props[0].len(), 2); // Only name and age, not email
1016    /// ```
1017    #[must_use]
1018    pub fn get_nodes_properties_selective_batch(
1019        &self,
1020        ids: &[NodeId],
1021        keys: &[PropertyKey],
1022    ) -> Vec<FxHashMap<PropertyKey, Value>> {
1023        self.node_properties.get_selective_batch(ids, keys)
1024    }
1025
1026    /// Gets selected properties for multiple edges (projection pushdown).
1027    ///
1028    /// Edge-property version of [`Self::get_nodes_properties_selective_batch`].
1029    #[must_use]
1030    pub fn get_edges_properties_selective_batch(
1031        &self,
1032        ids: &[EdgeId],
1033        keys: &[PropertyKey],
1034    ) -> Vec<FxHashMap<PropertyKey, Value>> {
1035        self.edge_properties.get_selective_batch(ids, keys)
1036    }
1037
1038    /// Finds nodes where a property value is in a range.
1039    ///
1040    /// This is useful for queries like `n.age > 30` or `n.price BETWEEN 10 AND 100`.
1041    /// Uses zone maps to skip scanning when the range definitely doesn't match.
1042    ///
1043    /// # Arguments
1044    ///
1045    /// * `property` - The property to check
1046    /// * `min` - Optional lower bound (None for unbounded)
1047    /// * `max` - Optional upper bound (None for unbounded)
1048    /// * `min_inclusive` - Whether lower bound is inclusive (>= vs >)
1049    /// * `max_inclusive` - Whether upper bound is inclusive (<= vs <)
1050    ///
1051    /// # Example
1052    ///
1053    /// ```
1054    /// use grafeo_core::graph::lpg::LpgStore;
1055    /// use grafeo_common::types::Value;
1056    ///
1057    /// let store = LpgStore::new();
1058    /// let n1 = store.create_node(&["Person"]);
1059    /// let n2 = store.create_node(&["Person"]);
1060    /// store.set_node_property(n1, "age", Value::from(25i64));
1061    /// store.set_node_property(n2, "age", Value::from(35i64));
1062    ///
1063    /// // Find nodes where age > 30
1064    /// let result = store.find_nodes_in_range(
1065    ///     "age",
1066    ///     Some(&Value::from(30i64)),
1067    ///     None,
1068    ///     false, // exclusive lower bound
1069    ///     true,  // inclusive upper bound (doesn't matter since None)
1070    /// );
1071    /// assert_eq!(result.len(), 1); // Only n2 matches
1072    /// ```
1073    #[must_use]
1074    pub fn find_nodes_in_range(
1075        &self,
1076        property: &str,
1077        min: Option<&Value>,
1078        max: Option<&Value>,
1079        min_inclusive: bool,
1080        max_inclusive: bool,
1081    ) -> Vec<NodeId> {
1082        let key = PropertyKey::new(property);
1083
1084        // Check zone map first - if no values could match, return empty
1085        if !self
1086            .node_properties
1087            .might_match_range(&key, min, max, min_inclusive, max_inclusive)
1088        {
1089            return Vec::new();
1090        }
1091
1092        // Scan all nodes and filter by range
1093        self.node_ids()
1094            .into_iter()
1095            .filter(|&node_id| {
1096                self.node_properties
1097                    .get(node_id, &key)
1098                    .is_some_and(|v| value_in_range(&v, min, max, min_inclusive, max_inclusive))
1099            })
1100            .collect()
1101    }
1102
1103    /// Finds nodes matching multiple property equality conditions.
1104    ///
1105    /// This is more efficient than intersecting multiple single-property lookups
1106    /// because it can use indexes when available and short-circuits on the first
1107    /// miss.
1108    ///
1109    /// # Example
1110    ///
1111    /// ```
1112    /// use grafeo_core::graph::lpg::LpgStore;
1113    /// use grafeo_common::types::Value;
1114    ///
1115    /// let store = LpgStore::new();
1116    /// let alice = store.create_node(&["Person"]);
1117    /// store.set_node_property(alice, "name", Value::from("Alice"));
1118    /// store.set_node_property(alice, "city", Value::from("NYC"));
1119    ///
1120    /// // Find nodes where name = "Alice" AND city = "NYC"
1121    /// let matches = store.find_nodes_by_properties(&[
1122    ///     ("name", Value::from("Alice")),
1123    ///     ("city", Value::from("NYC")),
1124    /// ]);
1125    /// assert!(matches.contains(&alice));
1126    /// ```
1127    #[must_use]
1128    pub fn find_nodes_by_properties(&self, conditions: &[(&str, Value)]) -> Vec<NodeId> {
1129        if conditions.is_empty() {
1130            return self.node_ids();
1131        }
1132
1133        // Find the most selective condition (smallest result set) to start
1134        // If any condition has an index, use that first
1135        let mut best_start: Option<(usize, Vec<NodeId>)> = None;
1136        let indexes = self.property_indexes.read();
1137
1138        for (i, (prop, value)) in conditions.iter().enumerate() {
1139            let key = PropertyKey::new(*prop);
1140            let hv = HashableValue::new(value.clone());
1141
1142            if let Some(index) = indexes.get(&key) {
1143                let matches: Vec<NodeId> = index
1144                    .get(&hv)
1145                    .map(|nodes| nodes.iter().copied().collect())
1146                    .unwrap_or_default();
1147
1148                // Short-circuit if any indexed condition has no matches
1149                if matches.is_empty() {
1150                    return Vec::new();
1151                }
1152
1153                // Use smallest indexed result as starting point
1154                if best_start
1155                    .as_ref()
1156                    .is_none_or(|(_, best)| matches.len() < best.len())
1157                {
1158                    best_start = Some((i, matches));
1159                }
1160            }
1161        }
1162        drop(indexes);
1163
1164        // Start from best indexed result or fall back to full node scan
1165        let (start_idx, mut candidates) = best_start.unwrap_or_else(|| {
1166            // No indexes available, start with first condition via full scan
1167            let (prop, value) = &conditions[0];
1168            (0, self.find_nodes_by_property(prop, value))
1169        });
1170
1171        // Filter candidates through remaining conditions
1172        for (i, (prop, value)) in conditions.iter().enumerate() {
1173            if i == start_idx {
1174                continue;
1175            }
1176
1177            let key = PropertyKey::new(*prop);
1178            candidates.retain(|&node_id| {
1179                self.node_properties
1180                    .get(node_id, &key)
1181                    .is_some_and(|v| v == *value)
1182            });
1183
1184            // Short-circuit if no candidates remain
1185            if candidates.is_empty() {
1186                return Vec::new();
1187            }
1188        }
1189
1190        candidates
1191    }
1192
1193    // === Property Index Operations ===
1194
1195    /// Creates an index on a node property for O(1) lookups by value.
1196    ///
1197    /// After creating an index, calls to [`Self::find_nodes_by_property`] will be
1198    /// O(1) instead of O(n) for this property. The index is automatically
1199    /// maintained when properties are set or removed.
1200    ///
1201    /// # Example
1202    ///
1203    /// ```
1204    /// use grafeo_core::graph::lpg::LpgStore;
1205    /// use grafeo_common::types::Value;
1206    ///
1207    /// let store = LpgStore::new();
1208    ///
1209    /// // Create nodes with an 'id' property
1210    /// let alice = store.create_node(&["Person"]);
1211    /// store.set_node_property(alice, "id", Value::from("alice_123"));
1212    ///
1213    /// // Create an index on the 'id' property
1214    /// store.create_property_index("id");
1215    ///
1216    /// // Now lookups by 'id' are O(1)
1217    /// let found = store.find_nodes_by_property("id", &Value::from("alice_123"));
1218    /// assert!(found.contains(&alice));
1219    /// ```
1220    pub fn create_property_index(&self, property: &str) {
1221        let key = PropertyKey::new(property);
1222
1223        let mut indexes = self.property_indexes.write();
1224        if indexes.contains_key(&key) {
1225            return; // Already indexed
1226        }
1227
1228        // Create the index and populate it with existing data
1229        let index: DashMap<HashableValue, FxHashSet<NodeId>> = DashMap::new();
1230
1231        // Scan all nodes to build the index
1232        for node_id in self.node_ids() {
1233            if let Some(value) = self.node_properties.get(node_id, &key) {
1234                let hv = HashableValue::new(value);
1235                index.entry(hv).or_default().insert(node_id);
1236            }
1237        }
1238
1239        indexes.insert(key, index);
1240    }
1241
1242    /// Drops an index on a node property.
1243    ///
1244    /// Returns `true` if the index existed and was removed.
1245    pub fn drop_property_index(&self, property: &str) -> bool {
1246        let key = PropertyKey::new(property);
1247        self.property_indexes.write().remove(&key).is_some()
1248    }
1249
1250    /// Returns `true` if the property has an index.
1251    #[must_use]
1252    pub fn has_property_index(&self, property: &str) -> bool {
1253        let key = PropertyKey::new(property);
1254        self.property_indexes.read().contains_key(&key)
1255    }
1256
1257    /// Stores a vector index for a label+property pair.
1258    #[cfg(feature = "vector-index")]
1259    pub fn add_vector_index(&self, label: &str, property: &str, index: Arc<HnswIndex>) {
1260        let key = format!("{label}:{property}");
1261        self.vector_indexes.write().insert(key, index);
1262    }
1263
1264    /// Retrieves the vector index for a label+property pair.
1265    #[cfg(feature = "vector-index")]
1266    #[must_use]
1267    pub fn get_vector_index(&self, label: &str, property: &str) -> Option<Arc<HnswIndex>> {
1268        let key = format!("{label}:{property}");
1269        self.vector_indexes.read().get(&key).cloned()
1270    }
1271
1272    /// Removes a vector index for a label+property pair.
1273    ///
1274    /// Returns `true` if the index existed and was removed.
1275    #[cfg(feature = "vector-index")]
1276    pub fn remove_vector_index(&self, label: &str, property: &str) -> bool {
1277        let key = format!("{label}:{property}");
1278        self.vector_indexes.write().remove(&key).is_some()
1279    }
1280
1281    /// Returns all vector index entries as `(key, index)` pairs.
1282    ///
1283    /// Keys are in `"label:property"` format.
1284    #[cfg(feature = "vector-index")]
1285    #[must_use]
1286    pub fn vector_index_entries(&self) -> Vec<(String, Arc<HnswIndex>)> {
1287        self.vector_indexes
1288            .read()
1289            .iter()
1290            .map(|(k, v)| (k.clone(), v.clone()))
1291            .collect()
1292    }
1293
1294    /// Finds all nodes that have a specific property value.
1295    ///
1296    /// If the property is indexed, this is O(1). Otherwise, it scans all nodes
1297    /// which is O(n). Use [`Self::create_property_index`] for frequently queried properties.
1298    ///
1299    /// # Example
1300    ///
1301    /// ```
1302    /// use grafeo_core::graph::lpg::LpgStore;
1303    /// use grafeo_common::types::Value;
1304    ///
1305    /// let store = LpgStore::new();
1306    /// store.create_property_index("city"); // Optional but makes lookups fast
1307    ///
1308    /// let alice = store.create_node(&["Person"]);
1309    /// let bob = store.create_node(&["Person"]);
1310    /// store.set_node_property(alice, "city", Value::from("NYC"));
1311    /// store.set_node_property(bob, "city", Value::from("NYC"));
1312    ///
1313    /// let nyc_people = store.find_nodes_by_property("city", &Value::from("NYC"));
1314    /// assert_eq!(nyc_people.len(), 2);
1315    /// ```
1316    #[must_use]
1317    pub fn find_nodes_by_property(&self, property: &str, value: &Value) -> Vec<NodeId> {
1318        let key = PropertyKey::new(property);
1319        let hv = HashableValue::new(value.clone());
1320
1321        // Try indexed lookup first
1322        let indexes = self.property_indexes.read();
1323        if let Some(index) = indexes.get(&key) {
1324            if let Some(nodes) = index.get(&hv) {
1325                return nodes.iter().copied().collect();
1326            }
1327            return Vec::new();
1328        }
1329        drop(indexes);
1330
1331        // Fall back to full scan
1332        self.node_ids()
1333            .into_iter()
1334            .filter(|&node_id| {
1335                self.node_properties
1336                    .get(node_id, &key)
1337                    .is_some_and(|v| v == *value)
1338            })
1339            .collect()
1340    }
1341
1342    /// Updates property indexes when a property is set.
1343    fn update_property_index_on_set(&self, node_id: NodeId, key: &PropertyKey, new_value: &Value) {
1344        let indexes = self.property_indexes.read();
1345        if let Some(index) = indexes.get(key) {
1346            // Get old value to remove from index
1347            if let Some(old_value) = self.node_properties.get(node_id, key) {
1348                let old_hv = HashableValue::new(old_value);
1349                if let Some(mut nodes) = index.get_mut(&old_hv) {
1350                    nodes.remove(&node_id);
1351                    if nodes.is_empty() {
1352                        drop(nodes);
1353                        index.remove(&old_hv);
1354                    }
1355                }
1356            }
1357
1358            // Add new value to index
1359            let new_hv = HashableValue::new(new_value.clone());
1360            index
1361                .entry(new_hv)
1362                .or_insert_with(FxHashSet::default)
1363                .insert(node_id);
1364        }
1365    }
1366
1367    /// Updates property indexes when a property is removed.
1368    fn update_property_index_on_remove(&self, node_id: NodeId, key: &PropertyKey) {
1369        let indexes = self.property_indexes.read();
1370        if let Some(index) = indexes.get(key) {
1371            // Get old value to remove from index
1372            if let Some(old_value) = self.node_properties.get(node_id, key) {
1373                let old_hv = HashableValue::new(old_value);
1374                if let Some(mut nodes) = index.get_mut(&old_hv) {
1375                    nodes.remove(&node_id);
1376                    if nodes.is_empty() {
1377                        drop(nodes);
1378                        index.remove(&old_hv);
1379                    }
1380                }
1381            }
1382        }
1383    }
1384
1385    /// Adds a label to a node.
1386    ///
1387    /// Returns true if the label was added, false if the node doesn't exist
1388    /// or already has the label.
1389    #[cfg(not(feature = "tiered-storage"))]
1390    pub fn add_label(&self, node_id: NodeId, label: &str) -> bool {
1391        let epoch = self.current_epoch();
1392
1393        // Check if node exists
1394        let nodes = self.nodes.read();
1395        if let Some(chain) = nodes.get(&node_id) {
1396            if chain.visible_at(epoch).map_or(true, |r| r.is_deleted()) {
1397                return false;
1398            }
1399        } else {
1400            return false;
1401        }
1402        drop(nodes);
1403
1404        // Get or create label ID
1405        let label_id = self.get_or_create_label_id(label);
1406
1407        // Add to node_labels map
1408        let mut node_labels = self.node_labels.write();
1409        let label_set = node_labels.entry(node_id).or_default();
1410
1411        if label_set.contains(&label_id) {
1412            return false; // Already has this label
1413        }
1414
1415        label_set.insert(label_id);
1416        drop(node_labels);
1417
1418        // Add to label_index
1419        let mut index = self.label_index.write();
1420        if (label_id as usize) >= index.len() {
1421            index.resize(label_id as usize + 1, FxHashMap::default());
1422        }
1423        index[label_id as usize].insert(node_id, ());
1424
1425        // Update label count in node record
1426        if let Some(chain) = self.nodes.write().get_mut(&node_id)
1427            && let Some(record) = chain.latest_mut()
1428        {
1429            let count = self.node_labels.read().get(&node_id).map_or(0, |s| s.len());
1430            record.set_label_count(count as u16);
1431        }
1432
1433        true
1434    }
1435
1436    /// Adds a label to a node.
1437    /// (Tiered storage version)
1438    #[cfg(feature = "tiered-storage")]
1439    pub fn add_label(&self, node_id: NodeId, label: &str) -> bool {
1440        let epoch = self.current_epoch();
1441
1442        // Check if node exists
1443        let versions = self.node_versions.read();
1444        if let Some(index) = versions.get(&node_id) {
1445            if let Some(vref) = index.visible_at(epoch) {
1446                if let Some(record) = self.read_node_record(&vref) {
1447                    if record.is_deleted() {
1448                        return false;
1449                    }
1450                } else {
1451                    return false;
1452                }
1453            } else {
1454                return false;
1455            }
1456        } else {
1457            return false;
1458        }
1459        drop(versions);
1460
1461        // Get or create label ID
1462        let label_id = self.get_or_create_label_id(label);
1463
1464        // Add to node_labels map
1465        let mut node_labels = self.node_labels.write();
1466        let label_set = node_labels.entry(node_id).or_default();
1467
1468        if label_set.contains(&label_id) {
1469            return false; // Already has this label
1470        }
1471
1472        label_set.insert(label_id);
1473        drop(node_labels);
1474
1475        // Add to label_index
1476        let mut index = self.label_index.write();
1477        if (label_id as usize) >= index.len() {
1478            index.resize(label_id as usize + 1, FxHashMap::default());
1479        }
1480        index[label_id as usize].insert(node_id, ());
1481
1482        // Note: label_count in record is not updated for tiered storage.
1483        // The record is immutable once allocated in the arena.
1484
1485        true
1486    }
1487
1488    /// Removes a label from a node.
1489    ///
1490    /// Returns true if the label was removed, false if the node doesn't exist
1491    /// or doesn't have the label.
1492    #[cfg(not(feature = "tiered-storage"))]
1493    pub fn remove_label(&self, node_id: NodeId, label: &str) -> bool {
1494        let epoch = self.current_epoch();
1495
1496        // Check if node exists
1497        let nodes = self.nodes.read();
1498        if let Some(chain) = nodes.get(&node_id) {
1499            if chain.visible_at(epoch).map_or(true, |r| r.is_deleted()) {
1500                return false;
1501            }
1502        } else {
1503            return false;
1504        }
1505        drop(nodes);
1506
1507        // Get label ID
1508        let label_id = {
1509            let label_ids = self.label_to_id.read();
1510            match label_ids.get(label) {
1511                Some(&id) => id,
1512                None => return false, // Label doesn't exist
1513            }
1514        };
1515
1516        // Remove from node_labels map
1517        let mut node_labels = self.node_labels.write();
1518        if let Some(label_set) = node_labels.get_mut(&node_id) {
1519            if !label_set.remove(&label_id) {
1520                return false; // Node doesn't have this label
1521            }
1522        } else {
1523            return false;
1524        }
1525        drop(node_labels);
1526
1527        // Remove from label_index
1528        let mut index = self.label_index.write();
1529        if (label_id as usize) < index.len() {
1530            index[label_id as usize].remove(&node_id);
1531        }
1532
1533        // Update label count in node record
1534        if let Some(chain) = self.nodes.write().get_mut(&node_id)
1535            && let Some(record) = chain.latest_mut()
1536        {
1537            let count = self.node_labels.read().get(&node_id).map_or(0, |s| s.len());
1538            record.set_label_count(count as u16);
1539        }
1540
1541        true
1542    }
1543
1544    /// Removes a label from a node.
1545    /// (Tiered storage version)
1546    #[cfg(feature = "tiered-storage")]
1547    pub fn remove_label(&self, node_id: NodeId, label: &str) -> bool {
1548        let epoch = self.current_epoch();
1549
1550        // Check if node exists
1551        let versions = self.node_versions.read();
1552        if let Some(index) = versions.get(&node_id) {
1553            if let Some(vref) = index.visible_at(epoch) {
1554                if let Some(record) = self.read_node_record(&vref) {
1555                    if record.is_deleted() {
1556                        return false;
1557                    }
1558                } else {
1559                    return false;
1560                }
1561            } else {
1562                return false;
1563            }
1564        } else {
1565            return false;
1566        }
1567        drop(versions);
1568
1569        // Get label ID
1570        let label_id = {
1571            let label_ids = self.label_to_id.read();
1572            match label_ids.get(label) {
1573                Some(&id) => id,
1574                None => return false, // Label doesn't exist
1575            }
1576        };
1577
1578        // Remove from node_labels map
1579        let mut node_labels = self.node_labels.write();
1580        if let Some(label_set) = node_labels.get_mut(&node_id) {
1581            if !label_set.remove(&label_id) {
1582                return false; // Node doesn't have this label
1583            }
1584        } else {
1585            return false;
1586        }
1587        drop(node_labels);
1588
1589        // Remove from label_index
1590        let mut index = self.label_index.write();
1591        if (label_id as usize) < index.len() {
1592            index[label_id as usize].remove(&node_id);
1593        }
1594
1595        // Note: label_count in record is not updated for tiered storage.
1596
1597        true
1598    }
1599
1600    /// Returns the number of nodes (non-deleted at current epoch).
1601    #[must_use]
1602    #[cfg(not(feature = "tiered-storage"))]
1603    pub fn node_count(&self) -> usize {
1604        let epoch = self.current_epoch();
1605        self.nodes
1606            .read()
1607            .values()
1608            .filter_map(|chain| chain.visible_at(epoch))
1609            .filter(|r| !r.is_deleted())
1610            .count()
1611    }
1612
1613    /// Returns the number of nodes (non-deleted at current epoch).
1614    /// (Tiered storage version)
1615    #[must_use]
1616    #[cfg(feature = "tiered-storage")]
1617    pub fn node_count(&self) -> usize {
1618        let epoch = self.current_epoch();
1619        let versions = self.node_versions.read();
1620        versions
1621            .iter()
1622            .filter(|(_, index)| {
1623                index.visible_at(epoch).map_or(false, |vref| {
1624                    self.read_node_record(&vref)
1625                        .map_or(false, |r| !r.is_deleted())
1626                })
1627            })
1628            .count()
1629    }
1630
1631    /// Returns all node IDs in the store.
1632    ///
1633    /// This returns a snapshot of current node IDs. The returned vector
1634    /// excludes deleted nodes. Results are sorted by NodeId for deterministic
1635    /// iteration order.
1636    #[must_use]
1637    #[cfg(not(feature = "tiered-storage"))]
1638    pub fn node_ids(&self) -> Vec<NodeId> {
1639        let epoch = self.current_epoch();
1640        let mut ids: Vec<NodeId> = self
1641            .nodes
1642            .read()
1643            .iter()
1644            .filter_map(|(id, chain)| {
1645                chain
1646                    .visible_at(epoch)
1647                    .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
1648            })
1649            .collect();
1650        ids.sort_unstable();
1651        ids
1652    }
1653
1654    /// Returns all node IDs in the store.
1655    /// (Tiered storage version)
1656    #[must_use]
1657    #[cfg(feature = "tiered-storage")]
1658    pub fn node_ids(&self) -> Vec<NodeId> {
1659        let epoch = self.current_epoch();
1660        let versions = self.node_versions.read();
1661        let mut ids: Vec<NodeId> = versions
1662            .iter()
1663            .filter_map(|(id, index)| {
1664                index.visible_at(epoch).and_then(|vref| {
1665                    self.read_node_record(&vref)
1666                        .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
1667                })
1668            })
1669            .collect();
1670        ids.sort_unstable();
1671        ids
1672    }
1673
1674    // === Edge Operations ===
1675
1676    /// Creates a new edge.
1677    pub fn create_edge(&self, src: NodeId, dst: NodeId, edge_type: &str) -> EdgeId {
1678        self.needs_stats_recompute.store(true, Ordering::Relaxed);
1679        self.create_edge_versioned(src, dst, edge_type, self.current_epoch(), TxId::SYSTEM)
1680    }
1681
1682    /// Creates a new edge within a transaction context.
1683    #[cfg(not(feature = "tiered-storage"))]
1684    pub fn create_edge_versioned(
1685        &self,
1686        src: NodeId,
1687        dst: NodeId,
1688        edge_type: &str,
1689        epoch: EpochId,
1690        tx_id: TxId,
1691    ) -> EdgeId {
1692        let id = EdgeId::new(self.next_edge_id.fetch_add(1, Ordering::Relaxed));
1693        let type_id = self.get_or_create_edge_type_id(edge_type);
1694
1695        let record = EdgeRecord::new(id, src, dst, type_id, epoch);
1696        let chain = VersionChain::with_initial(record, epoch, tx_id);
1697        self.edges.write().insert(id, chain);
1698
1699        // Update adjacency
1700        self.forward_adj.add_edge(src, dst, id);
1701        if let Some(ref backward) = self.backward_adj {
1702            backward.add_edge(dst, src, id);
1703        }
1704
1705        id
1706    }
1707
1708    /// Creates a new edge within a transaction context.
1709    /// (Tiered storage version)
1710    #[cfg(feature = "tiered-storage")]
1711    pub fn create_edge_versioned(
1712        &self,
1713        src: NodeId,
1714        dst: NodeId,
1715        edge_type: &str,
1716        epoch: EpochId,
1717        tx_id: TxId,
1718    ) -> EdgeId {
1719        let id = EdgeId::new(self.next_edge_id.fetch_add(1, Ordering::Relaxed));
1720        let type_id = self.get_or_create_edge_type_id(edge_type);
1721
1722        let record = EdgeRecord::new(id, src, dst, type_id, epoch);
1723
1724        // Allocate record in arena and get offset (create epoch if needed)
1725        let arena = self.arena_allocator.arena_or_create(epoch);
1726        let (offset, _stored) = arena.alloc_value_with_offset(record);
1727
1728        // Create HotVersionRef pointing to arena data
1729        let hot_ref = HotVersionRef::new(epoch, offset, tx_id);
1730
1731        // Create or update version index
1732        let mut versions = self.edge_versions.write();
1733        if let Some(index) = versions.get_mut(&id) {
1734            index.add_hot(hot_ref);
1735        } else {
1736            versions.insert(id, VersionIndex::with_initial(hot_ref));
1737        }
1738
1739        // Update adjacency
1740        self.forward_adj.add_edge(src, dst, id);
1741        if let Some(ref backward) = self.backward_adj {
1742            backward.add_edge(dst, src, id);
1743        }
1744
1745        id
1746    }
1747
1748    /// Creates a new edge with properties.
1749    pub fn create_edge_with_props(
1750        &self,
1751        src: NodeId,
1752        dst: NodeId,
1753        edge_type: &str,
1754        properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
1755    ) -> EdgeId {
1756        let id = self.create_edge(src, dst, edge_type);
1757
1758        for (key, value) in properties {
1759            self.edge_properties.set(id, key.into(), value.into());
1760        }
1761
1762        id
1763    }
1764
1765    /// Gets an edge by ID (latest visible version).
1766    #[must_use]
1767    pub fn get_edge(&self, id: EdgeId) -> Option<Edge> {
1768        self.get_edge_at_epoch(id, self.current_epoch())
1769    }
1770
1771    /// Gets an edge by ID at a specific epoch.
1772    #[must_use]
1773    #[cfg(not(feature = "tiered-storage"))]
1774    pub fn get_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> Option<Edge> {
1775        let edges = self.edges.read();
1776        let chain = edges.get(&id)?;
1777        let record = chain.visible_at(epoch)?;
1778
1779        if record.is_deleted() {
1780            return None;
1781        }
1782
1783        let edge_type = {
1784            let id_to_type = self.id_to_edge_type.read();
1785            id_to_type.get(record.type_id as usize)?.clone()
1786        };
1787
1788        let mut edge = Edge::new(id, record.src, record.dst, edge_type);
1789
1790        // Get properties
1791        edge.properties = self.edge_properties.get_all(id).into_iter().collect();
1792
1793        Some(edge)
1794    }
1795
1796    /// Gets an edge by ID at a specific epoch.
1797    /// (Tiered storage version)
1798    #[must_use]
1799    #[cfg(feature = "tiered-storage")]
1800    pub fn get_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> Option<Edge> {
1801        let versions = self.edge_versions.read();
1802        let index = versions.get(&id)?;
1803        let version_ref = index.visible_at(epoch)?;
1804
1805        let record = self.read_edge_record(&version_ref)?;
1806
1807        if record.is_deleted() {
1808            return None;
1809        }
1810
1811        let edge_type = {
1812            let id_to_type = self.id_to_edge_type.read();
1813            id_to_type.get(record.type_id as usize)?.clone()
1814        };
1815
1816        let mut edge = Edge::new(id, record.src, record.dst, edge_type);
1817
1818        // Get properties
1819        edge.properties = self.edge_properties.get_all(id).into_iter().collect();
1820
1821        Some(edge)
1822    }
1823
1824    /// Gets an edge visible to a specific transaction.
1825    #[must_use]
1826    #[cfg(not(feature = "tiered-storage"))]
1827    pub fn get_edge_versioned(&self, id: EdgeId, epoch: EpochId, tx_id: TxId) -> Option<Edge> {
1828        let edges = self.edges.read();
1829        let chain = edges.get(&id)?;
1830        let record = chain.visible_to(epoch, tx_id)?;
1831
1832        if record.is_deleted() {
1833            return None;
1834        }
1835
1836        let edge_type = {
1837            let id_to_type = self.id_to_edge_type.read();
1838            id_to_type.get(record.type_id as usize)?.clone()
1839        };
1840
1841        let mut edge = Edge::new(id, record.src, record.dst, edge_type);
1842
1843        // Get properties
1844        edge.properties = self.edge_properties.get_all(id).into_iter().collect();
1845
1846        Some(edge)
1847    }
1848
1849    /// Gets an edge visible to a specific transaction.
1850    /// (Tiered storage version)
1851    #[must_use]
1852    #[cfg(feature = "tiered-storage")]
1853    pub fn get_edge_versioned(&self, id: EdgeId, epoch: EpochId, tx_id: TxId) -> Option<Edge> {
1854        let versions = self.edge_versions.read();
1855        let index = versions.get(&id)?;
1856        let version_ref = index.visible_to(epoch, tx_id)?;
1857
1858        let record = self.read_edge_record(&version_ref)?;
1859
1860        if record.is_deleted() {
1861            return None;
1862        }
1863
1864        let edge_type = {
1865            let id_to_type = self.id_to_edge_type.read();
1866            id_to_type.get(record.type_id as usize)?.clone()
1867        };
1868
1869        let mut edge = Edge::new(id, record.src, record.dst, edge_type);
1870
1871        // Get properties
1872        edge.properties = self.edge_properties.get_all(id).into_iter().collect();
1873
1874        Some(edge)
1875    }
1876
1877    /// Reads an EdgeRecord from arena using a VersionRef.
1878    #[cfg(feature = "tiered-storage")]
1879    #[allow(unsafe_code)]
1880    fn read_edge_record(&self, version_ref: &VersionRef) -> Option<EdgeRecord> {
1881        match version_ref {
1882            VersionRef::Hot(hot_ref) => {
1883                let arena = self.arena_allocator.arena(hot_ref.epoch);
1884                // SAFETY: The offset was returned by alloc_value_with_offset for an EdgeRecord
1885                let record: &EdgeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
1886                Some(*record)
1887            }
1888            VersionRef::Cold(cold_ref) => {
1889                // Read from compressed epoch store
1890                self.epoch_store
1891                    .get_edge(cold_ref.epoch, cold_ref.block_offset, cold_ref.length)
1892            }
1893        }
1894    }
1895
1896    /// Deletes an edge (using latest epoch).
1897    pub fn delete_edge(&self, id: EdgeId) -> bool {
1898        self.needs_stats_recompute.store(true, Ordering::Relaxed);
1899        self.delete_edge_at_epoch(id, self.current_epoch())
1900    }
1901
1902    /// Deletes an edge at a specific epoch.
1903    #[cfg(not(feature = "tiered-storage"))]
1904    pub fn delete_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> bool {
1905        let mut edges = self.edges.write();
1906        if let Some(chain) = edges.get_mut(&id) {
1907            // Get the visible record to check if deleted and get src/dst
1908            let (src, dst) = {
1909                match chain.visible_at(epoch) {
1910                    Some(record) => {
1911                        if record.is_deleted() {
1912                            return false;
1913                        }
1914                        (record.src, record.dst)
1915                    }
1916                    None => return false, // Not visible at this epoch (already deleted)
1917                }
1918            };
1919
1920            // Mark the version chain as deleted
1921            chain.mark_deleted(epoch);
1922
1923            drop(edges); // Release lock
1924
1925            // Mark as deleted in adjacency (soft delete)
1926            self.forward_adj.mark_deleted(src, id);
1927            if let Some(ref backward) = self.backward_adj {
1928                backward.mark_deleted(dst, id);
1929            }
1930
1931            // Remove properties
1932            self.edge_properties.remove_all(id);
1933
1934            true
1935        } else {
1936            false
1937        }
1938    }
1939
1940    /// Deletes an edge at a specific epoch.
1941    /// (Tiered storage version)
1942    #[cfg(feature = "tiered-storage")]
1943    pub fn delete_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> bool {
1944        let mut versions = self.edge_versions.write();
1945        if let Some(index) = versions.get_mut(&id) {
1946            // Get the visible record to check if deleted and get src/dst
1947            let (src, dst) = {
1948                match index.visible_at(epoch) {
1949                    Some(version_ref) => {
1950                        if let Some(record) = self.read_edge_record(&version_ref) {
1951                            if record.is_deleted() {
1952                                return false;
1953                            }
1954                            (record.src, record.dst)
1955                        } else {
1956                            return false;
1957                        }
1958                    }
1959                    None => return false,
1960                }
1961            };
1962
1963            // Mark as deleted in version index
1964            index.mark_deleted(epoch);
1965
1966            drop(versions); // Release lock
1967
1968            // Mark as deleted in adjacency (soft delete)
1969            self.forward_adj.mark_deleted(src, id);
1970            if let Some(ref backward) = self.backward_adj {
1971                backward.mark_deleted(dst, id);
1972            }
1973
1974            // Remove properties
1975            self.edge_properties.remove_all(id);
1976
1977            true
1978        } else {
1979            false
1980        }
1981    }
1982
1983    /// Returns the number of edges (non-deleted at current epoch).
1984    #[must_use]
1985    #[cfg(not(feature = "tiered-storage"))]
1986    pub fn edge_count(&self) -> usize {
1987        let epoch = self.current_epoch();
1988        self.edges
1989            .read()
1990            .values()
1991            .filter_map(|chain| chain.visible_at(epoch))
1992            .filter(|r| !r.is_deleted())
1993            .count()
1994    }
1995
1996    /// Returns the number of edges (non-deleted at current epoch).
1997    /// (Tiered storage version)
1998    #[must_use]
1999    #[cfg(feature = "tiered-storage")]
2000    pub fn edge_count(&self) -> usize {
2001        let epoch = self.current_epoch();
2002        let versions = self.edge_versions.read();
2003        versions
2004            .iter()
2005            .filter(|(_, index)| {
2006                index.visible_at(epoch).map_or(false, |vref| {
2007                    self.read_edge_record(&vref)
2008                        .map_or(false, |r| !r.is_deleted())
2009                })
2010            })
2011            .count()
2012    }
2013
2014    /// Discards all uncommitted versions created by a transaction.
2015    ///
2016    /// This is called during transaction rollback to clean up uncommitted changes.
2017    /// The method removes version chain entries created by the specified transaction.
2018    #[cfg(not(feature = "tiered-storage"))]
2019    pub fn discard_uncommitted_versions(&self, tx_id: TxId) {
2020        // Remove uncommitted node versions
2021        {
2022            let mut nodes = self.nodes.write();
2023            for chain in nodes.values_mut() {
2024                chain.remove_versions_by(tx_id);
2025            }
2026            // Remove completely empty chains (no versions left)
2027            nodes.retain(|_, chain| !chain.is_empty());
2028        }
2029
2030        // Remove uncommitted edge versions
2031        {
2032            let mut edges = self.edges.write();
2033            for chain in edges.values_mut() {
2034                chain.remove_versions_by(tx_id);
2035            }
2036            // Remove completely empty chains (no versions left)
2037            edges.retain(|_, chain| !chain.is_empty());
2038        }
2039    }
2040
2041    /// Discards all uncommitted versions created by a transaction.
2042    /// (Tiered storage version)
2043    #[cfg(feature = "tiered-storage")]
2044    pub fn discard_uncommitted_versions(&self, tx_id: TxId) {
2045        // Remove uncommitted node versions
2046        {
2047            let mut versions = self.node_versions.write();
2048            for index in versions.values_mut() {
2049                index.remove_versions_by(tx_id);
2050            }
2051            // Remove completely empty indexes (no versions left)
2052            versions.retain(|_, index| !index.is_empty());
2053        }
2054
2055        // Remove uncommitted edge versions
2056        {
2057            let mut versions = self.edge_versions.write();
2058            for index in versions.values_mut() {
2059                index.remove_versions_by(tx_id);
2060            }
2061            // Remove completely empty indexes (no versions left)
2062            versions.retain(|_, index| !index.is_empty());
2063        }
2064    }
2065
2066    /// Garbage collects old versions that are no longer visible to any transaction.
2067    ///
2068    /// Versions older than `min_epoch` are pruned from version chains, keeping
2069    /// at most one old version per entity as a baseline. Empty chains are removed.
2070    #[cfg(not(feature = "tiered-storage"))]
2071    pub fn gc_versions(&self, min_epoch: EpochId) {
2072        {
2073            let mut nodes = self.nodes.write();
2074            for chain in nodes.values_mut() {
2075                chain.gc(min_epoch);
2076            }
2077            nodes.retain(|_, chain| !chain.is_empty());
2078        }
2079        {
2080            let mut edges = self.edges.write();
2081            for chain in edges.values_mut() {
2082                chain.gc(min_epoch);
2083            }
2084            edges.retain(|_, chain| !chain.is_empty());
2085        }
2086    }
2087
2088    /// Garbage collects old versions (tiered storage variant).
2089    #[cfg(feature = "tiered-storage")]
2090    pub fn gc_versions(&self, min_epoch: EpochId) {
2091        {
2092            let mut versions = self.node_versions.write();
2093            for index in versions.values_mut() {
2094                index.gc(min_epoch);
2095            }
2096            versions.retain(|_, index| !index.is_empty());
2097        }
2098        {
2099            let mut versions = self.edge_versions.write();
2100            for index in versions.values_mut() {
2101                index.gc(min_epoch);
2102            }
2103            versions.retain(|_, index| !index.is_empty());
2104        }
2105    }
2106
2107    /// Freezes an epoch from hot (arena) storage to cold (compressed) storage.
2108    ///
2109    /// This is called by the transaction manager when an epoch becomes eligible
2110    /// for freezing (no active transactions can see it). The freeze process:
2111    ///
2112    /// 1. Collects all hot version refs for the epoch
2113    /// 2. Reads the corresponding records from arena
2114    /// 3. Compresses them into a `CompressedEpochBlock`
2115    /// 4. Updates `VersionIndex` entries to point to cold storage
2116    /// 5. The arena can be deallocated after all epochs in it are frozen
2117    ///
2118    /// # Arguments
2119    ///
2120    /// * `epoch` - The epoch to freeze
2121    ///
2122    /// # Returns
2123    ///
2124    /// The number of records frozen (nodes + edges).
2125    #[cfg(feature = "tiered-storage")]
2126    #[allow(unsafe_code)]
2127    pub fn freeze_epoch(&self, epoch: EpochId) -> usize {
2128        // Collect node records to freeze
2129        let mut node_records: Vec<(u64, NodeRecord)> = Vec::new();
2130        let mut node_hot_refs: Vec<(NodeId, HotVersionRef)> = Vec::new();
2131
2132        {
2133            let versions = self.node_versions.read();
2134            for (node_id, index) in versions.iter() {
2135                for hot_ref in index.hot_refs_for_epoch(epoch) {
2136                    let arena = self.arena_allocator.arena(hot_ref.epoch);
2137                    // SAFETY: The offset was returned by alloc_value_with_offset for a NodeRecord
2138                    let record: &NodeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
2139                    node_records.push((node_id.as_u64(), *record));
2140                    node_hot_refs.push((*node_id, *hot_ref));
2141                }
2142            }
2143        }
2144
2145        // Collect edge records to freeze
2146        let mut edge_records: Vec<(u64, EdgeRecord)> = Vec::new();
2147        let mut edge_hot_refs: Vec<(EdgeId, HotVersionRef)> = Vec::new();
2148
2149        {
2150            let versions = self.edge_versions.read();
2151            for (edge_id, index) in versions.iter() {
2152                for hot_ref in index.hot_refs_for_epoch(epoch) {
2153                    let arena = self.arena_allocator.arena(hot_ref.epoch);
2154                    // SAFETY: The offset was returned by alloc_value_with_offset for an EdgeRecord
2155                    let record: &EdgeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
2156                    edge_records.push((edge_id.as_u64(), *record));
2157                    edge_hot_refs.push((*edge_id, *hot_ref));
2158                }
2159            }
2160        }
2161
2162        let total_frozen = node_records.len() + edge_records.len();
2163
2164        if total_frozen == 0 {
2165            return 0;
2166        }
2167
2168        // Freeze to compressed storage
2169        let (node_entries, edge_entries) =
2170            self.epoch_store
2171                .freeze_epoch(epoch, node_records, edge_records);
2172
2173        // Build lookup maps for index entries
2174        let node_entry_map: FxHashMap<u64, _> = node_entries
2175            .iter()
2176            .map(|e| (e.entity_id, (e.offset, e.length)))
2177            .collect();
2178        let edge_entry_map: FxHashMap<u64, _> = edge_entries
2179            .iter()
2180            .map(|e| (e.entity_id, (e.offset, e.length)))
2181            .collect();
2182
2183        // Update version indexes to use cold refs
2184        {
2185            let mut versions = self.node_versions.write();
2186            for (node_id, hot_ref) in &node_hot_refs {
2187                if let Some(index) = versions.get_mut(node_id)
2188                    && let Some(&(offset, length)) = node_entry_map.get(&node_id.as_u64())
2189                {
2190                    let cold_ref = ColdVersionRef {
2191                        epoch,
2192                        block_offset: offset,
2193                        length,
2194                        created_by: hot_ref.created_by,
2195                        deleted_epoch: hot_ref.deleted_epoch,
2196                    };
2197                    index.freeze_epoch(epoch, std::iter::once(cold_ref));
2198                }
2199            }
2200        }
2201
2202        {
2203            let mut versions = self.edge_versions.write();
2204            for (edge_id, hot_ref) in &edge_hot_refs {
2205                if let Some(index) = versions.get_mut(edge_id)
2206                    && let Some(&(offset, length)) = edge_entry_map.get(&edge_id.as_u64())
2207                {
2208                    let cold_ref = ColdVersionRef {
2209                        epoch,
2210                        block_offset: offset,
2211                        length,
2212                        created_by: hot_ref.created_by,
2213                        deleted_epoch: hot_ref.deleted_epoch,
2214                    };
2215                    index.freeze_epoch(epoch, std::iter::once(cold_ref));
2216                }
2217            }
2218        }
2219
2220        total_frozen
2221    }
2222
2223    /// Returns the epoch store for cold storage statistics.
2224    #[cfg(feature = "tiered-storage")]
2225    #[must_use]
2226    pub fn epoch_store(&self) -> &EpochStore {
2227        &self.epoch_store
2228    }
2229
2230    /// Returns the number of distinct labels in the store.
2231    #[must_use]
2232    pub fn label_count(&self) -> usize {
2233        self.id_to_label.read().len()
2234    }
2235
2236    /// Returns the number of distinct property keys in the store.
2237    ///
2238    /// This counts unique property keys across both nodes and edges.
2239    #[must_use]
2240    pub fn property_key_count(&self) -> usize {
2241        let node_keys = self.node_properties.column_count();
2242        let edge_keys = self.edge_properties.column_count();
2243        // Note: This may count some keys twice if the same key is used
2244        // for both nodes and edges. A more precise count would require
2245        // tracking unique keys across both storages.
2246        node_keys + edge_keys
2247    }
2248
2249    /// Returns the number of distinct edge types in the store.
2250    #[must_use]
2251    pub fn edge_type_count(&self) -> usize {
2252        self.id_to_edge_type.read().len()
2253    }
2254
2255    // === Traversal ===
2256
2257    /// Iterates over neighbors of a node in the specified direction.
2258    ///
2259    /// This is the fast path for graph traversal - goes straight to the
2260    /// adjacency index without loading full node data.
2261    pub fn neighbors(
2262        &self,
2263        node: NodeId,
2264        direction: Direction,
2265    ) -> impl Iterator<Item = NodeId> + '_ {
2266        let forward: Box<dyn Iterator<Item = NodeId>> = match direction {
2267            Direction::Outgoing | Direction::Both => {
2268                Box::new(self.forward_adj.neighbors(node).into_iter())
2269            }
2270            Direction::Incoming => Box::new(std::iter::empty()),
2271        };
2272
2273        let backward: Box<dyn Iterator<Item = NodeId>> = match direction {
2274            Direction::Incoming | Direction::Both => {
2275                if let Some(ref adj) = self.backward_adj {
2276                    Box::new(adj.neighbors(node).into_iter())
2277                } else {
2278                    Box::new(std::iter::empty())
2279                }
2280            }
2281            Direction::Outgoing => Box::new(std::iter::empty()),
2282        };
2283
2284        forward.chain(backward)
2285    }
2286
2287    /// Returns edges from a node with their targets.
2288    ///
2289    /// Returns an iterator of (target_node, edge_id) pairs.
2290    pub fn edges_from(
2291        &self,
2292        node: NodeId,
2293        direction: Direction,
2294    ) -> impl Iterator<Item = (NodeId, EdgeId)> + '_ {
2295        let forward: Box<dyn Iterator<Item = (NodeId, EdgeId)>> = match direction {
2296            Direction::Outgoing | Direction::Both => {
2297                Box::new(self.forward_adj.edges_from(node).into_iter())
2298            }
2299            Direction::Incoming => Box::new(std::iter::empty()),
2300        };
2301
2302        let backward: Box<dyn Iterator<Item = (NodeId, EdgeId)>> = match direction {
2303            Direction::Incoming | Direction::Both => {
2304                if let Some(ref adj) = self.backward_adj {
2305                    Box::new(adj.edges_from(node).into_iter())
2306                } else {
2307                    Box::new(std::iter::empty())
2308                }
2309            }
2310            Direction::Outgoing => Box::new(std::iter::empty()),
2311        };
2312
2313        forward.chain(backward)
2314    }
2315
2316    /// Returns edges to a node (where the node is the destination).
2317    ///
2318    /// Returns (source_node, edge_id) pairs for all edges pointing TO this node.
2319    /// Uses the backward adjacency index for O(degree) lookup.
2320    ///
2321    /// # Example
2322    ///
2323    /// ```ignore
2324    /// // For edges: A->B, C->B
2325    /// let incoming = store.edges_to(B);
2326    /// // Returns: [(A, edge1), (C, edge2)]
2327    /// ```
2328    pub fn edges_to(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
2329        if let Some(ref backward) = self.backward_adj {
2330            backward.edges_from(node)
2331        } else {
2332            // Fallback: scan all edges (slow but correct)
2333            self.all_edges()
2334                .filter_map(|edge| {
2335                    if edge.dst == node {
2336                        Some((edge.src, edge.id))
2337                    } else {
2338                        None
2339                    }
2340                })
2341                .collect()
2342        }
2343    }
2344
2345    /// Returns the out-degree of a node (number of outgoing edges).
2346    ///
2347    /// Uses the forward adjacency index for O(1) lookup.
2348    #[must_use]
2349    pub fn out_degree(&self, node: NodeId) -> usize {
2350        self.forward_adj.out_degree(node)
2351    }
2352
2353    /// Returns the in-degree of a node (number of incoming edges).
2354    ///
2355    /// Uses the backward adjacency index for O(1) lookup if available,
2356    /// otherwise falls back to scanning edges.
2357    #[must_use]
2358    pub fn in_degree(&self, node: NodeId) -> usize {
2359        if let Some(ref backward) = self.backward_adj {
2360            backward.in_degree(node)
2361        } else {
2362            // Fallback: count edges (slow)
2363            self.all_edges().filter(|edge| edge.dst == node).count()
2364        }
2365    }
2366
2367    /// Gets the type of an edge by ID.
2368    #[must_use]
2369    #[cfg(not(feature = "tiered-storage"))]
2370    pub fn edge_type(&self, id: EdgeId) -> Option<ArcStr> {
2371        let edges = self.edges.read();
2372        let chain = edges.get(&id)?;
2373        let epoch = self.current_epoch();
2374        let record = chain.visible_at(epoch)?;
2375        let id_to_type = self.id_to_edge_type.read();
2376        id_to_type.get(record.type_id as usize).cloned()
2377    }
2378
2379    /// Gets the type of an edge by ID.
2380    /// (Tiered storage version)
2381    #[must_use]
2382    #[cfg(feature = "tiered-storage")]
2383    pub fn edge_type(&self, id: EdgeId) -> Option<ArcStr> {
2384        let versions = self.edge_versions.read();
2385        let index = versions.get(&id)?;
2386        let epoch = self.current_epoch();
2387        let vref = index.visible_at(epoch)?;
2388        let record = self.read_edge_record(&vref)?;
2389        let id_to_type = self.id_to_edge_type.read();
2390        id_to_type.get(record.type_id as usize).cloned()
2391    }
2392
2393    /// Returns all nodes with a specific label.
2394    ///
2395    /// Uses the label index for O(1) lookup per label. Returns a snapshot -
2396    /// concurrent modifications won't affect the returned vector. Results are
2397    /// sorted by NodeId for deterministic iteration order.
2398    pub fn nodes_by_label(&self, label: &str) -> Vec<NodeId> {
2399        let label_to_id = self.label_to_id.read();
2400        if let Some(&label_id) = label_to_id.get(label) {
2401            let index = self.label_index.read();
2402            if let Some(set) = index.get(label_id as usize) {
2403                let mut ids: Vec<NodeId> = set.keys().copied().collect();
2404                ids.sort_unstable();
2405                return ids;
2406            }
2407        }
2408        Vec::new()
2409    }
2410
2411    // === Admin API: Iteration ===
2412
2413    /// Returns an iterator over all nodes in the database.
2414    ///
2415    /// This creates a snapshot of all visible nodes at the current epoch.
2416    /// Useful for dump/export operations.
2417    #[cfg(not(feature = "tiered-storage"))]
2418    pub fn all_nodes(&self) -> impl Iterator<Item = Node> + '_ {
2419        let epoch = self.current_epoch();
2420        let node_ids: Vec<NodeId> = self
2421            .nodes
2422            .read()
2423            .iter()
2424            .filter_map(|(id, chain)| {
2425                chain
2426                    .visible_at(epoch)
2427                    .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
2428            })
2429            .collect();
2430
2431        node_ids.into_iter().filter_map(move |id| self.get_node(id))
2432    }
2433
2434    /// Returns an iterator over all nodes in the database.
2435    /// (Tiered storage version)
2436    #[cfg(feature = "tiered-storage")]
2437    pub fn all_nodes(&self) -> impl Iterator<Item = Node> + '_ {
2438        let node_ids = self.node_ids();
2439        node_ids.into_iter().filter_map(move |id| self.get_node(id))
2440    }
2441
2442    /// Returns an iterator over all edges in the database.
2443    ///
2444    /// This creates a snapshot of all visible edges at the current epoch.
2445    /// Useful for dump/export operations.
2446    #[cfg(not(feature = "tiered-storage"))]
2447    pub fn all_edges(&self) -> impl Iterator<Item = Edge> + '_ {
2448        let epoch = self.current_epoch();
2449        let edge_ids: Vec<EdgeId> = self
2450            .edges
2451            .read()
2452            .iter()
2453            .filter_map(|(id, chain)| {
2454                chain
2455                    .visible_at(epoch)
2456                    .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
2457            })
2458            .collect();
2459
2460        edge_ids.into_iter().filter_map(move |id| self.get_edge(id))
2461    }
2462
2463    /// Returns an iterator over all edges in the database.
2464    /// (Tiered storage version)
2465    #[cfg(feature = "tiered-storage")]
2466    pub fn all_edges(&self) -> impl Iterator<Item = Edge> + '_ {
2467        let epoch = self.current_epoch();
2468        let versions = self.edge_versions.read();
2469        let edge_ids: Vec<EdgeId> = versions
2470            .iter()
2471            .filter_map(|(id, index)| {
2472                index.visible_at(epoch).and_then(|vref| {
2473                    self.read_edge_record(&vref)
2474                        .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
2475                })
2476            })
2477            .collect();
2478
2479        edge_ids.into_iter().filter_map(move |id| self.get_edge(id))
2480    }
2481
2482    /// Returns all label names in the database.
2483    pub fn all_labels(&self) -> Vec<String> {
2484        self.id_to_label
2485            .read()
2486            .iter()
2487            .map(|s| s.to_string())
2488            .collect()
2489    }
2490
2491    /// Returns all edge type names in the database.
2492    pub fn all_edge_types(&self) -> Vec<String> {
2493        self.id_to_edge_type
2494            .read()
2495            .iter()
2496            .map(|s| s.to_string())
2497            .collect()
2498    }
2499
2500    /// Returns all property keys used in the database.
2501    pub fn all_property_keys(&self) -> Vec<String> {
2502        let mut keys = std::collections::HashSet::new();
2503        for key in self.node_properties.keys() {
2504            keys.insert(key.to_string());
2505        }
2506        for key in self.edge_properties.keys() {
2507            keys.insert(key.to_string());
2508        }
2509        keys.into_iter().collect()
2510    }
2511
2512    /// Returns an iterator over nodes with a specific label.
2513    pub fn nodes_with_label<'a>(&'a self, label: &str) -> impl Iterator<Item = Node> + 'a {
2514        let node_ids = self.nodes_by_label(label);
2515        node_ids.into_iter().filter_map(move |id| self.get_node(id))
2516    }
2517
2518    /// Returns an iterator over edges with a specific type.
2519    #[cfg(not(feature = "tiered-storage"))]
2520    pub fn edges_with_type<'a>(&'a self, edge_type: &str) -> impl Iterator<Item = Edge> + 'a {
2521        let epoch = self.current_epoch();
2522        let type_to_id = self.edge_type_to_id.read();
2523
2524        if let Some(&type_id) = type_to_id.get(edge_type) {
2525            let edge_ids: Vec<EdgeId> = self
2526                .edges
2527                .read()
2528                .iter()
2529                .filter_map(|(id, chain)| {
2530                    chain.visible_at(epoch).and_then(|r| {
2531                        if !r.is_deleted() && r.type_id == type_id {
2532                            Some(*id)
2533                        } else {
2534                            None
2535                        }
2536                    })
2537                })
2538                .collect();
2539
2540            // Return a boxed iterator for the found edges
2541            Box::new(edge_ids.into_iter().filter_map(move |id| self.get_edge(id)))
2542                as Box<dyn Iterator<Item = Edge> + 'a>
2543        } else {
2544            // Return empty iterator
2545            Box::new(std::iter::empty()) as Box<dyn Iterator<Item = Edge> + 'a>
2546        }
2547    }
2548
2549    /// Returns an iterator over edges with a specific type.
2550    /// (Tiered storage version)
2551    #[cfg(feature = "tiered-storage")]
2552    pub fn edges_with_type<'a>(&'a self, edge_type: &str) -> impl Iterator<Item = Edge> + 'a {
2553        let epoch = self.current_epoch();
2554        let type_to_id = self.edge_type_to_id.read();
2555
2556        if let Some(&type_id) = type_to_id.get(edge_type) {
2557            let versions = self.edge_versions.read();
2558            let edge_ids: Vec<EdgeId> = versions
2559                .iter()
2560                .filter_map(|(id, index)| {
2561                    index.visible_at(epoch).and_then(|vref| {
2562                        self.read_edge_record(&vref).and_then(|r| {
2563                            if !r.is_deleted() && r.type_id == type_id {
2564                                Some(*id)
2565                            } else {
2566                                None
2567                            }
2568                        })
2569                    })
2570                })
2571                .collect();
2572
2573            Box::new(edge_ids.into_iter().filter_map(move |id| self.get_edge(id)))
2574                as Box<dyn Iterator<Item = Edge> + 'a>
2575        } else {
2576            Box::new(std::iter::empty()) as Box<dyn Iterator<Item = Edge> + 'a>
2577        }
2578    }
2579
2580    // === Zone Map Support ===
2581
2582    /// Checks if a node property predicate might match any nodes.
2583    ///
2584    /// Uses zone maps for early filtering. Returns `true` if there might be
2585    /// matching nodes, `false` if there definitely aren't.
2586    #[must_use]
2587    pub fn node_property_might_match(
2588        &self,
2589        property: &PropertyKey,
2590        op: CompareOp,
2591        value: &Value,
2592    ) -> bool {
2593        self.node_properties.might_match(property, op, value)
2594    }
2595
2596    /// Checks if an edge property predicate might match any edges.
2597    #[must_use]
2598    pub fn edge_property_might_match(
2599        &self,
2600        property: &PropertyKey,
2601        op: CompareOp,
2602        value: &Value,
2603    ) -> bool {
2604        self.edge_properties.might_match(property, op, value)
2605    }
2606
2607    /// Gets the zone map for a node property.
2608    #[must_use]
2609    pub fn node_property_zone_map(&self, property: &PropertyKey) -> Option<ZoneMapEntry> {
2610        self.node_properties.zone_map(property)
2611    }
2612
2613    /// Gets the zone map for an edge property.
2614    #[must_use]
2615    pub fn edge_property_zone_map(&self, property: &PropertyKey) -> Option<ZoneMapEntry> {
2616        self.edge_properties.zone_map(property)
2617    }
2618
2619    /// Rebuilds zone maps for all properties.
2620    pub fn rebuild_zone_maps(&self) {
2621        self.node_properties.rebuild_zone_maps();
2622        self.edge_properties.rebuild_zone_maps();
2623    }
2624
2625    // === Statistics ===
2626
2627    /// Returns the current statistics.
2628    #[must_use]
2629    pub fn statistics(&self) -> Statistics {
2630        self.statistics.read().clone()
2631    }
2632
2633    /// Recomputes statistics if they are stale (i.e., after mutations).
2634    ///
2635    /// Call this before reading statistics for query optimization.
2636    /// Avoids redundant recomputation if no mutations occurred.
2637    pub fn ensure_statistics_fresh(&self) {
2638        if self.needs_stats_recompute.swap(false, Ordering::Relaxed) {
2639            self.compute_statistics();
2640        }
2641    }
2642
2643    /// Recomputes statistics from current data.
2644    ///
2645    /// Scans all labels and edge types to build cardinality estimates for the
2646    /// query optimizer. Call this periodically or after bulk data loads.
2647    #[cfg(not(feature = "tiered-storage"))]
2648    pub fn compute_statistics(&self) {
2649        let mut stats = Statistics::new();
2650
2651        // Compute total counts
2652        stats.total_nodes = self.node_count() as u64;
2653        stats.total_edges = self.edge_count() as u64;
2654
2655        // Compute per-label statistics
2656        let id_to_label = self.id_to_label.read();
2657        let label_index = self.label_index.read();
2658
2659        for (label_id, label_name) in id_to_label.iter().enumerate() {
2660            let node_count = label_index.get(label_id).map_or(0, |set| set.len() as u64);
2661
2662            if node_count > 0 {
2663                // Estimate average degree
2664                let avg_out_degree = if stats.total_nodes > 0 {
2665                    stats.total_edges as f64 / stats.total_nodes as f64
2666                } else {
2667                    0.0
2668                };
2669
2670                let label_stats =
2671                    LabelStatistics::new(node_count).with_degrees(avg_out_degree, avg_out_degree);
2672
2673                stats.update_label(label_name.as_ref(), label_stats);
2674            }
2675        }
2676
2677        // Compute per-edge-type statistics
2678        let id_to_edge_type = self.id_to_edge_type.read();
2679        let edges = self.edges.read();
2680        let epoch = self.current_epoch();
2681
2682        let mut edge_type_counts: FxHashMap<u32, u64> = FxHashMap::default();
2683        for chain in edges.values() {
2684            if let Some(record) = chain.visible_at(epoch)
2685                && !record.is_deleted()
2686            {
2687                *edge_type_counts.entry(record.type_id).or_default() += 1;
2688            }
2689        }
2690
2691        for (type_id, count) in edge_type_counts {
2692            if let Some(type_name) = id_to_edge_type.get(type_id as usize) {
2693                let avg_degree = if stats.total_nodes > 0 {
2694                    count as f64 / stats.total_nodes as f64
2695                } else {
2696                    0.0
2697                };
2698
2699                let edge_stats = EdgeTypeStatistics::new(count, avg_degree, avg_degree);
2700                stats.update_edge_type(type_name.as_ref(), edge_stats);
2701            }
2702        }
2703
2704        *self.statistics.write() = stats;
2705    }
2706
2707    /// Recomputes statistics from current data.
2708    /// (Tiered storage version)
2709    #[cfg(feature = "tiered-storage")]
2710    pub fn compute_statistics(&self) {
2711        let mut stats = Statistics::new();
2712
2713        // Compute total counts
2714        stats.total_nodes = self.node_count() as u64;
2715        stats.total_edges = self.edge_count() as u64;
2716
2717        // Compute per-label statistics
2718        let id_to_label = self.id_to_label.read();
2719        let label_index = self.label_index.read();
2720
2721        for (label_id, label_name) in id_to_label.iter().enumerate() {
2722            let node_count = label_index.get(label_id).map_or(0, |set| set.len() as u64);
2723
2724            if node_count > 0 {
2725                let avg_out_degree = if stats.total_nodes > 0 {
2726                    stats.total_edges as f64 / stats.total_nodes as f64
2727                } else {
2728                    0.0
2729                };
2730
2731                let label_stats =
2732                    LabelStatistics::new(node_count).with_degrees(avg_out_degree, avg_out_degree);
2733
2734                stats.update_label(label_name.as_ref(), label_stats);
2735            }
2736        }
2737
2738        // Compute per-edge-type statistics
2739        let id_to_edge_type = self.id_to_edge_type.read();
2740        let versions = self.edge_versions.read();
2741        let epoch = self.current_epoch();
2742
2743        let mut edge_type_counts: FxHashMap<u32, u64> = FxHashMap::default();
2744        for index in versions.values() {
2745            if let Some(vref) = index.visible_at(epoch)
2746                && let Some(record) = self.read_edge_record(&vref)
2747                && !record.is_deleted()
2748            {
2749                *edge_type_counts.entry(record.type_id).or_default() += 1;
2750            }
2751        }
2752
2753        for (type_id, count) in edge_type_counts {
2754            if let Some(type_name) = id_to_edge_type.get(type_id as usize) {
2755                let avg_degree = if stats.total_nodes > 0 {
2756                    count as f64 / stats.total_nodes as f64
2757                } else {
2758                    0.0
2759                };
2760
2761                let edge_stats = EdgeTypeStatistics::new(count, avg_degree, avg_degree);
2762                stats.update_edge_type(type_name.as_ref(), edge_stats);
2763            }
2764        }
2765
2766        *self.statistics.write() = stats;
2767    }
2768
2769    /// Estimates cardinality for a label scan.
2770    #[must_use]
2771    pub fn estimate_label_cardinality(&self, label: &str) -> f64 {
2772        self.statistics.read().estimate_label_cardinality(label)
2773    }
2774
2775    /// Estimates average degree for an edge type.
2776    #[must_use]
2777    pub fn estimate_avg_degree(&self, edge_type: &str, outgoing: bool) -> f64 {
2778        self.statistics
2779            .read()
2780            .estimate_avg_degree(edge_type, outgoing)
2781    }
2782
2783    // === Internal Helpers ===
2784
2785    fn get_or_create_label_id(&self, label: &str) -> u32 {
2786        {
2787            let label_to_id = self.label_to_id.read();
2788            if let Some(&id) = label_to_id.get(label) {
2789                return id;
2790            }
2791        }
2792
2793        let mut label_to_id = self.label_to_id.write();
2794        let mut id_to_label = self.id_to_label.write();
2795
2796        // Double-check after acquiring write lock
2797        if let Some(&id) = label_to_id.get(label) {
2798            return id;
2799        }
2800
2801        let id = id_to_label.len() as u32;
2802
2803        let label: ArcStr = label.into();
2804        label_to_id.insert(label.clone(), id);
2805        id_to_label.push(label);
2806
2807        id
2808    }
2809
2810    fn get_or_create_edge_type_id(&self, edge_type: &str) -> u32 {
2811        {
2812            let type_to_id = self.edge_type_to_id.read();
2813            if let Some(&id) = type_to_id.get(edge_type) {
2814                return id;
2815            }
2816        }
2817
2818        let mut type_to_id = self.edge_type_to_id.write();
2819        let mut id_to_type = self.id_to_edge_type.write();
2820
2821        // Double-check
2822        if let Some(&id) = type_to_id.get(edge_type) {
2823            return id;
2824        }
2825
2826        let id = id_to_type.len() as u32;
2827        let edge_type: ArcStr = edge_type.into();
2828        type_to_id.insert(edge_type.clone(), id);
2829        id_to_type.push(edge_type);
2830
2831        id
2832    }
2833
2834    // === Recovery Support ===
2835
2836    /// Creates a node with a specific ID during recovery.
2837    ///
2838    /// This is used for WAL recovery to restore nodes with their original IDs.
2839    /// The caller must ensure IDs don't conflict with existing nodes.
2840    #[cfg(not(feature = "tiered-storage"))]
2841    pub fn create_node_with_id(&self, id: NodeId, labels: &[&str]) {
2842        let epoch = self.current_epoch();
2843        let mut record = NodeRecord::new(id, epoch);
2844        record.set_label_count(labels.len() as u16);
2845
2846        // Store labels in node_labels map and label_index
2847        let mut node_label_set = FxHashSet::default();
2848        for label in labels {
2849            let label_id = self.get_or_create_label_id(*label);
2850            node_label_set.insert(label_id);
2851
2852            // Update label index
2853            let mut index = self.label_index.write();
2854            while index.len() <= label_id as usize {
2855                index.push(FxHashMap::default());
2856            }
2857            index[label_id as usize].insert(id, ());
2858        }
2859
2860        // Store node's labels
2861        self.node_labels.write().insert(id, node_label_set);
2862
2863        // Create version chain with initial version (using SYSTEM tx for recovery)
2864        let chain = VersionChain::with_initial(record, epoch, TxId::SYSTEM);
2865        self.nodes.write().insert(id, chain);
2866
2867        // Update next_node_id if necessary to avoid future collisions
2868        let id_val = id.as_u64();
2869        let _ = self
2870            .next_node_id
2871            .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
2872                if id_val >= current {
2873                    Some(id_val + 1)
2874                } else {
2875                    None
2876                }
2877            });
2878    }
2879
2880    /// Creates a node with a specific ID during recovery.
2881    /// (Tiered storage version)
2882    #[cfg(feature = "tiered-storage")]
2883    pub fn create_node_with_id(&self, id: NodeId, labels: &[&str]) {
2884        let epoch = self.current_epoch();
2885        let mut record = NodeRecord::new(id, epoch);
2886        record.set_label_count(labels.len() as u16);
2887
2888        // Store labels in node_labels map and label_index
2889        let mut node_label_set = FxHashSet::default();
2890        for label in labels {
2891            let label_id = self.get_or_create_label_id(*label);
2892            node_label_set.insert(label_id);
2893
2894            // Update label index
2895            let mut index = self.label_index.write();
2896            while index.len() <= label_id as usize {
2897                index.push(FxHashMap::default());
2898            }
2899            index[label_id as usize].insert(id, ());
2900        }
2901
2902        // Store node's labels
2903        self.node_labels.write().insert(id, node_label_set);
2904
2905        // Allocate record in arena and get offset (create epoch if needed)
2906        let arena = self.arena_allocator.arena_or_create(epoch);
2907        let (offset, _stored) = arena.alloc_value_with_offset(record);
2908
2909        // Create HotVersionRef (using SYSTEM tx for recovery)
2910        let hot_ref = HotVersionRef::new(epoch, offset, TxId::SYSTEM);
2911        let mut versions = self.node_versions.write();
2912        versions.insert(id, VersionIndex::with_initial(hot_ref));
2913
2914        // Update next_node_id if necessary to avoid future collisions
2915        let id_val = id.as_u64();
2916        let _ = self
2917            .next_node_id
2918            .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
2919                if id_val >= current {
2920                    Some(id_val + 1)
2921                } else {
2922                    None
2923                }
2924            });
2925    }
2926
2927    /// Creates an edge with a specific ID during recovery.
2928    ///
2929    /// This is used for WAL recovery to restore edges with their original IDs.
2930    #[cfg(not(feature = "tiered-storage"))]
2931    pub fn create_edge_with_id(&self, id: EdgeId, src: NodeId, dst: NodeId, edge_type: &str) {
2932        let epoch = self.current_epoch();
2933        let type_id = self.get_or_create_edge_type_id(edge_type);
2934
2935        let record = EdgeRecord::new(id, src, dst, type_id, epoch);
2936        let chain = VersionChain::with_initial(record, epoch, TxId::SYSTEM);
2937        self.edges.write().insert(id, chain);
2938
2939        // Update adjacency
2940        self.forward_adj.add_edge(src, dst, id);
2941        if let Some(ref backward) = self.backward_adj {
2942            backward.add_edge(dst, src, id);
2943        }
2944
2945        // Update next_edge_id if necessary
2946        let id_val = id.as_u64();
2947        let _ = self
2948            .next_edge_id
2949            .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
2950                if id_val >= current {
2951                    Some(id_val + 1)
2952                } else {
2953                    None
2954                }
2955            });
2956    }
2957
2958    /// Creates an edge with a specific ID during recovery.
2959    /// (Tiered storage version)
2960    #[cfg(feature = "tiered-storage")]
2961    pub fn create_edge_with_id(&self, id: EdgeId, src: NodeId, dst: NodeId, edge_type: &str) {
2962        let epoch = self.current_epoch();
2963        let type_id = self.get_or_create_edge_type_id(edge_type);
2964
2965        let record = EdgeRecord::new(id, src, dst, type_id, epoch);
2966
2967        // Allocate record in arena and get offset (create epoch if needed)
2968        let arena = self.arena_allocator.arena_or_create(epoch);
2969        let (offset, _stored) = arena.alloc_value_with_offset(record);
2970
2971        // Create HotVersionRef (using SYSTEM tx for recovery)
2972        let hot_ref = HotVersionRef::new(epoch, offset, TxId::SYSTEM);
2973        let mut versions = self.edge_versions.write();
2974        versions.insert(id, VersionIndex::with_initial(hot_ref));
2975
2976        // Update adjacency
2977        self.forward_adj.add_edge(src, dst, id);
2978        if let Some(ref backward) = self.backward_adj {
2979            backward.add_edge(dst, src, id);
2980        }
2981
2982        // Update next_edge_id if necessary
2983        let id_val = id.as_u64();
2984        let _ = self
2985            .next_edge_id
2986            .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
2987                if id_val >= current {
2988                    Some(id_val + 1)
2989                } else {
2990                    None
2991                }
2992            });
2993    }
2994
2995    /// Sets the current epoch during recovery.
2996    pub fn set_epoch(&self, epoch: EpochId) {
2997        self.current_epoch.store(epoch.as_u64(), Ordering::SeqCst);
2998    }
2999}
3000
3001impl Default for LpgStore {
3002    fn default() -> Self {
3003        Self::new()
3004    }
3005}
3006
3007#[cfg(test)]
3008mod tests {
3009    use super::*;
3010
3011    #[test]
3012    fn test_create_node() {
3013        let store = LpgStore::new();
3014
3015        let id = store.create_node(&["Person"]);
3016        assert!(id.is_valid());
3017
3018        let node = store.get_node(id).unwrap();
3019        assert!(node.has_label("Person"));
3020        assert!(!node.has_label("Animal"));
3021    }
3022
3023    #[test]
3024    fn test_create_node_with_props() {
3025        let store = LpgStore::new();
3026
3027        let id = store.create_node_with_props(
3028            &["Person"],
3029            [("name", Value::from("Alice")), ("age", Value::from(30i64))],
3030        );
3031
3032        let node = store.get_node(id).unwrap();
3033        assert_eq!(
3034            node.get_property("name").and_then(|v| v.as_str()),
3035            Some("Alice")
3036        );
3037        assert_eq!(
3038            node.get_property("age").and_then(|v| v.as_int64()),
3039            Some(30)
3040        );
3041    }
3042
3043    #[test]
3044    fn test_delete_node() {
3045        let store = LpgStore::new();
3046
3047        let id = store.create_node(&["Person"]);
3048        assert_eq!(store.node_count(), 1);
3049
3050        assert!(store.delete_node(id));
3051        assert_eq!(store.node_count(), 0);
3052        assert!(store.get_node(id).is_none());
3053
3054        // Double delete should return false
3055        assert!(!store.delete_node(id));
3056    }
3057
3058    #[test]
3059    fn test_create_edge() {
3060        let store = LpgStore::new();
3061
3062        let alice = store.create_node(&["Person"]);
3063        let bob = store.create_node(&["Person"]);
3064
3065        let edge_id = store.create_edge(alice, bob, "KNOWS");
3066        assert!(edge_id.is_valid());
3067
3068        let edge = store.get_edge(edge_id).unwrap();
3069        assert_eq!(edge.src, alice);
3070        assert_eq!(edge.dst, bob);
3071        assert_eq!(edge.edge_type.as_str(), "KNOWS");
3072    }
3073
3074    #[test]
3075    fn test_neighbors() {
3076        let store = LpgStore::new();
3077
3078        let a = store.create_node(&["Person"]);
3079        let b = store.create_node(&["Person"]);
3080        let c = store.create_node(&["Person"]);
3081
3082        store.create_edge(a, b, "KNOWS");
3083        store.create_edge(a, c, "KNOWS");
3084
3085        let outgoing: Vec<_> = store.neighbors(a, Direction::Outgoing).collect();
3086        assert_eq!(outgoing.len(), 2);
3087        assert!(outgoing.contains(&b));
3088        assert!(outgoing.contains(&c));
3089
3090        let incoming: Vec<_> = store.neighbors(b, Direction::Incoming).collect();
3091        assert_eq!(incoming.len(), 1);
3092        assert!(incoming.contains(&a));
3093    }
3094
3095    #[test]
3096    fn test_nodes_by_label() {
3097        let store = LpgStore::new();
3098
3099        let p1 = store.create_node(&["Person"]);
3100        let p2 = store.create_node(&["Person"]);
3101        let _a = store.create_node(&["Animal"]);
3102
3103        let persons = store.nodes_by_label("Person");
3104        assert_eq!(persons.len(), 2);
3105        assert!(persons.contains(&p1));
3106        assert!(persons.contains(&p2));
3107
3108        let animals = store.nodes_by_label("Animal");
3109        assert_eq!(animals.len(), 1);
3110    }
3111
3112    #[test]
3113    fn test_delete_edge() {
3114        let store = LpgStore::new();
3115
3116        let a = store.create_node(&["Person"]);
3117        let b = store.create_node(&["Person"]);
3118        let edge_id = store.create_edge(a, b, "KNOWS");
3119
3120        assert_eq!(store.edge_count(), 1);
3121
3122        assert!(store.delete_edge(edge_id));
3123        assert_eq!(store.edge_count(), 0);
3124        assert!(store.get_edge(edge_id).is_none());
3125    }
3126
3127    // === New tests for improved coverage ===
3128
3129    #[test]
3130    fn test_lpg_store_config() {
3131        // Test with_config
3132        let config = LpgStoreConfig {
3133            backward_edges: false,
3134            initial_node_capacity: 100,
3135            initial_edge_capacity: 200,
3136        };
3137        let store = LpgStore::with_config(config);
3138
3139        // Store should work but without backward adjacency
3140        let a = store.create_node(&["Person"]);
3141        let b = store.create_node(&["Person"]);
3142        store.create_edge(a, b, "KNOWS");
3143
3144        // Outgoing should work
3145        let outgoing: Vec<_> = store.neighbors(a, Direction::Outgoing).collect();
3146        assert_eq!(outgoing.len(), 1);
3147
3148        // Incoming should be empty (no backward adjacency)
3149        let incoming: Vec<_> = store.neighbors(b, Direction::Incoming).collect();
3150        assert_eq!(incoming.len(), 0);
3151    }
3152
3153    #[test]
3154    fn test_epoch_management() {
3155        let store = LpgStore::new();
3156
3157        let epoch0 = store.current_epoch();
3158        assert_eq!(epoch0.as_u64(), 0);
3159
3160        let epoch1 = store.new_epoch();
3161        assert_eq!(epoch1.as_u64(), 1);
3162
3163        let current = store.current_epoch();
3164        assert_eq!(current.as_u64(), 1);
3165    }
3166
3167    #[test]
3168    fn test_node_properties() {
3169        let store = LpgStore::new();
3170        let id = store.create_node(&["Person"]);
3171
3172        // Set and get property
3173        store.set_node_property(id, "name", Value::from("Alice"));
3174        let name = store.get_node_property(id, &"name".into());
3175        assert!(matches!(name, Some(Value::String(s)) if s.as_str() == "Alice"));
3176
3177        // Update property
3178        store.set_node_property(id, "name", Value::from("Bob"));
3179        let name = store.get_node_property(id, &"name".into());
3180        assert!(matches!(name, Some(Value::String(s)) if s.as_str() == "Bob"));
3181
3182        // Remove property
3183        let old = store.remove_node_property(id, "name");
3184        assert!(matches!(old, Some(Value::String(s)) if s.as_str() == "Bob"));
3185
3186        // Property should be gone
3187        let name = store.get_node_property(id, &"name".into());
3188        assert!(name.is_none());
3189
3190        // Remove non-existent property
3191        let none = store.remove_node_property(id, "nonexistent");
3192        assert!(none.is_none());
3193    }
3194
3195    #[test]
3196    fn test_edge_properties() {
3197        let store = LpgStore::new();
3198        let a = store.create_node(&["Person"]);
3199        let b = store.create_node(&["Person"]);
3200        let edge_id = store.create_edge(a, b, "KNOWS");
3201
3202        // Set and get property
3203        store.set_edge_property(edge_id, "since", Value::from(2020i64));
3204        let since = store.get_edge_property(edge_id, &"since".into());
3205        assert_eq!(since.and_then(|v| v.as_int64()), Some(2020));
3206
3207        // Remove property
3208        let old = store.remove_edge_property(edge_id, "since");
3209        assert_eq!(old.and_then(|v| v.as_int64()), Some(2020));
3210
3211        let since = store.get_edge_property(edge_id, &"since".into());
3212        assert!(since.is_none());
3213    }
3214
3215    #[test]
3216    fn test_add_remove_label() {
3217        let store = LpgStore::new();
3218        let id = store.create_node(&["Person"]);
3219
3220        // Add new label
3221        assert!(store.add_label(id, "Employee"));
3222
3223        let node = store.get_node(id).unwrap();
3224        assert!(node.has_label("Person"));
3225        assert!(node.has_label("Employee"));
3226
3227        // Adding same label again should fail
3228        assert!(!store.add_label(id, "Employee"));
3229
3230        // Remove label
3231        assert!(store.remove_label(id, "Employee"));
3232
3233        let node = store.get_node(id).unwrap();
3234        assert!(node.has_label("Person"));
3235        assert!(!node.has_label("Employee"));
3236
3237        // Removing non-existent label should fail
3238        assert!(!store.remove_label(id, "Employee"));
3239        assert!(!store.remove_label(id, "NonExistent"));
3240    }
3241
3242    #[test]
3243    fn test_add_label_to_nonexistent_node() {
3244        let store = LpgStore::new();
3245        let fake_id = NodeId::new(999);
3246        assert!(!store.add_label(fake_id, "Label"));
3247    }
3248
3249    #[test]
3250    fn test_remove_label_from_nonexistent_node() {
3251        let store = LpgStore::new();
3252        let fake_id = NodeId::new(999);
3253        assert!(!store.remove_label(fake_id, "Label"));
3254    }
3255
3256    #[test]
3257    fn test_node_ids() {
3258        let store = LpgStore::new();
3259
3260        let n1 = store.create_node(&["Person"]);
3261        let n2 = store.create_node(&["Person"]);
3262        let n3 = store.create_node(&["Person"]);
3263
3264        let ids = store.node_ids();
3265        assert_eq!(ids.len(), 3);
3266        assert!(ids.contains(&n1));
3267        assert!(ids.contains(&n2));
3268        assert!(ids.contains(&n3));
3269
3270        // Delete one
3271        store.delete_node(n2);
3272        let ids = store.node_ids();
3273        assert_eq!(ids.len(), 2);
3274        assert!(!ids.contains(&n2));
3275    }
3276
3277    #[test]
3278    fn test_delete_node_nonexistent() {
3279        let store = LpgStore::new();
3280        let fake_id = NodeId::new(999);
3281        assert!(!store.delete_node(fake_id));
3282    }
3283
3284    #[test]
3285    fn test_delete_edge_nonexistent() {
3286        let store = LpgStore::new();
3287        let fake_id = EdgeId::new(999);
3288        assert!(!store.delete_edge(fake_id));
3289    }
3290
3291    #[test]
3292    fn test_delete_edge_double() {
3293        let store = LpgStore::new();
3294        let a = store.create_node(&["Person"]);
3295        let b = store.create_node(&["Person"]);
3296        let edge_id = store.create_edge(a, b, "KNOWS");
3297
3298        assert!(store.delete_edge(edge_id));
3299        assert!(!store.delete_edge(edge_id)); // Double delete
3300    }
3301
3302    #[test]
3303    fn test_create_edge_with_props() {
3304        let store = LpgStore::new();
3305        let a = store.create_node(&["Person"]);
3306        let b = store.create_node(&["Person"]);
3307
3308        let edge_id = store.create_edge_with_props(
3309            a,
3310            b,
3311            "KNOWS",
3312            [
3313                ("since", Value::from(2020i64)),
3314                ("weight", Value::from(1.0)),
3315            ],
3316        );
3317
3318        let edge = store.get_edge(edge_id).unwrap();
3319        assert_eq!(
3320            edge.get_property("since").and_then(|v| v.as_int64()),
3321            Some(2020)
3322        );
3323        assert_eq!(
3324            edge.get_property("weight").and_then(|v| v.as_float64()),
3325            Some(1.0)
3326        );
3327    }
3328
3329    #[test]
3330    fn test_delete_node_edges() {
3331        let store = LpgStore::new();
3332
3333        let a = store.create_node(&["Person"]);
3334        let b = store.create_node(&["Person"]);
3335        let c = store.create_node(&["Person"]);
3336
3337        store.create_edge(a, b, "KNOWS"); // a -> b
3338        store.create_edge(c, a, "KNOWS"); // c -> a
3339
3340        assert_eq!(store.edge_count(), 2);
3341
3342        // Delete all edges connected to a
3343        store.delete_node_edges(a);
3344
3345        assert_eq!(store.edge_count(), 0);
3346    }
3347
3348    #[test]
3349    fn test_neighbors_both_directions() {
3350        let store = LpgStore::new();
3351
3352        let a = store.create_node(&["Person"]);
3353        let b = store.create_node(&["Person"]);
3354        let c = store.create_node(&["Person"]);
3355
3356        store.create_edge(a, b, "KNOWS"); // a -> b
3357        store.create_edge(c, a, "KNOWS"); // c -> a
3358
3359        // Direction::Both for node a
3360        let neighbors: Vec<_> = store.neighbors(a, Direction::Both).collect();
3361        assert_eq!(neighbors.len(), 2);
3362        assert!(neighbors.contains(&b)); // outgoing
3363        assert!(neighbors.contains(&c)); // incoming
3364    }
3365
3366    #[test]
3367    fn test_edges_from() {
3368        let store = LpgStore::new();
3369
3370        let a = store.create_node(&["Person"]);
3371        let b = store.create_node(&["Person"]);
3372        let c = store.create_node(&["Person"]);
3373
3374        let e1 = store.create_edge(a, b, "KNOWS");
3375        let e2 = store.create_edge(a, c, "KNOWS");
3376
3377        let edges: Vec<_> = store.edges_from(a, Direction::Outgoing).collect();
3378        assert_eq!(edges.len(), 2);
3379        assert!(edges.iter().any(|(_, e)| *e == e1));
3380        assert!(edges.iter().any(|(_, e)| *e == e2));
3381
3382        // Incoming edges to b
3383        let incoming: Vec<_> = store.edges_from(b, Direction::Incoming).collect();
3384        assert_eq!(incoming.len(), 1);
3385        assert_eq!(incoming[0].1, e1);
3386    }
3387
3388    #[test]
3389    fn test_edges_to() {
3390        let store = LpgStore::new();
3391
3392        let a = store.create_node(&["Person"]);
3393        let b = store.create_node(&["Person"]);
3394        let c = store.create_node(&["Person"]);
3395
3396        let e1 = store.create_edge(a, b, "KNOWS");
3397        let e2 = store.create_edge(c, b, "KNOWS");
3398
3399        // Edges pointing TO b
3400        let to_b = store.edges_to(b);
3401        assert_eq!(to_b.len(), 2);
3402        assert!(to_b.iter().any(|(src, e)| *src == a && *e == e1));
3403        assert!(to_b.iter().any(|(src, e)| *src == c && *e == e2));
3404    }
3405
3406    #[test]
3407    fn test_out_degree_in_degree() {
3408        let store = LpgStore::new();
3409
3410        let a = store.create_node(&["Person"]);
3411        let b = store.create_node(&["Person"]);
3412        let c = store.create_node(&["Person"]);
3413
3414        store.create_edge(a, b, "KNOWS");
3415        store.create_edge(a, c, "KNOWS");
3416        store.create_edge(c, b, "KNOWS");
3417
3418        assert_eq!(store.out_degree(a), 2);
3419        assert_eq!(store.out_degree(b), 0);
3420        assert_eq!(store.out_degree(c), 1);
3421
3422        assert_eq!(store.in_degree(a), 0);
3423        assert_eq!(store.in_degree(b), 2);
3424        assert_eq!(store.in_degree(c), 1);
3425    }
3426
3427    #[test]
3428    fn test_edge_type() {
3429        let store = LpgStore::new();
3430
3431        let a = store.create_node(&["Person"]);
3432        let b = store.create_node(&["Person"]);
3433        let edge_id = store.create_edge(a, b, "KNOWS");
3434
3435        let edge_type = store.edge_type(edge_id);
3436        assert_eq!(edge_type.as_deref(), Some("KNOWS"));
3437
3438        // Non-existent edge
3439        let fake_id = EdgeId::new(999);
3440        assert!(store.edge_type(fake_id).is_none());
3441    }
3442
3443    #[test]
3444    fn test_count_methods() {
3445        let store = LpgStore::new();
3446
3447        assert_eq!(store.label_count(), 0);
3448        assert_eq!(store.edge_type_count(), 0);
3449        assert_eq!(store.property_key_count(), 0);
3450
3451        let a = store.create_node_with_props(&["Person"], [("age", Value::from(30i64))]);
3452        let b = store.create_node(&["Company"]);
3453        store.create_edge_with_props(a, b, "WORKS_AT", [("since", Value::from(2020i64))]);
3454
3455        assert_eq!(store.label_count(), 2); // Person, Company
3456        assert_eq!(store.edge_type_count(), 1); // WORKS_AT
3457        assert_eq!(store.property_key_count(), 2); // age, since
3458    }
3459
3460    #[test]
3461    fn test_all_nodes_and_edges() {
3462        let store = LpgStore::new();
3463
3464        let a = store.create_node(&["Person"]);
3465        let b = store.create_node(&["Person"]);
3466        store.create_edge(a, b, "KNOWS");
3467
3468        let nodes: Vec<_> = store.all_nodes().collect();
3469        assert_eq!(nodes.len(), 2);
3470
3471        let edges: Vec<_> = store.all_edges().collect();
3472        assert_eq!(edges.len(), 1);
3473    }
3474
3475    #[test]
3476    fn test_all_labels_and_edge_types() {
3477        let store = LpgStore::new();
3478
3479        store.create_node(&["Person"]);
3480        store.create_node(&["Company"]);
3481        let a = store.create_node(&["Animal"]);
3482        let b = store.create_node(&["Animal"]);
3483        store.create_edge(a, b, "EATS");
3484
3485        let labels = store.all_labels();
3486        assert_eq!(labels.len(), 3);
3487        assert!(labels.contains(&"Person".to_string()));
3488        assert!(labels.contains(&"Company".to_string()));
3489        assert!(labels.contains(&"Animal".to_string()));
3490
3491        let edge_types = store.all_edge_types();
3492        assert_eq!(edge_types.len(), 1);
3493        assert!(edge_types.contains(&"EATS".to_string()));
3494    }
3495
3496    #[test]
3497    fn test_all_property_keys() {
3498        let store = LpgStore::new();
3499
3500        let a = store.create_node_with_props(&["Person"], [("name", Value::from("Alice"))]);
3501        let b = store.create_node_with_props(&["Person"], [("age", Value::from(30i64))]);
3502        store.create_edge_with_props(a, b, "KNOWS", [("since", Value::from(2020i64))]);
3503
3504        let keys = store.all_property_keys();
3505        assert!(keys.contains(&"name".to_string()));
3506        assert!(keys.contains(&"age".to_string()));
3507        assert!(keys.contains(&"since".to_string()));
3508    }
3509
3510    #[test]
3511    fn test_nodes_with_label() {
3512        let store = LpgStore::new();
3513
3514        store.create_node(&["Person"]);
3515        store.create_node(&["Person"]);
3516        store.create_node(&["Company"]);
3517
3518        let persons: Vec<_> = store.nodes_with_label("Person").collect();
3519        assert_eq!(persons.len(), 2);
3520
3521        let companies: Vec<_> = store.nodes_with_label("Company").collect();
3522        assert_eq!(companies.len(), 1);
3523
3524        let none: Vec<_> = store.nodes_with_label("NonExistent").collect();
3525        assert_eq!(none.len(), 0);
3526    }
3527
3528    #[test]
3529    fn test_edges_with_type() {
3530        let store = LpgStore::new();
3531
3532        let a = store.create_node(&["Person"]);
3533        let b = store.create_node(&["Person"]);
3534        let c = store.create_node(&["Company"]);
3535
3536        store.create_edge(a, b, "KNOWS");
3537        store.create_edge(a, c, "WORKS_AT");
3538
3539        let knows: Vec<_> = store.edges_with_type("KNOWS").collect();
3540        assert_eq!(knows.len(), 1);
3541
3542        let works_at: Vec<_> = store.edges_with_type("WORKS_AT").collect();
3543        assert_eq!(works_at.len(), 1);
3544
3545        let none: Vec<_> = store.edges_with_type("NonExistent").collect();
3546        assert_eq!(none.len(), 0);
3547    }
3548
3549    #[test]
3550    fn test_nodes_by_label_nonexistent() {
3551        let store = LpgStore::new();
3552        store.create_node(&["Person"]);
3553
3554        let empty = store.nodes_by_label("NonExistent");
3555        assert!(empty.is_empty());
3556    }
3557
3558    #[test]
3559    fn test_statistics() {
3560        let store = LpgStore::new();
3561
3562        let a = store.create_node(&["Person"]);
3563        let b = store.create_node(&["Person"]);
3564        let c = store.create_node(&["Company"]);
3565
3566        store.create_edge(a, b, "KNOWS");
3567        store.create_edge(a, c, "WORKS_AT");
3568
3569        store.compute_statistics();
3570        let stats = store.statistics();
3571
3572        assert_eq!(stats.total_nodes, 3);
3573        assert_eq!(stats.total_edges, 2);
3574
3575        // Estimates
3576        let person_card = store.estimate_label_cardinality("Person");
3577        assert!(person_card > 0.0);
3578
3579        let avg_degree = store.estimate_avg_degree("KNOWS", true);
3580        assert!(avg_degree >= 0.0);
3581    }
3582
3583    #[test]
3584    fn test_zone_maps() {
3585        let store = LpgStore::new();
3586
3587        store.create_node_with_props(&["Person"], [("age", Value::from(25i64))]);
3588        store.create_node_with_props(&["Person"], [("age", Value::from(35i64))]);
3589
3590        // Zone map should indicate possible matches (30 is within [25, 35] range)
3591        let might_match =
3592            store.node_property_might_match(&"age".into(), CompareOp::Eq, &Value::from(30i64));
3593        // Zone maps return true conservatively when value is within min/max range
3594        assert!(might_match);
3595
3596        let zone = store.node_property_zone_map(&"age".into());
3597        assert!(zone.is_some());
3598
3599        // Non-existent property
3600        let no_zone = store.node_property_zone_map(&"nonexistent".into());
3601        assert!(no_zone.is_none());
3602
3603        // Edge zone maps
3604        let a = store.create_node(&["A"]);
3605        let b = store.create_node(&["B"]);
3606        store.create_edge_with_props(a, b, "REL", [("weight", Value::from(1.0))]);
3607
3608        let edge_zone = store.edge_property_zone_map(&"weight".into());
3609        assert!(edge_zone.is_some());
3610    }
3611
3612    #[test]
3613    fn test_rebuild_zone_maps() {
3614        let store = LpgStore::new();
3615        store.create_node_with_props(&["Person"], [("age", Value::from(25i64))]);
3616
3617        // Should not panic
3618        store.rebuild_zone_maps();
3619    }
3620
3621    #[test]
3622    fn test_create_node_with_id() {
3623        let store = LpgStore::new();
3624
3625        let specific_id = NodeId::new(100);
3626        store.create_node_with_id(specific_id, &["Person", "Employee"]);
3627
3628        let node = store.get_node(specific_id).unwrap();
3629        assert!(node.has_label("Person"));
3630        assert!(node.has_label("Employee"));
3631
3632        // Next auto-generated ID should be > 100
3633        let next = store.create_node(&["Other"]);
3634        assert!(next.as_u64() > 100);
3635    }
3636
3637    #[test]
3638    fn test_create_edge_with_id() {
3639        let store = LpgStore::new();
3640
3641        let a = store.create_node(&["A"]);
3642        let b = store.create_node(&["B"]);
3643
3644        let specific_id = EdgeId::new(500);
3645        store.create_edge_with_id(specific_id, a, b, "REL");
3646
3647        let edge = store.get_edge(specific_id).unwrap();
3648        assert_eq!(edge.src, a);
3649        assert_eq!(edge.dst, b);
3650        assert_eq!(edge.edge_type.as_str(), "REL");
3651
3652        // Next auto-generated ID should be > 500
3653        let next = store.create_edge(a, b, "OTHER");
3654        assert!(next.as_u64() > 500);
3655    }
3656
3657    #[test]
3658    fn test_set_epoch() {
3659        let store = LpgStore::new();
3660
3661        assert_eq!(store.current_epoch().as_u64(), 0);
3662
3663        store.set_epoch(EpochId::new(42));
3664        assert_eq!(store.current_epoch().as_u64(), 42);
3665    }
3666
3667    #[test]
3668    fn test_get_node_nonexistent() {
3669        let store = LpgStore::new();
3670        let fake_id = NodeId::new(999);
3671        assert!(store.get_node(fake_id).is_none());
3672    }
3673
3674    #[test]
3675    fn test_get_edge_nonexistent() {
3676        let store = LpgStore::new();
3677        let fake_id = EdgeId::new(999);
3678        assert!(store.get_edge(fake_id).is_none());
3679    }
3680
3681    #[test]
3682    fn test_multiple_labels() {
3683        let store = LpgStore::new();
3684
3685        let id = store.create_node(&["Person", "Employee", "Manager"]);
3686        let node = store.get_node(id).unwrap();
3687
3688        assert!(node.has_label("Person"));
3689        assert!(node.has_label("Employee"));
3690        assert!(node.has_label("Manager"));
3691        assert!(!node.has_label("Other"));
3692    }
3693
3694    #[test]
3695    fn test_default_impl() {
3696        let store: LpgStore = Default::default();
3697        assert_eq!(store.node_count(), 0);
3698        assert_eq!(store.edge_count(), 0);
3699    }
3700
3701    #[test]
3702    fn test_edges_from_both_directions() {
3703        let store = LpgStore::new();
3704
3705        let a = store.create_node(&["A"]);
3706        let b = store.create_node(&["B"]);
3707        let c = store.create_node(&["C"]);
3708
3709        let e1 = store.create_edge(a, b, "R1"); // a -> b
3710        let e2 = store.create_edge(c, a, "R2"); // c -> a
3711
3712        // Both directions from a
3713        let edges: Vec<_> = store.edges_from(a, Direction::Both).collect();
3714        assert_eq!(edges.len(), 2);
3715        assert!(edges.iter().any(|(_, e)| *e == e1)); // outgoing
3716        assert!(edges.iter().any(|(_, e)| *e == e2)); // incoming
3717    }
3718
3719    #[test]
3720    fn test_no_backward_adj_in_degree() {
3721        let config = LpgStoreConfig {
3722            backward_edges: false,
3723            initial_node_capacity: 10,
3724            initial_edge_capacity: 10,
3725        };
3726        let store = LpgStore::with_config(config);
3727
3728        let a = store.create_node(&["A"]);
3729        let b = store.create_node(&["B"]);
3730        store.create_edge(a, b, "R");
3731
3732        // in_degree should still work (falls back to scanning)
3733        let degree = store.in_degree(b);
3734        assert_eq!(degree, 1);
3735    }
3736
3737    #[test]
3738    fn test_no_backward_adj_edges_to() {
3739        let config = LpgStoreConfig {
3740            backward_edges: false,
3741            initial_node_capacity: 10,
3742            initial_edge_capacity: 10,
3743        };
3744        let store = LpgStore::with_config(config);
3745
3746        let a = store.create_node(&["A"]);
3747        let b = store.create_node(&["B"]);
3748        let e = store.create_edge(a, b, "R");
3749
3750        // edges_to should still work (falls back to scanning)
3751        let edges = store.edges_to(b);
3752        assert_eq!(edges.len(), 1);
3753        assert_eq!(edges[0].1, e);
3754    }
3755
3756    #[test]
3757    fn test_node_versioned_creation() {
3758        let store = LpgStore::new();
3759
3760        let epoch = store.new_epoch();
3761        let tx_id = TxId::new(1);
3762
3763        let id = store.create_node_versioned(&["Person"], epoch, tx_id);
3764        assert!(store.get_node(id).is_some());
3765    }
3766
3767    #[test]
3768    fn test_edge_versioned_creation() {
3769        let store = LpgStore::new();
3770
3771        let a = store.create_node(&["A"]);
3772        let b = store.create_node(&["B"]);
3773
3774        let epoch = store.new_epoch();
3775        let tx_id = TxId::new(1);
3776
3777        let edge_id = store.create_edge_versioned(a, b, "REL", epoch, tx_id);
3778        assert!(store.get_edge(edge_id).is_some());
3779    }
3780
3781    #[test]
3782    fn test_node_with_props_versioned() {
3783        let store = LpgStore::new();
3784
3785        let epoch = store.new_epoch();
3786        let tx_id = TxId::new(1);
3787
3788        let id = store.create_node_with_props_versioned(
3789            &["Person"],
3790            [("name", Value::from("Alice"))],
3791            epoch,
3792            tx_id,
3793        );
3794
3795        let node = store.get_node(id).unwrap();
3796        assert_eq!(
3797            node.get_property("name").and_then(|v| v.as_str()),
3798            Some("Alice")
3799        );
3800    }
3801
3802    #[test]
3803    fn test_discard_uncommitted_versions() {
3804        let store = LpgStore::new();
3805
3806        let epoch = store.new_epoch();
3807        let tx_id = TxId::new(42);
3808
3809        // Create node with specific tx
3810        let node_id = store.create_node_versioned(&["Person"], epoch, tx_id);
3811        assert!(store.get_node(node_id).is_some());
3812
3813        // Discard uncommitted versions for this tx
3814        store.discard_uncommitted_versions(tx_id);
3815
3816        // Node should be gone (version chain was removed)
3817        assert!(store.get_node(node_id).is_none());
3818    }
3819
3820    // === Property Index Tests ===
3821
3822    #[test]
3823    fn test_property_index_create_and_lookup() {
3824        let store = LpgStore::new();
3825
3826        // Create nodes with properties
3827        let alice = store.create_node(&["Person"]);
3828        let bob = store.create_node(&["Person"]);
3829        let charlie = store.create_node(&["Person"]);
3830
3831        store.set_node_property(alice, "city", Value::from("NYC"));
3832        store.set_node_property(bob, "city", Value::from("NYC"));
3833        store.set_node_property(charlie, "city", Value::from("LA"));
3834
3835        // Before indexing, lookup still works (via scan)
3836        let nyc_people = store.find_nodes_by_property("city", &Value::from("NYC"));
3837        assert_eq!(nyc_people.len(), 2);
3838
3839        // Create index
3840        store.create_property_index("city");
3841        assert!(store.has_property_index("city"));
3842
3843        // Indexed lookup should return same results
3844        let nyc_people = store.find_nodes_by_property("city", &Value::from("NYC"));
3845        assert_eq!(nyc_people.len(), 2);
3846        assert!(nyc_people.contains(&alice));
3847        assert!(nyc_people.contains(&bob));
3848
3849        let la_people = store.find_nodes_by_property("city", &Value::from("LA"));
3850        assert_eq!(la_people.len(), 1);
3851        assert!(la_people.contains(&charlie));
3852    }
3853
3854    #[test]
3855    fn test_property_index_maintained_on_update() {
3856        let store = LpgStore::new();
3857
3858        // Create index first
3859        store.create_property_index("status");
3860
3861        let node = store.create_node(&["Task"]);
3862        store.set_node_property(node, "status", Value::from("pending"));
3863
3864        // Should find by initial value
3865        let pending = store.find_nodes_by_property("status", &Value::from("pending"));
3866        assert_eq!(pending.len(), 1);
3867        assert!(pending.contains(&node));
3868
3869        // Update the property
3870        store.set_node_property(node, "status", Value::from("done"));
3871
3872        // Old value should not find it
3873        let pending = store.find_nodes_by_property("status", &Value::from("pending"));
3874        assert!(pending.is_empty());
3875
3876        // New value should find it
3877        let done = store.find_nodes_by_property("status", &Value::from("done"));
3878        assert_eq!(done.len(), 1);
3879        assert!(done.contains(&node));
3880    }
3881
3882    #[test]
3883    fn test_property_index_maintained_on_remove() {
3884        let store = LpgStore::new();
3885
3886        store.create_property_index("tag");
3887
3888        let node = store.create_node(&["Item"]);
3889        store.set_node_property(node, "tag", Value::from("important"));
3890
3891        // Should find it
3892        let found = store.find_nodes_by_property("tag", &Value::from("important"));
3893        assert_eq!(found.len(), 1);
3894
3895        // Remove the property
3896        store.remove_node_property(node, "tag");
3897
3898        // Should no longer find it
3899        let found = store.find_nodes_by_property("tag", &Value::from("important"));
3900        assert!(found.is_empty());
3901    }
3902
3903    #[test]
3904    fn test_property_index_drop() {
3905        let store = LpgStore::new();
3906
3907        store.create_property_index("key");
3908        assert!(store.has_property_index("key"));
3909
3910        assert!(store.drop_property_index("key"));
3911        assert!(!store.has_property_index("key"));
3912
3913        // Dropping non-existent index returns false
3914        assert!(!store.drop_property_index("key"));
3915    }
3916
3917    #[test]
3918    fn test_property_index_multiple_values() {
3919        let store = LpgStore::new();
3920
3921        store.create_property_index("age");
3922
3923        // Create multiple nodes with same and different ages
3924        let n1 = store.create_node(&["Person"]);
3925        let n2 = store.create_node(&["Person"]);
3926        let n3 = store.create_node(&["Person"]);
3927        let n4 = store.create_node(&["Person"]);
3928
3929        store.set_node_property(n1, "age", Value::from(25i64));
3930        store.set_node_property(n2, "age", Value::from(25i64));
3931        store.set_node_property(n3, "age", Value::from(30i64));
3932        store.set_node_property(n4, "age", Value::from(25i64));
3933
3934        let age_25 = store.find_nodes_by_property("age", &Value::from(25i64));
3935        assert_eq!(age_25.len(), 3);
3936
3937        let age_30 = store.find_nodes_by_property("age", &Value::from(30i64));
3938        assert_eq!(age_30.len(), 1);
3939
3940        let age_40 = store.find_nodes_by_property("age", &Value::from(40i64));
3941        assert!(age_40.is_empty());
3942    }
3943
3944    #[test]
3945    fn test_property_index_builds_from_existing_data() {
3946        let store = LpgStore::new();
3947
3948        // Create nodes first
3949        let n1 = store.create_node(&["Person"]);
3950        let n2 = store.create_node(&["Person"]);
3951        store.set_node_property(n1, "email", Value::from("alice@example.com"));
3952        store.set_node_property(n2, "email", Value::from("bob@example.com"));
3953
3954        // Create index after data exists
3955        store.create_property_index("email");
3956
3957        // Index should include existing data
3958        let alice = store.find_nodes_by_property("email", &Value::from("alice@example.com"));
3959        assert_eq!(alice.len(), 1);
3960        assert!(alice.contains(&n1));
3961
3962        let bob = store.find_nodes_by_property("email", &Value::from("bob@example.com"));
3963        assert_eq!(bob.len(), 1);
3964        assert!(bob.contains(&n2));
3965    }
3966
3967    #[test]
3968    fn test_get_node_property_batch() {
3969        let store = LpgStore::new();
3970
3971        let n1 = store.create_node(&["Person"]);
3972        let n2 = store.create_node(&["Person"]);
3973        let n3 = store.create_node(&["Person"]);
3974
3975        store.set_node_property(n1, "age", Value::from(25i64));
3976        store.set_node_property(n2, "age", Value::from(30i64));
3977        // n3 has no age property
3978
3979        let age_key = PropertyKey::new("age");
3980        let values = store.get_node_property_batch(&[n1, n2, n3], &age_key);
3981
3982        assert_eq!(values.len(), 3);
3983        assert_eq!(values[0], Some(Value::from(25i64)));
3984        assert_eq!(values[1], Some(Value::from(30i64)));
3985        assert_eq!(values[2], None);
3986    }
3987
3988    #[test]
3989    fn test_get_node_property_batch_empty() {
3990        let store = LpgStore::new();
3991        let key = PropertyKey::new("any");
3992
3993        let values = store.get_node_property_batch(&[], &key);
3994        assert!(values.is_empty());
3995    }
3996
3997    #[test]
3998    fn test_get_nodes_properties_batch() {
3999        let store = LpgStore::new();
4000
4001        let n1 = store.create_node(&["Person"]);
4002        let n2 = store.create_node(&["Person"]);
4003        let n3 = store.create_node(&["Person"]);
4004
4005        store.set_node_property(n1, "name", Value::from("Alice"));
4006        store.set_node_property(n1, "age", Value::from(25i64));
4007        store.set_node_property(n2, "name", Value::from("Bob"));
4008        // n3 has no properties
4009
4010        let all_props = store.get_nodes_properties_batch(&[n1, n2, n3]);
4011
4012        assert_eq!(all_props.len(), 3);
4013        assert_eq!(all_props[0].len(), 2); // name and age
4014        assert_eq!(all_props[1].len(), 1); // name only
4015        assert_eq!(all_props[2].len(), 0); // no properties
4016
4017        assert_eq!(
4018            all_props[0].get(&PropertyKey::new("name")),
4019            Some(&Value::from("Alice"))
4020        );
4021        assert_eq!(
4022            all_props[1].get(&PropertyKey::new("name")),
4023            Some(&Value::from("Bob"))
4024        );
4025    }
4026
4027    #[test]
4028    fn test_get_nodes_properties_batch_empty() {
4029        let store = LpgStore::new();
4030
4031        let all_props = store.get_nodes_properties_batch(&[]);
4032        assert!(all_props.is_empty());
4033    }
4034
4035    #[test]
4036    fn test_get_nodes_properties_selective_batch() {
4037        let store = LpgStore::new();
4038
4039        let n1 = store.create_node(&["Person"]);
4040        let n2 = store.create_node(&["Person"]);
4041
4042        // Set multiple properties
4043        store.set_node_property(n1, "name", Value::from("Alice"));
4044        store.set_node_property(n1, "age", Value::from(25i64));
4045        store.set_node_property(n1, "email", Value::from("alice@example.com"));
4046        store.set_node_property(n2, "name", Value::from("Bob"));
4047        store.set_node_property(n2, "age", Value::from(30i64));
4048        store.set_node_property(n2, "city", Value::from("NYC"));
4049
4050        // Request only name and age (not email or city)
4051        let keys = vec![PropertyKey::new("name"), PropertyKey::new("age")];
4052        let props = store.get_nodes_properties_selective_batch(&[n1, n2], &keys);
4053
4054        assert_eq!(props.len(), 2);
4055
4056        // n1: should have name and age, but NOT email
4057        assert_eq!(props[0].len(), 2);
4058        assert_eq!(
4059            props[0].get(&PropertyKey::new("name")),
4060            Some(&Value::from("Alice"))
4061        );
4062        assert_eq!(
4063            props[0].get(&PropertyKey::new("age")),
4064            Some(&Value::from(25i64))
4065        );
4066        assert_eq!(props[0].get(&PropertyKey::new("email")), None);
4067
4068        // n2: should have name and age, but NOT city
4069        assert_eq!(props[1].len(), 2);
4070        assert_eq!(
4071            props[1].get(&PropertyKey::new("name")),
4072            Some(&Value::from("Bob"))
4073        );
4074        assert_eq!(
4075            props[1].get(&PropertyKey::new("age")),
4076            Some(&Value::from(30i64))
4077        );
4078        assert_eq!(props[1].get(&PropertyKey::new("city")), None);
4079    }
4080
4081    #[test]
4082    fn test_get_nodes_properties_selective_batch_empty_keys() {
4083        let store = LpgStore::new();
4084
4085        let n1 = store.create_node(&["Person"]);
4086        store.set_node_property(n1, "name", Value::from("Alice"));
4087
4088        // Request no properties
4089        let props = store.get_nodes_properties_selective_batch(&[n1], &[]);
4090
4091        assert_eq!(props.len(), 1);
4092        assert!(props[0].is_empty()); // Empty map when no keys requested
4093    }
4094
4095    #[test]
4096    fn test_get_nodes_properties_selective_batch_missing_keys() {
4097        let store = LpgStore::new();
4098
4099        let n1 = store.create_node(&["Person"]);
4100        store.set_node_property(n1, "name", Value::from("Alice"));
4101
4102        // Request a property that doesn't exist
4103        let keys = vec![PropertyKey::new("nonexistent"), PropertyKey::new("name")];
4104        let props = store.get_nodes_properties_selective_batch(&[n1], &keys);
4105
4106        assert_eq!(props.len(), 1);
4107        assert_eq!(props[0].len(), 1); // Only name exists
4108        assert_eq!(
4109            props[0].get(&PropertyKey::new("name")),
4110            Some(&Value::from("Alice"))
4111        );
4112    }
4113
4114    // === Range Query Tests ===
4115
4116    #[test]
4117    fn test_find_nodes_in_range_inclusive() {
4118        let store = LpgStore::new();
4119
4120        let n1 = store.create_node_with_props(&["Person"], [("age", Value::from(20i64))]);
4121        let n2 = store.create_node_with_props(&["Person"], [("age", Value::from(30i64))]);
4122        let n3 = store.create_node_with_props(&["Person"], [("age", Value::from(40i64))]);
4123        let _n4 = store.create_node_with_props(&["Person"], [("age", Value::from(50i64))]);
4124
4125        // age >= 20 AND age <= 40 (inclusive both sides)
4126        let result = store.find_nodes_in_range(
4127            "age",
4128            Some(&Value::from(20i64)),
4129            Some(&Value::from(40i64)),
4130            true,
4131            true,
4132        );
4133        assert_eq!(result.len(), 3);
4134        assert!(result.contains(&n1));
4135        assert!(result.contains(&n2));
4136        assert!(result.contains(&n3));
4137    }
4138
4139    #[test]
4140    fn test_find_nodes_in_range_exclusive() {
4141        let store = LpgStore::new();
4142
4143        store.create_node_with_props(&["Person"], [("age", Value::from(20i64))]);
4144        let n2 = store.create_node_with_props(&["Person"], [("age", Value::from(30i64))]);
4145        store.create_node_with_props(&["Person"], [("age", Value::from(40i64))]);
4146
4147        // age > 20 AND age < 40 (exclusive both sides)
4148        let result = store.find_nodes_in_range(
4149            "age",
4150            Some(&Value::from(20i64)),
4151            Some(&Value::from(40i64)),
4152            false,
4153            false,
4154        );
4155        assert_eq!(result.len(), 1);
4156        assert!(result.contains(&n2));
4157    }
4158
4159    #[test]
4160    fn test_find_nodes_in_range_open_ended() {
4161        let store = LpgStore::new();
4162
4163        store.create_node_with_props(&["Person"], [("age", Value::from(20i64))]);
4164        store.create_node_with_props(&["Person"], [("age", Value::from(30i64))]);
4165        let n3 = store.create_node_with_props(&["Person"], [("age", Value::from(40i64))]);
4166        let n4 = store.create_node_with_props(&["Person"], [("age", Value::from(50i64))]);
4167
4168        // age >= 35 (no upper bound)
4169        let result = store.find_nodes_in_range("age", Some(&Value::from(35i64)), None, true, true);
4170        assert_eq!(result.len(), 2);
4171        assert!(result.contains(&n3));
4172        assert!(result.contains(&n4));
4173
4174        // age <= 25 (no lower bound)
4175        let result = store.find_nodes_in_range("age", None, Some(&Value::from(25i64)), true, true);
4176        assert_eq!(result.len(), 1);
4177    }
4178
4179    #[test]
4180    fn test_find_nodes_in_range_empty_result() {
4181        let store = LpgStore::new();
4182
4183        store.create_node_with_props(&["Person"], [("age", Value::from(20i64))]);
4184
4185        // Range that doesn't match anything
4186        let result = store.find_nodes_in_range(
4187            "age",
4188            Some(&Value::from(100i64)),
4189            Some(&Value::from(200i64)),
4190            true,
4191            true,
4192        );
4193        assert!(result.is_empty());
4194    }
4195
4196    #[test]
4197    fn test_find_nodes_in_range_nonexistent_property() {
4198        let store = LpgStore::new();
4199
4200        store.create_node_with_props(&["Person"], [("age", Value::from(20i64))]);
4201
4202        let result = store.find_nodes_in_range(
4203            "weight",
4204            Some(&Value::from(50i64)),
4205            Some(&Value::from(100i64)),
4206            true,
4207            true,
4208        );
4209        assert!(result.is_empty());
4210    }
4211
4212    // === Multi-Property Query Tests ===
4213
4214    #[test]
4215    fn test_find_nodes_by_properties_multiple_conditions() {
4216        let store = LpgStore::new();
4217
4218        let alice = store.create_node_with_props(
4219            &["Person"],
4220            [("name", Value::from("Alice")), ("city", Value::from("NYC"))],
4221        );
4222        store.create_node_with_props(
4223            &["Person"],
4224            [("name", Value::from("Bob")), ("city", Value::from("NYC"))],
4225        );
4226        store.create_node_with_props(
4227            &["Person"],
4228            [("name", Value::from("Alice")), ("city", Value::from("LA"))],
4229        );
4230
4231        // Match name="Alice" AND city="NYC"
4232        let result = store.find_nodes_by_properties(&[
4233            ("name", Value::from("Alice")),
4234            ("city", Value::from("NYC")),
4235        ]);
4236        assert_eq!(result.len(), 1);
4237        assert!(result.contains(&alice));
4238    }
4239
4240    #[test]
4241    fn test_find_nodes_by_properties_empty_conditions() {
4242        let store = LpgStore::new();
4243
4244        store.create_node(&["Person"]);
4245        store.create_node(&["Person"]);
4246
4247        // Empty conditions should return all nodes
4248        let result = store.find_nodes_by_properties(&[]);
4249        assert_eq!(result.len(), 2);
4250    }
4251
4252    #[test]
4253    fn test_find_nodes_by_properties_no_match() {
4254        let store = LpgStore::new();
4255
4256        store.create_node_with_props(&["Person"], [("name", Value::from("Alice"))]);
4257
4258        let result = store.find_nodes_by_properties(&[("name", Value::from("Nobody"))]);
4259        assert!(result.is_empty());
4260    }
4261
4262    #[test]
4263    fn test_find_nodes_by_properties_with_index() {
4264        let store = LpgStore::new();
4265
4266        // Create index on name
4267        store.create_property_index("name");
4268
4269        let alice = store.create_node_with_props(
4270            &["Person"],
4271            [("name", Value::from("Alice")), ("age", Value::from(30i64))],
4272        );
4273        store.create_node_with_props(
4274            &["Person"],
4275            [("name", Value::from("Bob")), ("age", Value::from(30i64))],
4276        );
4277
4278        // Index should accelerate the lookup
4279        let result = store.find_nodes_by_properties(&[
4280            ("name", Value::from("Alice")),
4281            ("age", Value::from(30i64)),
4282        ]);
4283        assert_eq!(result.len(), 1);
4284        assert!(result.contains(&alice));
4285    }
4286
4287    // === Cardinality Estimation Tests ===
4288
4289    #[test]
4290    fn test_estimate_label_cardinality() {
4291        let store = LpgStore::new();
4292
4293        store.create_node(&["Person"]);
4294        store.create_node(&["Person"]);
4295        store.create_node(&["Animal"]);
4296
4297        store.ensure_statistics_fresh();
4298
4299        let person_est = store.estimate_label_cardinality("Person");
4300        let animal_est = store.estimate_label_cardinality("Animal");
4301        let unknown_est = store.estimate_label_cardinality("Unknown");
4302
4303        assert!(
4304            person_est >= 1.0,
4305            "Person should have cardinality >= 1, got {person_est}"
4306        );
4307        assert!(
4308            animal_est >= 1.0,
4309            "Animal should have cardinality >= 1, got {animal_est}"
4310        );
4311        // Unknown label should return some default (not panic)
4312        assert!(unknown_est >= 0.0);
4313    }
4314
4315    #[test]
4316    fn test_estimate_avg_degree() {
4317        let store = LpgStore::new();
4318
4319        let a = store.create_node(&["Person"]);
4320        let b = store.create_node(&["Person"]);
4321        let c = store.create_node(&["Person"]);
4322
4323        store.create_edge(a, b, "KNOWS");
4324        store.create_edge(a, c, "KNOWS");
4325        store.create_edge(b, c, "KNOWS");
4326
4327        store.ensure_statistics_fresh();
4328
4329        let outgoing = store.estimate_avg_degree("KNOWS", true);
4330        let incoming = store.estimate_avg_degree("KNOWS", false);
4331
4332        assert!(
4333            outgoing > 0.0,
4334            "Outgoing degree should be > 0, got {outgoing}"
4335        );
4336        assert!(
4337            incoming > 0.0,
4338            "Incoming degree should be > 0, got {incoming}"
4339        );
4340    }
4341
4342    // === Delete operations ===
4343
4344    #[test]
4345    fn test_delete_node_does_not_cascade() {
4346        let store = LpgStore::new();
4347
4348        let a = store.create_node(&["A"]);
4349        let b = store.create_node(&["B"]);
4350        let e = store.create_edge(a, b, "KNOWS");
4351
4352        assert!(store.delete_node(a));
4353        assert!(store.get_node(a).is_none());
4354
4355        // Edges are NOT automatically deleted (non-detach delete)
4356        assert!(
4357            store.get_edge(e).is_some(),
4358            "Edge should survive non-detach node delete"
4359        );
4360    }
4361
4362    #[test]
4363    fn test_delete_already_deleted_node() {
4364        let store = LpgStore::new();
4365        let a = store.create_node(&["A"]);
4366
4367        assert!(store.delete_node(a));
4368        // Second delete should return false (already deleted)
4369        assert!(!store.delete_node(a));
4370    }
4371
4372    #[test]
4373    fn test_delete_nonexistent_node() {
4374        let store = LpgStore::new();
4375        assert!(!store.delete_node(NodeId::new(999)));
4376    }
4377}