Skip to main content

grafeo_core/graph/lpg/
store.rs

1//! The in-memory LPG graph store.
2//!
3//! This is where your nodes and edges actually live. Most users interact
4//! through [`GrafeoDB`](grafeo_engine::GrafeoDB), but algorithm implementers
5//! sometimes need the raw [`LpgStore`] for direct adjacency traversal.
6//!
7//! Key features:
8//! - MVCC versioning - concurrent readers don't block each other
9//! - Columnar properties with zone maps for fast filtering
10//! - Forward and backward adjacency indexes
11
12use super::property::CompareOp;
13use super::{Edge, EdgeRecord, Node, NodeRecord, PropertyStorage};
14use crate::graph::Direction;
15use crate::index::adjacency::ChunkedAdjacency;
16use crate::index::zone_map::ZoneMapEntry;
17use crate::statistics::{EdgeTypeStatistics, LabelStatistics, Statistics};
18use arcstr::ArcStr;
19use dashmap::DashMap;
20#[cfg(not(feature = "tiered-storage"))]
21use grafeo_common::mvcc::VersionChain;
22use grafeo_common::types::{EdgeId, EpochId, HashableValue, NodeId, PropertyKey, TxId, Value};
23use grafeo_common::utils::hash::{FxHashMap, FxHashSet};
24use parking_lot::RwLock;
25use std::cmp::Ordering as CmpOrdering;
26#[cfg(any(
27    feature = "tiered-storage",
28    feature = "vector-index",
29    feature = "text-index"
30))]
31use std::sync::Arc;
32use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
33
34#[cfg(feature = "vector-index")]
35use crate::index::vector::HnswIndex;
36
37/// Compares two values for ordering (used for range checks).
38fn compare_values_for_range(a: &Value, b: &Value) -> Option<CmpOrdering> {
39    match (a, b) {
40        (Value::Int64(a), Value::Int64(b)) => Some(a.cmp(b)),
41        (Value::Float64(a), Value::Float64(b)) => a.partial_cmp(b),
42        (Value::String(a), Value::String(b)) => Some(a.cmp(b)),
43        (Value::Bool(a), Value::Bool(b)) => Some(a.cmp(b)),
44        _ => None,
45    }
46}
47
48/// Checks if a value is within a range.
49fn value_in_range(
50    value: &Value,
51    min: Option<&Value>,
52    max: Option<&Value>,
53    min_inclusive: bool,
54    max_inclusive: bool,
55) -> bool {
56    // Check lower bound
57    if let Some(min_val) = min {
58        match compare_values_for_range(value, min_val) {
59            Some(CmpOrdering::Less) => return false,
60            Some(CmpOrdering::Equal) if !min_inclusive => return false,
61            None => return false, // Can't compare
62            _ => {}
63        }
64    }
65
66    // Check upper bound
67    if let Some(max_val) = max {
68        match compare_values_for_range(value, max_val) {
69            Some(CmpOrdering::Greater) => return false,
70            Some(CmpOrdering::Equal) if !max_inclusive => return false,
71            None => return false,
72            _ => {}
73        }
74    }
75
76    true
77}
78
79// Tiered storage imports
80#[cfg(feature = "tiered-storage")]
81use crate::storage::EpochStore;
82#[cfg(feature = "tiered-storage")]
83use grafeo_common::memory::arena::ArenaAllocator;
84#[cfg(feature = "tiered-storage")]
85use grafeo_common::mvcc::{ColdVersionRef, HotVersionRef, VersionIndex, VersionRef};
86
87/// Configuration for the LPG store.
88///
89/// The defaults work well for most cases. Tune `backward_edges` if you only
90/// traverse in one direction (saves memory), or adjust capacities if you know
91/// your graph size upfront (avoids reallocations).
92#[derive(Debug, Clone)]
93pub struct LpgStoreConfig {
94    /// Maintain backward adjacency for incoming edge queries. Turn off if
95    /// you only traverse outgoing edges - saves ~50% adjacency memory.
96    pub backward_edges: bool,
97    /// Initial capacity for nodes (avoids early reallocations).
98    pub initial_node_capacity: usize,
99    /// Initial capacity for edges (avoids early reallocations).
100    pub initial_edge_capacity: usize,
101}
102
103impl Default for LpgStoreConfig {
104    fn default() -> Self {
105        Self {
106            backward_edges: true,
107            initial_node_capacity: 1024,
108            initial_edge_capacity: 4096,
109        }
110    }
111}
112
113/// The core in-memory graph storage.
114///
115/// Everything lives here: nodes, edges, properties, adjacency indexes, and
116/// version chains for MVCC. Concurrent reads never block each other.
117///
118/// Most users should go through `GrafeoDB` (from the `grafeo_engine` crate) which
119/// adds transaction management and query execution. Use `LpgStore` directly
120/// when you need raw performance for algorithm implementations.
121///
122/// # Example
123///
124/// ```
125/// use grafeo_core::graph::lpg::LpgStore;
126/// use grafeo_core::graph::Direction;
127///
128/// let store = LpgStore::new();
129///
130/// // Create a small social network
131/// let alice = store.create_node(&["Person"]);
132/// let bob = store.create_node(&["Person"]);
133/// store.create_edge(alice, bob, "KNOWS");
134///
135/// // Traverse outgoing edges
136/// for neighbor in store.neighbors(alice, Direction::Outgoing) {
137///     println!("Alice knows node {:?}", neighbor);
138/// }
139/// ```
140///
141/// # Lock Ordering
142///
143/// `LpgStore` contains multiple `RwLock` fields that must be acquired in a
144/// consistent order to prevent deadlocks. Always acquire locks in this order:
145///
146/// ## Level 1 - Entity Storage (mutually exclusive via feature flag)
147/// 1. `nodes` / `node_versions`
148/// 2. `edges` / `edge_versions`
149///
150/// ## Level 2 - Catalogs (acquire as pairs when writing)
151/// 3. `label_to_id` + `id_to_label`
152/// 4. `edge_type_to_id` + `id_to_edge_type`
153///
154/// ## Level 3 - Indexes
155/// 5. `label_index`
156/// 6. `node_labels`
157/// 7. `property_indexes`
158///
159/// ## Level 4 - Statistics
160/// 8. `statistics`
161///
162/// ## Level 5 - Nested Locks (internal to other structs)
163/// 9. `PropertyStorage::columns` (via `node_properties`/`edge_properties`)
164/// 10. `ChunkedAdjacency::lists` (via `forward_adj`/`backward_adj`)
165///
166/// ## Rules
167/// - Catalog pairs must be acquired together when writing.
168/// - Never hold entity locks while acquiring catalog locks in a different scope.
169/// - Statistics lock is always last.
170/// - Read locks are generally safe, but avoid read-to-write upgrades.
171pub struct LpgStore {
172    /// Configuration.
173    #[allow(dead_code)]
174    config: LpgStoreConfig,
175
176    /// Node records indexed by NodeId, with version chains for MVCC.
177    /// Used when `tiered-storage` feature is disabled.
178    /// Lock order: 1
179    #[cfg(not(feature = "tiered-storage"))]
180    nodes: RwLock<FxHashMap<NodeId, VersionChain<NodeRecord>>>,
181
182    /// Edge records indexed by EdgeId, with version chains for MVCC.
183    /// Used when `tiered-storage` feature is disabled.
184    /// Lock order: 2
185    #[cfg(not(feature = "tiered-storage"))]
186    edges: RwLock<FxHashMap<EdgeId, VersionChain<EdgeRecord>>>,
187
188    // === Tiered Storage Fields (feature-gated) ===
189    //
190    // Lock ordering for arena access:
191    //   version_lock (read/write) → arena read lock (via arena_allocator.arena())
192    //
193    // Rules:
194    // - Acquire arena read lock *after* version locks, never before.
195    // - Multiple threads may call arena.read_at() concurrently (shared refs only).
196    // - Never acquire arena write lock (alloc_new_chunk) while holding version locks.
197    // - freeze_epoch order: node_versions.read() → arena.read_at(),
198    //   then edge_versions.read() → arena.read_at().
199    /// Arena allocator for hot data storage.
200    /// Data is stored in per-epoch arenas for fast allocation and bulk deallocation.
201    #[cfg(feature = "tiered-storage")]
202    arena_allocator: Arc<ArenaAllocator>,
203
204    /// Node version indexes - store metadata and arena offsets.
205    /// The actual NodeRecord data is stored in the arena.
206    /// Lock order: 1
207    #[cfg(feature = "tiered-storage")]
208    node_versions: RwLock<FxHashMap<NodeId, VersionIndex>>,
209
210    /// Edge version indexes - store metadata and arena offsets.
211    /// The actual EdgeRecord data is stored in the arena.
212    /// Lock order: 2
213    #[cfg(feature = "tiered-storage")]
214    edge_versions: RwLock<FxHashMap<EdgeId, VersionIndex>>,
215
216    /// Cold storage for frozen epochs.
217    /// Contains compressed epoch blocks for historical data.
218    #[cfg(feature = "tiered-storage")]
219    epoch_store: Arc<EpochStore>,
220
221    /// Property storage for nodes.
222    node_properties: PropertyStorage<NodeId>,
223
224    /// Property storage for edges.
225    edge_properties: PropertyStorage<EdgeId>,
226
227    /// Label name to ID mapping.
228    /// Lock order: 3 (acquire with id_to_label)
229    label_to_id: RwLock<FxHashMap<ArcStr, u32>>,
230
231    /// Label ID to name mapping.
232    /// Lock order: 3 (acquire with label_to_id)
233    id_to_label: RwLock<Vec<ArcStr>>,
234
235    /// Edge type name to ID mapping.
236    /// Lock order: 4 (acquire with id_to_edge_type)
237    edge_type_to_id: RwLock<FxHashMap<ArcStr, u32>>,
238
239    /// Edge type ID to name mapping.
240    /// Lock order: 4 (acquire with edge_type_to_id)
241    id_to_edge_type: RwLock<Vec<ArcStr>>,
242
243    /// Forward adjacency lists (outgoing edges).
244    forward_adj: ChunkedAdjacency,
245
246    /// Backward adjacency lists (incoming edges).
247    /// Only populated if config.backward_edges is true.
248    backward_adj: Option<ChunkedAdjacency>,
249
250    /// Label index: label_id -> set of node IDs.
251    /// Lock order: 5
252    label_index: RwLock<Vec<FxHashMap<NodeId, ()>>>,
253
254    /// Node labels: node_id -> set of label IDs.
255    /// Reverse mapping to efficiently get labels for a node.
256    /// Lock order: 6
257    node_labels: RwLock<FxHashMap<NodeId, FxHashSet<u32>>>,
258
259    /// Property indexes: property_key -> (value -> set of node IDs).
260    ///
261    /// When a property is indexed, lookups by value are O(1) instead of O(n).
262    /// Use [`create_property_index`] to enable indexing for a property.
263    /// Lock order: 7
264    property_indexes: RwLock<FxHashMap<PropertyKey, DashMap<HashableValue, FxHashSet<NodeId>>>>,
265
266    /// Vector indexes: "label:property" -> HNSW index.
267    ///
268    /// Created via [`GrafeoDB::create_vector_index`](grafeo_engine::GrafeoDB::create_vector_index).
269    /// Lock order: 7 (same level as property_indexes, disjoint keys)
270    #[cfg(feature = "vector-index")]
271    vector_indexes: RwLock<FxHashMap<String, Arc<HnswIndex>>>,
272
273    /// Text indexes: "label:property" -> inverted index with BM25 scoring.
274    ///
275    /// Created via [`GrafeoDB::create_text_index`](grafeo_engine::GrafeoDB::create_text_index).
276    /// Lock order: 7 (same level as property_indexes, disjoint keys)
277    #[cfg(feature = "text-index")]
278    text_indexes: RwLock<FxHashMap<String, Arc<RwLock<crate::index::text::InvertedIndex>>>>,
279
280    /// Next node ID.
281    next_node_id: AtomicU64,
282
283    /// Next edge ID.
284    next_edge_id: AtomicU64,
285
286    /// Current epoch.
287    current_epoch: AtomicU64,
288
289    /// Statistics for cost-based optimization.
290    /// Lock order: 8 (always last)
291    statistics: RwLock<Statistics>,
292
293    /// Whether statistics need recomputation after mutations.
294    needs_stats_recompute: AtomicBool,
295}
296
297impl LpgStore {
298    /// Creates a new LPG store with default configuration.
299    #[must_use]
300    pub fn new() -> Self {
301        Self::with_config(LpgStoreConfig::default())
302    }
303
304    /// Creates a new LPG store with custom configuration.
305    #[must_use]
306    pub fn with_config(config: LpgStoreConfig) -> Self {
307        let backward_adj = if config.backward_edges {
308            Some(ChunkedAdjacency::new())
309        } else {
310            None
311        };
312
313        Self {
314            #[cfg(not(feature = "tiered-storage"))]
315            nodes: RwLock::new(FxHashMap::default()),
316            #[cfg(not(feature = "tiered-storage"))]
317            edges: RwLock::new(FxHashMap::default()),
318            #[cfg(feature = "tiered-storage")]
319            arena_allocator: Arc::new(ArenaAllocator::new()),
320            #[cfg(feature = "tiered-storage")]
321            node_versions: RwLock::new(FxHashMap::default()),
322            #[cfg(feature = "tiered-storage")]
323            edge_versions: RwLock::new(FxHashMap::default()),
324            #[cfg(feature = "tiered-storage")]
325            epoch_store: Arc::new(EpochStore::new()),
326            node_properties: PropertyStorage::new(),
327            edge_properties: PropertyStorage::new(),
328            label_to_id: RwLock::new(FxHashMap::default()),
329            id_to_label: RwLock::new(Vec::new()),
330            edge_type_to_id: RwLock::new(FxHashMap::default()),
331            id_to_edge_type: RwLock::new(Vec::new()),
332            forward_adj: ChunkedAdjacency::new(),
333            backward_adj,
334            label_index: RwLock::new(Vec::new()),
335            node_labels: RwLock::new(FxHashMap::default()),
336            property_indexes: RwLock::new(FxHashMap::default()),
337            #[cfg(feature = "vector-index")]
338            vector_indexes: RwLock::new(FxHashMap::default()),
339            #[cfg(feature = "text-index")]
340            text_indexes: RwLock::new(FxHashMap::default()),
341            next_node_id: AtomicU64::new(0),
342            next_edge_id: AtomicU64::new(0),
343            current_epoch: AtomicU64::new(0),
344            statistics: RwLock::new(Statistics::new()),
345            needs_stats_recompute: AtomicBool::new(true),
346            config,
347        }
348    }
349
350    /// Returns the current epoch.
351    #[must_use]
352    pub fn current_epoch(&self) -> EpochId {
353        EpochId::new(self.current_epoch.load(Ordering::Acquire))
354    }
355
356    /// Creates a new epoch.
357    pub fn new_epoch(&self) -> EpochId {
358        let id = self.current_epoch.fetch_add(1, Ordering::AcqRel) + 1;
359        EpochId::new(id)
360    }
361
362    /// Syncs the store epoch to match an external epoch counter.
363    ///
364    /// Used by the transaction manager to keep the store's epoch in step
365    /// after a transaction commit advances the global epoch.
366    pub fn sync_epoch(&self, epoch: EpochId) {
367        self.current_epoch
368            .fetch_max(epoch.as_u64(), Ordering::AcqRel);
369    }
370
371    // === Node Operations ===
372
373    /// Creates a new node with the given labels.
374    ///
375    /// Uses the system transaction for non-transactional operations.
376    pub fn create_node(&self, labels: &[&str]) -> NodeId {
377        self.needs_stats_recompute.store(true, Ordering::Relaxed);
378        self.create_node_versioned(labels, self.current_epoch(), TxId::SYSTEM)
379    }
380
381    /// Creates a new node with the given labels within a transaction context.
382    #[cfg(not(feature = "tiered-storage"))]
383    pub fn create_node_versioned(&self, labels: &[&str], epoch: EpochId, tx_id: TxId) -> NodeId {
384        let id = NodeId::new(self.next_node_id.fetch_add(1, Ordering::Relaxed));
385
386        let mut record = NodeRecord::new(id, epoch);
387        record.set_label_count(labels.len() as u16);
388
389        // Store labels in node_labels map and label_index
390        let mut node_label_set = FxHashSet::default();
391        for label in labels {
392            let label_id = self.get_or_create_label_id(*label);
393            node_label_set.insert(label_id);
394
395            // Update label index
396            let mut index = self.label_index.write();
397            while index.len() <= label_id as usize {
398                index.push(FxHashMap::default());
399            }
400            index[label_id as usize].insert(id, ());
401        }
402
403        // Store node's labels
404        self.node_labels.write().insert(id, node_label_set);
405
406        // Create version chain with initial version
407        let chain = VersionChain::with_initial(record, epoch, tx_id);
408        self.nodes.write().insert(id, chain);
409        id
410    }
411
412    /// Creates a new node with the given labels within a transaction context.
413    /// (Tiered storage version: stores data in arena, metadata in VersionIndex)
414    #[cfg(feature = "tiered-storage")]
415    pub fn create_node_versioned(&self, labels: &[&str], epoch: EpochId, tx_id: TxId) -> NodeId {
416        let id = NodeId::new(self.next_node_id.fetch_add(1, Ordering::Relaxed));
417
418        let mut record = NodeRecord::new(id, epoch);
419        record.set_label_count(labels.len() as u16);
420
421        // Store labels in node_labels map and label_index
422        let mut node_label_set = FxHashSet::default();
423        for label in labels {
424            let label_id = self.get_or_create_label_id(*label);
425            node_label_set.insert(label_id);
426
427            // Update label index
428            let mut index = self.label_index.write();
429            while index.len() <= label_id as usize {
430                index.push(FxHashMap::default());
431            }
432            index[label_id as usize].insert(id, ());
433        }
434
435        // Store node's labels
436        self.node_labels.write().insert(id, node_label_set);
437
438        // Allocate record in arena and get offset (create epoch if needed)
439        let arena = self.arena_allocator.arena_or_create(epoch);
440        let (offset, _stored) = arena.alloc_value_with_offset(record);
441
442        // Create HotVersionRef pointing to arena data
443        let hot_ref = HotVersionRef::new(epoch, offset, tx_id);
444
445        // Create or update version index
446        let mut versions = self.node_versions.write();
447        if let Some(index) = versions.get_mut(&id) {
448            index.add_hot(hot_ref);
449        } else {
450            versions.insert(id, VersionIndex::with_initial(hot_ref));
451        }
452
453        id
454    }
455
456    /// Creates a new node with labels and properties.
457    pub fn create_node_with_props(
458        &self,
459        labels: &[&str],
460        properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
461    ) -> NodeId {
462        self.create_node_with_props_versioned(
463            labels,
464            properties,
465            self.current_epoch(),
466            TxId::SYSTEM,
467        )
468    }
469
470    /// Creates a new node with labels and properties within a transaction context.
471    #[cfg(not(feature = "tiered-storage"))]
472    pub fn create_node_with_props_versioned(
473        &self,
474        labels: &[&str],
475        properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
476        epoch: EpochId,
477        tx_id: TxId,
478    ) -> NodeId {
479        let id = self.create_node_versioned(labels, epoch, tx_id);
480
481        for (key, value) in properties {
482            let prop_key: PropertyKey = key.into();
483            let prop_value: Value = value.into();
484            // Update property index before setting the property
485            self.update_property_index_on_set(id, &prop_key, &prop_value);
486            self.node_properties.set(id, prop_key, prop_value);
487        }
488
489        // Update props_count in record
490        let count = self.node_properties.get_all(id).len() as u16;
491        if let Some(chain) = self.nodes.write().get_mut(&id)
492            && let Some(record) = chain.latest_mut()
493        {
494            record.props_count = count;
495        }
496
497        id
498    }
499
500    /// Creates a new node with labels and properties within a transaction context.
501    /// (Tiered storage version)
502    #[cfg(feature = "tiered-storage")]
503    pub fn create_node_with_props_versioned(
504        &self,
505        labels: &[&str],
506        properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
507        epoch: EpochId,
508        tx_id: TxId,
509    ) -> NodeId {
510        let id = self.create_node_versioned(labels, epoch, tx_id);
511
512        for (key, value) in properties {
513            let prop_key: PropertyKey = key.into();
514            let prop_value: Value = value.into();
515            // Update property index before setting the property
516            self.update_property_index_on_set(id, &prop_key, &prop_value);
517            self.node_properties.set(id, prop_key, prop_value);
518        }
519
520        // Note: props_count in record is not updated for tiered storage.
521        // The record is immutable once allocated in the arena.
522
523        id
524    }
525
526    /// Gets a node by ID (latest visible version).
527    #[must_use]
528    pub fn get_node(&self, id: NodeId) -> Option<Node> {
529        self.get_node_at_epoch(id, self.current_epoch())
530    }
531
532    /// Gets a node by ID at a specific epoch.
533    #[must_use]
534    #[cfg(not(feature = "tiered-storage"))]
535    pub fn get_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> Option<Node> {
536        let nodes = self.nodes.read();
537        let chain = nodes.get(&id)?;
538        let record = chain.visible_at(epoch)?;
539
540        if record.is_deleted() {
541            return None;
542        }
543
544        let mut node = Node::new(id);
545
546        // Get labels from node_labels map
547        let id_to_label = self.id_to_label.read();
548        let node_labels = self.node_labels.read();
549        if let Some(label_ids) = node_labels.get(&id) {
550            for &label_id in label_ids {
551                if let Some(label) = id_to_label.get(label_id as usize) {
552                    node.labels.push(label.clone());
553                }
554            }
555        }
556
557        // Get properties
558        node.properties = self.node_properties.get_all(id).into_iter().collect();
559
560        Some(node)
561    }
562
563    /// Gets a node by ID at a specific epoch.
564    /// (Tiered storage version: reads from arena via VersionIndex)
565    #[must_use]
566    #[cfg(feature = "tiered-storage")]
567    pub fn get_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> Option<Node> {
568        let versions = self.node_versions.read();
569        let index = versions.get(&id)?;
570        let version_ref = index.visible_at(epoch)?;
571
572        // Read the record from arena
573        let record = self.read_node_record(&version_ref)?;
574
575        if record.is_deleted() {
576            return None;
577        }
578
579        let mut node = Node::new(id);
580
581        // Get labels from node_labels map
582        let id_to_label = self.id_to_label.read();
583        let node_labels = self.node_labels.read();
584        if let Some(label_ids) = node_labels.get(&id) {
585            for &label_id in label_ids {
586                if let Some(label) = id_to_label.get(label_id as usize) {
587                    node.labels.push(label.clone());
588                }
589            }
590        }
591
592        // Get properties
593        node.properties = self.node_properties.get_all(id).into_iter().collect();
594
595        Some(node)
596    }
597
598    /// Gets a node visible to a specific transaction.
599    #[must_use]
600    #[cfg(not(feature = "tiered-storage"))]
601    pub fn get_node_versioned(&self, id: NodeId, epoch: EpochId, tx_id: TxId) -> Option<Node> {
602        let nodes = self.nodes.read();
603        let chain = nodes.get(&id)?;
604        let record = chain.visible_to(epoch, tx_id)?;
605
606        if record.is_deleted() {
607            return None;
608        }
609
610        let mut node = Node::new(id);
611
612        // Get labels from node_labels map
613        let id_to_label = self.id_to_label.read();
614        let node_labels = self.node_labels.read();
615        if let Some(label_ids) = node_labels.get(&id) {
616            for &label_id in label_ids {
617                if let Some(label) = id_to_label.get(label_id as usize) {
618                    node.labels.push(label.clone());
619                }
620            }
621        }
622
623        // Get properties
624        node.properties = self.node_properties.get_all(id).into_iter().collect();
625
626        Some(node)
627    }
628
629    /// Gets a node visible to a specific transaction.
630    /// (Tiered storage version: reads from arena via VersionIndex)
631    #[must_use]
632    #[cfg(feature = "tiered-storage")]
633    pub fn get_node_versioned(&self, id: NodeId, epoch: EpochId, tx_id: TxId) -> Option<Node> {
634        let versions = self.node_versions.read();
635        let index = versions.get(&id)?;
636        let version_ref = index.visible_to(epoch, tx_id)?;
637
638        // Read the record from arena
639        let record = self.read_node_record(&version_ref)?;
640
641        if record.is_deleted() {
642            return None;
643        }
644
645        let mut node = Node::new(id);
646
647        // Get labels from node_labels map
648        let id_to_label = self.id_to_label.read();
649        let node_labels = self.node_labels.read();
650        if let Some(label_ids) = node_labels.get(&id) {
651            for &label_id in label_ids {
652                if let Some(label) = id_to_label.get(label_id as usize) {
653                    node.labels.push(label.clone());
654                }
655            }
656        }
657
658        // Get properties
659        node.properties = self.node_properties.get_all(id).into_iter().collect();
660
661        Some(node)
662    }
663
664    /// Reads a NodeRecord from arena (hot) or epoch store (cold) using a VersionRef.
665    #[cfg(feature = "tiered-storage")]
666    #[allow(unsafe_code)]
667    fn read_node_record(&self, version_ref: &VersionRef) -> Option<NodeRecord> {
668        match version_ref {
669            VersionRef::Hot(hot_ref) => {
670                let arena = self.arena_allocator.arena(hot_ref.epoch);
671                // SAFETY: The offset was returned by alloc_value_with_offset for a NodeRecord
672                let record: &NodeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
673                Some(*record)
674            }
675            VersionRef::Cold(cold_ref) => {
676                // Read from compressed epoch store
677                self.epoch_store
678                    .get_node(cold_ref.epoch, cold_ref.block_offset, cold_ref.length)
679            }
680        }
681    }
682
683    /// Deletes a node and all its edges (using latest epoch).
684    pub fn delete_node(&self, id: NodeId) -> bool {
685        self.needs_stats_recompute.store(true, Ordering::Relaxed);
686        self.delete_node_at_epoch(id, self.current_epoch())
687    }
688
689    /// Deletes a node at a specific epoch.
690    #[cfg(not(feature = "tiered-storage"))]
691    pub fn delete_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> bool {
692        let mut nodes = self.nodes.write();
693        if let Some(chain) = nodes.get_mut(&id) {
694            // Check if visible at this epoch (not already deleted)
695            if let Some(record) = chain.visible_at(epoch) {
696                if record.is_deleted() {
697                    return false;
698                }
699            } else {
700                // Not visible at this epoch (already deleted or doesn't exist)
701                return false;
702            }
703
704            // Mark the version chain as deleted at this epoch
705            chain.mark_deleted(epoch);
706
707            // Remove from label index using node_labels map
708            let mut index = self.label_index.write();
709            let mut node_labels = self.node_labels.write();
710            if let Some(label_ids) = node_labels.remove(&id) {
711                for label_id in label_ids {
712                    if let Some(set) = index.get_mut(label_id as usize) {
713                        set.remove(&id);
714                    }
715                }
716            }
717
718            // Remove properties
719            drop(nodes); // Release lock before removing properties
720            drop(index);
721            drop(node_labels);
722            self.node_properties.remove_all(id);
723
724            // Note: Caller should use delete_node_edges() first if detach is needed
725
726            true
727        } else {
728            false
729        }
730    }
731
732    /// Deletes a node at a specific epoch.
733    /// (Tiered storage version)
734    #[cfg(feature = "tiered-storage")]
735    pub fn delete_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> bool {
736        let mut versions = self.node_versions.write();
737        if let Some(index) = versions.get_mut(&id) {
738            // Check if visible at this epoch
739            if let Some(version_ref) = index.visible_at(epoch) {
740                if let Some(record) = self.read_node_record(&version_ref) {
741                    if record.is_deleted() {
742                        return false;
743                    }
744                } else {
745                    return false;
746                }
747            } else {
748                return false;
749            }
750
751            // Mark as deleted in version index
752            index.mark_deleted(epoch);
753
754            // Remove from label index using node_labels map
755            let mut label_index = self.label_index.write();
756            let mut node_labels = self.node_labels.write();
757            if let Some(label_ids) = node_labels.remove(&id) {
758                for label_id in label_ids {
759                    if let Some(set) = label_index.get_mut(label_id as usize) {
760                        set.remove(&id);
761                    }
762                }
763            }
764
765            // Remove properties
766            drop(versions);
767            drop(label_index);
768            drop(node_labels);
769            self.node_properties.remove_all(id);
770
771            true
772        } else {
773            false
774        }
775    }
776
777    /// Deletes all edges connected to a node (implements DETACH DELETE).
778    ///
779    /// Call this before `delete_node()` if you want to remove a node that
780    /// has edges. Grafeo doesn't auto-delete edges - you have to be explicit.
781    #[cfg(not(feature = "tiered-storage"))]
782    pub fn delete_node_edges(&self, node_id: NodeId) {
783        // Get outgoing edges
784        let outgoing: Vec<EdgeId> = self
785            .forward_adj
786            .edges_from(node_id)
787            .into_iter()
788            .map(|(_, edge_id)| edge_id)
789            .collect();
790
791        // Get incoming edges
792        let incoming: Vec<EdgeId> = if let Some(ref backward) = self.backward_adj {
793            backward
794                .edges_from(node_id)
795                .into_iter()
796                .map(|(_, edge_id)| edge_id)
797                .collect()
798        } else {
799            // No backward adjacency - scan all edges
800            let epoch = self.current_epoch();
801            self.edges
802                .read()
803                .iter()
804                .filter_map(|(id, chain)| {
805                    chain.visible_at(epoch).and_then(|r| {
806                        if !r.is_deleted() && r.dst == node_id {
807                            Some(*id)
808                        } else {
809                            None
810                        }
811                    })
812                })
813                .collect()
814        };
815
816        // Delete all edges
817        for edge_id in outgoing.into_iter().chain(incoming) {
818            self.delete_edge(edge_id);
819        }
820    }
821
822    /// Deletes all edges connected to a node (implements DETACH DELETE).
823    /// (Tiered storage version)
824    #[cfg(feature = "tiered-storage")]
825    pub fn delete_node_edges(&self, node_id: NodeId) {
826        // Get outgoing edges
827        let outgoing: Vec<EdgeId> = self
828            .forward_adj
829            .edges_from(node_id)
830            .into_iter()
831            .map(|(_, edge_id)| edge_id)
832            .collect();
833
834        // Get incoming edges
835        let incoming: Vec<EdgeId> = if let Some(ref backward) = self.backward_adj {
836            backward
837                .edges_from(node_id)
838                .into_iter()
839                .map(|(_, edge_id)| edge_id)
840                .collect()
841        } else {
842            // No backward adjacency - scan all edges
843            let epoch = self.current_epoch();
844            let versions = self.edge_versions.read();
845            versions
846                .iter()
847                .filter_map(|(id, index)| {
848                    index.visible_at(epoch).and_then(|vref| {
849                        self.read_edge_record(&vref).and_then(|r| {
850                            if !r.is_deleted() && r.dst == node_id {
851                                Some(*id)
852                            } else {
853                                None
854                            }
855                        })
856                    })
857                })
858                .collect()
859        };
860
861        // Delete all edges
862        for edge_id in outgoing.into_iter().chain(incoming) {
863            self.delete_edge(edge_id);
864        }
865    }
866
867    /// Sets a property on a node.
868    #[cfg(not(feature = "tiered-storage"))]
869    pub fn set_node_property(&self, id: NodeId, key: &str, value: Value) {
870        let prop_key: PropertyKey = key.into();
871
872        // Update property index before setting the property (needs to read old value)
873        self.update_property_index_on_set(id, &prop_key, &value);
874
875        self.node_properties.set(id, prop_key, value);
876
877        // Update props_count in record
878        let count = self.node_properties.get_all(id).len() as u16;
879        if let Some(chain) = self.nodes.write().get_mut(&id)
880            && let Some(record) = chain.latest_mut()
881        {
882            record.props_count = count;
883        }
884    }
885
886    /// Sets a property on a node.
887    /// (Tiered storage version: properties stored separately, record is immutable)
888    #[cfg(feature = "tiered-storage")]
889    pub fn set_node_property(&self, id: NodeId, key: &str, value: Value) {
890        let prop_key: PropertyKey = key.into();
891
892        // Update property index before setting the property (needs to read old value)
893        self.update_property_index_on_set(id, &prop_key, &value);
894
895        self.node_properties.set(id, prop_key, value);
896        // Note: props_count in record is not updated for tiered storage.
897        // The record is immutable once allocated in the arena.
898        // Property count can be derived from PropertyStorage if needed.
899    }
900
901    /// Sets a property on an edge.
902    pub fn set_edge_property(&self, id: EdgeId, key: &str, value: Value) {
903        self.edge_properties.set(id, key.into(), value);
904    }
905
906    /// Removes a property from a node.
907    ///
908    /// Returns the previous value if it existed, or None if the property didn't exist.
909    #[cfg(not(feature = "tiered-storage"))]
910    pub fn remove_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
911        let prop_key: PropertyKey = key.into();
912
913        // Update property index before removing (needs to read old value)
914        self.update_property_index_on_remove(id, &prop_key);
915
916        let result = self.node_properties.remove(id, &prop_key);
917
918        // Update props_count in record
919        let count = self.node_properties.get_all(id).len() as u16;
920        if let Some(chain) = self.nodes.write().get_mut(&id)
921            && let Some(record) = chain.latest_mut()
922        {
923            record.props_count = count;
924        }
925
926        result
927    }
928
929    /// Removes a property from a node.
930    /// (Tiered storage version)
931    #[cfg(feature = "tiered-storage")]
932    pub fn remove_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
933        let prop_key: PropertyKey = key.into();
934
935        // Update property index before removing (needs to read old value)
936        self.update_property_index_on_remove(id, &prop_key);
937
938        self.node_properties.remove(id, &prop_key)
939        // Note: props_count in record is not updated for tiered storage.
940    }
941
942    /// Removes a property from an edge.
943    ///
944    /// Returns the previous value if it existed, or None if the property didn't exist.
945    pub fn remove_edge_property(&self, id: EdgeId, key: &str) -> Option<Value> {
946        self.edge_properties.remove(id, &key.into())
947    }
948
949    /// Gets a single property from a node without loading all properties.
950    ///
951    /// This is O(1) vs O(properties) for `get_node().get_property()`.
952    /// Use this for filter predicates where you only need one property value.
953    ///
954    /// # Example
955    ///
956    /// ```ignore
957    /// // Fast: Direct single-property lookup
958    /// let age = store.get_node_property(node_id, "age");
959    ///
960    /// // Slow: Loads all properties, then extracts one
961    /// let age = store.get_node(node_id).and_then(|n| n.get_property("age").cloned());
962    /// ```
963    #[must_use]
964    pub fn get_node_property(&self, id: NodeId, key: &PropertyKey) -> Option<Value> {
965        self.node_properties.get(id, key)
966    }
967
968    /// Gets a single property from an edge without loading all properties.
969    ///
970    /// This is O(1) vs O(properties) for `get_edge().get_property()`.
971    #[must_use]
972    pub fn get_edge_property(&self, id: EdgeId, key: &PropertyKey) -> Option<Value> {
973        self.edge_properties.get(id, key)
974    }
975
976    // === Batch Property Operations ===
977
978    /// Gets a property for multiple nodes in a single batch operation.
979    ///
980    /// More efficient than calling [`Self::get_node_property`] in a loop because it
981    /// reduces lock overhead and enables better cache utilization.
982    ///
983    /// # Example
984    ///
985    /// ```
986    /// use grafeo_core::graph::lpg::LpgStore;
987    /// use grafeo_common::types::{NodeId, PropertyKey, Value};
988    ///
989    /// let store = LpgStore::new();
990    /// let n1 = store.create_node(&["Person"]);
991    /// let n2 = store.create_node(&["Person"]);
992    /// store.set_node_property(n1, "age", Value::from(25i64));
993    /// store.set_node_property(n2, "age", Value::from(30i64));
994    ///
995    /// let ages = store.get_node_property_batch(&[n1, n2], &PropertyKey::new("age"));
996    /// assert_eq!(ages, vec![Some(Value::from(25i64)), Some(Value::from(30i64))]);
997    /// ```
998    #[must_use]
999    pub fn get_node_property_batch(&self, ids: &[NodeId], key: &PropertyKey) -> Vec<Option<Value>> {
1000        self.node_properties.get_batch(ids, key)
1001    }
1002
1003    /// Gets all properties for multiple nodes in a single batch operation.
1004    ///
1005    /// Returns a vector of property maps, one per node ID (empty map if no properties).
1006    /// More efficient than calling [`Self::get_node`] in a loop.
1007    #[must_use]
1008    pub fn get_nodes_properties_batch(&self, ids: &[NodeId]) -> Vec<FxHashMap<PropertyKey, Value>> {
1009        self.node_properties.get_all_batch(ids)
1010    }
1011
1012    /// Gets selected properties for multiple nodes (projection pushdown).
1013    ///
1014    /// This is more efficient than [`Self::get_nodes_properties_batch`] when you only
1015    /// need a subset of properties. It only iterates the requested columns instead of
1016    /// all columns.
1017    ///
1018    /// **Use this for**: Queries with explicit projections like `RETURN n.name, n.age`
1019    /// instead of `RETURN n` (which requires all properties).
1020    ///
1021    /// # Example
1022    ///
1023    /// ```
1024    /// use grafeo_core::graph::lpg::LpgStore;
1025    /// use grafeo_common::types::{PropertyKey, Value};
1026    ///
1027    /// let store = LpgStore::new();
1028    /// let n1 = store.create_node(&["Person"]);
1029    /// store.set_node_property(n1, "name", Value::from("Alice"));
1030    /// store.set_node_property(n1, "age", Value::from(30i64));
1031    /// store.set_node_property(n1, "email", Value::from("alice@example.com"));
1032    ///
1033    /// // Only fetch name and age (faster than get_nodes_properties_batch)
1034    /// let keys = vec![PropertyKey::new("name"), PropertyKey::new("age")];
1035    /// let props = store.get_nodes_properties_selective_batch(&[n1], &keys);
1036    ///
1037    /// assert_eq!(props[0].len(), 2); // Only name and age, not email
1038    /// ```
1039    #[must_use]
1040    pub fn get_nodes_properties_selective_batch(
1041        &self,
1042        ids: &[NodeId],
1043        keys: &[PropertyKey],
1044    ) -> Vec<FxHashMap<PropertyKey, Value>> {
1045        self.node_properties.get_selective_batch(ids, keys)
1046    }
1047
1048    /// Gets selected properties for multiple edges (projection pushdown).
1049    ///
1050    /// Edge-property version of [`Self::get_nodes_properties_selective_batch`].
1051    #[must_use]
1052    pub fn get_edges_properties_selective_batch(
1053        &self,
1054        ids: &[EdgeId],
1055        keys: &[PropertyKey],
1056    ) -> Vec<FxHashMap<PropertyKey, Value>> {
1057        self.edge_properties.get_selective_batch(ids, keys)
1058    }
1059
1060    /// Finds nodes where a property value is in a range.
1061    ///
1062    /// This is useful for queries like `n.age > 30` or `n.price BETWEEN 10 AND 100`.
1063    /// Uses zone maps to skip scanning when the range definitely doesn't match.
1064    ///
1065    /// # Arguments
1066    ///
1067    /// * `property` - The property to check
1068    /// * `min` - Optional lower bound (None for unbounded)
1069    /// * `max` - Optional upper bound (None for unbounded)
1070    /// * `min_inclusive` - Whether lower bound is inclusive (>= vs >)
1071    /// * `max_inclusive` - Whether upper bound is inclusive (<= vs <)
1072    ///
1073    /// # Example
1074    ///
1075    /// ```
1076    /// use grafeo_core::graph::lpg::LpgStore;
1077    /// use grafeo_common::types::Value;
1078    ///
1079    /// let store = LpgStore::new();
1080    /// let n1 = store.create_node(&["Person"]);
1081    /// let n2 = store.create_node(&["Person"]);
1082    /// store.set_node_property(n1, "age", Value::from(25i64));
1083    /// store.set_node_property(n2, "age", Value::from(35i64));
1084    ///
1085    /// // Find nodes where age > 30
1086    /// let result = store.find_nodes_in_range(
1087    ///     "age",
1088    ///     Some(&Value::from(30i64)),
1089    ///     None,
1090    ///     false, // exclusive lower bound
1091    ///     true,  // inclusive upper bound (doesn't matter since None)
1092    /// );
1093    /// assert_eq!(result.len(), 1); // Only n2 matches
1094    /// ```
1095    #[must_use]
1096    pub fn find_nodes_in_range(
1097        &self,
1098        property: &str,
1099        min: Option<&Value>,
1100        max: Option<&Value>,
1101        min_inclusive: bool,
1102        max_inclusive: bool,
1103    ) -> Vec<NodeId> {
1104        let key = PropertyKey::new(property);
1105
1106        // Check zone map first - if no values could match, return empty
1107        if !self
1108            .node_properties
1109            .might_match_range(&key, min, max, min_inclusive, max_inclusive)
1110        {
1111            return Vec::new();
1112        }
1113
1114        // Scan all nodes and filter by range
1115        self.node_ids()
1116            .into_iter()
1117            .filter(|&node_id| {
1118                self.node_properties
1119                    .get(node_id, &key)
1120                    .is_some_and(|v| value_in_range(&v, min, max, min_inclusive, max_inclusive))
1121            })
1122            .collect()
1123    }
1124
1125    /// Finds nodes matching multiple property equality conditions.
1126    ///
1127    /// This is more efficient than intersecting multiple single-property lookups
1128    /// because it can use indexes when available and short-circuits on the first
1129    /// miss.
1130    ///
1131    /// # Example
1132    ///
1133    /// ```
1134    /// use grafeo_core::graph::lpg::LpgStore;
1135    /// use grafeo_common::types::Value;
1136    ///
1137    /// let store = LpgStore::new();
1138    /// let alice = store.create_node(&["Person"]);
1139    /// store.set_node_property(alice, "name", Value::from("Alice"));
1140    /// store.set_node_property(alice, "city", Value::from("NYC"));
1141    ///
1142    /// // Find nodes where name = "Alice" AND city = "NYC"
1143    /// let matches = store.find_nodes_by_properties(&[
1144    ///     ("name", Value::from("Alice")),
1145    ///     ("city", Value::from("NYC")),
1146    /// ]);
1147    /// assert!(matches.contains(&alice));
1148    /// ```
1149    #[must_use]
1150    pub fn find_nodes_by_properties(&self, conditions: &[(&str, Value)]) -> Vec<NodeId> {
1151        if conditions.is_empty() {
1152            return self.node_ids();
1153        }
1154
1155        // Find the most selective condition (smallest result set) to start
1156        // If any condition has an index, use that first
1157        let mut best_start: Option<(usize, Vec<NodeId>)> = None;
1158        let indexes = self.property_indexes.read();
1159
1160        for (i, (prop, value)) in conditions.iter().enumerate() {
1161            let key = PropertyKey::new(*prop);
1162            let hv = HashableValue::new(value.clone());
1163
1164            if let Some(index) = indexes.get(&key) {
1165                let matches: Vec<NodeId> = index
1166                    .get(&hv)
1167                    .map(|nodes| nodes.iter().copied().collect())
1168                    .unwrap_or_default();
1169
1170                // Short-circuit if any indexed condition has no matches
1171                if matches.is_empty() {
1172                    return Vec::new();
1173                }
1174
1175                // Use smallest indexed result as starting point
1176                if best_start
1177                    .as_ref()
1178                    .is_none_or(|(_, best)| matches.len() < best.len())
1179                {
1180                    best_start = Some((i, matches));
1181                }
1182            }
1183        }
1184        drop(indexes);
1185
1186        // Start from best indexed result or fall back to full node scan
1187        let (start_idx, mut candidates) = best_start.unwrap_or_else(|| {
1188            // No indexes available, start with first condition via full scan
1189            let (prop, value) = &conditions[0];
1190            (0, self.find_nodes_by_property(prop, value))
1191        });
1192
1193        // Filter candidates through remaining conditions
1194        for (i, (prop, value)) in conditions.iter().enumerate() {
1195            if i == start_idx {
1196                continue;
1197            }
1198
1199            let key = PropertyKey::new(*prop);
1200            candidates.retain(|&node_id| {
1201                self.node_properties
1202                    .get(node_id, &key)
1203                    .is_some_and(|v| v == *value)
1204            });
1205
1206            // Short-circuit if no candidates remain
1207            if candidates.is_empty() {
1208                return Vec::new();
1209            }
1210        }
1211
1212        candidates
1213    }
1214
1215    // === Property Index Operations ===
1216
1217    /// Creates an index on a node property for O(1) lookups by value.
1218    ///
1219    /// After creating an index, calls to [`Self::find_nodes_by_property`] will be
1220    /// O(1) instead of O(n) for this property. The index is automatically
1221    /// maintained when properties are set or removed.
1222    ///
1223    /// # Example
1224    ///
1225    /// ```
1226    /// use grafeo_core::graph::lpg::LpgStore;
1227    /// use grafeo_common::types::Value;
1228    ///
1229    /// let store = LpgStore::new();
1230    ///
1231    /// // Create nodes with an 'id' property
1232    /// let alice = store.create_node(&["Person"]);
1233    /// store.set_node_property(alice, "id", Value::from("alice_123"));
1234    ///
1235    /// // Create an index on the 'id' property
1236    /// store.create_property_index("id");
1237    ///
1238    /// // Now lookups by 'id' are O(1)
1239    /// let found = store.find_nodes_by_property("id", &Value::from("alice_123"));
1240    /// assert!(found.contains(&alice));
1241    /// ```
1242    pub fn create_property_index(&self, property: &str) {
1243        let key = PropertyKey::new(property);
1244
1245        let mut indexes = self.property_indexes.write();
1246        if indexes.contains_key(&key) {
1247            return; // Already indexed
1248        }
1249
1250        // Create the index and populate it with existing data
1251        let index: DashMap<HashableValue, FxHashSet<NodeId>> = DashMap::new();
1252
1253        // Scan all nodes to build the index
1254        for node_id in self.node_ids() {
1255            if let Some(value) = self.node_properties.get(node_id, &key) {
1256                let hv = HashableValue::new(value);
1257                index.entry(hv).or_default().insert(node_id);
1258            }
1259        }
1260
1261        indexes.insert(key, index);
1262    }
1263
1264    /// Drops an index on a node property.
1265    ///
1266    /// Returns `true` if the index existed and was removed.
1267    pub fn drop_property_index(&self, property: &str) -> bool {
1268        let key = PropertyKey::new(property);
1269        self.property_indexes.write().remove(&key).is_some()
1270    }
1271
1272    /// Returns `true` if the property has an index.
1273    #[must_use]
1274    pub fn has_property_index(&self, property: &str) -> bool {
1275        let key = PropertyKey::new(property);
1276        self.property_indexes.read().contains_key(&key)
1277    }
1278
1279    /// Stores a vector index for a label+property pair.
1280    #[cfg(feature = "vector-index")]
1281    pub fn add_vector_index(&self, label: &str, property: &str, index: Arc<HnswIndex>) {
1282        let key = format!("{label}:{property}");
1283        self.vector_indexes.write().insert(key, index);
1284    }
1285
1286    /// Retrieves the vector index for a label+property pair.
1287    #[cfg(feature = "vector-index")]
1288    #[must_use]
1289    pub fn get_vector_index(&self, label: &str, property: &str) -> Option<Arc<HnswIndex>> {
1290        let key = format!("{label}:{property}");
1291        self.vector_indexes.read().get(&key).cloned()
1292    }
1293
1294    /// Removes a vector index for a label+property pair.
1295    ///
1296    /// Returns `true` if the index existed and was removed.
1297    #[cfg(feature = "vector-index")]
1298    pub fn remove_vector_index(&self, label: &str, property: &str) -> bool {
1299        let key = format!("{label}:{property}");
1300        self.vector_indexes.write().remove(&key).is_some()
1301    }
1302
1303    /// Returns all vector index entries as `(key, index)` pairs.
1304    ///
1305    /// Keys are in `"label:property"` format.
1306    #[cfg(feature = "vector-index")]
1307    #[must_use]
1308    pub fn vector_index_entries(&self) -> Vec<(String, Arc<HnswIndex>)> {
1309        self.vector_indexes
1310            .read()
1311            .iter()
1312            .map(|(k, v)| (k.clone(), v.clone()))
1313            .collect()
1314    }
1315
1316    /// Stores a text index for a label+property pair.
1317    #[cfg(feature = "text-index")]
1318    pub fn add_text_index(
1319        &self,
1320        label: &str,
1321        property: &str,
1322        index: Arc<RwLock<crate::index::text::InvertedIndex>>,
1323    ) {
1324        let key = format!("{label}:{property}");
1325        self.text_indexes.write().insert(key, index);
1326    }
1327
1328    /// Retrieves the text index for a label+property pair.
1329    #[cfg(feature = "text-index")]
1330    #[must_use]
1331    pub fn get_text_index(
1332        &self,
1333        label: &str,
1334        property: &str,
1335    ) -> Option<Arc<RwLock<crate::index::text::InvertedIndex>>> {
1336        let key = format!("{label}:{property}");
1337        self.text_indexes.read().get(&key).cloned()
1338    }
1339
1340    /// Removes a text index for a label+property pair.
1341    ///
1342    /// Returns `true` if the index existed and was removed.
1343    #[cfg(feature = "text-index")]
1344    pub fn remove_text_index(&self, label: &str, property: &str) -> bool {
1345        let key = format!("{label}:{property}");
1346        self.text_indexes.write().remove(&key).is_some()
1347    }
1348
1349    /// Returns all text index entries as `(key, index)` pairs.
1350    ///
1351    /// The key format is `"label:property"`.
1352    #[cfg(feature = "text-index")]
1353    pub fn text_index_entries(
1354        &self,
1355    ) -> Vec<(String, Arc<RwLock<crate::index::text::InvertedIndex>>)> {
1356        self.text_indexes
1357            .read()
1358            .iter()
1359            .map(|(k, v)| (k.clone(), v.clone()))
1360            .collect()
1361    }
1362
1363    /// Finds all nodes that have a specific property value.
1364    ///
1365    /// If the property is indexed, this is O(1). Otherwise, it scans all nodes
1366    /// which is O(n). Use [`Self::create_property_index`] for frequently queried properties.
1367    ///
1368    /// # Example
1369    ///
1370    /// ```
1371    /// use grafeo_core::graph::lpg::LpgStore;
1372    /// use grafeo_common::types::Value;
1373    ///
1374    /// let store = LpgStore::new();
1375    /// store.create_property_index("city"); // Optional but makes lookups fast
1376    ///
1377    /// let alice = store.create_node(&["Person"]);
1378    /// let bob = store.create_node(&["Person"]);
1379    /// store.set_node_property(alice, "city", Value::from("NYC"));
1380    /// store.set_node_property(bob, "city", Value::from("NYC"));
1381    ///
1382    /// let nyc_people = store.find_nodes_by_property("city", &Value::from("NYC"));
1383    /// assert_eq!(nyc_people.len(), 2);
1384    /// ```
1385    #[must_use]
1386    pub fn find_nodes_by_property(&self, property: &str, value: &Value) -> Vec<NodeId> {
1387        let key = PropertyKey::new(property);
1388        let hv = HashableValue::new(value.clone());
1389
1390        // Try indexed lookup first
1391        let indexes = self.property_indexes.read();
1392        if let Some(index) = indexes.get(&key) {
1393            if let Some(nodes) = index.get(&hv) {
1394                return nodes.iter().copied().collect();
1395            }
1396            return Vec::new();
1397        }
1398        drop(indexes);
1399
1400        // Fall back to full scan
1401        self.node_ids()
1402            .into_iter()
1403            .filter(|&node_id| {
1404                self.node_properties
1405                    .get(node_id, &key)
1406                    .is_some_and(|v| v == *value)
1407            })
1408            .collect()
1409    }
1410
1411    /// Finds nodes whose property matches an operator filter.
1412    ///
1413    /// The `filter_value` is either a scalar (equality) or a `Value::Map` with
1414    /// `$`-prefixed operator keys like `$gt`, `$lt`, `$gte`, `$lte`, `$in`,
1415    /// `$nin`, `$ne`, `$contains`.
1416    pub fn find_nodes_matching_filter(&self, property: &str, filter_value: &Value) -> Vec<NodeId> {
1417        let key = PropertyKey::new(property);
1418        self.node_ids()
1419            .into_iter()
1420            .filter(|&node_id| {
1421                self.node_properties
1422                    .get(node_id, &key)
1423                    .is_some_and(|v| Self::matches_filter(&v, filter_value))
1424            })
1425            .collect()
1426    }
1427
1428    /// Checks if a node property value matches a filter value.
1429    ///
1430    /// - Scalar filter: equality check
1431    /// - Map filter with `$`-prefixed keys: operator evaluation
1432    fn matches_filter(node_value: &Value, filter_value: &Value) -> bool {
1433        match filter_value {
1434            Value::Map(ops) if ops.keys().any(|k| k.as_str().starts_with('$')) => {
1435                ops.iter().all(|(op_key, op_val)| {
1436                    match op_key.as_str() {
1437                        "$gt" => {
1438                            Self::compare_values(node_value, op_val)
1439                                == Some(std::cmp::Ordering::Greater)
1440                        }
1441                        "$gte" => matches!(
1442                            Self::compare_values(node_value, op_val),
1443                            Some(std::cmp::Ordering::Greater | std::cmp::Ordering::Equal)
1444                        ),
1445                        "$lt" => {
1446                            Self::compare_values(node_value, op_val)
1447                                == Some(std::cmp::Ordering::Less)
1448                        }
1449                        "$lte" => matches!(
1450                            Self::compare_values(node_value, op_val),
1451                            Some(std::cmp::Ordering::Less | std::cmp::Ordering::Equal)
1452                        ),
1453                        "$ne" => node_value != op_val,
1454                        "$in" => match op_val {
1455                            Value::List(items) => items.iter().any(|v| v == node_value),
1456                            _ => false,
1457                        },
1458                        "$nin" => match op_val {
1459                            Value::List(items) => !items.iter().any(|v| v == node_value),
1460                            _ => true,
1461                        },
1462                        "$contains" => match (node_value, op_val) {
1463                            (Value::String(a), Value::String(b)) => a.contains(b.as_str()),
1464                            _ => false,
1465                        },
1466                        _ => false, // Unknown operator — no match
1467                    }
1468                })
1469            }
1470            _ => node_value == filter_value, // Equality (backward compatible)
1471        }
1472    }
1473
1474    /// Compares two values for ordering (cross-type numeric comparison supported).
1475    fn compare_values(a: &Value, b: &Value) -> Option<std::cmp::Ordering> {
1476        match (a, b) {
1477            (Value::Int64(a), Value::Int64(b)) => Some(a.cmp(b)),
1478            (Value::Float64(a), Value::Float64(b)) => a.partial_cmp(b),
1479            (Value::String(a), Value::String(b)) => Some(a.cmp(b)),
1480            (Value::Bool(a), Value::Bool(b)) => Some(a.cmp(b)),
1481            (Value::Int64(a), Value::Float64(b)) => (*a as f64).partial_cmp(b),
1482            (Value::Float64(a), Value::Int64(b)) => a.partial_cmp(&(*b as f64)),
1483            _ => None,
1484        }
1485    }
1486
1487    /// Updates property indexes when a property is set.
1488    fn update_property_index_on_set(&self, node_id: NodeId, key: &PropertyKey, new_value: &Value) {
1489        let indexes = self.property_indexes.read();
1490        if let Some(index) = indexes.get(key) {
1491            // Get old value to remove from index
1492            if let Some(old_value) = self.node_properties.get(node_id, key) {
1493                let old_hv = HashableValue::new(old_value);
1494                if let Some(mut nodes) = index.get_mut(&old_hv) {
1495                    nodes.remove(&node_id);
1496                    if nodes.is_empty() {
1497                        drop(nodes);
1498                        index.remove(&old_hv);
1499                    }
1500                }
1501            }
1502
1503            // Add new value to index
1504            let new_hv = HashableValue::new(new_value.clone());
1505            index
1506                .entry(new_hv)
1507                .or_insert_with(FxHashSet::default)
1508                .insert(node_id);
1509        }
1510    }
1511
1512    /// Updates property indexes when a property is removed.
1513    fn update_property_index_on_remove(&self, node_id: NodeId, key: &PropertyKey) {
1514        let indexes = self.property_indexes.read();
1515        if let Some(index) = indexes.get(key) {
1516            // Get old value to remove from index
1517            if let Some(old_value) = self.node_properties.get(node_id, key) {
1518                let old_hv = HashableValue::new(old_value);
1519                if let Some(mut nodes) = index.get_mut(&old_hv) {
1520                    nodes.remove(&node_id);
1521                    if nodes.is_empty() {
1522                        drop(nodes);
1523                        index.remove(&old_hv);
1524                    }
1525                }
1526            }
1527        }
1528    }
1529
1530    /// Adds a label to a node.
1531    ///
1532    /// Returns true if the label was added, false if the node doesn't exist
1533    /// or already has the label.
1534    #[cfg(not(feature = "tiered-storage"))]
1535    pub fn add_label(&self, node_id: NodeId, label: &str) -> bool {
1536        let epoch = self.current_epoch();
1537
1538        // Check if node exists
1539        let nodes = self.nodes.read();
1540        if let Some(chain) = nodes.get(&node_id) {
1541            if chain.visible_at(epoch).map_or(true, |r| r.is_deleted()) {
1542                return false;
1543            }
1544        } else {
1545            return false;
1546        }
1547        drop(nodes);
1548
1549        // Get or create label ID
1550        let label_id = self.get_or_create_label_id(label);
1551
1552        // Add to node_labels map
1553        let mut node_labels = self.node_labels.write();
1554        let label_set = node_labels.entry(node_id).or_default();
1555
1556        if label_set.contains(&label_id) {
1557            return false; // Already has this label
1558        }
1559
1560        label_set.insert(label_id);
1561        drop(node_labels);
1562
1563        // Add to label_index
1564        let mut index = self.label_index.write();
1565        if (label_id as usize) >= index.len() {
1566            index.resize(label_id as usize + 1, FxHashMap::default());
1567        }
1568        index[label_id as usize].insert(node_id, ());
1569
1570        // Update label count in node record
1571        if let Some(chain) = self.nodes.write().get_mut(&node_id)
1572            && let Some(record) = chain.latest_mut()
1573        {
1574            let count = self.node_labels.read().get(&node_id).map_or(0, |s| s.len());
1575            record.set_label_count(count as u16);
1576        }
1577
1578        true
1579    }
1580
1581    /// Adds a label to a node.
1582    /// (Tiered storage version)
1583    #[cfg(feature = "tiered-storage")]
1584    pub fn add_label(&self, node_id: NodeId, label: &str) -> bool {
1585        let epoch = self.current_epoch();
1586
1587        // Check if node exists
1588        let versions = self.node_versions.read();
1589        if let Some(index) = versions.get(&node_id) {
1590            if let Some(vref) = index.visible_at(epoch) {
1591                if let Some(record) = self.read_node_record(&vref) {
1592                    if record.is_deleted() {
1593                        return false;
1594                    }
1595                } else {
1596                    return false;
1597                }
1598            } else {
1599                return false;
1600            }
1601        } else {
1602            return false;
1603        }
1604        drop(versions);
1605
1606        // Get or create label ID
1607        let label_id = self.get_or_create_label_id(label);
1608
1609        // Add to node_labels map
1610        let mut node_labels = self.node_labels.write();
1611        let label_set = node_labels.entry(node_id).or_default();
1612
1613        if label_set.contains(&label_id) {
1614            return false; // Already has this label
1615        }
1616
1617        label_set.insert(label_id);
1618        drop(node_labels);
1619
1620        // Add to label_index
1621        let mut index = self.label_index.write();
1622        if (label_id as usize) >= index.len() {
1623            index.resize(label_id as usize + 1, FxHashMap::default());
1624        }
1625        index[label_id as usize].insert(node_id, ());
1626
1627        // Note: label_count in record is not updated for tiered storage.
1628        // The record is immutable once allocated in the arena.
1629
1630        true
1631    }
1632
1633    /// Removes a label from a node.
1634    ///
1635    /// Returns true if the label was removed, false if the node doesn't exist
1636    /// or doesn't have the label.
1637    #[cfg(not(feature = "tiered-storage"))]
1638    pub fn remove_label(&self, node_id: NodeId, label: &str) -> bool {
1639        let epoch = self.current_epoch();
1640
1641        // Check if node exists
1642        let nodes = self.nodes.read();
1643        if let Some(chain) = nodes.get(&node_id) {
1644            if chain.visible_at(epoch).map_or(true, |r| r.is_deleted()) {
1645                return false;
1646            }
1647        } else {
1648            return false;
1649        }
1650        drop(nodes);
1651
1652        // Get label ID
1653        let label_id = {
1654            let label_ids = self.label_to_id.read();
1655            match label_ids.get(label) {
1656                Some(&id) => id,
1657                None => return false, // Label doesn't exist
1658            }
1659        };
1660
1661        // Remove from node_labels map
1662        let mut node_labels = self.node_labels.write();
1663        if let Some(label_set) = node_labels.get_mut(&node_id) {
1664            if !label_set.remove(&label_id) {
1665                return false; // Node doesn't have this label
1666            }
1667        } else {
1668            return false;
1669        }
1670        drop(node_labels);
1671
1672        // Remove from label_index
1673        let mut index = self.label_index.write();
1674        if (label_id as usize) < index.len() {
1675            index[label_id as usize].remove(&node_id);
1676        }
1677
1678        // Update label count in node record
1679        if let Some(chain) = self.nodes.write().get_mut(&node_id)
1680            && let Some(record) = chain.latest_mut()
1681        {
1682            let count = self.node_labels.read().get(&node_id).map_or(0, |s| s.len());
1683            record.set_label_count(count as u16);
1684        }
1685
1686        true
1687    }
1688
1689    /// Removes a label from a node.
1690    /// (Tiered storage version)
1691    #[cfg(feature = "tiered-storage")]
1692    pub fn remove_label(&self, node_id: NodeId, label: &str) -> bool {
1693        let epoch = self.current_epoch();
1694
1695        // Check if node exists
1696        let versions = self.node_versions.read();
1697        if let Some(index) = versions.get(&node_id) {
1698            if let Some(vref) = index.visible_at(epoch) {
1699                if let Some(record) = self.read_node_record(&vref) {
1700                    if record.is_deleted() {
1701                        return false;
1702                    }
1703                } else {
1704                    return false;
1705                }
1706            } else {
1707                return false;
1708            }
1709        } else {
1710            return false;
1711        }
1712        drop(versions);
1713
1714        // Get label ID
1715        let label_id = {
1716            let label_ids = self.label_to_id.read();
1717            match label_ids.get(label) {
1718                Some(&id) => id,
1719                None => return false, // Label doesn't exist
1720            }
1721        };
1722
1723        // Remove from node_labels map
1724        let mut node_labels = self.node_labels.write();
1725        if let Some(label_set) = node_labels.get_mut(&node_id) {
1726            if !label_set.remove(&label_id) {
1727                return false; // Node doesn't have this label
1728            }
1729        } else {
1730            return false;
1731        }
1732        drop(node_labels);
1733
1734        // Remove from label_index
1735        let mut index = self.label_index.write();
1736        if (label_id as usize) < index.len() {
1737            index[label_id as usize].remove(&node_id);
1738        }
1739
1740        // Note: label_count in record is not updated for tiered storage.
1741
1742        true
1743    }
1744
1745    /// Returns the number of nodes (non-deleted at current epoch).
1746    #[must_use]
1747    #[cfg(not(feature = "tiered-storage"))]
1748    pub fn node_count(&self) -> usize {
1749        let epoch = self.current_epoch();
1750        self.nodes
1751            .read()
1752            .values()
1753            .filter_map(|chain| chain.visible_at(epoch))
1754            .filter(|r| !r.is_deleted())
1755            .count()
1756    }
1757
1758    /// Returns the number of nodes (non-deleted at current epoch).
1759    /// (Tiered storage version)
1760    #[must_use]
1761    #[cfg(feature = "tiered-storage")]
1762    pub fn node_count(&self) -> usize {
1763        let epoch = self.current_epoch();
1764        let versions = self.node_versions.read();
1765        versions
1766            .iter()
1767            .filter(|(_, index)| {
1768                index.visible_at(epoch).map_or(false, |vref| {
1769                    self.read_node_record(&vref)
1770                        .map_or(false, |r| !r.is_deleted())
1771                })
1772            })
1773            .count()
1774    }
1775
1776    /// Returns all node IDs in the store.
1777    ///
1778    /// This returns a snapshot of current node IDs. The returned vector
1779    /// excludes deleted nodes. Results are sorted by NodeId for deterministic
1780    /// iteration order.
1781    #[must_use]
1782    #[cfg(not(feature = "tiered-storage"))]
1783    pub fn node_ids(&self) -> Vec<NodeId> {
1784        let epoch = self.current_epoch();
1785        let mut ids: Vec<NodeId> = self
1786            .nodes
1787            .read()
1788            .iter()
1789            .filter_map(|(id, chain)| {
1790                chain
1791                    .visible_at(epoch)
1792                    .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
1793            })
1794            .collect();
1795        ids.sort_unstable();
1796        ids
1797    }
1798
1799    /// Returns all node IDs in the store.
1800    /// (Tiered storage version)
1801    #[must_use]
1802    #[cfg(feature = "tiered-storage")]
1803    pub fn node_ids(&self) -> Vec<NodeId> {
1804        let epoch = self.current_epoch();
1805        let versions = self.node_versions.read();
1806        let mut ids: Vec<NodeId> = versions
1807            .iter()
1808            .filter_map(|(id, index)| {
1809                index.visible_at(epoch).and_then(|vref| {
1810                    self.read_node_record(&vref)
1811                        .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
1812                })
1813            })
1814            .collect();
1815        ids.sort_unstable();
1816        ids
1817    }
1818
1819    // === Edge Operations ===
1820
1821    /// Creates a new edge.
1822    pub fn create_edge(&self, src: NodeId, dst: NodeId, edge_type: &str) -> EdgeId {
1823        self.needs_stats_recompute.store(true, Ordering::Relaxed);
1824        self.create_edge_versioned(src, dst, edge_type, self.current_epoch(), TxId::SYSTEM)
1825    }
1826
1827    /// Creates a new edge within a transaction context.
1828    #[cfg(not(feature = "tiered-storage"))]
1829    pub fn create_edge_versioned(
1830        &self,
1831        src: NodeId,
1832        dst: NodeId,
1833        edge_type: &str,
1834        epoch: EpochId,
1835        tx_id: TxId,
1836    ) -> EdgeId {
1837        let id = EdgeId::new(self.next_edge_id.fetch_add(1, Ordering::Relaxed));
1838        let type_id = self.get_or_create_edge_type_id(edge_type);
1839
1840        let record = EdgeRecord::new(id, src, dst, type_id, epoch);
1841        let chain = VersionChain::with_initial(record, epoch, tx_id);
1842        self.edges.write().insert(id, chain);
1843
1844        // Update adjacency
1845        self.forward_adj.add_edge(src, dst, id);
1846        if let Some(ref backward) = self.backward_adj {
1847            backward.add_edge(dst, src, id);
1848        }
1849
1850        id
1851    }
1852
1853    /// Creates a new edge within a transaction context.
1854    /// (Tiered storage version)
1855    #[cfg(feature = "tiered-storage")]
1856    pub fn create_edge_versioned(
1857        &self,
1858        src: NodeId,
1859        dst: NodeId,
1860        edge_type: &str,
1861        epoch: EpochId,
1862        tx_id: TxId,
1863    ) -> EdgeId {
1864        let id = EdgeId::new(self.next_edge_id.fetch_add(1, Ordering::Relaxed));
1865        let type_id = self.get_or_create_edge_type_id(edge_type);
1866
1867        let record = EdgeRecord::new(id, src, dst, type_id, epoch);
1868
1869        // Allocate record in arena and get offset (create epoch if needed)
1870        let arena = self.arena_allocator.arena_or_create(epoch);
1871        let (offset, _stored) = arena.alloc_value_with_offset(record);
1872
1873        // Create HotVersionRef pointing to arena data
1874        let hot_ref = HotVersionRef::new(epoch, offset, tx_id);
1875
1876        // Create or update version index
1877        let mut versions = self.edge_versions.write();
1878        if let Some(index) = versions.get_mut(&id) {
1879            index.add_hot(hot_ref);
1880        } else {
1881            versions.insert(id, VersionIndex::with_initial(hot_ref));
1882        }
1883
1884        // Update adjacency
1885        self.forward_adj.add_edge(src, dst, id);
1886        if let Some(ref backward) = self.backward_adj {
1887            backward.add_edge(dst, src, id);
1888        }
1889
1890        id
1891    }
1892
1893    /// Creates a new edge with properties.
1894    pub fn create_edge_with_props(
1895        &self,
1896        src: NodeId,
1897        dst: NodeId,
1898        edge_type: &str,
1899        properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
1900    ) -> EdgeId {
1901        let id = self.create_edge(src, dst, edge_type);
1902
1903        for (key, value) in properties {
1904            self.edge_properties.set(id, key.into(), value.into());
1905        }
1906
1907        id
1908    }
1909
1910    /// Gets an edge by ID (latest visible version).
1911    #[must_use]
1912    pub fn get_edge(&self, id: EdgeId) -> Option<Edge> {
1913        self.get_edge_at_epoch(id, self.current_epoch())
1914    }
1915
1916    /// Gets an edge by ID at a specific epoch.
1917    #[must_use]
1918    #[cfg(not(feature = "tiered-storage"))]
1919    pub fn get_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> Option<Edge> {
1920        let edges = self.edges.read();
1921        let chain = edges.get(&id)?;
1922        let record = chain.visible_at(epoch)?;
1923
1924        if record.is_deleted() {
1925            return None;
1926        }
1927
1928        let edge_type = {
1929            let id_to_type = self.id_to_edge_type.read();
1930            id_to_type.get(record.type_id as usize)?.clone()
1931        };
1932
1933        let mut edge = Edge::new(id, record.src, record.dst, edge_type);
1934
1935        // Get properties
1936        edge.properties = self.edge_properties.get_all(id).into_iter().collect();
1937
1938        Some(edge)
1939    }
1940
1941    /// Gets an edge by ID at a specific epoch.
1942    /// (Tiered storage version)
1943    #[must_use]
1944    #[cfg(feature = "tiered-storage")]
1945    pub fn get_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> Option<Edge> {
1946        let versions = self.edge_versions.read();
1947        let index = versions.get(&id)?;
1948        let version_ref = index.visible_at(epoch)?;
1949
1950        let record = self.read_edge_record(&version_ref)?;
1951
1952        if record.is_deleted() {
1953            return None;
1954        }
1955
1956        let edge_type = {
1957            let id_to_type = self.id_to_edge_type.read();
1958            id_to_type.get(record.type_id as usize)?.clone()
1959        };
1960
1961        let mut edge = Edge::new(id, record.src, record.dst, edge_type);
1962
1963        // Get properties
1964        edge.properties = self.edge_properties.get_all(id).into_iter().collect();
1965
1966        Some(edge)
1967    }
1968
1969    /// Gets an edge visible to a specific transaction.
1970    #[must_use]
1971    #[cfg(not(feature = "tiered-storage"))]
1972    pub fn get_edge_versioned(&self, id: EdgeId, epoch: EpochId, tx_id: TxId) -> Option<Edge> {
1973        let edges = self.edges.read();
1974        let chain = edges.get(&id)?;
1975        let record = chain.visible_to(epoch, tx_id)?;
1976
1977        if record.is_deleted() {
1978            return None;
1979        }
1980
1981        let edge_type = {
1982            let id_to_type = self.id_to_edge_type.read();
1983            id_to_type.get(record.type_id as usize)?.clone()
1984        };
1985
1986        let mut edge = Edge::new(id, record.src, record.dst, edge_type);
1987
1988        // Get properties
1989        edge.properties = self.edge_properties.get_all(id).into_iter().collect();
1990
1991        Some(edge)
1992    }
1993
1994    /// Gets an edge visible to a specific transaction.
1995    /// (Tiered storage version)
1996    #[must_use]
1997    #[cfg(feature = "tiered-storage")]
1998    pub fn get_edge_versioned(&self, id: EdgeId, epoch: EpochId, tx_id: TxId) -> Option<Edge> {
1999        let versions = self.edge_versions.read();
2000        let index = versions.get(&id)?;
2001        let version_ref = index.visible_to(epoch, tx_id)?;
2002
2003        let record = self.read_edge_record(&version_ref)?;
2004
2005        if record.is_deleted() {
2006            return None;
2007        }
2008
2009        let edge_type = {
2010            let id_to_type = self.id_to_edge_type.read();
2011            id_to_type.get(record.type_id as usize)?.clone()
2012        };
2013
2014        let mut edge = Edge::new(id, record.src, record.dst, edge_type);
2015
2016        // Get properties
2017        edge.properties = self.edge_properties.get_all(id).into_iter().collect();
2018
2019        Some(edge)
2020    }
2021
2022    /// Reads an EdgeRecord from arena using a VersionRef.
2023    #[cfg(feature = "tiered-storage")]
2024    #[allow(unsafe_code)]
2025    fn read_edge_record(&self, version_ref: &VersionRef) -> Option<EdgeRecord> {
2026        match version_ref {
2027            VersionRef::Hot(hot_ref) => {
2028                let arena = self.arena_allocator.arena(hot_ref.epoch);
2029                // SAFETY: The offset was returned by alloc_value_with_offset for an EdgeRecord
2030                let record: &EdgeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
2031                Some(*record)
2032            }
2033            VersionRef::Cold(cold_ref) => {
2034                // Read from compressed epoch store
2035                self.epoch_store
2036                    .get_edge(cold_ref.epoch, cold_ref.block_offset, cold_ref.length)
2037            }
2038        }
2039    }
2040
2041    /// Deletes an edge (using latest epoch).
2042    pub fn delete_edge(&self, id: EdgeId) -> bool {
2043        self.needs_stats_recompute.store(true, Ordering::Relaxed);
2044        self.delete_edge_at_epoch(id, self.current_epoch())
2045    }
2046
2047    /// Deletes an edge at a specific epoch.
2048    #[cfg(not(feature = "tiered-storage"))]
2049    pub fn delete_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> bool {
2050        let mut edges = self.edges.write();
2051        if let Some(chain) = edges.get_mut(&id) {
2052            // Get the visible record to check if deleted and get src/dst
2053            let (src, dst) = {
2054                match chain.visible_at(epoch) {
2055                    Some(record) => {
2056                        if record.is_deleted() {
2057                            return false;
2058                        }
2059                        (record.src, record.dst)
2060                    }
2061                    None => return false, // Not visible at this epoch (already deleted)
2062                }
2063            };
2064
2065            // Mark the version chain as deleted
2066            chain.mark_deleted(epoch);
2067
2068            drop(edges); // Release lock
2069
2070            // Mark as deleted in adjacency (soft delete)
2071            self.forward_adj.mark_deleted(src, id);
2072            if let Some(ref backward) = self.backward_adj {
2073                backward.mark_deleted(dst, id);
2074            }
2075
2076            // Remove properties
2077            self.edge_properties.remove_all(id);
2078
2079            true
2080        } else {
2081            false
2082        }
2083    }
2084
2085    /// Deletes an edge at a specific epoch.
2086    /// (Tiered storage version)
2087    #[cfg(feature = "tiered-storage")]
2088    pub fn delete_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> bool {
2089        let mut versions = self.edge_versions.write();
2090        if let Some(index) = versions.get_mut(&id) {
2091            // Get the visible record to check if deleted and get src/dst
2092            let (src, dst) = {
2093                match index.visible_at(epoch) {
2094                    Some(version_ref) => {
2095                        if let Some(record) = self.read_edge_record(&version_ref) {
2096                            if record.is_deleted() {
2097                                return false;
2098                            }
2099                            (record.src, record.dst)
2100                        } else {
2101                            return false;
2102                        }
2103                    }
2104                    None => return false,
2105                }
2106            };
2107
2108            // Mark as deleted in version index
2109            index.mark_deleted(epoch);
2110
2111            drop(versions); // Release lock
2112
2113            // Mark as deleted in adjacency (soft delete)
2114            self.forward_adj.mark_deleted(src, id);
2115            if let Some(ref backward) = self.backward_adj {
2116                backward.mark_deleted(dst, id);
2117            }
2118
2119            // Remove properties
2120            self.edge_properties.remove_all(id);
2121
2122            true
2123        } else {
2124            false
2125        }
2126    }
2127
2128    /// Returns the number of edges (non-deleted at current epoch).
2129    #[must_use]
2130    #[cfg(not(feature = "tiered-storage"))]
2131    pub fn edge_count(&self) -> usize {
2132        let epoch = self.current_epoch();
2133        self.edges
2134            .read()
2135            .values()
2136            .filter_map(|chain| chain.visible_at(epoch))
2137            .filter(|r| !r.is_deleted())
2138            .count()
2139    }
2140
2141    /// Returns the number of edges (non-deleted at current epoch).
2142    /// (Tiered storage version)
2143    #[must_use]
2144    #[cfg(feature = "tiered-storage")]
2145    pub fn edge_count(&self) -> usize {
2146        let epoch = self.current_epoch();
2147        let versions = self.edge_versions.read();
2148        versions
2149            .iter()
2150            .filter(|(_, index)| {
2151                index.visible_at(epoch).map_or(false, |vref| {
2152                    self.read_edge_record(&vref)
2153                        .map_or(false, |r| !r.is_deleted())
2154                })
2155            })
2156            .count()
2157    }
2158
2159    /// Discards all uncommitted versions created by a transaction.
2160    ///
2161    /// This is called during transaction rollback to clean up uncommitted changes.
2162    /// The method removes version chain entries created by the specified transaction.
2163    #[cfg(not(feature = "tiered-storage"))]
2164    pub fn discard_uncommitted_versions(&self, tx_id: TxId) {
2165        // Remove uncommitted node versions
2166        {
2167            let mut nodes = self.nodes.write();
2168            for chain in nodes.values_mut() {
2169                chain.remove_versions_by(tx_id);
2170            }
2171            // Remove completely empty chains (no versions left)
2172            nodes.retain(|_, chain| !chain.is_empty());
2173        }
2174
2175        // Remove uncommitted edge versions
2176        {
2177            let mut edges = self.edges.write();
2178            for chain in edges.values_mut() {
2179                chain.remove_versions_by(tx_id);
2180            }
2181            // Remove completely empty chains (no versions left)
2182            edges.retain(|_, chain| !chain.is_empty());
2183        }
2184    }
2185
2186    /// Discards all uncommitted versions created by a transaction.
2187    /// (Tiered storage version)
2188    #[cfg(feature = "tiered-storage")]
2189    pub fn discard_uncommitted_versions(&self, tx_id: TxId) {
2190        // Remove uncommitted node versions
2191        {
2192            let mut versions = self.node_versions.write();
2193            for index in versions.values_mut() {
2194                index.remove_versions_by(tx_id);
2195            }
2196            // Remove completely empty indexes (no versions left)
2197            versions.retain(|_, index| !index.is_empty());
2198        }
2199
2200        // Remove uncommitted edge versions
2201        {
2202            let mut versions = self.edge_versions.write();
2203            for index in versions.values_mut() {
2204                index.remove_versions_by(tx_id);
2205            }
2206            // Remove completely empty indexes (no versions left)
2207            versions.retain(|_, index| !index.is_empty());
2208        }
2209    }
2210
2211    /// Garbage collects old versions that are no longer visible to any transaction.
2212    ///
2213    /// Versions older than `min_epoch` are pruned from version chains, keeping
2214    /// at most one old version per entity as a baseline. Empty chains are removed.
2215    #[cfg(not(feature = "tiered-storage"))]
2216    pub fn gc_versions(&self, min_epoch: EpochId) {
2217        {
2218            let mut nodes = self.nodes.write();
2219            for chain in nodes.values_mut() {
2220                chain.gc(min_epoch);
2221            }
2222            nodes.retain(|_, chain| !chain.is_empty());
2223        }
2224        {
2225            let mut edges = self.edges.write();
2226            for chain in edges.values_mut() {
2227                chain.gc(min_epoch);
2228            }
2229            edges.retain(|_, chain| !chain.is_empty());
2230        }
2231    }
2232
2233    /// Garbage collects old versions (tiered storage variant).
2234    #[cfg(feature = "tiered-storage")]
2235    pub fn gc_versions(&self, min_epoch: EpochId) {
2236        {
2237            let mut versions = self.node_versions.write();
2238            for index in versions.values_mut() {
2239                index.gc(min_epoch);
2240            }
2241            versions.retain(|_, index| !index.is_empty());
2242        }
2243        {
2244            let mut versions = self.edge_versions.write();
2245            for index in versions.values_mut() {
2246                index.gc(min_epoch);
2247            }
2248            versions.retain(|_, index| !index.is_empty());
2249        }
2250    }
2251
2252    /// Freezes an epoch from hot (arena) storage to cold (compressed) storage.
2253    ///
2254    /// This is called by the transaction manager when an epoch becomes eligible
2255    /// for freezing (no active transactions can see it). The freeze process:
2256    ///
2257    /// 1. Collects all hot version refs for the epoch
2258    /// 2. Reads the corresponding records from arena
2259    /// 3. Compresses them into a `CompressedEpochBlock`
2260    /// 4. Updates `VersionIndex` entries to point to cold storage
2261    /// 5. The arena can be deallocated after all epochs in it are frozen
2262    ///
2263    /// # Arguments
2264    ///
2265    /// * `epoch` - The epoch to freeze
2266    ///
2267    /// # Returns
2268    ///
2269    /// The number of records frozen (nodes + edges).
2270    #[cfg(feature = "tiered-storage")]
2271    #[allow(unsafe_code)]
2272    pub fn freeze_epoch(&self, epoch: EpochId) -> usize {
2273        // Collect node records to freeze
2274        let mut node_records: Vec<(u64, NodeRecord)> = Vec::new();
2275        let mut node_hot_refs: Vec<(NodeId, HotVersionRef)> = Vec::new();
2276
2277        {
2278            let versions = self.node_versions.read();
2279            for (node_id, index) in versions.iter() {
2280                for hot_ref in index.hot_refs_for_epoch(epoch) {
2281                    let arena = self.arena_allocator.arena(hot_ref.epoch);
2282                    // SAFETY: The offset was returned by alloc_value_with_offset for a NodeRecord
2283                    let record: &NodeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
2284                    node_records.push((node_id.as_u64(), *record));
2285                    node_hot_refs.push((*node_id, *hot_ref));
2286                }
2287            }
2288        }
2289
2290        // Collect edge records to freeze
2291        let mut edge_records: Vec<(u64, EdgeRecord)> = Vec::new();
2292        let mut edge_hot_refs: Vec<(EdgeId, HotVersionRef)> = Vec::new();
2293
2294        {
2295            let versions = self.edge_versions.read();
2296            for (edge_id, index) in versions.iter() {
2297                for hot_ref in index.hot_refs_for_epoch(epoch) {
2298                    let arena = self.arena_allocator.arena(hot_ref.epoch);
2299                    // SAFETY: The offset was returned by alloc_value_with_offset for an EdgeRecord
2300                    let record: &EdgeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
2301                    edge_records.push((edge_id.as_u64(), *record));
2302                    edge_hot_refs.push((*edge_id, *hot_ref));
2303                }
2304            }
2305        }
2306
2307        let total_frozen = node_records.len() + edge_records.len();
2308
2309        if total_frozen == 0 {
2310            return 0;
2311        }
2312
2313        // Freeze to compressed storage
2314        let (node_entries, edge_entries) =
2315            self.epoch_store
2316                .freeze_epoch(epoch, node_records, edge_records);
2317
2318        // Build lookup maps for index entries
2319        let node_entry_map: FxHashMap<u64, _> = node_entries
2320            .iter()
2321            .map(|e| (e.entity_id, (e.offset, e.length)))
2322            .collect();
2323        let edge_entry_map: FxHashMap<u64, _> = edge_entries
2324            .iter()
2325            .map(|e| (e.entity_id, (e.offset, e.length)))
2326            .collect();
2327
2328        // Update version indexes to use cold refs
2329        {
2330            let mut versions = self.node_versions.write();
2331            for (node_id, hot_ref) in &node_hot_refs {
2332                if let Some(index) = versions.get_mut(node_id)
2333                    && let Some(&(offset, length)) = node_entry_map.get(&node_id.as_u64())
2334                {
2335                    let cold_ref = ColdVersionRef {
2336                        epoch,
2337                        block_offset: offset,
2338                        length,
2339                        created_by: hot_ref.created_by,
2340                        deleted_epoch: hot_ref.deleted_epoch,
2341                    };
2342                    index.freeze_epoch(epoch, std::iter::once(cold_ref));
2343                }
2344            }
2345        }
2346
2347        {
2348            let mut versions = self.edge_versions.write();
2349            for (edge_id, hot_ref) in &edge_hot_refs {
2350                if let Some(index) = versions.get_mut(edge_id)
2351                    && let Some(&(offset, length)) = edge_entry_map.get(&edge_id.as_u64())
2352                {
2353                    let cold_ref = ColdVersionRef {
2354                        epoch,
2355                        block_offset: offset,
2356                        length,
2357                        created_by: hot_ref.created_by,
2358                        deleted_epoch: hot_ref.deleted_epoch,
2359                    };
2360                    index.freeze_epoch(epoch, std::iter::once(cold_ref));
2361                }
2362            }
2363        }
2364
2365        total_frozen
2366    }
2367
2368    /// Returns the epoch store for cold storage statistics.
2369    #[cfg(feature = "tiered-storage")]
2370    #[must_use]
2371    pub fn epoch_store(&self) -> &EpochStore {
2372        &self.epoch_store
2373    }
2374
2375    /// Returns the number of distinct labels in the store.
2376    #[must_use]
2377    pub fn label_count(&self) -> usize {
2378        self.id_to_label.read().len()
2379    }
2380
2381    /// Returns the number of distinct property keys in the store.
2382    ///
2383    /// This counts unique property keys across both nodes and edges.
2384    #[must_use]
2385    pub fn property_key_count(&self) -> usize {
2386        let node_keys = self.node_properties.column_count();
2387        let edge_keys = self.edge_properties.column_count();
2388        // Note: This may count some keys twice if the same key is used
2389        // for both nodes and edges. A more precise count would require
2390        // tracking unique keys across both storages.
2391        node_keys + edge_keys
2392    }
2393
2394    /// Returns the number of distinct edge types in the store.
2395    #[must_use]
2396    pub fn edge_type_count(&self) -> usize {
2397        self.id_to_edge_type.read().len()
2398    }
2399
2400    // === Traversal ===
2401
2402    /// Iterates over neighbors of a node in the specified direction.
2403    ///
2404    /// This is the fast path for graph traversal - goes straight to the
2405    /// adjacency index without loading full node data.
2406    pub fn neighbors(
2407        &self,
2408        node: NodeId,
2409        direction: Direction,
2410    ) -> impl Iterator<Item = NodeId> + '_ {
2411        let forward: Box<dyn Iterator<Item = NodeId>> = match direction {
2412            Direction::Outgoing | Direction::Both => {
2413                Box::new(self.forward_adj.neighbors(node).into_iter())
2414            }
2415            Direction::Incoming => Box::new(std::iter::empty()),
2416        };
2417
2418        let backward: Box<dyn Iterator<Item = NodeId>> = match direction {
2419            Direction::Incoming | Direction::Both => {
2420                if let Some(ref adj) = self.backward_adj {
2421                    Box::new(adj.neighbors(node).into_iter())
2422                } else {
2423                    Box::new(std::iter::empty())
2424                }
2425            }
2426            Direction::Outgoing => Box::new(std::iter::empty()),
2427        };
2428
2429        forward.chain(backward)
2430    }
2431
2432    /// Returns edges from a node with their targets.
2433    ///
2434    /// Returns an iterator of (target_node, edge_id) pairs.
2435    pub fn edges_from(
2436        &self,
2437        node: NodeId,
2438        direction: Direction,
2439    ) -> impl Iterator<Item = (NodeId, EdgeId)> + '_ {
2440        let forward: Box<dyn Iterator<Item = (NodeId, EdgeId)>> = match direction {
2441            Direction::Outgoing | Direction::Both => {
2442                Box::new(self.forward_adj.edges_from(node).into_iter())
2443            }
2444            Direction::Incoming => Box::new(std::iter::empty()),
2445        };
2446
2447        let backward: Box<dyn Iterator<Item = (NodeId, EdgeId)>> = match direction {
2448            Direction::Incoming | Direction::Both => {
2449                if let Some(ref adj) = self.backward_adj {
2450                    Box::new(adj.edges_from(node).into_iter())
2451                } else {
2452                    Box::new(std::iter::empty())
2453                }
2454            }
2455            Direction::Outgoing => Box::new(std::iter::empty()),
2456        };
2457
2458        forward.chain(backward)
2459    }
2460
2461    /// Returns edges to a node (where the node is the destination).
2462    ///
2463    /// Returns (source_node, edge_id) pairs for all edges pointing TO this node.
2464    /// Uses the backward adjacency index for O(degree) lookup.
2465    ///
2466    /// # Example
2467    ///
2468    /// ```ignore
2469    /// // For edges: A->B, C->B
2470    /// let incoming = store.edges_to(B);
2471    /// // Returns: [(A, edge1), (C, edge2)]
2472    /// ```
2473    pub fn edges_to(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
2474        if let Some(ref backward) = self.backward_adj {
2475            backward.edges_from(node)
2476        } else {
2477            // Fallback: scan all edges (slow but correct)
2478            self.all_edges()
2479                .filter_map(|edge| {
2480                    if edge.dst == node {
2481                        Some((edge.src, edge.id))
2482                    } else {
2483                        None
2484                    }
2485                })
2486                .collect()
2487        }
2488    }
2489
2490    /// Returns the out-degree of a node (number of outgoing edges).
2491    ///
2492    /// Uses the forward adjacency index for O(1) lookup.
2493    #[must_use]
2494    pub fn out_degree(&self, node: NodeId) -> usize {
2495        self.forward_adj.out_degree(node)
2496    }
2497
2498    /// Returns the in-degree of a node (number of incoming edges).
2499    ///
2500    /// Uses the backward adjacency index for O(1) lookup if available,
2501    /// otherwise falls back to scanning edges.
2502    #[must_use]
2503    pub fn in_degree(&self, node: NodeId) -> usize {
2504        if let Some(ref backward) = self.backward_adj {
2505            backward.in_degree(node)
2506        } else {
2507            // Fallback: count edges (slow)
2508            self.all_edges().filter(|edge| edge.dst == node).count()
2509        }
2510    }
2511
2512    /// Gets the type of an edge by ID.
2513    #[must_use]
2514    #[cfg(not(feature = "tiered-storage"))]
2515    pub fn edge_type(&self, id: EdgeId) -> Option<ArcStr> {
2516        let edges = self.edges.read();
2517        let chain = edges.get(&id)?;
2518        let epoch = self.current_epoch();
2519        let record = chain.visible_at(epoch)?;
2520        let id_to_type = self.id_to_edge_type.read();
2521        id_to_type.get(record.type_id as usize).cloned()
2522    }
2523
2524    /// Gets the type of an edge by ID.
2525    /// (Tiered storage version)
2526    #[must_use]
2527    #[cfg(feature = "tiered-storage")]
2528    pub fn edge_type(&self, id: EdgeId) -> Option<ArcStr> {
2529        let versions = self.edge_versions.read();
2530        let index = versions.get(&id)?;
2531        let epoch = self.current_epoch();
2532        let vref = index.visible_at(epoch)?;
2533        let record = self.read_edge_record(&vref)?;
2534        let id_to_type = self.id_to_edge_type.read();
2535        id_to_type.get(record.type_id as usize).cloned()
2536    }
2537
2538    /// Returns all nodes with a specific label.
2539    ///
2540    /// Uses the label index for O(1) lookup per label. Returns a snapshot -
2541    /// concurrent modifications won't affect the returned vector. Results are
2542    /// sorted by NodeId for deterministic iteration order.
2543    pub fn nodes_by_label(&self, label: &str) -> Vec<NodeId> {
2544        let label_to_id = self.label_to_id.read();
2545        if let Some(&label_id) = label_to_id.get(label) {
2546            let index = self.label_index.read();
2547            if let Some(set) = index.get(label_id as usize) {
2548                let mut ids: Vec<NodeId> = set.keys().copied().collect();
2549                ids.sort_unstable();
2550                return ids;
2551            }
2552        }
2553        Vec::new()
2554    }
2555
2556    // === Admin API: Iteration ===
2557
2558    /// Returns an iterator over all nodes in the database.
2559    ///
2560    /// This creates a snapshot of all visible nodes at the current epoch.
2561    /// Useful for dump/export operations.
2562    #[cfg(not(feature = "tiered-storage"))]
2563    pub fn all_nodes(&self) -> impl Iterator<Item = Node> + '_ {
2564        let epoch = self.current_epoch();
2565        let node_ids: Vec<NodeId> = self
2566            .nodes
2567            .read()
2568            .iter()
2569            .filter_map(|(id, chain)| {
2570                chain
2571                    .visible_at(epoch)
2572                    .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
2573            })
2574            .collect();
2575
2576        node_ids.into_iter().filter_map(move |id| self.get_node(id))
2577    }
2578
2579    /// Returns an iterator over all nodes in the database.
2580    /// (Tiered storage version)
2581    #[cfg(feature = "tiered-storage")]
2582    pub fn all_nodes(&self) -> impl Iterator<Item = Node> + '_ {
2583        let node_ids = self.node_ids();
2584        node_ids.into_iter().filter_map(move |id| self.get_node(id))
2585    }
2586
2587    /// Returns an iterator over all edges in the database.
2588    ///
2589    /// This creates a snapshot of all visible edges at the current epoch.
2590    /// Useful for dump/export operations.
2591    #[cfg(not(feature = "tiered-storage"))]
2592    pub fn all_edges(&self) -> impl Iterator<Item = Edge> + '_ {
2593        let epoch = self.current_epoch();
2594        let edge_ids: Vec<EdgeId> = self
2595            .edges
2596            .read()
2597            .iter()
2598            .filter_map(|(id, chain)| {
2599                chain
2600                    .visible_at(epoch)
2601                    .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
2602            })
2603            .collect();
2604
2605        edge_ids.into_iter().filter_map(move |id| self.get_edge(id))
2606    }
2607
2608    /// Returns an iterator over all edges in the database.
2609    /// (Tiered storage version)
2610    #[cfg(feature = "tiered-storage")]
2611    pub fn all_edges(&self) -> impl Iterator<Item = Edge> + '_ {
2612        let epoch = self.current_epoch();
2613        let versions = self.edge_versions.read();
2614        let edge_ids: Vec<EdgeId> = versions
2615            .iter()
2616            .filter_map(|(id, index)| {
2617                index.visible_at(epoch).and_then(|vref| {
2618                    self.read_edge_record(&vref)
2619                        .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
2620                })
2621            })
2622            .collect();
2623
2624        edge_ids.into_iter().filter_map(move |id| self.get_edge(id))
2625    }
2626
2627    /// Returns all label names in the database.
2628    pub fn all_labels(&self) -> Vec<String> {
2629        self.id_to_label
2630            .read()
2631            .iter()
2632            .map(|s| s.to_string())
2633            .collect()
2634    }
2635
2636    /// Returns all edge type names in the database.
2637    pub fn all_edge_types(&self) -> Vec<String> {
2638        self.id_to_edge_type
2639            .read()
2640            .iter()
2641            .map(|s| s.to_string())
2642            .collect()
2643    }
2644
2645    /// Returns all property keys used in the database.
2646    pub fn all_property_keys(&self) -> Vec<String> {
2647        let mut keys = std::collections::HashSet::new();
2648        for key in self.node_properties.keys() {
2649            keys.insert(key.to_string());
2650        }
2651        for key in self.edge_properties.keys() {
2652            keys.insert(key.to_string());
2653        }
2654        keys.into_iter().collect()
2655    }
2656
2657    /// Returns an iterator over nodes with a specific label.
2658    pub fn nodes_with_label<'a>(&'a self, label: &str) -> impl Iterator<Item = Node> + 'a {
2659        let node_ids = self.nodes_by_label(label);
2660        node_ids.into_iter().filter_map(move |id| self.get_node(id))
2661    }
2662
2663    /// Returns an iterator over edges with a specific type.
2664    #[cfg(not(feature = "tiered-storage"))]
2665    pub fn edges_with_type<'a>(&'a self, edge_type: &str) -> impl Iterator<Item = Edge> + 'a {
2666        let epoch = self.current_epoch();
2667        let type_to_id = self.edge_type_to_id.read();
2668
2669        if let Some(&type_id) = type_to_id.get(edge_type) {
2670            let edge_ids: Vec<EdgeId> = self
2671                .edges
2672                .read()
2673                .iter()
2674                .filter_map(|(id, chain)| {
2675                    chain.visible_at(epoch).and_then(|r| {
2676                        if !r.is_deleted() && r.type_id == type_id {
2677                            Some(*id)
2678                        } else {
2679                            None
2680                        }
2681                    })
2682                })
2683                .collect();
2684
2685            // Return a boxed iterator for the found edges
2686            Box::new(edge_ids.into_iter().filter_map(move |id| self.get_edge(id)))
2687                as Box<dyn Iterator<Item = Edge> + 'a>
2688        } else {
2689            // Return empty iterator
2690            Box::new(std::iter::empty()) as Box<dyn Iterator<Item = Edge> + 'a>
2691        }
2692    }
2693
2694    /// Returns an iterator over edges with a specific type.
2695    /// (Tiered storage version)
2696    #[cfg(feature = "tiered-storage")]
2697    pub fn edges_with_type<'a>(&'a self, edge_type: &str) -> impl Iterator<Item = Edge> + 'a {
2698        let epoch = self.current_epoch();
2699        let type_to_id = self.edge_type_to_id.read();
2700
2701        if let Some(&type_id) = type_to_id.get(edge_type) {
2702            let versions = self.edge_versions.read();
2703            let edge_ids: Vec<EdgeId> = versions
2704                .iter()
2705                .filter_map(|(id, index)| {
2706                    index.visible_at(epoch).and_then(|vref| {
2707                        self.read_edge_record(&vref).and_then(|r| {
2708                            if !r.is_deleted() && r.type_id == type_id {
2709                                Some(*id)
2710                            } else {
2711                                None
2712                            }
2713                        })
2714                    })
2715                })
2716                .collect();
2717
2718            Box::new(edge_ids.into_iter().filter_map(move |id| self.get_edge(id)))
2719                as Box<dyn Iterator<Item = Edge> + 'a>
2720        } else {
2721            Box::new(std::iter::empty()) as Box<dyn Iterator<Item = Edge> + 'a>
2722        }
2723    }
2724
2725    // === Zone Map Support ===
2726
2727    /// Checks if a node property predicate might match any nodes.
2728    ///
2729    /// Uses zone maps for early filtering. Returns `true` if there might be
2730    /// matching nodes, `false` if there definitely aren't.
2731    #[must_use]
2732    pub fn node_property_might_match(
2733        &self,
2734        property: &PropertyKey,
2735        op: CompareOp,
2736        value: &Value,
2737    ) -> bool {
2738        self.node_properties.might_match(property, op, value)
2739    }
2740
2741    /// Checks if an edge property predicate might match any edges.
2742    #[must_use]
2743    pub fn edge_property_might_match(
2744        &self,
2745        property: &PropertyKey,
2746        op: CompareOp,
2747        value: &Value,
2748    ) -> bool {
2749        self.edge_properties.might_match(property, op, value)
2750    }
2751
2752    /// Gets the zone map for a node property.
2753    #[must_use]
2754    pub fn node_property_zone_map(&self, property: &PropertyKey) -> Option<ZoneMapEntry> {
2755        self.node_properties.zone_map(property)
2756    }
2757
2758    /// Gets the zone map for an edge property.
2759    #[must_use]
2760    pub fn edge_property_zone_map(&self, property: &PropertyKey) -> Option<ZoneMapEntry> {
2761        self.edge_properties.zone_map(property)
2762    }
2763
2764    /// Rebuilds zone maps for all properties.
2765    pub fn rebuild_zone_maps(&self) {
2766        self.node_properties.rebuild_zone_maps();
2767        self.edge_properties.rebuild_zone_maps();
2768    }
2769
2770    // === Statistics ===
2771
2772    /// Returns the current statistics.
2773    #[must_use]
2774    pub fn statistics(&self) -> Statistics {
2775        self.statistics.read().clone()
2776    }
2777
2778    /// Recomputes statistics if they are stale (i.e., after mutations).
2779    ///
2780    /// Call this before reading statistics for query optimization.
2781    /// Avoids redundant recomputation if no mutations occurred.
2782    pub fn ensure_statistics_fresh(&self) {
2783        if self.needs_stats_recompute.swap(false, Ordering::Relaxed) {
2784            self.compute_statistics();
2785        }
2786    }
2787
2788    /// Recomputes statistics from current data.
2789    ///
2790    /// Scans all labels and edge types to build cardinality estimates for the
2791    /// query optimizer. Call this periodically or after bulk data loads.
2792    #[cfg(not(feature = "tiered-storage"))]
2793    pub fn compute_statistics(&self) {
2794        let mut stats = Statistics::new();
2795
2796        // Compute total counts
2797        stats.total_nodes = self.node_count() as u64;
2798        stats.total_edges = self.edge_count() as u64;
2799
2800        // Compute per-label statistics
2801        let id_to_label = self.id_to_label.read();
2802        let label_index = self.label_index.read();
2803
2804        for (label_id, label_name) in id_to_label.iter().enumerate() {
2805            let node_count = label_index.get(label_id).map_or(0, |set| set.len() as u64);
2806
2807            if node_count > 0 {
2808                // Estimate average degree
2809                let avg_out_degree = if stats.total_nodes > 0 {
2810                    stats.total_edges as f64 / stats.total_nodes as f64
2811                } else {
2812                    0.0
2813                };
2814
2815                let label_stats =
2816                    LabelStatistics::new(node_count).with_degrees(avg_out_degree, avg_out_degree);
2817
2818                stats.update_label(label_name.as_ref(), label_stats);
2819            }
2820        }
2821
2822        // Compute per-edge-type statistics
2823        let id_to_edge_type = self.id_to_edge_type.read();
2824        let edges = self.edges.read();
2825        let epoch = self.current_epoch();
2826
2827        let mut edge_type_counts: FxHashMap<u32, u64> = FxHashMap::default();
2828        for chain in edges.values() {
2829            if let Some(record) = chain.visible_at(epoch)
2830                && !record.is_deleted()
2831            {
2832                *edge_type_counts.entry(record.type_id).or_default() += 1;
2833            }
2834        }
2835
2836        for (type_id, count) in edge_type_counts {
2837            if let Some(type_name) = id_to_edge_type.get(type_id as usize) {
2838                let avg_degree = if stats.total_nodes > 0 {
2839                    count as f64 / stats.total_nodes as f64
2840                } else {
2841                    0.0
2842                };
2843
2844                let edge_stats = EdgeTypeStatistics::new(count, avg_degree, avg_degree);
2845                stats.update_edge_type(type_name.as_ref(), edge_stats);
2846            }
2847        }
2848
2849        *self.statistics.write() = stats;
2850    }
2851
2852    /// Recomputes statistics from current data.
2853    /// (Tiered storage version)
2854    #[cfg(feature = "tiered-storage")]
2855    pub fn compute_statistics(&self) {
2856        let mut stats = Statistics::new();
2857
2858        // Compute total counts
2859        stats.total_nodes = self.node_count() as u64;
2860        stats.total_edges = self.edge_count() as u64;
2861
2862        // Compute per-label statistics
2863        let id_to_label = self.id_to_label.read();
2864        let label_index = self.label_index.read();
2865
2866        for (label_id, label_name) in id_to_label.iter().enumerate() {
2867            let node_count = label_index.get(label_id).map_or(0, |set| set.len() as u64);
2868
2869            if node_count > 0 {
2870                let avg_out_degree = if stats.total_nodes > 0 {
2871                    stats.total_edges as f64 / stats.total_nodes as f64
2872                } else {
2873                    0.0
2874                };
2875
2876                let label_stats =
2877                    LabelStatistics::new(node_count).with_degrees(avg_out_degree, avg_out_degree);
2878
2879                stats.update_label(label_name.as_ref(), label_stats);
2880            }
2881        }
2882
2883        // Compute per-edge-type statistics
2884        let id_to_edge_type = self.id_to_edge_type.read();
2885        let versions = self.edge_versions.read();
2886        let epoch = self.current_epoch();
2887
2888        let mut edge_type_counts: FxHashMap<u32, u64> = FxHashMap::default();
2889        for index in versions.values() {
2890            if let Some(vref) = index.visible_at(epoch)
2891                && let Some(record) = self.read_edge_record(&vref)
2892                && !record.is_deleted()
2893            {
2894                *edge_type_counts.entry(record.type_id).or_default() += 1;
2895            }
2896        }
2897
2898        for (type_id, count) in edge_type_counts {
2899            if let Some(type_name) = id_to_edge_type.get(type_id as usize) {
2900                let avg_degree = if stats.total_nodes > 0 {
2901                    count as f64 / stats.total_nodes as f64
2902                } else {
2903                    0.0
2904                };
2905
2906                let edge_stats = EdgeTypeStatistics::new(count, avg_degree, avg_degree);
2907                stats.update_edge_type(type_name.as_ref(), edge_stats);
2908            }
2909        }
2910
2911        *self.statistics.write() = stats;
2912    }
2913
2914    /// Estimates cardinality for a label scan.
2915    #[must_use]
2916    pub fn estimate_label_cardinality(&self, label: &str) -> f64 {
2917        self.statistics.read().estimate_label_cardinality(label)
2918    }
2919
2920    /// Estimates average degree for an edge type.
2921    #[must_use]
2922    pub fn estimate_avg_degree(&self, edge_type: &str, outgoing: bool) -> f64 {
2923        self.statistics
2924            .read()
2925            .estimate_avg_degree(edge_type, outgoing)
2926    }
2927
2928    // === Internal Helpers ===
2929
2930    fn get_or_create_label_id(&self, label: &str) -> u32 {
2931        {
2932            let label_to_id = self.label_to_id.read();
2933            if let Some(&id) = label_to_id.get(label) {
2934                return id;
2935            }
2936        }
2937
2938        let mut label_to_id = self.label_to_id.write();
2939        let mut id_to_label = self.id_to_label.write();
2940
2941        // Double-check after acquiring write lock
2942        if let Some(&id) = label_to_id.get(label) {
2943            return id;
2944        }
2945
2946        let id = id_to_label.len() as u32;
2947
2948        let label: ArcStr = label.into();
2949        label_to_id.insert(label.clone(), id);
2950        id_to_label.push(label);
2951
2952        id
2953    }
2954
2955    fn get_or_create_edge_type_id(&self, edge_type: &str) -> u32 {
2956        {
2957            let type_to_id = self.edge_type_to_id.read();
2958            if let Some(&id) = type_to_id.get(edge_type) {
2959                return id;
2960            }
2961        }
2962
2963        let mut type_to_id = self.edge_type_to_id.write();
2964        let mut id_to_type = self.id_to_edge_type.write();
2965
2966        // Double-check
2967        if let Some(&id) = type_to_id.get(edge_type) {
2968            return id;
2969        }
2970
2971        let id = id_to_type.len() as u32;
2972        let edge_type: ArcStr = edge_type.into();
2973        type_to_id.insert(edge_type.clone(), id);
2974        id_to_type.push(edge_type);
2975
2976        id
2977    }
2978
2979    // === Recovery Support ===
2980
2981    /// Creates a node with a specific ID during recovery.
2982    ///
2983    /// This is used for WAL recovery to restore nodes with their original IDs.
2984    /// The caller must ensure IDs don't conflict with existing nodes.
2985    #[cfg(not(feature = "tiered-storage"))]
2986    pub fn create_node_with_id(&self, id: NodeId, labels: &[&str]) {
2987        let epoch = self.current_epoch();
2988        let mut record = NodeRecord::new(id, epoch);
2989        record.set_label_count(labels.len() as u16);
2990
2991        // Store labels in node_labels map and label_index
2992        let mut node_label_set = FxHashSet::default();
2993        for label in labels {
2994            let label_id = self.get_or_create_label_id(*label);
2995            node_label_set.insert(label_id);
2996
2997            // Update label index
2998            let mut index = self.label_index.write();
2999            while index.len() <= label_id as usize {
3000                index.push(FxHashMap::default());
3001            }
3002            index[label_id as usize].insert(id, ());
3003        }
3004
3005        // Store node's labels
3006        self.node_labels.write().insert(id, node_label_set);
3007
3008        // Create version chain with initial version (using SYSTEM tx for recovery)
3009        let chain = VersionChain::with_initial(record, epoch, TxId::SYSTEM);
3010        self.nodes.write().insert(id, chain);
3011
3012        // Update next_node_id if necessary to avoid future collisions
3013        let id_val = id.as_u64();
3014        let _ = self
3015            .next_node_id
3016            .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
3017                if id_val >= current {
3018                    Some(id_val + 1)
3019                } else {
3020                    None
3021                }
3022            });
3023    }
3024
3025    /// Creates a node with a specific ID during recovery.
3026    /// (Tiered storage version)
3027    #[cfg(feature = "tiered-storage")]
3028    pub fn create_node_with_id(&self, id: NodeId, labels: &[&str]) {
3029        let epoch = self.current_epoch();
3030        let mut record = NodeRecord::new(id, epoch);
3031        record.set_label_count(labels.len() as u16);
3032
3033        // Store labels in node_labels map and label_index
3034        let mut node_label_set = FxHashSet::default();
3035        for label in labels {
3036            let label_id = self.get_or_create_label_id(*label);
3037            node_label_set.insert(label_id);
3038
3039            // Update label index
3040            let mut index = self.label_index.write();
3041            while index.len() <= label_id as usize {
3042                index.push(FxHashMap::default());
3043            }
3044            index[label_id as usize].insert(id, ());
3045        }
3046
3047        // Store node's labels
3048        self.node_labels.write().insert(id, node_label_set);
3049
3050        // Allocate record in arena and get offset (create epoch if needed)
3051        let arena = self.arena_allocator.arena_or_create(epoch);
3052        let (offset, _stored) = arena.alloc_value_with_offset(record);
3053
3054        // Create HotVersionRef (using SYSTEM tx for recovery)
3055        let hot_ref = HotVersionRef::new(epoch, offset, TxId::SYSTEM);
3056        let mut versions = self.node_versions.write();
3057        versions.insert(id, VersionIndex::with_initial(hot_ref));
3058
3059        // Update next_node_id if necessary to avoid future collisions
3060        let id_val = id.as_u64();
3061        let _ = self
3062            .next_node_id
3063            .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
3064                if id_val >= current {
3065                    Some(id_val + 1)
3066                } else {
3067                    None
3068                }
3069            });
3070    }
3071
3072    /// Creates an edge with a specific ID during recovery.
3073    ///
3074    /// This is used for WAL recovery to restore edges with their original IDs.
3075    #[cfg(not(feature = "tiered-storage"))]
3076    pub fn create_edge_with_id(&self, id: EdgeId, src: NodeId, dst: NodeId, edge_type: &str) {
3077        let epoch = self.current_epoch();
3078        let type_id = self.get_or_create_edge_type_id(edge_type);
3079
3080        let record = EdgeRecord::new(id, src, dst, type_id, epoch);
3081        let chain = VersionChain::with_initial(record, epoch, TxId::SYSTEM);
3082        self.edges.write().insert(id, chain);
3083
3084        // Update adjacency
3085        self.forward_adj.add_edge(src, dst, id);
3086        if let Some(ref backward) = self.backward_adj {
3087            backward.add_edge(dst, src, id);
3088        }
3089
3090        // Update next_edge_id if necessary
3091        let id_val = id.as_u64();
3092        let _ = self
3093            .next_edge_id
3094            .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
3095                if id_val >= current {
3096                    Some(id_val + 1)
3097                } else {
3098                    None
3099                }
3100            });
3101    }
3102
3103    /// Creates an edge with a specific ID during recovery.
3104    /// (Tiered storage version)
3105    #[cfg(feature = "tiered-storage")]
3106    pub fn create_edge_with_id(&self, id: EdgeId, src: NodeId, dst: NodeId, edge_type: &str) {
3107        let epoch = self.current_epoch();
3108        let type_id = self.get_or_create_edge_type_id(edge_type);
3109
3110        let record = EdgeRecord::new(id, src, dst, type_id, epoch);
3111
3112        // Allocate record in arena and get offset (create epoch if needed)
3113        let arena = self.arena_allocator.arena_or_create(epoch);
3114        let (offset, _stored) = arena.alloc_value_with_offset(record);
3115
3116        // Create HotVersionRef (using SYSTEM tx for recovery)
3117        let hot_ref = HotVersionRef::new(epoch, offset, TxId::SYSTEM);
3118        let mut versions = self.edge_versions.write();
3119        versions.insert(id, VersionIndex::with_initial(hot_ref));
3120
3121        // Update adjacency
3122        self.forward_adj.add_edge(src, dst, id);
3123        if let Some(ref backward) = self.backward_adj {
3124            backward.add_edge(dst, src, id);
3125        }
3126
3127        // Update next_edge_id if necessary
3128        let id_val = id.as_u64();
3129        let _ = self
3130            .next_edge_id
3131            .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
3132                if id_val >= current {
3133                    Some(id_val + 1)
3134                } else {
3135                    None
3136                }
3137            });
3138    }
3139
3140    /// Sets the current epoch during recovery.
3141    pub fn set_epoch(&self, epoch: EpochId) {
3142        self.current_epoch.store(epoch.as_u64(), Ordering::SeqCst);
3143    }
3144}
3145
3146impl Default for LpgStore {
3147    fn default() -> Self {
3148        Self::new()
3149    }
3150}
3151
3152#[cfg(test)]
3153mod tests {
3154    use super::*;
3155
3156    #[test]
3157    fn test_create_node() {
3158        let store = LpgStore::new();
3159
3160        let id = store.create_node(&["Person"]);
3161        assert!(id.is_valid());
3162
3163        let node = store.get_node(id).unwrap();
3164        assert!(node.has_label("Person"));
3165        assert!(!node.has_label("Animal"));
3166    }
3167
3168    #[test]
3169    fn test_create_node_with_props() {
3170        let store = LpgStore::new();
3171
3172        let id = store.create_node_with_props(
3173            &["Person"],
3174            [("name", Value::from("Alice")), ("age", Value::from(30i64))],
3175        );
3176
3177        let node = store.get_node(id).unwrap();
3178        assert_eq!(
3179            node.get_property("name").and_then(|v| v.as_str()),
3180            Some("Alice")
3181        );
3182        assert_eq!(
3183            node.get_property("age").and_then(|v| v.as_int64()),
3184            Some(30)
3185        );
3186    }
3187
3188    #[test]
3189    fn test_delete_node() {
3190        let store = LpgStore::new();
3191
3192        let id = store.create_node(&["Person"]);
3193        assert_eq!(store.node_count(), 1);
3194
3195        assert!(store.delete_node(id));
3196        assert_eq!(store.node_count(), 0);
3197        assert!(store.get_node(id).is_none());
3198
3199        // Double delete should return false
3200        assert!(!store.delete_node(id));
3201    }
3202
3203    #[test]
3204    fn test_create_edge() {
3205        let store = LpgStore::new();
3206
3207        let alice = store.create_node(&["Person"]);
3208        let bob = store.create_node(&["Person"]);
3209
3210        let edge_id = store.create_edge(alice, bob, "KNOWS");
3211        assert!(edge_id.is_valid());
3212
3213        let edge = store.get_edge(edge_id).unwrap();
3214        assert_eq!(edge.src, alice);
3215        assert_eq!(edge.dst, bob);
3216        assert_eq!(edge.edge_type.as_str(), "KNOWS");
3217    }
3218
3219    #[test]
3220    fn test_neighbors() {
3221        let store = LpgStore::new();
3222
3223        let a = store.create_node(&["Person"]);
3224        let b = store.create_node(&["Person"]);
3225        let c = store.create_node(&["Person"]);
3226
3227        store.create_edge(a, b, "KNOWS");
3228        store.create_edge(a, c, "KNOWS");
3229
3230        let outgoing: Vec<_> = store.neighbors(a, Direction::Outgoing).collect();
3231        assert_eq!(outgoing.len(), 2);
3232        assert!(outgoing.contains(&b));
3233        assert!(outgoing.contains(&c));
3234
3235        let incoming: Vec<_> = store.neighbors(b, Direction::Incoming).collect();
3236        assert_eq!(incoming.len(), 1);
3237        assert!(incoming.contains(&a));
3238    }
3239
3240    #[test]
3241    fn test_nodes_by_label() {
3242        let store = LpgStore::new();
3243
3244        let p1 = store.create_node(&["Person"]);
3245        let p2 = store.create_node(&["Person"]);
3246        let _a = store.create_node(&["Animal"]);
3247
3248        let persons = store.nodes_by_label("Person");
3249        assert_eq!(persons.len(), 2);
3250        assert!(persons.contains(&p1));
3251        assert!(persons.contains(&p2));
3252
3253        let animals = store.nodes_by_label("Animal");
3254        assert_eq!(animals.len(), 1);
3255    }
3256
3257    #[test]
3258    fn test_delete_edge() {
3259        let store = LpgStore::new();
3260
3261        let a = store.create_node(&["Person"]);
3262        let b = store.create_node(&["Person"]);
3263        let edge_id = store.create_edge(a, b, "KNOWS");
3264
3265        assert_eq!(store.edge_count(), 1);
3266
3267        assert!(store.delete_edge(edge_id));
3268        assert_eq!(store.edge_count(), 0);
3269        assert!(store.get_edge(edge_id).is_none());
3270    }
3271
3272    // === New tests for improved coverage ===
3273
3274    #[test]
3275    fn test_lpg_store_config() {
3276        // Test with_config
3277        let config = LpgStoreConfig {
3278            backward_edges: false,
3279            initial_node_capacity: 100,
3280            initial_edge_capacity: 200,
3281        };
3282        let store = LpgStore::with_config(config);
3283
3284        // Store should work but without backward adjacency
3285        let a = store.create_node(&["Person"]);
3286        let b = store.create_node(&["Person"]);
3287        store.create_edge(a, b, "KNOWS");
3288
3289        // Outgoing should work
3290        let outgoing: Vec<_> = store.neighbors(a, Direction::Outgoing).collect();
3291        assert_eq!(outgoing.len(), 1);
3292
3293        // Incoming should be empty (no backward adjacency)
3294        let incoming: Vec<_> = store.neighbors(b, Direction::Incoming).collect();
3295        assert_eq!(incoming.len(), 0);
3296    }
3297
3298    #[test]
3299    fn test_epoch_management() {
3300        let store = LpgStore::new();
3301
3302        let epoch0 = store.current_epoch();
3303        assert_eq!(epoch0.as_u64(), 0);
3304
3305        let epoch1 = store.new_epoch();
3306        assert_eq!(epoch1.as_u64(), 1);
3307
3308        let current = store.current_epoch();
3309        assert_eq!(current.as_u64(), 1);
3310    }
3311
3312    #[test]
3313    fn test_node_properties() {
3314        let store = LpgStore::new();
3315        let id = store.create_node(&["Person"]);
3316
3317        // Set and get property
3318        store.set_node_property(id, "name", Value::from("Alice"));
3319        let name = store.get_node_property(id, &"name".into());
3320        assert!(matches!(name, Some(Value::String(s)) if s.as_str() == "Alice"));
3321
3322        // Update property
3323        store.set_node_property(id, "name", Value::from("Bob"));
3324        let name = store.get_node_property(id, &"name".into());
3325        assert!(matches!(name, Some(Value::String(s)) if s.as_str() == "Bob"));
3326
3327        // Remove property
3328        let old = store.remove_node_property(id, "name");
3329        assert!(matches!(old, Some(Value::String(s)) if s.as_str() == "Bob"));
3330
3331        // Property should be gone
3332        let name = store.get_node_property(id, &"name".into());
3333        assert!(name.is_none());
3334
3335        // Remove non-existent property
3336        let none = store.remove_node_property(id, "nonexistent");
3337        assert!(none.is_none());
3338    }
3339
3340    #[test]
3341    fn test_edge_properties() {
3342        let store = LpgStore::new();
3343        let a = store.create_node(&["Person"]);
3344        let b = store.create_node(&["Person"]);
3345        let edge_id = store.create_edge(a, b, "KNOWS");
3346
3347        // Set and get property
3348        store.set_edge_property(edge_id, "since", Value::from(2020i64));
3349        let since = store.get_edge_property(edge_id, &"since".into());
3350        assert_eq!(since.and_then(|v| v.as_int64()), Some(2020));
3351
3352        // Remove property
3353        let old = store.remove_edge_property(edge_id, "since");
3354        assert_eq!(old.and_then(|v| v.as_int64()), Some(2020));
3355
3356        let since = store.get_edge_property(edge_id, &"since".into());
3357        assert!(since.is_none());
3358    }
3359
3360    #[test]
3361    fn test_add_remove_label() {
3362        let store = LpgStore::new();
3363        let id = store.create_node(&["Person"]);
3364
3365        // Add new label
3366        assert!(store.add_label(id, "Employee"));
3367
3368        let node = store.get_node(id).unwrap();
3369        assert!(node.has_label("Person"));
3370        assert!(node.has_label("Employee"));
3371
3372        // Adding same label again should fail
3373        assert!(!store.add_label(id, "Employee"));
3374
3375        // Remove label
3376        assert!(store.remove_label(id, "Employee"));
3377
3378        let node = store.get_node(id).unwrap();
3379        assert!(node.has_label("Person"));
3380        assert!(!node.has_label("Employee"));
3381
3382        // Removing non-existent label should fail
3383        assert!(!store.remove_label(id, "Employee"));
3384        assert!(!store.remove_label(id, "NonExistent"));
3385    }
3386
3387    #[test]
3388    fn test_add_label_to_nonexistent_node() {
3389        let store = LpgStore::new();
3390        let fake_id = NodeId::new(999);
3391        assert!(!store.add_label(fake_id, "Label"));
3392    }
3393
3394    #[test]
3395    fn test_remove_label_from_nonexistent_node() {
3396        let store = LpgStore::new();
3397        let fake_id = NodeId::new(999);
3398        assert!(!store.remove_label(fake_id, "Label"));
3399    }
3400
3401    #[test]
3402    fn test_node_ids() {
3403        let store = LpgStore::new();
3404
3405        let n1 = store.create_node(&["Person"]);
3406        let n2 = store.create_node(&["Person"]);
3407        let n3 = store.create_node(&["Person"]);
3408
3409        let ids = store.node_ids();
3410        assert_eq!(ids.len(), 3);
3411        assert!(ids.contains(&n1));
3412        assert!(ids.contains(&n2));
3413        assert!(ids.contains(&n3));
3414
3415        // Delete one
3416        store.delete_node(n2);
3417        let ids = store.node_ids();
3418        assert_eq!(ids.len(), 2);
3419        assert!(!ids.contains(&n2));
3420    }
3421
3422    #[test]
3423    fn test_delete_node_nonexistent() {
3424        let store = LpgStore::new();
3425        let fake_id = NodeId::new(999);
3426        assert!(!store.delete_node(fake_id));
3427    }
3428
3429    #[test]
3430    fn test_delete_edge_nonexistent() {
3431        let store = LpgStore::new();
3432        let fake_id = EdgeId::new(999);
3433        assert!(!store.delete_edge(fake_id));
3434    }
3435
3436    #[test]
3437    fn test_delete_edge_double() {
3438        let store = LpgStore::new();
3439        let a = store.create_node(&["Person"]);
3440        let b = store.create_node(&["Person"]);
3441        let edge_id = store.create_edge(a, b, "KNOWS");
3442
3443        assert!(store.delete_edge(edge_id));
3444        assert!(!store.delete_edge(edge_id)); // Double delete
3445    }
3446
3447    #[test]
3448    fn test_create_edge_with_props() {
3449        let store = LpgStore::new();
3450        let a = store.create_node(&["Person"]);
3451        let b = store.create_node(&["Person"]);
3452
3453        let edge_id = store.create_edge_with_props(
3454            a,
3455            b,
3456            "KNOWS",
3457            [
3458                ("since", Value::from(2020i64)),
3459                ("weight", Value::from(1.0)),
3460            ],
3461        );
3462
3463        let edge = store.get_edge(edge_id).unwrap();
3464        assert_eq!(
3465            edge.get_property("since").and_then(|v| v.as_int64()),
3466            Some(2020)
3467        );
3468        assert_eq!(
3469            edge.get_property("weight").and_then(|v| v.as_float64()),
3470            Some(1.0)
3471        );
3472    }
3473
3474    #[test]
3475    fn test_delete_node_edges() {
3476        let store = LpgStore::new();
3477
3478        let a = store.create_node(&["Person"]);
3479        let b = store.create_node(&["Person"]);
3480        let c = store.create_node(&["Person"]);
3481
3482        store.create_edge(a, b, "KNOWS"); // a -> b
3483        store.create_edge(c, a, "KNOWS"); // c -> a
3484
3485        assert_eq!(store.edge_count(), 2);
3486
3487        // Delete all edges connected to a
3488        store.delete_node_edges(a);
3489
3490        assert_eq!(store.edge_count(), 0);
3491    }
3492
3493    #[test]
3494    fn test_neighbors_both_directions() {
3495        let store = LpgStore::new();
3496
3497        let a = store.create_node(&["Person"]);
3498        let b = store.create_node(&["Person"]);
3499        let c = store.create_node(&["Person"]);
3500
3501        store.create_edge(a, b, "KNOWS"); // a -> b
3502        store.create_edge(c, a, "KNOWS"); // c -> a
3503
3504        // Direction::Both for node a
3505        let neighbors: Vec<_> = store.neighbors(a, Direction::Both).collect();
3506        assert_eq!(neighbors.len(), 2);
3507        assert!(neighbors.contains(&b)); // outgoing
3508        assert!(neighbors.contains(&c)); // incoming
3509    }
3510
3511    #[test]
3512    fn test_edges_from() {
3513        let store = LpgStore::new();
3514
3515        let a = store.create_node(&["Person"]);
3516        let b = store.create_node(&["Person"]);
3517        let c = store.create_node(&["Person"]);
3518
3519        let e1 = store.create_edge(a, b, "KNOWS");
3520        let e2 = store.create_edge(a, c, "KNOWS");
3521
3522        let edges: Vec<_> = store.edges_from(a, Direction::Outgoing).collect();
3523        assert_eq!(edges.len(), 2);
3524        assert!(edges.iter().any(|(_, e)| *e == e1));
3525        assert!(edges.iter().any(|(_, e)| *e == e2));
3526
3527        // Incoming edges to b
3528        let incoming: Vec<_> = store.edges_from(b, Direction::Incoming).collect();
3529        assert_eq!(incoming.len(), 1);
3530        assert_eq!(incoming[0].1, e1);
3531    }
3532
3533    #[test]
3534    fn test_edges_to() {
3535        let store = LpgStore::new();
3536
3537        let a = store.create_node(&["Person"]);
3538        let b = store.create_node(&["Person"]);
3539        let c = store.create_node(&["Person"]);
3540
3541        let e1 = store.create_edge(a, b, "KNOWS");
3542        let e2 = store.create_edge(c, b, "KNOWS");
3543
3544        // Edges pointing TO b
3545        let to_b = store.edges_to(b);
3546        assert_eq!(to_b.len(), 2);
3547        assert!(to_b.iter().any(|(src, e)| *src == a && *e == e1));
3548        assert!(to_b.iter().any(|(src, e)| *src == c && *e == e2));
3549    }
3550
3551    #[test]
3552    fn test_out_degree_in_degree() {
3553        let store = LpgStore::new();
3554
3555        let a = store.create_node(&["Person"]);
3556        let b = store.create_node(&["Person"]);
3557        let c = store.create_node(&["Person"]);
3558
3559        store.create_edge(a, b, "KNOWS");
3560        store.create_edge(a, c, "KNOWS");
3561        store.create_edge(c, b, "KNOWS");
3562
3563        assert_eq!(store.out_degree(a), 2);
3564        assert_eq!(store.out_degree(b), 0);
3565        assert_eq!(store.out_degree(c), 1);
3566
3567        assert_eq!(store.in_degree(a), 0);
3568        assert_eq!(store.in_degree(b), 2);
3569        assert_eq!(store.in_degree(c), 1);
3570    }
3571
3572    #[test]
3573    fn test_edge_type() {
3574        let store = LpgStore::new();
3575
3576        let a = store.create_node(&["Person"]);
3577        let b = store.create_node(&["Person"]);
3578        let edge_id = store.create_edge(a, b, "KNOWS");
3579
3580        let edge_type = store.edge_type(edge_id);
3581        assert_eq!(edge_type.as_deref(), Some("KNOWS"));
3582
3583        // Non-existent edge
3584        let fake_id = EdgeId::new(999);
3585        assert!(store.edge_type(fake_id).is_none());
3586    }
3587
3588    #[test]
3589    fn test_count_methods() {
3590        let store = LpgStore::new();
3591
3592        assert_eq!(store.label_count(), 0);
3593        assert_eq!(store.edge_type_count(), 0);
3594        assert_eq!(store.property_key_count(), 0);
3595
3596        let a = store.create_node_with_props(&["Person"], [("age", Value::from(30i64))]);
3597        let b = store.create_node(&["Company"]);
3598        store.create_edge_with_props(a, b, "WORKS_AT", [("since", Value::from(2020i64))]);
3599
3600        assert_eq!(store.label_count(), 2); // Person, Company
3601        assert_eq!(store.edge_type_count(), 1); // WORKS_AT
3602        assert_eq!(store.property_key_count(), 2); // age, since
3603    }
3604
3605    #[test]
3606    fn test_all_nodes_and_edges() {
3607        let store = LpgStore::new();
3608
3609        let a = store.create_node(&["Person"]);
3610        let b = store.create_node(&["Person"]);
3611        store.create_edge(a, b, "KNOWS");
3612
3613        let nodes: Vec<_> = store.all_nodes().collect();
3614        assert_eq!(nodes.len(), 2);
3615
3616        let edges: Vec<_> = store.all_edges().collect();
3617        assert_eq!(edges.len(), 1);
3618    }
3619
3620    #[test]
3621    fn test_all_labels_and_edge_types() {
3622        let store = LpgStore::new();
3623
3624        store.create_node(&["Person"]);
3625        store.create_node(&["Company"]);
3626        let a = store.create_node(&["Animal"]);
3627        let b = store.create_node(&["Animal"]);
3628        store.create_edge(a, b, "EATS");
3629
3630        let labels = store.all_labels();
3631        assert_eq!(labels.len(), 3);
3632        assert!(labels.contains(&"Person".to_string()));
3633        assert!(labels.contains(&"Company".to_string()));
3634        assert!(labels.contains(&"Animal".to_string()));
3635
3636        let edge_types = store.all_edge_types();
3637        assert_eq!(edge_types.len(), 1);
3638        assert!(edge_types.contains(&"EATS".to_string()));
3639    }
3640
3641    #[test]
3642    fn test_all_property_keys() {
3643        let store = LpgStore::new();
3644
3645        let a = store.create_node_with_props(&["Person"], [("name", Value::from("Alice"))]);
3646        let b = store.create_node_with_props(&["Person"], [("age", Value::from(30i64))]);
3647        store.create_edge_with_props(a, b, "KNOWS", [("since", Value::from(2020i64))]);
3648
3649        let keys = store.all_property_keys();
3650        assert!(keys.contains(&"name".to_string()));
3651        assert!(keys.contains(&"age".to_string()));
3652        assert!(keys.contains(&"since".to_string()));
3653    }
3654
3655    #[test]
3656    fn test_nodes_with_label() {
3657        let store = LpgStore::new();
3658
3659        store.create_node(&["Person"]);
3660        store.create_node(&["Person"]);
3661        store.create_node(&["Company"]);
3662
3663        let persons: Vec<_> = store.nodes_with_label("Person").collect();
3664        assert_eq!(persons.len(), 2);
3665
3666        let companies: Vec<_> = store.nodes_with_label("Company").collect();
3667        assert_eq!(companies.len(), 1);
3668
3669        let none: Vec<_> = store.nodes_with_label("NonExistent").collect();
3670        assert_eq!(none.len(), 0);
3671    }
3672
3673    #[test]
3674    fn test_edges_with_type() {
3675        let store = LpgStore::new();
3676
3677        let a = store.create_node(&["Person"]);
3678        let b = store.create_node(&["Person"]);
3679        let c = store.create_node(&["Company"]);
3680
3681        store.create_edge(a, b, "KNOWS");
3682        store.create_edge(a, c, "WORKS_AT");
3683
3684        let knows: Vec<_> = store.edges_with_type("KNOWS").collect();
3685        assert_eq!(knows.len(), 1);
3686
3687        let works_at: Vec<_> = store.edges_with_type("WORKS_AT").collect();
3688        assert_eq!(works_at.len(), 1);
3689
3690        let none: Vec<_> = store.edges_with_type("NonExistent").collect();
3691        assert_eq!(none.len(), 0);
3692    }
3693
3694    #[test]
3695    fn test_nodes_by_label_nonexistent() {
3696        let store = LpgStore::new();
3697        store.create_node(&["Person"]);
3698
3699        let empty = store.nodes_by_label("NonExistent");
3700        assert!(empty.is_empty());
3701    }
3702
3703    #[test]
3704    fn test_statistics() {
3705        let store = LpgStore::new();
3706
3707        let a = store.create_node(&["Person"]);
3708        let b = store.create_node(&["Person"]);
3709        let c = store.create_node(&["Company"]);
3710
3711        store.create_edge(a, b, "KNOWS");
3712        store.create_edge(a, c, "WORKS_AT");
3713
3714        store.compute_statistics();
3715        let stats = store.statistics();
3716
3717        assert_eq!(stats.total_nodes, 3);
3718        assert_eq!(stats.total_edges, 2);
3719
3720        // Estimates
3721        let person_card = store.estimate_label_cardinality("Person");
3722        assert!(person_card > 0.0);
3723
3724        let avg_degree = store.estimate_avg_degree("KNOWS", true);
3725        assert!(avg_degree >= 0.0);
3726    }
3727
3728    #[test]
3729    fn test_zone_maps() {
3730        let store = LpgStore::new();
3731
3732        store.create_node_with_props(&["Person"], [("age", Value::from(25i64))]);
3733        store.create_node_with_props(&["Person"], [("age", Value::from(35i64))]);
3734
3735        // Zone map should indicate possible matches (30 is within [25, 35] range)
3736        let might_match =
3737            store.node_property_might_match(&"age".into(), CompareOp::Eq, &Value::from(30i64));
3738        // Zone maps return true conservatively when value is within min/max range
3739        assert!(might_match);
3740
3741        let zone = store.node_property_zone_map(&"age".into());
3742        assert!(zone.is_some());
3743
3744        // Non-existent property
3745        let no_zone = store.node_property_zone_map(&"nonexistent".into());
3746        assert!(no_zone.is_none());
3747
3748        // Edge zone maps
3749        let a = store.create_node(&["A"]);
3750        let b = store.create_node(&["B"]);
3751        store.create_edge_with_props(a, b, "REL", [("weight", Value::from(1.0))]);
3752
3753        let edge_zone = store.edge_property_zone_map(&"weight".into());
3754        assert!(edge_zone.is_some());
3755    }
3756
3757    #[test]
3758    fn test_rebuild_zone_maps() {
3759        let store = LpgStore::new();
3760        store.create_node_with_props(&["Person"], [("age", Value::from(25i64))]);
3761
3762        // Should not panic
3763        store.rebuild_zone_maps();
3764    }
3765
3766    #[test]
3767    fn test_create_node_with_id() {
3768        let store = LpgStore::new();
3769
3770        let specific_id = NodeId::new(100);
3771        store.create_node_with_id(specific_id, &["Person", "Employee"]);
3772
3773        let node = store.get_node(specific_id).unwrap();
3774        assert!(node.has_label("Person"));
3775        assert!(node.has_label("Employee"));
3776
3777        // Next auto-generated ID should be > 100
3778        let next = store.create_node(&["Other"]);
3779        assert!(next.as_u64() > 100);
3780    }
3781
3782    #[test]
3783    fn test_create_edge_with_id() {
3784        let store = LpgStore::new();
3785
3786        let a = store.create_node(&["A"]);
3787        let b = store.create_node(&["B"]);
3788
3789        let specific_id = EdgeId::new(500);
3790        store.create_edge_with_id(specific_id, a, b, "REL");
3791
3792        let edge = store.get_edge(specific_id).unwrap();
3793        assert_eq!(edge.src, a);
3794        assert_eq!(edge.dst, b);
3795        assert_eq!(edge.edge_type.as_str(), "REL");
3796
3797        // Next auto-generated ID should be > 500
3798        let next = store.create_edge(a, b, "OTHER");
3799        assert!(next.as_u64() > 500);
3800    }
3801
3802    #[test]
3803    fn test_set_epoch() {
3804        let store = LpgStore::new();
3805
3806        assert_eq!(store.current_epoch().as_u64(), 0);
3807
3808        store.set_epoch(EpochId::new(42));
3809        assert_eq!(store.current_epoch().as_u64(), 42);
3810    }
3811
3812    #[test]
3813    fn test_get_node_nonexistent() {
3814        let store = LpgStore::new();
3815        let fake_id = NodeId::new(999);
3816        assert!(store.get_node(fake_id).is_none());
3817    }
3818
3819    #[test]
3820    fn test_get_edge_nonexistent() {
3821        let store = LpgStore::new();
3822        let fake_id = EdgeId::new(999);
3823        assert!(store.get_edge(fake_id).is_none());
3824    }
3825
3826    #[test]
3827    fn test_multiple_labels() {
3828        let store = LpgStore::new();
3829
3830        let id = store.create_node(&["Person", "Employee", "Manager"]);
3831        let node = store.get_node(id).unwrap();
3832
3833        assert!(node.has_label("Person"));
3834        assert!(node.has_label("Employee"));
3835        assert!(node.has_label("Manager"));
3836        assert!(!node.has_label("Other"));
3837    }
3838
3839    #[test]
3840    fn test_default_impl() {
3841        let store: LpgStore = Default::default();
3842        assert_eq!(store.node_count(), 0);
3843        assert_eq!(store.edge_count(), 0);
3844    }
3845
3846    #[test]
3847    fn test_edges_from_both_directions() {
3848        let store = LpgStore::new();
3849
3850        let a = store.create_node(&["A"]);
3851        let b = store.create_node(&["B"]);
3852        let c = store.create_node(&["C"]);
3853
3854        let e1 = store.create_edge(a, b, "R1"); // a -> b
3855        let e2 = store.create_edge(c, a, "R2"); // c -> a
3856
3857        // Both directions from a
3858        let edges: Vec<_> = store.edges_from(a, Direction::Both).collect();
3859        assert_eq!(edges.len(), 2);
3860        assert!(edges.iter().any(|(_, e)| *e == e1)); // outgoing
3861        assert!(edges.iter().any(|(_, e)| *e == e2)); // incoming
3862    }
3863
3864    #[test]
3865    fn test_no_backward_adj_in_degree() {
3866        let config = LpgStoreConfig {
3867            backward_edges: false,
3868            initial_node_capacity: 10,
3869            initial_edge_capacity: 10,
3870        };
3871        let store = LpgStore::with_config(config);
3872
3873        let a = store.create_node(&["A"]);
3874        let b = store.create_node(&["B"]);
3875        store.create_edge(a, b, "R");
3876
3877        // in_degree should still work (falls back to scanning)
3878        let degree = store.in_degree(b);
3879        assert_eq!(degree, 1);
3880    }
3881
3882    #[test]
3883    fn test_no_backward_adj_edges_to() {
3884        let config = LpgStoreConfig {
3885            backward_edges: false,
3886            initial_node_capacity: 10,
3887            initial_edge_capacity: 10,
3888        };
3889        let store = LpgStore::with_config(config);
3890
3891        let a = store.create_node(&["A"]);
3892        let b = store.create_node(&["B"]);
3893        let e = store.create_edge(a, b, "R");
3894
3895        // edges_to should still work (falls back to scanning)
3896        let edges = store.edges_to(b);
3897        assert_eq!(edges.len(), 1);
3898        assert_eq!(edges[0].1, e);
3899    }
3900
3901    #[test]
3902    fn test_node_versioned_creation() {
3903        let store = LpgStore::new();
3904
3905        let epoch = store.new_epoch();
3906        let tx_id = TxId::new(1);
3907
3908        let id = store.create_node_versioned(&["Person"], epoch, tx_id);
3909        assert!(store.get_node(id).is_some());
3910    }
3911
3912    #[test]
3913    fn test_edge_versioned_creation() {
3914        let store = LpgStore::new();
3915
3916        let a = store.create_node(&["A"]);
3917        let b = store.create_node(&["B"]);
3918
3919        let epoch = store.new_epoch();
3920        let tx_id = TxId::new(1);
3921
3922        let edge_id = store.create_edge_versioned(a, b, "REL", epoch, tx_id);
3923        assert!(store.get_edge(edge_id).is_some());
3924    }
3925
3926    #[test]
3927    fn test_node_with_props_versioned() {
3928        let store = LpgStore::new();
3929
3930        let epoch = store.new_epoch();
3931        let tx_id = TxId::new(1);
3932
3933        let id = store.create_node_with_props_versioned(
3934            &["Person"],
3935            [("name", Value::from("Alice"))],
3936            epoch,
3937            tx_id,
3938        );
3939
3940        let node = store.get_node(id).unwrap();
3941        assert_eq!(
3942            node.get_property("name").and_then(|v| v.as_str()),
3943            Some("Alice")
3944        );
3945    }
3946
3947    #[test]
3948    fn test_discard_uncommitted_versions() {
3949        let store = LpgStore::new();
3950
3951        let epoch = store.new_epoch();
3952        let tx_id = TxId::new(42);
3953
3954        // Create node with specific tx
3955        let node_id = store.create_node_versioned(&["Person"], epoch, tx_id);
3956        assert!(store.get_node(node_id).is_some());
3957
3958        // Discard uncommitted versions for this tx
3959        store.discard_uncommitted_versions(tx_id);
3960
3961        // Node should be gone (version chain was removed)
3962        assert!(store.get_node(node_id).is_none());
3963    }
3964
3965    // === Property Index Tests ===
3966
3967    #[test]
3968    fn test_property_index_create_and_lookup() {
3969        let store = LpgStore::new();
3970
3971        // Create nodes with properties
3972        let alice = store.create_node(&["Person"]);
3973        let bob = store.create_node(&["Person"]);
3974        let charlie = store.create_node(&["Person"]);
3975
3976        store.set_node_property(alice, "city", Value::from("NYC"));
3977        store.set_node_property(bob, "city", Value::from("NYC"));
3978        store.set_node_property(charlie, "city", Value::from("LA"));
3979
3980        // Before indexing, lookup still works (via scan)
3981        let nyc_people = store.find_nodes_by_property("city", &Value::from("NYC"));
3982        assert_eq!(nyc_people.len(), 2);
3983
3984        // Create index
3985        store.create_property_index("city");
3986        assert!(store.has_property_index("city"));
3987
3988        // Indexed lookup should return same results
3989        let nyc_people = store.find_nodes_by_property("city", &Value::from("NYC"));
3990        assert_eq!(nyc_people.len(), 2);
3991        assert!(nyc_people.contains(&alice));
3992        assert!(nyc_people.contains(&bob));
3993
3994        let la_people = store.find_nodes_by_property("city", &Value::from("LA"));
3995        assert_eq!(la_people.len(), 1);
3996        assert!(la_people.contains(&charlie));
3997    }
3998
3999    #[test]
4000    fn test_property_index_maintained_on_update() {
4001        let store = LpgStore::new();
4002
4003        // Create index first
4004        store.create_property_index("status");
4005
4006        let node = store.create_node(&["Task"]);
4007        store.set_node_property(node, "status", Value::from("pending"));
4008
4009        // Should find by initial value
4010        let pending = store.find_nodes_by_property("status", &Value::from("pending"));
4011        assert_eq!(pending.len(), 1);
4012        assert!(pending.contains(&node));
4013
4014        // Update the property
4015        store.set_node_property(node, "status", Value::from("done"));
4016
4017        // Old value should not find it
4018        let pending = store.find_nodes_by_property("status", &Value::from("pending"));
4019        assert!(pending.is_empty());
4020
4021        // New value should find it
4022        let done = store.find_nodes_by_property("status", &Value::from("done"));
4023        assert_eq!(done.len(), 1);
4024        assert!(done.contains(&node));
4025    }
4026
4027    #[test]
4028    fn test_property_index_maintained_on_remove() {
4029        let store = LpgStore::new();
4030
4031        store.create_property_index("tag");
4032
4033        let node = store.create_node(&["Item"]);
4034        store.set_node_property(node, "tag", Value::from("important"));
4035
4036        // Should find it
4037        let found = store.find_nodes_by_property("tag", &Value::from("important"));
4038        assert_eq!(found.len(), 1);
4039
4040        // Remove the property
4041        store.remove_node_property(node, "tag");
4042
4043        // Should no longer find it
4044        let found = store.find_nodes_by_property("tag", &Value::from("important"));
4045        assert!(found.is_empty());
4046    }
4047
4048    #[test]
4049    fn test_property_index_drop() {
4050        let store = LpgStore::new();
4051
4052        store.create_property_index("key");
4053        assert!(store.has_property_index("key"));
4054
4055        assert!(store.drop_property_index("key"));
4056        assert!(!store.has_property_index("key"));
4057
4058        // Dropping non-existent index returns false
4059        assert!(!store.drop_property_index("key"));
4060    }
4061
4062    #[test]
4063    fn test_property_index_multiple_values() {
4064        let store = LpgStore::new();
4065
4066        store.create_property_index("age");
4067
4068        // Create multiple nodes with same and different ages
4069        let n1 = store.create_node(&["Person"]);
4070        let n2 = store.create_node(&["Person"]);
4071        let n3 = store.create_node(&["Person"]);
4072        let n4 = store.create_node(&["Person"]);
4073
4074        store.set_node_property(n1, "age", Value::from(25i64));
4075        store.set_node_property(n2, "age", Value::from(25i64));
4076        store.set_node_property(n3, "age", Value::from(30i64));
4077        store.set_node_property(n4, "age", Value::from(25i64));
4078
4079        let age_25 = store.find_nodes_by_property("age", &Value::from(25i64));
4080        assert_eq!(age_25.len(), 3);
4081
4082        let age_30 = store.find_nodes_by_property("age", &Value::from(30i64));
4083        assert_eq!(age_30.len(), 1);
4084
4085        let age_40 = store.find_nodes_by_property("age", &Value::from(40i64));
4086        assert!(age_40.is_empty());
4087    }
4088
4089    #[test]
4090    fn test_property_index_builds_from_existing_data() {
4091        let store = LpgStore::new();
4092
4093        // Create nodes first
4094        let n1 = store.create_node(&["Person"]);
4095        let n2 = store.create_node(&["Person"]);
4096        store.set_node_property(n1, "email", Value::from("alice@example.com"));
4097        store.set_node_property(n2, "email", Value::from("bob@example.com"));
4098
4099        // Create index after data exists
4100        store.create_property_index("email");
4101
4102        // Index should include existing data
4103        let alice = store.find_nodes_by_property("email", &Value::from("alice@example.com"));
4104        assert_eq!(alice.len(), 1);
4105        assert!(alice.contains(&n1));
4106
4107        let bob = store.find_nodes_by_property("email", &Value::from("bob@example.com"));
4108        assert_eq!(bob.len(), 1);
4109        assert!(bob.contains(&n2));
4110    }
4111
4112    #[test]
4113    fn test_get_node_property_batch() {
4114        let store = LpgStore::new();
4115
4116        let n1 = store.create_node(&["Person"]);
4117        let n2 = store.create_node(&["Person"]);
4118        let n3 = store.create_node(&["Person"]);
4119
4120        store.set_node_property(n1, "age", Value::from(25i64));
4121        store.set_node_property(n2, "age", Value::from(30i64));
4122        // n3 has no age property
4123
4124        let age_key = PropertyKey::new("age");
4125        let values = store.get_node_property_batch(&[n1, n2, n3], &age_key);
4126
4127        assert_eq!(values.len(), 3);
4128        assert_eq!(values[0], Some(Value::from(25i64)));
4129        assert_eq!(values[1], Some(Value::from(30i64)));
4130        assert_eq!(values[2], None);
4131    }
4132
4133    #[test]
4134    fn test_get_node_property_batch_empty() {
4135        let store = LpgStore::new();
4136        let key = PropertyKey::new("any");
4137
4138        let values = store.get_node_property_batch(&[], &key);
4139        assert!(values.is_empty());
4140    }
4141
4142    #[test]
4143    fn test_get_nodes_properties_batch() {
4144        let store = LpgStore::new();
4145
4146        let n1 = store.create_node(&["Person"]);
4147        let n2 = store.create_node(&["Person"]);
4148        let n3 = store.create_node(&["Person"]);
4149
4150        store.set_node_property(n1, "name", Value::from("Alice"));
4151        store.set_node_property(n1, "age", Value::from(25i64));
4152        store.set_node_property(n2, "name", Value::from("Bob"));
4153        // n3 has no properties
4154
4155        let all_props = store.get_nodes_properties_batch(&[n1, n2, n3]);
4156
4157        assert_eq!(all_props.len(), 3);
4158        assert_eq!(all_props[0].len(), 2); // name and age
4159        assert_eq!(all_props[1].len(), 1); // name only
4160        assert_eq!(all_props[2].len(), 0); // no properties
4161
4162        assert_eq!(
4163            all_props[0].get(&PropertyKey::new("name")),
4164            Some(&Value::from("Alice"))
4165        );
4166        assert_eq!(
4167            all_props[1].get(&PropertyKey::new("name")),
4168            Some(&Value::from("Bob"))
4169        );
4170    }
4171
4172    #[test]
4173    fn test_get_nodes_properties_batch_empty() {
4174        let store = LpgStore::new();
4175
4176        let all_props = store.get_nodes_properties_batch(&[]);
4177        assert!(all_props.is_empty());
4178    }
4179
4180    #[test]
4181    fn test_get_nodes_properties_selective_batch() {
4182        let store = LpgStore::new();
4183
4184        let n1 = store.create_node(&["Person"]);
4185        let n2 = store.create_node(&["Person"]);
4186
4187        // Set multiple properties
4188        store.set_node_property(n1, "name", Value::from("Alice"));
4189        store.set_node_property(n1, "age", Value::from(25i64));
4190        store.set_node_property(n1, "email", Value::from("alice@example.com"));
4191        store.set_node_property(n2, "name", Value::from("Bob"));
4192        store.set_node_property(n2, "age", Value::from(30i64));
4193        store.set_node_property(n2, "city", Value::from("NYC"));
4194
4195        // Request only name and age (not email or city)
4196        let keys = vec![PropertyKey::new("name"), PropertyKey::new("age")];
4197        let props = store.get_nodes_properties_selective_batch(&[n1, n2], &keys);
4198
4199        assert_eq!(props.len(), 2);
4200
4201        // n1: should have name and age, but NOT email
4202        assert_eq!(props[0].len(), 2);
4203        assert_eq!(
4204            props[0].get(&PropertyKey::new("name")),
4205            Some(&Value::from("Alice"))
4206        );
4207        assert_eq!(
4208            props[0].get(&PropertyKey::new("age")),
4209            Some(&Value::from(25i64))
4210        );
4211        assert_eq!(props[0].get(&PropertyKey::new("email")), None);
4212
4213        // n2: should have name and age, but NOT city
4214        assert_eq!(props[1].len(), 2);
4215        assert_eq!(
4216            props[1].get(&PropertyKey::new("name")),
4217            Some(&Value::from("Bob"))
4218        );
4219        assert_eq!(
4220            props[1].get(&PropertyKey::new("age")),
4221            Some(&Value::from(30i64))
4222        );
4223        assert_eq!(props[1].get(&PropertyKey::new("city")), None);
4224    }
4225
4226    #[test]
4227    fn test_get_nodes_properties_selective_batch_empty_keys() {
4228        let store = LpgStore::new();
4229
4230        let n1 = store.create_node(&["Person"]);
4231        store.set_node_property(n1, "name", Value::from("Alice"));
4232
4233        // Request no properties
4234        let props = store.get_nodes_properties_selective_batch(&[n1], &[]);
4235
4236        assert_eq!(props.len(), 1);
4237        assert!(props[0].is_empty()); // Empty map when no keys requested
4238    }
4239
4240    #[test]
4241    fn test_get_nodes_properties_selective_batch_missing_keys() {
4242        let store = LpgStore::new();
4243
4244        let n1 = store.create_node(&["Person"]);
4245        store.set_node_property(n1, "name", Value::from("Alice"));
4246
4247        // Request a property that doesn't exist
4248        let keys = vec![PropertyKey::new("nonexistent"), PropertyKey::new("name")];
4249        let props = store.get_nodes_properties_selective_batch(&[n1], &keys);
4250
4251        assert_eq!(props.len(), 1);
4252        assert_eq!(props[0].len(), 1); // Only name exists
4253        assert_eq!(
4254            props[0].get(&PropertyKey::new("name")),
4255            Some(&Value::from("Alice"))
4256        );
4257    }
4258
4259    // === Range Query Tests ===
4260
4261    #[test]
4262    fn test_find_nodes_in_range_inclusive() {
4263        let store = LpgStore::new();
4264
4265        let n1 = store.create_node_with_props(&["Person"], [("age", Value::from(20i64))]);
4266        let n2 = store.create_node_with_props(&["Person"], [("age", Value::from(30i64))]);
4267        let n3 = store.create_node_with_props(&["Person"], [("age", Value::from(40i64))]);
4268        let _n4 = store.create_node_with_props(&["Person"], [("age", Value::from(50i64))]);
4269
4270        // age >= 20 AND age <= 40 (inclusive both sides)
4271        let result = store.find_nodes_in_range(
4272            "age",
4273            Some(&Value::from(20i64)),
4274            Some(&Value::from(40i64)),
4275            true,
4276            true,
4277        );
4278        assert_eq!(result.len(), 3);
4279        assert!(result.contains(&n1));
4280        assert!(result.contains(&n2));
4281        assert!(result.contains(&n3));
4282    }
4283
4284    #[test]
4285    fn test_find_nodes_in_range_exclusive() {
4286        let store = LpgStore::new();
4287
4288        store.create_node_with_props(&["Person"], [("age", Value::from(20i64))]);
4289        let n2 = store.create_node_with_props(&["Person"], [("age", Value::from(30i64))]);
4290        store.create_node_with_props(&["Person"], [("age", Value::from(40i64))]);
4291
4292        // age > 20 AND age < 40 (exclusive both sides)
4293        let result = store.find_nodes_in_range(
4294            "age",
4295            Some(&Value::from(20i64)),
4296            Some(&Value::from(40i64)),
4297            false,
4298            false,
4299        );
4300        assert_eq!(result.len(), 1);
4301        assert!(result.contains(&n2));
4302    }
4303
4304    #[test]
4305    fn test_find_nodes_in_range_open_ended() {
4306        let store = LpgStore::new();
4307
4308        store.create_node_with_props(&["Person"], [("age", Value::from(20i64))]);
4309        store.create_node_with_props(&["Person"], [("age", Value::from(30i64))]);
4310        let n3 = store.create_node_with_props(&["Person"], [("age", Value::from(40i64))]);
4311        let n4 = store.create_node_with_props(&["Person"], [("age", Value::from(50i64))]);
4312
4313        // age >= 35 (no upper bound)
4314        let result = store.find_nodes_in_range("age", Some(&Value::from(35i64)), None, true, true);
4315        assert_eq!(result.len(), 2);
4316        assert!(result.contains(&n3));
4317        assert!(result.contains(&n4));
4318
4319        // age <= 25 (no lower bound)
4320        let result = store.find_nodes_in_range("age", None, Some(&Value::from(25i64)), true, true);
4321        assert_eq!(result.len(), 1);
4322    }
4323
4324    #[test]
4325    fn test_find_nodes_in_range_empty_result() {
4326        let store = LpgStore::new();
4327
4328        store.create_node_with_props(&["Person"], [("age", Value::from(20i64))]);
4329
4330        // Range that doesn't match anything
4331        let result = store.find_nodes_in_range(
4332            "age",
4333            Some(&Value::from(100i64)),
4334            Some(&Value::from(200i64)),
4335            true,
4336            true,
4337        );
4338        assert!(result.is_empty());
4339    }
4340
4341    #[test]
4342    fn test_find_nodes_in_range_nonexistent_property() {
4343        let store = LpgStore::new();
4344
4345        store.create_node_with_props(&["Person"], [("age", Value::from(20i64))]);
4346
4347        let result = store.find_nodes_in_range(
4348            "weight",
4349            Some(&Value::from(50i64)),
4350            Some(&Value::from(100i64)),
4351            true,
4352            true,
4353        );
4354        assert!(result.is_empty());
4355    }
4356
4357    // === Multi-Property Query Tests ===
4358
4359    #[test]
4360    fn test_find_nodes_by_properties_multiple_conditions() {
4361        let store = LpgStore::new();
4362
4363        let alice = store.create_node_with_props(
4364            &["Person"],
4365            [("name", Value::from("Alice")), ("city", Value::from("NYC"))],
4366        );
4367        store.create_node_with_props(
4368            &["Person"],
4369            [("name", Value::from("Bob")), ("city", Value::from("NYC"))],
4370        );
4371        store.create_node_with_props(
4372            &["Person"],
4373            [("name", Value::from("Alice")), ("city", Value::from("LA"))],
4374        );
4375
4376        // Match name="Alice" AND city="NYC"
4377        let result = store.find_nodes_by_properties(&[
4378            ("name", Value::from("Alice")),
4379            ("city", Value::from("NYC")),
4380        ]);
4381        assert_eq!(result.len(), 1);
4382        assert!(result.contains(&alice));
4383    }
4384
4385    #[test]
4386    fn test_find_nodes_by_properties_empty_conditions() {
4387        let store = LpgStore::new();
4388
4389        store.create_node(&["Person"]);
4390        store.create_node(&["Person"]);
4391
4392        // Empty conditions should return all nodes
4393        let result = store.find_nodes_by_properties(&[]);
4394        assert_eq!(result.len(), 2);
4395    }
4396
4397    #[test]
4398    fn test_find_nodes_by_properties_no_match() {
4399        let store = LpgStore::new();
4400
4401        store.create_node_with_props(&["Person"], [("name", Value::from("Alice"))]);
4402
4403        let result = store.find_nodes_by_properties(&[("name", Value::from("Nobody"))]);
4404        assert!(result.is_empty());
4405    }
4406
4407    #[test]
4408    fn test_find_nodes_by_properties_with_index() {
4409        let store = LpgStore::new();
4410
4411        // Create index on name
4412        store.create_property_index("name");
4413
4414        let alice = store.create_node_with_props(
4415            &["Person"],
4416            [("name", Value::from("Alice")), ("age", Value::from(30i64))],
4417        );
4418        store.create_node_with_props(
4419            &["Person"],
4420            [("name", Value::from("Bob")), ("age", Value::from(30i64))],
4421        );
4422
4423        // Index should accelerate the lookup
4424        let result = store.find_nodes_by_properties(&[
4425            ("name", Value::from("Alice")),
4426            ("age", Value::from(30i64)),
4427        ]);
4428        assert_eq!(result.len(), 1);
4429        assert!(result.contains(&alice));
4430    }
4431
4432    // === Cardinality Estimation Tests ===
4433
4434    #[test]
4435    fn test_estimate_label_cardinality() {
4436        let store = LpgStore::new();
4437
4438        store.create_node(&["Person"]);
4439        store.create_node(&["Person"]);
4440        store.create_node(&["Animal"]);
4441
4442        store.ensure_statistics_fresh();
4443
4444        let person_est = store.estimate_label_cardinality("Person");
4445        let animal_est = store.estimate_label_cardinality("Animal");
4446        let unknown_est = store.estimate_label_cardinality("Unknown");
4447
4448        assert!(
4449            person_est >= 1.0,
4450            "Person should have cardinality >= 1, got {person_est}"
4451        );
4452        assert!(
4453            animal_est >= 1.0,
4454            "Animal should have cardinality >= 1, got {animal_est}"
4455        );
4456        // Unknown label should return some default (not panic)
4457        assert!(unknown_est >= 0.0);
4458    }
4459
4460    #[test]
4461    fn test_estimate_avg_degree() {
4462        let store = LpgStore::new();
4463
4464        let a = store.create_node(&["Person"]);
4465        let b = store.create_node(&["Person"]);
4466        let c = store.create_node(&["Person"]);
4467
4468        store.create_edge(a, b, "KNOWS");
4469        store.create_edge(a, c, "KNOWS");
4470        store.create_edge(b, c, "KNOWS");
4471
4472        store.ensure_statistics_fresh();
4473
4474        let outgoing = store.estimate_avg_degree("KNOWS", true);
4475        let incoming = store.estimate_avg_degree("KNOWS", false);
4476
4477        assert!(
4478            outgoing > 0.0,
4479            "Outgoing degree should be > 0, got {outgoing}"
4480        );
4481        assert!(
4482            incoming > 0.0,
4483            "Incoming degree should be > 0, got {incoming}"
4484        );
4485    }
4486
4487    // === Delete operations ===
4488
4489    #[test]
4490    fn test_delete_node_does_not_cascade() {
4491        let store = LpgStore::new();
4492
4493        let a = store.create_node(&["A"]);
4494        let b = store.create_node(&["B"]);
4495        let e = store.create_edge(a, b, "KNOWS");
4496
4497        assert!(store.delete_node(a));
4498        assert!(store.get_node(a).is_none());
4499
4500        // Edges are NOT automatically deleted (non-detach delete)
4501        assert!(
4502            store.get_edge(e).is_some(),
4503            "Edge should survive non-detach node delete"
4504        );
4505    }
4506
4507    #[test]
4508    fn test_delete_already_deleted_node() {
4509        let store = LpgStore::new();
4510        let a = store.create_node(&["A"]);
4511
4512        assert!(store.delete_node(a));
4513        // Second delete should return false (already deleted)
4514        assert!(!store.delete_node(a));
4515    }
4516
4517    #[test]
4518    fn test_delete_nonexistent_node() {
4519        let store = LpgStore::new();
4520        assert!(!store.delete_node(NodeId::new(999)));
4521    }
4522}