Skip to main content

grafeo_core/graph/lpg/
store.rs

1//! The in-memory LPG graph store.
2//!
3//! This is where your nodes and edges actually live. Most users interact
4//! through [`GrafeoDB`](grafeo_engine::GrafeoDB), but algorithm implementers
5//! sometimes need the raw [`LpgStore`] for direct adjacency traversal.
6//!
7//! Key features:
8//! - MVCC versioning - concurrent readers don't block each other
9//! - Columnar properties with zone maps for fast filtering
10//! - Forward and backward adjacency indexes
11
12use super::property::CompareOp;
13use super::{Edge, EdgeRecord, Node, NodeRecord, PropertyStorage};
14use crate::graph::Direction;
15use crate::index::adjacency::ChunkedAdjacency;
16use crate::index::zone_map::ZoneMapEntry;
17use crate::statistics::{EdgeTypeStatistics, LabelStatistics, Statistics};
18use arcstr::ArcStr;
19use dashmap::DashMap;
20#[cfg(not(feature = "tiered-storage"))]
21use grafeo_common::mvcc::VersionChain;
22use grafeo_common::types::{EdgeId, EpochId, HashableValue, NodeId, PropertyKey, TxId, Value};
23use grafeo_common::utils::hash::{FxHashMap, FxHashSet};
24use parking_lot::RwLock;
25use std::cmp::Ordering as CmpOrdering;
26#[cfg(any(
27    feature = "tiered-storage",
28    feature = "vector-index",
29    feature = "text-index"
30))]
31use std::sync::Arc;
32use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
33
34#[cfg(feature = "vector-index")]
35use crate::index::vector::HnswIndex;
36
37/// Compares two values for ordering (used for range checks).
38fn compare_values_for_range(a: &Value, b: &Value) -> Option<CmpOrdering> {
39    match (a, b) {
40        (Value::Int64(a), Value::Int64(b)) => Some(a.cmp(b)),
41        (Value::Float64(a), Value::Float64(b)) => a.partial_cmp(b),
42        (Value::String(a), Value::String(b)) => Some(a.cmp(b)),
43        (Value::Bool(a), Value::Bool(b)) => Some(a.cmp(b)),
44        _ => None,
45    }
46}
47
48/// Checks if a value is within a range.
49fn value_in_range(
50    value: &Value,
51    min: Option<&Value>,
52    max: Option<&Value>,
53    min_inclusive: bool,
54    max_inclusive: bool,
55) -> bool {
56    // Check lower bound
57    if let Some(min_val) = min {
58        match compare_values_for_range(value, min_val) {
59            Some(CmpOrdering::Less) => return false,
60            Some(CmpOrdering::Equal) if !min_inclusive => return false,
61            None => return false, // Can't compare
62            _ => {}
63        }
64    }
65
66    // Check upper bound
67    if let Some(max_val) = max {
68        match compare_values_for_range(value, max_val) {
69            Some(CmpOrdering::Greater) => return false,
70            Some(CmpOrdering::Equal) if !max_inclusive => return false,
71            None => return false,
72            _ => {}
73        }
74    }
75
76    true
77}
78
79// Tiered storage imports
80#[cfg(feature = "tiered-storage")]
81use crate::storage::EpochStore;
82#[cfg(feature = "tiered-storage")]
83use grafeo_common::memory::arena::ArenaAllocator;
84#[cfg(feature = "tiered-storage")]
85use grafeo_common::mvcc::{ColdVersionRef, HotVersionRef, VersionIndex, VersionRef};
86
87/// Configuration for the LPG store.
88///
89/// The defaults work well for most cases. Tune `backward_edges` if you only
90/// traverse in one direction (saves memory), or adjust capacities if you know
91/// your graph size upfront (avoids reallocations).
92#[derive(Debug, Clone)]
93pub struct LpgStoreConfig {
94    /// Maintain backward adjacency for incoming edge queries. Turn off if
95    /// you only traverse outgoing edges - saves ~50% adjacency memory.
96    pub backward_edges: bool,
97    /// Initial capacity for nodes (avoids early reallocations).
98    pub initial_node_capacity: usize,
99    /// Initial capacity for edges (avoids early reallocations).
100    pub initial_edge_capacity: usize,
101}
102
103impl Default for LpgStoreConfig {
104    fn default() -> Self {
105        Self {
106            backward_edges: true,
107            initial_node_capacity: 1024,
108            initial_edge_capacity: 4096,
109        }
110    }
111}
112
113/// The core in-memory graph storage.
114///
115/// Everything lives here: nodes, edges, properties, adjacency indexes, and
116/// version chains for MVCC. Concurrent reads never block each other.
117///
118/// Most users should go through `GrafeoDB` (from the `grafeo_engine` crate) which
119/// adds transaction management and query execution. Use `LpgStore` directly
120/// when you need raw performance for algorithm implementations.
121///
122/// # Example
123///
124/// ```
125/// use grafeo_core::graph::lpg::LpgStore;
126/// use grafeo_core::graph::Direction;
127///
128/// let store = LpgStore::new();
129///
130/// // Create a small social network
131/// let alice = store.create_node(&["Person"]);
132/// let bob = store.create_node(&["Person"]);
133/// store.create_edge(alice, bob, "KNOWS");
134///
135/// // Traverse outgoing edges
136/// for neighbor in store.neighbors(alice, Direction::Outgoing) {
137///     println!("Alice knows node {:?}", neighbor);
138/// }
139/// ```
140///
141/// # Lock Ordering
142///
143/// `LpgStore` contains multiple `RwLock` fields that must be acquired in a
144/// consistent order to prevent deadlocks. Always acquire locks in this order:
145///
146/// ## Level 1 - Entity Storage (mutually exclusive via feature flag)
147/// 1. `nodes` / `node_versions`
148/// 2. `edges` / `edge_versions`
149///
150/// ## Level 2 - Catalogs (acquire as pairs when writing)
151/// 3. `label_to_id` + `id_to_label`
152/// 4. `edge_type_to_id` + `id_to_edge_type`
153///
154/// ## Level 3 - Indexes
155/// 5. `label_index`
156/// 6. `node_labels`
157/// 7. `property_indexes`
158///
159/// ## Level 4 - Statistics
160/// 8. `statistics`
161///
162/// ## Level 5 - Nested Locks (internal to other structs)
163/// 9. `PropertyStorage::columns` (via `node_properties`/`edge_properties`)
164/// 10. `ChunkedAdjacency::lists` (via `forward_adj`/`backward_adj`)
165///
166/// ## Rules
167/// - Catalog pairs must be acquired together when writing.
168/// - Never hold entity locks while acquiring catalog locks in a different scope.
169/// - Statistics lock is always last.
170/// - Read locks are generally safe, but avoid read-to-write upgrades.
171pub struct LpgStore {
172    /// Configuration.
173    #[allow(dead_code)]
174    config: LpgStoreConfig,
175
176    /// Node records indexed by NodeId, with version chains for MVCC.
177    /// Used when `tiered-storage` feature is disabled.
178    /// Lock order: 1
179    #[cfg(not(feature = "tiered-storage"))]
180    nodes: RwLock<FxHashMap<NodeId, VersionChain<NodeRecord>>>,
181
182    /// Edge records indexed by EdgeId, with version chains for MVCC.
183    /// Used when `tiered-storage` feature is disabled.
184    /// Lock order: 2
185    #[cfg(not(feature = "tiered-storage"))]
186    edges: RwLock<FxHashMap<EdgeId, VersionChain<EdgeRecord>>>,
187
188    // === Tiered Storage Fields (feature-gated) ===
189    //
190    // Lock ordering for arena access:
191    //   version_lock (read/write) → arena read lock (via arena_allocator.arena())
192    //
193    // Rules:
194    // - Acquire arena read lock *after* version locks, never before.
195    // - Multiple threads may call arena.read_at() concurrently (shared refs only).
196    // - Never acquire arena write lock (alloc_new_chunk) while holding version locks.
197    // - freeze_epoch order: node_versions.read() → arena.read_at(),
198    //   then edge_versions.read() → arena.read_at().
199    /// Arena allocator for hot data storage.
200    /// Data is stored in per-epoch arenas for fast allocation and bulk deallocation.
201    #[cfg(feature = "tiered-storage")]
202    arena_allocator: Arc<ArenaAllocator>,
203
204    /// Node version indexes - store metadata and arena offsets.
205    /// The actual NodeRecord data is stored in the arena.
206    /// Lock order: 1
207    #[cfg(feature = "tiered-storage")]
208    node_versions: RwLock<FxHashMap<NodeId, VersionIndex>>,
209
210    /// Edge version indexes - store metadata and arena offsets.
211    /// The actual EdgeRecord data is stored in the arena.
212    /// Lock order: 2
213    #[cfg(feature = "tiered-storage")]
214    edge_versions: RwLock<FxHashMap<EdgeId, VersionIndex>>,
215
216    /// Cold storage for frozen epochs.
217    /// Contains compressed epoch blocks for historical data.
218    #[cfg(feature = "tiered-storage")]
219    epoch_store: Arc<EpochStore>,
220
221    /// Property storage for nodes.
222    node_properties: PropertyStorage<NodeId>,
223
224    /// Property storage for edges.
225    edge_properties: PropertyStorage<EdgeId>,
226
227    /// Label name to ID mapping.
228    /// Lock order: 3 (acquire with id_to_label)
229    label_to_id: RwLock<FxHashMap<ArcStr, u32>>,
230
231    /// Label ID to name mapping.
232    /// Lock order: 3 (acquire with label_to_id)
233    id_to_label: RwLock<Vec<ArcStr>>,
234
235    /// Edge type name to ID mapping.
236    /// Lock order: 4 (acquire with id_to_edge_type)
237    edge_type_to_id: RwLock<FxHashMap<ArcStr, u32>>,
238
239    /// Edge type ID to name mapping.
240    /// Lock order: 4 (acquire with edge_type_to_id)
241    id_to_edge_type: RwLock<Vec<ArcStr>>,
242
243    /// Forward adjacency lists (outgoing edges).
244    forward_adj: ChunkedAdjacency,
245
246    /// Backward adjacency lists (incoming edges).
247    /// Only populated if config.backward_edges is true.
248    backward_adj: Option<ChunkedAdjacency>,
249
250    /// Label index: label_id -> set of node IDs.
251    /// Lock order: 5
252    label_index: RwLock<Vec<FxHashMap<NodeId, ()>>>,
253
254    /// Node labels: node_id -> set of label IDs.
255    /// Reverse mapping to efficiently get labels for a node.
256    /// Lock order: 6
257    node_labels: RwLock<FxHashMap<NodeId, FxHashSet<u32>>>,
258
259    /// Property indexes: property_key -> (value -> set of node IDs).
260    ///
261    /// When a property is indexed, lookups by value are O(1) instead of O(n).
262    /// Use [`create_property_index`] to enable indexing for a property.
263    /// Lock order: 7
264    property_indexes: RwLock<FxHashMap<PropertyKey, DashMap<HashableValue, FxHashSet<NodeId>>>>,
265
266    /// Vector indexes: "label:property" -> HNSW index.
267    ///
268    /// Created via [`GrafeoDB::create_vector_index`](grafeo_engine::GrafeoDB::create_vector_index).
269    /// Lock order: 7 (same level as property_indexes, disjoint keys)
270    #[cfg(feature = "vector-index")]
271    vector_indexes: RwLock<FxHashMap<String, Arc<HnswIndex>>>,
272
273    /// Text indexes: "label:property" -> inverted index with BM25 scoring.
274    ///
275    /// Created via [`GrafeoDB::create_text_index`](grafeo_engine::GrafeoDB::create_text_index).
276    /// Lock order: 7 (same level as property_indexes, disjoint keys)
277    #[cfg(feature = "text-index")]
278    text_indexes: RwLock<FxHashMap<String, Arc<RwLock<crate::index::text::InvertedIndex>>>>,
279
280    /// Next node ID.
281    next_node_id: AtomicU64,
282
283    /// Next edge ID.
284    next_edge_id: AtomicU64,
285
286    /// Current epoch.
287    current_epoch: AtomicU64,
288
289    /// Statistics for cost-based optimization.
290    /// Lock order: 8 (always last)
291    statistics: RwLock<Statistics>,
292
293    /// Whether statistics need recomputation after mutations.
294    needs_stats_recompute: AtomicBool,
295}
296
297impl LpgStore {
298    /// Creates a new LPG store with default configuration.
299    #[must_use]
300    pub fn new() -> Self {
301        Self::with_config(LpgStoreConfig::default())
302    }
303
304    /// Creates a new LPG store with custom configuration.
305    #[must_use]
306    pub fn with_config(config: LpgStoreConfig) -> Self {
307        let backward_adj = if config.backward_edges {
308            Some(ChunkedAdjacency::new())
309        } else {
310            None
311        };
312
313        Self {
314            #[cfg(not(feature = "tiered-storage"))]
315            nodes: RwLock::new(FxHashMap::default()),
316            #[cfg(not(feature = "tiered-storage"))]
317            edges: RwLock::new(FxHashMap::default()),
318            #[cfg(feature = "tiered-storage")]
319            arena_allocator: Arc::new(ArenaAllocator::new()),
320            #[cfg(feature = "tiered-storage")]
321            node_versions: RwLock::new(FxHashMap::default()),
322            #[cfg(feature = "tiered-storage")]
323            edge_versions: RwLock::new(FxHashMap::default()),
324            #[cfg(feature = "tiered-storage")]
325            epoch_store: Arc::new(EpochStore::new()),
326            node_properties: PropertyStorage::new(),
327            edge_properties: PropertyStorage::new(),
328            label_to_id: RwLock::new(FxHashMap::default()),
329            id_to_label: RwLock::new(Vec::new()),
330            edge_type_to_id: RwLock::new(FxHashMap::default()),
331            id_to_edge_type: RwLock::new(Vec::new()),
332            forward_adj: ChunkedAdjacency::new(),
333            backward_adj,
334            label_index: RwLock::new(Vec::new()),
335            node_labels: RwLock::new(FxHashMap::default()),
336            property_indexes: RwLock::new(FxHashMap::default()),
337            #[cfg(feature = "vector-index")]
338            vector_indexes: RwLock::new(FxHashMap::default()),
339            #[cfg(feature = "text-index")]
340            text_indexes: RwLock::new(FxHashMap::default()),
341            next_node_id: AtomicU64::new(0),
342            next_edge_id: AtomicU64::new(0),
343            current_epoch: AtomicU64::new(0),
344            statistics: RwLock::new(Statistics::new()),
345            needs_stats_recompute: AtomicBool::new(true),
346            config,
347        }
348    }
349
350    /// Returns the current epoch.
351    #[must_use]
352    pub fn current_epoch(&self) -> EpochId {
353        EpochId::new(self.current_epoch.load(Ordering::Acquire))
354    }
355
356    /// Creates a new epoch.
357    pub fn new_epoch(&self) -> EpochId {
358        let id = self.current_epoch.fetch_add(1, Ordering::AcqRel) + 1;
359        EpochId::new(id)
360    }
361
362    // === Node Operations ===
363
364    /// Creates a new node with the given labels.
365    ///
366    /// Uses the system transaction for non-transactional operations.
367    pub fn create_node(&self, labels: &[&str]) -> NodeId {
368        self.needs_stats_recompute.store(true, Ordering::Relaxed);
369        self.create_node_versioned(labels, self.current_epoch(), TxId::SYSTEM)
370    }
371
372    /// Creates a new node with the given labels within a transaction context.
373    #[cfg(not(feature = "tiered-storage"))]
374    pub fn create_node_versioned(&self, labels: &[&str], epoch: EpochId, tx_id: TxId) -> NodeId {
375        let id = NodeId::new(self.next_node_id.fetch_add(1, Ordering::Relaxed));
376
377        let mut record = NodeRecord::new(id, epoch);
378        record.set_label_count(labels.len() as u16);
379
380        // Store labels in node_labels map and label_index
381        let mut node_label_set = FxHashSet::default();
382        for label in labels {
383            let label_id = self.get_or_create_label_id(*label);
384            node_label_set.insert(label_id);
385
386            // Update label index
387            let mut index = self.label_index.write();
388            while index.len() <= label_id as usize {
389                index.push(FxHashMap::default());
390            }
391            index[label_id as usize].insert(id, ());
392        }
393
394        // Store node's labels
395        self.node_labels.write().insert(id, node_label_set);
396
397        // Create version chain with initial version
398        let chain = VersionChain::with_initial(record, epoch, tx_id);
399        self.nodes.write().insert(id, chain);
400        id
401    }
402
403    /// Creates a new node with the given labels within a transaction context.
404    /// (Tiered storage version: stores data in arena, metadata in VersionIndex)
405    #[cfg(feature = "tiered-storage")]
406    pub fn create_node_versioned(&self, labels: &[&str], epoch: EpochId, tx_id: TxId) -> NodeId {
407        let id = NodeId::new(self.next_node_id.fetch_add(1, Ordering::Relaxed));
408
409        let mut record = NodeRecord::new(id, epoch);
410        record.set_label_count(labels.len() as u16);
411
412        // Store labels in node_labels map and label_index
413        let mut node_label_set = FxHashSet::default();
414        for label in labels {
415            let label_id = self.get_or_create_label_id(*label);
416            node_label_set.insert(label_id);
417
418            // Update label index
419            let mut index = self.label_index.write();
420            while index.len() <= label_id as usize {
421                index.push(FxHashMap::default());
422            }
423            index[label_id as usize].insert(id, ());
424        }
425
426        // Store node's labels
427        self.node_labels.write().insert(id, node_label_set);
428
429        // Allocate record in arena and get offset (create epoch if needed)
430        let arena = self.arena_allocator.arena_or_create(epoch);
431        let (offset, _stored) = arena.alloc_value_with_offset(record);
432
433        // Create HotVersionRef pointing to arena data
434        let hot_ref = HotVersionRef::new(epoch, offset, tx_id);
435
436        // Create or update version index
437        let mut versions = self.node_versions.write();
438        if let Some(index) = versions.get_mut(&id) {
439            index.add_hot(hot_ref);
440        } else {
441            versions.insert(id, VersionIndex::with_initial(hot_ref));
442        }
443
444        id
445    }
446
447    /// Creates a new node with labels and properties.
448    pub fn create_node_with_props(
449        &self,
450        labels: &[&str],
451        properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
452    ) -> NodeId {
453        self.create_node_with_props_versioned(
454            labels,
455            properties,
456            self.current_epoch(),
457            TxId::SYSTEM,
458        )
459    }
460
461    /// Creates a new node with labels and properties within a transaction context.
462    #[cfg(not(feature = "tiered-storage"))]
463    pub fn create_node_with_props_versioned(
464        &self,
465        labels: &[&str],
466        properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
467        epoch: EpochId,
468        tx_id: TxId,
469    ) -> NodeId {
470        let id = self.create_node_versioned(labels, epoch, tx_id);
471
472        for (key, value) in properties {
473            let prop_key: PropertyKey = key.into();
474            let prop_value: Value = value.into();
475            // Update property index before setting the property
476            self.update_property_index_on_set(id, &prop_key, &prop_value);
477            self.node_properties.set(id, prop_key, prop_value);
478        }
479
480        // Update props_count in record
481        let count = self.node_properties.get_all(id).len() as u16;
482        if let Some(chain) = self.nodes.write().get_mut(&id)
483            && let Some(record) = chain.latest_mut()
484        {
485            record.props_count = count;
486        }
487
488        id
489    }
490
491    /// Creates a new node with labels and properties within a transaction context.
492    /// (Tiered storage version)
493    #[cfg(feature = "tiered-storage")]
494    pub fn create_node_with_props_versioned(
495        &self,
496        labels: &[&str],
497        properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
498        epoch: EpochId,
499        tx_id: TxId,
500    ) -> NodeId {
501        let id = self.create_node_versioned(labels, epoch, tx_id);
502
503        for (key, value) in properties {
504            let prop_key: PropertyKey = key.into();
505            let prop_value: Value = value.into();
506            // Update property index before setting the property
507            self.update_property_index_on_set(id, &prop_key, &prop_value);
508            self.node_properties.set(id, prop_key, prop_value);
509        }
510
511        // Note: props_count in record is not updated for tiered storage.
512        // The record is immutable once allocated in the arena.
513
514        id
515    }
516
517    /// Gets a node by ID (latest visible version).
518    #[must_use]
519    pub fn get_node(&self, id: NodeId) -> Option<Node> {
520        self.get_node_at_epoch(id, self.current_epoch())
521    }
522
523    /// Gets a node by ID at a specific epoch.
524    #[must_use]
525    #[cfg(not(feature = "tiered-storage"))]
526    pub fn get_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> Option<Node> {
527        let nodes = self.nodes.read();
528        let chain = nodes.get(&id)?;
529        let record = chain.visible_at(epoch)?;
530
531        if record.is_deleted() {
532            return None;
533        }
534
535        let mut node = Node::new(id);
536
537        // Get labels from node_labels map
538        let id_to_label = self.id_to_label.read();
539        let node_labels = self.node_labels.read();
540        if let Some(label_ids) = node_labels.get(&id) {
541            for &label_id in label_ids {
542                if let Some(label) = id_to_label.get(label_id as usize) {
543                    node.labels.push(label.clone());
544                }
545            }
546        }
547
548        // Get properties
549        node.properties = self.node_properties.get_all(id).into_iter().collect();
550
551        Some(node)
552    }
553
554    /// Gets a node by ID at a specific epoch.
555    /// (Tiered storage version: reads from arena via VersionIndex)
556    #[must_use]
557    #[cfg(feature = "tiered-storage")]
558    pub fn get_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> Option<Node> {
559        let versions = self.node_versions.read();
560        let index = versions.get(&id)?;
561        let version_ref = index.visible_at(epoch)?;
562
563        // Read the record from arena
564        let record = self.read_node_record(&version_ref)?;
565
566        if record.is_deleted() {
567            return None;
568        }
569
570        let mut node = Node::new(id);
571
572        // Get labels from node_labels map
573        let id_to_label = self.id_to_label.read();
574        let node_labels = self.node_labels.read();
575        if let Some(label_ids) = node_labels.get(&id) {
576            for &label_id in label_ids {
577                if let Some(label) = id_to_label.get(label_id as usize) {
578                    node.labels.push(label.clone());
579                }
580            }
581        }
582
583        // Get properties
584        node.properties = self.node_properties.get_all(id).into_iter().collect();
585
586        Some(node)
587    }
588
589    /// Gets a node visible to a specific transaction.
590    #[must_use]
591    #[cfg(not(feature = "tiered-storage"))]
592    pub fn get_node_versioned(&self, id: NodeId, epoch: EpochId, tx_id: TxId) -> Option<Node> {
593        let nodes = self.nodes.read();
594        let chain = nodes.get(&id)?;
595        let record = chain.visible_to(epoch, tx_id)?;
596
597        if record.is_deleted() {
598            return None;
599        }
600
601        let mut node = Node::new(id);
602
603        // Get labels from node_labels map
604        let id_to_label = self.id_to_label.read();
605        let node_labels = self.node_labels.read();
606        if let Some(label_ids) = node_labels.get(&id) {
607            for &label_id in label_ids {
608                if let Some(label) = id_to_label.get(label_id as usize) {
609                    node.labels.push(label.clone());
610                }
611            }
612        }
613
614        // Get properties
615        node.properties = self.node_properties.get_all(id).into_iter().collect();
616
617        Some(node)
618    }
619
620    /// Gets a node visible to a specific transaction.
621    /// (Tiered storage version: reads from arena via VersionIndex)
622    #[must_use]
623    #[cfg(feature = "tiered-storage")]
624    pub fn get_node_versioned(&self, id: NodeId, epoch: EpochId, tx_id: TxId) -> Option<Node> {
625        let versions = self.node_versions.read();
626        let index = versions.get(&id)?;
627        let version_ref = index.visible_to(epoch, tx_id)?;
628
629        // Read the record from arena
630        let record = self.read_node_record(&version_ref)?;
631
632        if record.is_deleted() {
633            return None;
634        }
635
636        let mut node = Node::new(id);
637
638        // Get labels from node_labels map
639        let id_to_label = self.id_to_label.read();
640        let node_labels = self.node_labels.read();
641        if let Some(label_ids) = node_labels.get(&id) {
642            for &label_id in label_ids {
643                if let Some(label) = id_to_label.get(label_id as usize) {
644                    node.labels.push(label.clone());
645                }
646            }
647        }
648
649        // Get properties
650        node.properties = self.node_properties.get_all(id).into_iter().collect();
651
652        Some(node)
653    }
654
655    /// Reads a NodeRecord from arena (hot) or epoch store (cold) using a VersionRef.
656    #[cfg(feature = "tiered-storage")]
657    #[allow(unsafe_code)]
658    fn read_node_record(&self, version_ref: &VersionRef) -> Option<NodeRecord> {
659        match version_ref {
660            VersionRef::Hot(hot_ref) => {
661                let arena = self.arena_allocator.arena(hot_ref.epoch);
662                // SAFETY: The offset was returned by alloc_value_with_offset for a NodeRecord
663                let record: &NodeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
664                Some(*record)
665            }
666            VersionRef::Cold(cold_ref) => {
667                // Read from compressed epoch store
668                self.epoch_store
669                    .get_node(cold_ref.epoch, cold_ref.block_offset, cold_ref.length)
670            }
671        }
672    }
673
674    /// Deletes a node and all its edges (using latest epoch).
675    pub fn delete_node(&self, id: NodeId) -> bool {
676        self.needs_stats_recompute.store(true, Ordering::Relaxed);
677        self.delete_node_at_epoch(id, self.current_epoch())
678    }
679
680    /// Deletes a node at a specific epoch.
681    #[cfg(not(feature = "tiered-storage"))]
682    pub fn delete_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> bool {
683        let mut nodes = self.nodes.write();
684        if let Some(chain) = nodes.get_mut(&id) {
685            // Check if visible at this epoch (not already deleted)
686            if let Some(record) = chain.visible_at(epoch) {
687                if record.is_deleted() {
688                    return false;
689                }
690            } else {
691                // Not visible at this epoch (already deleted or doesn't exist)
692                return false;
693            }
694
695            // Mark the version chain as deleted at this epoch
696            chain.mark_deleted(epoch);
697
698            // Remove from label index using node_labels map
699            let mut index = self.label_index.write();
700            let mut node_labels = self.node_labels.write();
701            if let Some(label_ids) = node_labels.remove(&id) {
702                for label_id in label_ids {
703                    if let Some(set) = index.get_mut(label_id as usize) {
704                        set.remove(&id);
705                    }
706                }
707            }
708
709            // Remove properties
710            drop(nodes); // Release lock before removing properties
711            drop(index);
712            drop(node_labels);
713            self.node_properties.remove_all(id);
714
715            // Note: Caller should use delete_node_edges() first if detach is needed
716
717            true
718        } else {
719            false
720        }
721    }
722
723    /// Deletes a node at a specific epoch.
724    /// (Tiered storage version)
725    #[cfg(feature = "tiered-storage")]
726    pub fn delete_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> bool {
727        let mut versions = self.node_versions.write();
728        if let Some(index) = versions.get_mut(&id) {
729            // Check if visible at this epoch
730            if let Some(version_ref) = index.visible_at(epoch) {
731                if let Some(record) = self.read_node_record(&version_ref) {
732                    if record.is_deleted() {
733                        return false;
734                    }
735                } else {
736                    return false;
737                }
738            } else {
739                return false;
740            }
741
742            // Mark as deleted in version index
743            index.mark_deleted(epoch);
744
745            // Remove from label index using node_labels map
746            let mut label_index = self.label_index.write();
747            let mut node_labels = self.node_labels.write();
748            if let Some(label_ids) = node_labels.remove(&id) {
749                for label_id in label_ids {
750                    if let Some(set) = label_index.get_mut(label_id as usize) {
751                        set.remove(&id);
752                    }
753                }
754            }
755
756            // Remove properties
757            drop(versions);
758            drop(label_index);
759            drop(node_labels);
760            self.node_properties.remove_all(id);
761
762            true
763        } else {
764            false
765        }
766    }
767
768    /// Deletes all edges connected to a node (implements DETACH DELETE).
769    ///
770    /// Call this before `delete_node()` if you want to remove a node that
771    /// has edges. Grafeo doesn't auto-delete edges - you have to be explicit.
772    #[cfg(not(feature = "tiered-storage"))]
773    pub fn delete_node_edges(&self, node_id: NodeId) {
774        // Get outgoing edges
775        let outgoing: Vec<EdgeId> = self
776            .forward_adj
777            .edges_from(node_id)
778            .into_iter()
779            .map(|(_, edge_id)| edge_id)
780            .collect();
781
782        // Get incoming edges
783        let incoming: Vec<EdgeId> = if let Some(ref backward) = self.backward_adj {
784            backward
785                .edges_from(node_id)
786                .into_iter()
787                .map(|(_, edge_id)| edge_id)
788                .collect()
789        } else {
790            // No backward adjacency - scan all edges
791            let epoch = self.current_epoch();
792            self.edges
793                .read()
794                .iter()
795                .filter_map(|(id, chain)| {
796                    chain.visible_at(epoch).and_then(|r| {
797                        if !r.is_deleted() && r.dst == node_id {
798                            Some(*id)
799                        } else {
800                            None
801                        }
802                    })
803                })
804                .collect()
805        };
806
807        // Delete all edges
808        for edge_id in outgoing.into_iter().chain(incoming) {
809            self.delete_edge(edge_id);
810        }
811    }
812
813    /// Deletes all edges connected to a node (implements DETACH DELETE).
814    /// (Tiered storage version)
815    #[cfg(feature = "tiered-storage")]
816    pub fn delete_node_edges(&self, node_id: NodeId) {
817        // Get outgoing edges
818        let outgoing: Vec<EdgeId> = self
819            .forward_adj
820            .edges_from(node_id)
821            .into_iter()
822            .map(|(_, edge_id)| edge_id)
823            .collect();
824
825        // Get incoming edges
826        let incoming: Vec<EdgeId> = if let Some(ref backward) = self.backward_adj {
827            backward
828                .edges_from(node_id)
829                .into_iter()
830                .map(|(_, edge_id)| edge_id)
831                .collect()
832        } else {
833            // No backward adjacency - scan all edges
834            let epoch = self.current_epoch();
835            let versions = self.edge_versions.read();
836            versions
837                .iter()
838                .filter_map(|(id, index)| {
839                    index.visible_at(epoch).and_then(|vref| {
840                        self.read_edge_record(&vref).and_then(|r| {
841                            if !r.is_deleted() && r.dst == node_id {
842                                Some(*id)
843                            } else {
844                                None
845                            }
846                        })
847                    })
848                })
849                .collect()
850        };
851
852        // Delete all edges
853        for edge_id in outgoing.into_iter().chain(incoming) {
854            self.delete_edge(edge_id);
855        }
856    }
857
858    /// Sets a property on a node.
859    #[cfg(not(feature = "tiered-storage"))]
860    pub fn set_node_property(&self, id: NodeId, key: &str, value: Value) {
861        let prop_key: PropertyKey = key.into();
862
863        // Update property index before setting the property (needs to read old value)
864        self.update_property_index_on_set(id, &prop_key, &value);
865
866        self.node_properties.set(id, prop_key, value);
867
868        // Update props_count in record
869        let count = self.node_properties.get_all(id).len() as u16;
870        if let Some(chain) = self.nodes.write().get_mut(&id)
871            && let Some(record) = chain.latest_mut()
872        {
873            record.props_count = count;
874        }
875    }
876
877    /// Sets a property on a node.
878    /// (Tiered storage version: properties stored separately, record is immutable)
879    #[cfg(feature = "tiered-storage")]
880    pub fn set_node_property(&self, id: NodeId, key: &str, value: Value) {
881        let prop_key: PropertyKey = key.into();
882
883        // Update property index before setting the property (needs to read old value)
884        self.update_property_index_on_set(id, &prop_key, &value);
885
886        self.node_properties.set(id, prop_key, value);
887        // Note: props_count in record is not updated for tiered storage.
888        // The record is immutable once allocated in the arena.
889        // Property count can be derived from PropertyStorage if needed.
890    }
891
892    /// Sets a property on an edge.
893    pub fn set_edge_property(&self, id: EdgeId, key: &str, value: Value) {
894        self.edge_properties.set(id, key.into(), value);
895    }
896
897    /// Removes a property from a node.
898    ///
899    /// Returns the previous value if it existed, or None if the property didn't exist.
900    #[cfg(not(feature = "tiered-storage"))]
901    pub fn remove_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
902        let prop_key: PropertyKey = key.into();
903
904        // Update property index before removing (needs to read old value)
905        self.update_property_index_on_remove(id, &prop_key);
906
907        let result = self.node_properties.remove(id, &prop_key);
908
909        // Update props_count in record
910        let count = self.node_properties.get_all(id).len() as u16;
911        if let Some(chain) = self.nodes.write().get_mut(&id)
912            && let Some(record) = chain.latest_mut()
913        {
914            record.props_count = count;
915        }
916
917        result
918    }
919
920    /// Removes a property from a node.
921    /// (Tiered storage version)
922    #[cfg(feature = "tiered-storage")]
923    pub fn remove_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
924        let prop_key: PropertyKey = key.into();
925
926        // Update property index before removing (needs to read old value)
927        self.update_property_index_on_remove(id, &prop_key);
928
929        self.node_properties.remove(id, &prop_key)
930        // Note: props_count in record is not updated for tiered storage.
931    }
932
933    /// Removes a property from an edge.
934    ///
935    /// Returns the previous value if it existed, or None if the property didn't exist.
936    pub fn remove_edge_property(&self, id: EdgeId, key: &str) -> Option<Value> {
937        self.edge_properties.remove(id, &key.into())
938    }
939
940    /// Gets a single property from a node without loading all properties.
941    ///
942    /// This is O(1) vs O(properties) for `get_node().get_property()`.
943    /// Use this for filter predicates where you only need one property value.
944    ///
945    /// # Example
946    ///
947    /// ```ignore
948    /// // Fast: Direct single-property lookup
949    /// let age = store.get_node_property(node_id, "age");
950    ///
951    /// // Slow: Loads all properties, then extracts one
952    /// let age = store.get_node(node_id).and_then(|n| n.get_property("age").cloned());
953    /// ```
954    #[must_use]
955    pub fn get_node_property(&self, id: NodeId, key: &PropertyKey) -> Option<Value> {
956        self.node_properties.get(id, key)
957    }
958
959    /// Gets a single property from an edge without loading all properties.
960    ///
961    /// This is O(1) vs O(properties) for `get_edge().get_property()`.
962    #[must_use]
963    pub fn get_edge_property(&self, id: EdgeId, key: &PropertyKey) -> Option<Value> {
964        self.edge_properties.get(id, key)
965    }
966
967    // === Batch Property Operations ===
968
969    /// Gets a property for multiple nodes in a single batch operation.
970    ///
971    /// More efficient than calling [`Self::get_node_property`] in a loop because it
972    /// reduces lock overhead and enables better cache utilization.
973    ///
974    /// # Example
975    ///
976    /// ```
977    /// use grafeo_core::graph::lpg::LpgStore;
978    /// use grafeo_common::types::{NodeId, PropertyKey, Value};
979    ///
980    /// let store = LpgStore::new();
981    /// let n1 = store.create_node(&["Person"]);
982    /// let n2 = store.create_node(&["Person"]);
983    /// store.set_node_property(n1, "age", Value::from(25i64));
984    /// store.set_node_property(n2, "age", Value::from(30i64));
985    ///
986    /// let ages = store.get_node_property_batch(&[n1, n2], &PropertyKey::new("age"));
987    /// assert_eq!(ages, vec![Some(Value::from(25i64)), Some(Value::from(30i64))]);
988    /// ```
989    #[must_use]
990    pub fn get_node_property_batch(&self, ids: &[NodeId], key: &PropertyKey) -> Vec<Option<Value>> {
991        self.node_properties.get_batch(ids, key)
992    }
993
994    /// Gets all properties for multiple nodes in a single batch operation.
995    ///
996    /// Returns a vector of property maps, one per node ID (empty map if no properties).
997    /// More efficient than calling [`Self::get_node`] in a loop.
998    #[must_use]
999    pub fn get_nodes_properties_batch(&self, ids: &[NodeId]) -> Vec<FxHashMap<PropertyKey, Value>> {
1000        self.node_properties.get_all_batch(ids)
1001    }
1002
1003    /// Gets selected properties for multiple nodes (projection pushdown).
1004    ///
1005    /// This is more efficient than [`Self::get_nodes_properties_batch`] when you only
1006    /// need a subset of properties. It only iterates the requested columns instead of
1007    /// all columns.
1008    ///
1009    /// **Use this for**: Queries with explicit projections like `RETURN n.name, n.age`
1010    /// instead of `RETURN n` (which requires all properties).
1011    ///
1012    /// # Example
1013    ///
1014    /// ```
1015    /// use grafeo_core::graph::lpg::LpgStore;
1016    /// use grafeo_common::types::{PropertyKey, Value};
1017    ///
1018    /// let store = LpgStore::new();
1019    /// let n1 = store.create_node(&["Person"]);
1020    /// store.set_node_property(n1, "name", Value::from("Alice"));
1021    /// store.set_node_property(n1, "age", Value::from(30i64));
1022    /// store.set_node_property(n1, "email", Value::from("alice@example.com"));
1023    ///
1024    /// // Only fetch name and age (faster than get_nodes_properties_batch)
1025    /// let keys = vec![PropertyKey::new("name"), PropertyKey::new("age")];
1026    /// let props = store.get_nodes_properties_selective_batch(&[n1], &keys);
1027    ///
1028    /// assert_eq!(props[0].len(), 2); // Only name and age, not email
1029    /// ```
1030    #[must_use]
1031    pub fn get_nodes_properties_selective_batch(
1032        &self,
1033        ids: &[NodeId],
1034        keys: &[PropertyKey],
1035    ) -> Vec<FxHashMap<PropertyKey, Value>> {
1036        self.node_properties.get_selective_batch(ids, keys)
1037    }
1038
1039    /// Gets selected properties for multiple edges (projection pushdown).
1040    ///
1041    /// Edge-property version of [`Self::get_nodes_properties_selective_batch`].
1042    #[must_use]
1043    pub fn get_edges_properties_selective_batch(
1044        &self,
1045        ids: &[EdgeId],
1046        keys: &[PropertyKey],
1047    ) -> Vec<FxHashMap<PropertyKey, Value>> {
1048        self.edge_properties.get_selective_batch(ids, keys)
1049    }
1050
1051    /// Finds nodes where a property value is in a range.
1052    ///
1053    /// This is useful for queries like `n.age > 30` or `n.price BETWEEN 10 AND 100`.
1054    /// Uses zone maps to skip scanning when the range definitely doesn't match.
1055    ///
1056    /// # Arguments
1057    ///
1058    /// * `property` - The property to check
1059    /// * `min` - Optional lower bound (None for unbounded)
1060    /// * `max` - Optional upper bound (None for unbounded)
1061    /// * `min_inclusive` - Whether lower bound is inclusive (>= vs >)
1062    /// * `max_inclusive` - Whether upper bound is inclusive (<= vs <)
1063    ///
1064    /// # Example
1065    ///
1066    /// ```
1067    /// use grafeo_core::graph::lpg::LpgStore;
1068    /// use grafeo_common::types::Value;
1069    ///
1070    /// let store = LpgStore::new();
1071    /// let n1 = store.create_node(&["Person"]);
1072    /// let n2 = store.create_node(&["Person"]);
1073    /// store.set_node_property(n1, "age", Value::from(25i64));
1074    /// store.set_node_property(n2, "age", Value::from(35i64));
1075    ///
1076    /// // Find nodes where age > 30
1077    /// let result = store.find_nodes_in_range(
1078    ///     "age",
1079    ///     Some(&Value::from(30i64)),
1080    ///     None,
1081    ///     false, // exclusive lower bound
1082    ///     true,  // inclusive upper bound (doesn't matter since None)
1083    /// );
1084    /// assert_eq!(result.len(), 1); // Only n2 matches
1085    /// ```
1086    #[must_use]
1087    pub fn find_nodes_in_range(
1088        &self,
1089        property: &str,
1090        min: Option<&Value>,
1091        max: Option<&Value>,
1092        min_inclusive: bool,
1093        max_inclusive: bool,
1094    ) -> Vec<NodeId> {
1095        let key = PropertyKey::new(property);
1096
1097        // Check zone map first - if no values could match, return empty
1098        if !self
1099            .node_properties
1100            .might_match_range(&key, min, max, min_inclusive, max_inclusive)
1101        {
1102            return Vec::new();
1103        }
1104
1105        // Scan all nodes and filter by range
1106        self.node_ids()
1107            .into_iter()
1108            .filter(|&node_id| {
1109                self.node_properties
1110                    .get(node_id, &key)
1111                    .is_some_and(|v| value_in_range(&v, min, max, min_inclusive, max_inclusive))
1112            })
1113            .collect()
1114    }
1115
1116    /// Finds nodes matching multiple property equality conditions.
1117    ///
1118    /// This is more efficient than intersecting multiple single-property lookups
1119    /// because it can use indexes when available and short-circuits on the first
1120    /// miss.
1121    ///
1122    /// # Example
1123    ///
1124    /// ```
1125    /// use grafeo_core::graph::lpg::LpgStore;
1126    /// use grafeo_common::types::Value;
1127    ///
1128    /// let store = LpgStore::new();
1129    /// let alice = store.create_node(&["Person"]);
1130    /// store.set_node_property(alice, "name", Value::from("Alice"));
1131    /// store.set_node_property(alice, "city", Value::from("NYC"));
1132    ///
1133    /// // Find nodes where name = "Alice" AND city = "NYC"
1134    /// let matches = store.find_nodes_by_properties(&[
1135    ///     ("name", Value::from("Alice")),
1136    ///     ("city", Value::from("NYC")),
1137    /// ]);
1138    /// assert!(matches.contains(&alice));
1139    /// ```
1140    #[must_use]
1141    pub fn find_nodes_by_properties(&self, conditions: &[(&str, Value)]) -> Vec<NodeId> {
1142        if conditions.is_empty() {
1143            return self.node_ids();
1144        }
1145
1146        // Find the most selective condition (smallest result set) to start
1147        // If any condition has an index, use that first
1148        let mut best_start: Option<(usize, Vec<NodeId>)> = None;
1149        let indexes = self.property_indexes.read();
1150
1151        for (i, (prop, value)) in conditions.iter().enumerate() {
1152            let key = PropertyKey::new(*prop);
1153            let hv = HashableValue::new(value.clone());
1154
1155            if let Some(index) = indexes.get(&key) {
1156                let matches: Vec<NodeId> = index
1157                    .get(&hv)
1158                    .map(|nodes| nodes.iter().copied().collect())
1159                    .unwrap_or_default();
1160
1161                // Short-circuit if any indexed condition has no matches
1162                if matches.is_empty() {
1163                    return Vec::new();
1164                }
1165
1166                // Use smallest indexed result as starting point
1167                if best_start
1168                    .as_ref()
1169                    .is_none_or(|(_, best)| matches.len() < best.len())
1170                {
1171                    best_start = Some((i, matches));
1172                }
1173            }
1174        }
1175        drop(indexes);
1176
1177        // Start from best indexed result or fall back to full node scan
1178        let (start_idx, mut candidates) = best_start.unwrap_or_else(|| {
1179            // No indexes available, start with first condition via full scan
1180            let (prop, value) = &conditions[0];
1181            (0, self.find_nodes_by_property(prop, value))
1182        });
1183
1184        // Filter candidates through remaining conditions
1185        for (i, (prop, value)) in conditions.iter().enumerate() {
1186            if i == start_idx {
1187                continue;
1188            }
1189
1190            let key = PropertyKey::new(*prop);
1191            candidates.retain(|&node_id| {
1192                self.node_properties
1193                    .get(node_id, &key)
1194                    .is_some_and(|v| v == *value)
1195            });
1196
1197            // Short-circuit if no candidates remain
1198            if candidates.is_empty() {
1199                return Vec::new();
1200            }
1201        }
1202
1203        candidates
1204    }
1205
1206    // === Property Index Operations ===
1207
1208    /// Creates an index on a node property for O(1) lookups by value.
1209    ///
1210    /// After creating an index, calls to [`Self::find_nodes_by_property`] will be
1211    /// O(1) instead of O(n) for this property. The index is automatically
1212    /// maintained when properties are set or removed.
1213    ///
1214    /// # Example
1215    ///
1216    /// ```
1217    /// use grafeo_core::graph::lpg::LpgStore;
1218    /// use grafeo_common::types::Value;
1219    ///
1220    /// let store = LpgStore::new();
1221    ///
1222    /// // Create nodes with an 'id' property
1223    /// let alice = store.create_node(&["Person"]);
1224    /// store.set_node_property(alice, "id", Value::from("alice_123"));
1225    ///
1226    /// // Create an index on the 'id' property
1227    /// store.create_property_index("id");
1228    ///
1229    /// // Now lookups by 'id' are O(1)
1230    /// let found = store.find_nodes_by_property("id", &Value::from("alice_123"));
1231    /// assert!(found.contains(&alice));
1232    /// ```
1233    pub fn create_property_index(&self, property: &str) {
1234        let key = PropertyKey::new(property);
1235
1236        let mut indexes = self.property_indexes.write();
1237        if indexes.contains_key(&key) {
1238            return; // Already indexed
1239        }
1240
1241        // Create the index and populate it with existing data
1242        let index: DashMap<HashableValue, FxHashSet<NodeId>> = DashMap::new();
1243
1244        // Scan all nodes to build the index
1245        for node_id in self.node_ids() {
1246            if let Some(value) = self.node_properties.get(node_id, &key) {
1247                let hv = HashableValue::new(value);
1248                index.entry(hv).or_default().insert(node_id);
1249            }
1250        }
1251
1252        indexes.insert(key, index);
1253    }
1254
1255    /// Drops an index on a node property.
1256    ///
1257    /// Returns `true` if the index existed and was removed.
1258    pub fn drop_property_index(&self, property: &str) -> bool {
1259        let key = PropertyKey::new(property);
1260        self.property_indexes.write().remove(&key).is_some()
1261    }
1262
1263    /// Returns `true` if the property has an index.
1264    #[must_use]
1265    pub fn has_property_index(&self, property: &str) -> bool {
1266        let key = PropertyKey::new(property);
1267        self.property_indexes.read().contains_key(&key)
1268    }
1269
1270    /// Stores a vector index for a label+property pair.
1271    #[cfg(feature = "vector-index")]
1272    pub fn add_vector_index(&self, label: &str, property: &str, index: Arc<HnswIndex>) {
1273        let key = format!("{label}:{property}");
1274        self.vector_indexes.write().insert(key, index);
1275    }
1276
1277    /// Retrieves the vector index for a label+property pair.
1278    #[cfg(feature = "vector-index")]
1279    #[must_use]
1280    pub fn get_vector_index(&self, label: &str, property: &str) -> Option<Arc<HnswIndex>> {
1281        let key = format!("{label}:{property}");
1282        self.vector_indexes.read().get(&key).cloned()
1283    }
1284
1285    /// Removes a vector index for a label+property pair.
1286    ///
1287    /// Returns `true` if the index existed and was removed.
1288    #[cfg(feature = "vector-index")]
1289    pub fn remove_vector_index(&self, label: &str, property: &str) -> bool {
1290        let key = format!("{label}:{property}");
1291        self.vector_indexes.write().remove(&key).is_some()
1292    }
1293
1294    /// Returns all vector index entries as `(key, index)` pairs.
1295    ///
1296    /// Keys are in `"label:property"` format.
1297    #[cfg(feature = "vector-index")]
1298    #[must_use]
1299    pub fn vector_index_entries(&self) -> Vec<(String, Arc<HnswIndex>)> {
1300        self.vector_indexes
1301            .read()
1302            .iter()
1303            .map(|(k, v)| (k.clone(), v.clone()))
1304            .collect()
1305    }
1306
1307    /// Stores a text index for a label+property pair.
1308    #[cfg(feature = "text-index")]
1309    pub fn add_text_index(
1310        &self,
1311        label: &str,
1312        property: &str,
1313        index: Arc<RwLock<crate::index::text::InvertedIndex>>,
1314    ) {
1315        let key = format!("{label}:{property}");
1316        self.text_indexes.write().insert(key, index);
1317    }
1318
1319    /// Retrieves the text index for a label+property pair.
1320    #[cfg(feature = "text-index")]
1321    #[must_use]
1322    pub fn get_text_index(
1323        &self,
1324        label: &str,
1325        property: &str,
1326    ) -> Option<Arc<RwLock<crate::index::text::InvertedIndex>>> {
1327        let key = format!("{label}:{property}");
1328        self.text_indexes.read().get(&key).cloned()
1329    }
1330
1331    /// Removes a text index for a label+property pair.
1332    ///
1333    /// Returns `true` if the index existed and was removed.
1334    #[cfg(feature = "text-index")]
1335    pub fn remove_text_index(&self, label: &str, property: &str) -> bool {
1336        let key = format!("{label}:{property}");
1337        self.text_indexes.write().remove(&key).is_some()
1338    }
1339
1340    /// Returns all text index entries as `(key, index)` pairs.
1341    ///
1342    /// The key format is `"label:property"`.
1343    #[cfg(feature = "text-index")]
1344    pub fn text_index_entries(
1345        &self,
1346    ) -> Vec<(String, Arc<RwLock<crate::index::text::InvertedIndex>>)> {
1347        self.text_indexes
1348            .read()
1349            .iter()
1350            .map(|(k, v)| (k.clone(), v.clone()))
1351            .collect()
1352    }
1353
1354    /// Finds all nodes that have a specific property value.
1355    ///
1356    /// If the property is indexed, this is O(1). Otherwise, it scans all nodes
1357    /// which is O(n). Use [`Self::create_property_index`] for frequently queried properties.
1358    ///
1359    /// # Example
1360    ///
1361    /// ```
1362    /// use grafeo_core::graph::lpg::LpgStore;
1363    /// use grafeo_common::types::Value;
1364    ///
1365    /// let store = LpgStore::new();
1366    /// store.create_property_index("city"); // Optional but makes lookups fast
1367    ///
1368    /// let alice = store.create_node(&["Person"]);
1369    /// let bob = store.create_node(&["Person"]);
1370    /// store.set_node_property(alice, "city", Value::from("NYC"));
1371    /// store.set_node_property(bob, "city", Value::from("NYC"));
1372    ///
1373    /// let nyc_people = store.find_nodes_by_property("city", &Value::from("NYC"));
1374    /// assert_eq!(nyc_people.len(), 2);
1375    /// ```
1376    #[must_use]
1377    pub fn find_nodes_by_property(&self, property: &str, value: &Value) -> Vec<NodeId> {
1378        let key = PropertyKey::new(property);
1379        let hv = HashableValue::new(value.clone());
1380
1381        // Try indexed lookup first
1382        let indexes = self.property_indexes.read();
1383        if let Some(index) = indexes.get(&key) {
1384            if let Some(nodes) = index.get(&hv) {
1385                return nodes.iter().copied().collect();
1386            }
1387            return Vec::new();
1388        }
1389        drop(indexes);
1390
1391        // Fall back to full scan
1392        self.node_ids()
1393            .into_iter()
1394            .filter(|&node_id| {
1395                self.node_properties
1396                    .get(node_id, &key)
1397                    .is_some_and(|v| v == *value)
1398            })
1399            .collect()
1400    }
1401
1402    /// Finds nodes whose property matches an operator filter.
1403    ///
1404    /// The `filter_value` is either a scalar (equality) or a `Value::Map` with
1405    /// `$`-prefixed operator keys like `$gt`, `$lt`, `$gte`, `$lte`, `$in`,
1406    /// `$nin`, `$ne`, `$contains`.
1407    pub fn find_nodes_matching_filter(&self, property: &str, filter_value: &Value) -> Vec<NodeId> {
1408        let key = PropertyKey::new(property);
1409        self.node_ids()
1410            .into_iter()
1411            .filter(|&node_id| {
1412                self.node_properties
1413                    .get(node_id, &key)
1414                    .is_some_and(|v| Self::matches_filter(&v, filter_value))
1415            })
1416            .collect()
1417    }
1418
1419    /// Checks if a node property value matches a filter value.
1420    ///
1421    /// - Scalar filter: equality check
1422    /// - Map filter with `$`-prefixed keys: operator evaluation
1423    fn matches_filter(node_value: &Value, filter_value: &Value) -> bool {
1424        match filter_value {
1425            Value::Map(ops) if ops.keys().any(|k| k.as_str().starts_with('$')) => {
1426                ops.iter().all(|(op_key, op_val)| {
1427                    match op_key.as_str() {
1428                        "$gt" => {
1429                            Self::compare_values(node_value, op_val)
1430                                == Some(std::cmp::Ordering::Greater)
1431                        }
1432                        "$gte" => matches!(
1433                            Self::compare_values(node_value, op_val),
1434                            Some(std::cmp::Ordering::Greater | std::cmp::Ordering::Equal)
1435                        ),
1436                        "$lt" => {
1437                            Self::compare_values(node_value, op_val)
1438                                == Some(std::cmp::Ordering::Less)
1439                        }
1440                        "$lte" => matches!(
1441                            Self::compare_values(node_value, op_val),
1442                            Some(std::cmp::Ordering::Less | std::cmp::Ordering::Equal)
1443                        ),
1444                        "$ne" => node_value != op_val,
1445                        "$in" => match op_val {
1446                            Value::List(items) => items.iter().any(|v| v == node_value),
1447                            _ => false,
1448                        },
1449                        "$nin" => match op_val {
1450                            Value::List(items) => !items.iter().any(|v| v == node_value),
1451                            _ => true,
1452                        },
1453                        "$contains" => match (node_value, op_val) {
1454                            (Value::String(a), Value::String(b)) => a.contains(b.as_str()),
1455                            _ => false,
1456                        },
1457                        _ => false, // Unknown operator — no match
1458                    }
1459                })
1460            }
1461            _ => node_value == filter_value, // Equality (backward compatible)
1462        }
1463    }
1464
1465    /// Compares two values for ordering (cross-type numeric comparison supported).
1466    fn compare_values(a: &Value, b: &Value) -> Option<std::cmp::Ordering> {
1467        match (a, b) {
1468            (Value::Int64(a), Value::Int64(b)) => Some(a.cmp(b)),
1469            (Value::Float64(a), Value::Float64(b)) => a.partial_cmp(b),
1470            (Value::String(a), Value::String(b)) => Some(a.cmp(b)),
1471            (Value::Bool(a), Value::Bool(b)) => Some(a.cmp(b)),
1472            (Value::Int64(a), Value::Float64(b)) => (*a as f64).partial_cmp(b),
1473            (Value::Float64(a), Value::Int64(b)) => a.partial_cmp(&(*b as f64)),
1474            _ => None,
1475        }
1476    }
1477
1478    /// Updates property indexes when a property is set.
1479    fn update_property_index_on_set(&self, node_id: NodeId, key: &PropertyKey, new_value: &Value) {
1480        let indexes = self.property_indexes.read();
1481        if let Some(index) = indexes.get(key) {
1482            // Get old value to remove from index
1483            if let Some(old_value) = self.node_properties.get(node_id, key) {
1484                let old_hv = HashableValue::new(old_value);
1485                if let Some(mut nodes) = index.get_mut(&old_hv) {
1486                    nodes.remove(&node_id);
1487                    if nodes.is_empty() {
1488                        drop(nodes);
1489                        index.remove(&old_hv);
1490                    }
1491                }
1492            }
1493
1494            // Add new value to index
1495            let new_hv = HashableValue::new(new_value.clone());
1496            index
1497                .entry(new_hv)
1498                .or_insert_with(FxHashSet::default)
1499                .insert(node_id);
1500        }
1501    }
1502
1503    /// Updates property indexes when a property is removed.
1504    fn update_property_index_on_remove(&self, node_id: NodeId, key: &PropertyKey) {
1505        let indexes = self.property_indexes.read();
1506        if let Some(index) = indexes.get(key) {
1507            // Get old value to remove from index
1508            if let Some(old_value) = self.node_properties.get(node_id, key) {
1509                let old_hv = HashableValue::new(old_value);
1510                if let Some(mut nodes) = index.get_mut(&old_hv) {
1511                    nodes.remove(&node_id);
1512                    if nodes.is_empty() {
1513                        drop(nodes);
1514                        index.remove(&old_hv);
1515                    }
1516                }
1517            }
1518        }
1519    }
1520
1521    /// Adds a label to a node.
1522    ///
1523    /// Returns true if the label was added, false if the node doesn't exist
1524    /// or already has the label.
1525    #[cfg(not(feature = "tiered-storage"))]
1526    pub fn add_label(&self, node_id: NodeId, label: &str) -> bool {
1527        let epoch = self.current_epoch();
1528
1529        // Check if node exists
1530        let nodes = self.nodes.read();
1531        if let Some(chain) = nodes.get(&node_id) {
1532            if chain.visible_at(epoch).map_or(true, |r| r.is_deleted()) {
1533                return false;
1534            }
1535        } else {
1536            return false;
1537        }
1538        drop(nodes);
1539
1540        // Get or create label ID
1541        let label_id = self.get_or_create_label_id(label);
1542
1543        // Add to node_labels map
1544        let mut node_labels = self.node_labels.write();
1545        let label_set = node_labels.entry(node_id).or_default();
1546
1547        if label_set.contains(&label_id) {
1548            return false; // Already has this label
1549        }
1550
1551        label_set.insert(label_id);
1552        drop(node_labels);
1553
1554        // Add to label_index
1555        let mut index = self.label_index.write();
1556        if (label_id as usize) >= index.len() {
1557            index.resize(label_id as usize + 1, FxHashMap::default());
1558        }
1559        index[label_id as usize].insert(node_id, ());
1560
1561        // Update label count in node record
1562        if let Some(chain) = self.nodes.write().get_mut(&node_id)
1563            && let Some(record) = chain.latest_mut()
1564        {
1565            let count = self.node_labels.read().get(&node_id).map_or(0, |s| s.len());
1566            record.set_label_count(count as u16);
1567        }
1568
1569        true
1570    }
1571
1572    /// Adds a label to a node.
1573    /// (Tiered storage version)
1574    #[cfg(feature = "tiered-storage")]
1575    pub fn add_label(&self, node_id: NodeId, label: &str) -> bool {
1576        let epoch = self.current_epoch();
1577
1578        // Check if node exists
1579        let versions = self.node_versions.read();
1580        if let Some(index) = versions.get(&node_id) {
1581            if let Some(vref) = index.visible_at(epoch) {
1582                if let Some(record) = self.read_node_record(&vref) {
1583                    if record.is_deleted() {
1584                        return false;
1585                    }
1586                } else {
1587                    return false;
1588                }
1589            } else {
1590                return false;
1591            }
1592        } else {
1593            return false;
1594        }
1595        drop(versions);
1596
1597        // Get or create label ID
1598        let label_id = self.get_or_create_label_id(label);
1599
1600        // Add to node_labels map
1601        let mut node_labels = self.node_labels.write();
1602        let label_set = node_labels.entry(node_id).or_default();
1603
1604        if label_set.contains(&label_id) {
1605            return false; // Already has this label
1606        }
1607
1608        label_set.insert(label_id);
1609        drop(node_labels);
1610
1611        // Add to label_index
1612        let mut index = self.label_index.write();
1613        if (label_id as usize) >= index.len() {
1614            index.resize(label_id as usize + 1, FxHashMap::default());
1615        }
1616        index[label_id as usize].insert(node_id, ());
1617
1618        // Note: label_count in record is not updated for tiered storage.
1619        // The record is immutable once allocated in the arena.
1620
1621        true
1622    }
1623
1624    /// Removes a label from a node.
1625    ///
1626    /// Returns true if the label was removed, false if the node doesn't exist
1627    /// or doesn't have the label.
1628    #[cfg(not(feature = "tiered-storage"))]
1629    pub fn remove_label(&self, node_id: NodeId, label: &str) -> bool {
1630        let epoch = self.current_epoch();
1631
1632        // Check if node exists
1633        let nodes = self.nodes.read();
1634        if let Some(chain) = nodes.get(&node_id) {
1635            if chain.visible_at(epoch).map_or(true, |r| r.is_deleted()) {
1636                return false;
1637            }
1638        } else {
1639            return false;
1640        }
1641        drop(nodes);
1642
1643        // Get label ID
1644        let label_id = {
1645            let label_ids = self.label_to_id.read();
1646            match label_ids.get(label) {
1647                Some(&id) => id,
1648                None => return false, // Label doesn't exist
1649            }
1650        };
1651
1652        // Remove from node_labels map
1653        let mut node_labels = self.node_labels.write();
1654        if let Some(label_set) = node_labels.get_mut(&node_id) {
1655            if !label_set.remove(&label_id) {
1656                return false; // Node doesn't have this label
1657            }
1658        } else {
1659            return false;
1660        }
1661        drop(node_labels);
1662
1663        // Remove from label_index
1664        let mut index = self.label_index.write();
1665        if (label_id as usize) < index.len() {
1666            index[label_id as usize].remove(&node_id);
1667        }
1668
1669        // Update label count in node record
1670        if let Some(chain) = self.nodes.write().get_mut(&node_id)
1671            && let Some(record) = chain.latest_mut()
1672        {
1673            let count = self.node_labels.read().get(&node_id).map_or(0, |s| s.len());
1674            record.set_label_count(count as u16);
1675        }
1676
1677        true
1678    }
1679
1680    /// Removes a label from a node.
1681    /// (Tiered storage version)
1682    #[cfg(feature = "tiered-storage")]
1683    pub fn remove_label(&self, node_id: NodeId, label: &str) -> bool {
1684        let epoch = self.current_epoch();
1685
1686        // Check if node exists
1687        let versions = self.node_versions.read();
1688        if let Some(index) = versions.get(&node_id) {
1689            if let Some(vref) = index.visible_at(epoch) {
1690                if let Some(record) = self.read_node_record(&vref) {
1691                    if record.is_deleted() {
1692                        return false;
1693                    }
1694                } else {
1695                    return false;
1696                }
1697            } else {
1698                return false;
1699            }
1700        } else {
1701            return false;
1702        }
1703        drop(versions);
1704
1705        // Get label ID
1706        let label_id = {
1707            let label_ids = self.label_to_id.read();
1708            match label_ids.get(label) {
1709                Some(&id) => id,
1710                None => return false, // Label doesn't exist
1711            }
1712        };
1713
1714        // Remove from node_labels map
1715        let mut node_labels = self.node_labels.write();
1716        if let Some(label_set) = node_labels.get_mut(&node_id) {
1717            if !label_set.remove(&label_id) {
1718                return false; // Node doesn't have this label
1719            }
1720        } else {
1721            return false;
1722        }
1723        drop(node_labels);
1724
1725        // Remove from label_index
1726        let mut index = self.label_index.write();
1727        if (label_id as usize) < index.len() {
1728            index[label_id as usize].remove(&node_id);
1729        }
1730
1731        // Note: label_count in record is not updated for tiered storage.
1732
1733        true
1734    }
1735
1736    /// Returns the number of nodes (non-deleted at current epoch).
1737    #[must_use]
1738    #[cfg(not(feature = "tiered-storage"))]
1739    pub fn node_count(&self) -> usize {
1740        let epoch = self.current_epoch();
1741        self.nodes
1742            .read()
1743            .values()
1744            .filter_map(|chain| chain.visible_at(epoch))
1745            .filter(|r| !r.is_deleted())
1746            .count()
1747    }
1748
1749    /// Returns the number of nodes (non-deleted at current epoch).
1750    /// (Tiered storage version)
1751    #[must_use]
1752    #[cfg(feature = "tiered-storage")]
1753    pub fn node_count(&self) -> usize {
1754        let epoch = self.current_epoch();
1755        let versions = self.node_versions.read();
1756        versions
1757            .iter()
1758            .filter(|(_, index)| {
1759                index.visible_at(epoch).map_or(false, |vref| {
1760                    self.read_node_record(&vref)
1761                        .map_or(false, |r| !r.is_deleted())
1762                })
1763            })
1764            .count()
1765    }
1766
1767    /// Returns all node IDs in the store.
1768    ///
1769    /// This returns a snapshot of current node IDs. The returned vector
1770    /// excludes deleted nodes. Results are sorted by NodeId for deterministic
1771    /// iteration order.
1772    #[must_use]
1773    #[cfg(not(feature = "tiered-storage"))]
1774    pub fn node_ids(&self) -> Vec<NodeId> {
1775        let epoch = self.current_epoch();
1776        let mut ids: Vec<NodeId> = self
1777            .nodes
1778            .read()
1779            .iter()
1780            .filter_map(|(id, chain)| {
1781                chain
1782                    .visible_at(epoch)
1783                    .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
1784            })
1785            .collect();
1786        ids.sort_unstable();
1787        ids
1788    }
1789
1790    /// Returns all node IDs in the store.
1791    /// (Tiered storage version)
1792    #[must_use]
1793    #[cfg(feature = "tiered-storage")]
1794    pub fn node_ids(&self) -> Vec<NodeId> {
1795        let epoch = self.current_epoch();
1796        let versions = self.node_versions.read();
1797        let mut ids: Vec<NodeId> = versions
1798            .iter()
1799            .filter_map(|(id, index)| {
1800                index.visible_at(epoch).and_then(|vref| {
1801                    self.read_node_record(&vref)
1802                        .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
1803                })
1804            })
1805            .collect();
1806        ids.sort_unstable();
1807        ids
1808    }
1809
1810    // === Edge Operations ===
1811
1812    /// Creates a new edge.
1813    pub fn create_edge(&self, src: NodeId, dst: NodeId, edge_type: &str) -> EdgeId {
1814        self.needs_stats_recompute.store(true, Ordering::Relaxed);
1815        self.create_edge_versioned(src, dst, edge_type, self.current_epoch(), TxId::SYSTEM)
1816    }
1817
1818    /// Creates a new edge within a transaction context.
1819    #[cfg(not(feature = "tiered-storage"))]
1820    pub fn create_edge_versioned(
1821        &self,
1822        src: NodeId,
1823        dst: NodeId,
1824        edge_type: &str,
1825        epoch: EpochId,
1826        tx_id: TxId,
1827    ) -> EdgeId {
1828        let id = EdgeId::new(self.next_edge_id.fetch_add(1, Ordering::Relaxed));
1829        let type_id = self.get_or_create_edge_type_id(edge_type);
1830
1831        let record = EdgeRecord::new(id, src, dst, type_id, epoch);
1832        let chain = VersionChain::with_initial(record, epoch, tx_id);
1833        self.edges.write().insert(id, chain);
1834
1835        // Update adjacency
1836        self.forward_adj.add_edge(src, dst, id);
1837        if let Some(ref backward) = self.backward_adj {
1838            backward.add_edge(dst, src, id);
1839        }
1840
1841        id
1842    }
1843
1844    /// Creates a new edge within a transaction context.
1845    /// (Tiered storage version)
1846    #[cfg(feature = "tiered-storage")]
1847    pub fn create_edge_versioned(
1848        &self,
1849        src: NodeId,
1850        dst: NodeId,
1851        edge_type: &str,
1852        epoch: EpochId,
1853        tx_id: TxId,
1854    ) -> EdgeId {
1855        let id = EdgeId::new(self.next_edge_id.fetch_add(1, Ordering::Relaxed));
1856        let type_id = self.get_or_create_edge_type_id(edge_type);
1857
1858        let record = EdgeRecord::new(id, src, dst, type_id, epoch);
1859
1860        // Allocate record in arena and get offset (create epoch if needed)
1861        let arena = self.arena_allocator.arena_or_create(epoch);
1862        let (offset, _stored) = arena.alloc_value_with_offset(record);
1863
1864        // Create HotVersionRef pointing to arena data
1865        let hot_ref = HotVersionRef::new(epoch, offset, tx_id);
1866
1867        // Create or update version index
1868        let mut versions = self.edge_versions.write();
1869        if let Some(index) = versions.get_mut(&id) {
1870            index.add_hot(hot_ref);
1871        } else {
1872            versions.insert(id, VersionIndex::with_initial(hot_ref));
1873        }
1874
1875        // Update adjacency
1876        self.forward_adj.add_edge(src, dst, id);
1877        if let Some(ref backward) = self.backward_adj {
1878            backward.add_edge(dst, src, id);
1879        }
1880
1881        id
1882    }
1883
1884    /// Creates a new edge with properties.
1885    pub fn create_edge_with_props(
1886        &self,
1887        src: NodeId,
1888        dst: NodeId,
1889        edge_type: &str,
1890        properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
1891    ) -> EdgeId {
1892        let id = self.create_edge(src, dst, edge_type);
1893
1894        for (key, value) in properties {
1895            self.edge_properties.set(id, key.into(), value.into());
1896        }
1897
1898        id
1899    }
1900
1901    /// Gets an edge by ID (latest visible version).
1902    #[must_use]
1903    pub fn get_edge(&self, id: EdgeId) -> Option<Edge> {
1904        self.get_edge_at_epoch(id, self.current_epoch())
1905    }
1906
1907    /// Gets an edge by ID at a specific epoch.
1908    #[must_use]
1909    #[cfg(not(feature = "tiered-storage"))]
1910    pub fn get_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> Option<Edge> {
1911        let edges = self.edges.read();
1912        let chain = edges.get(&id)?;
1913        let record = chain.visible_at(epoch)?;
1914
1915        if record.is_deleted() {
1916            return None;
1917        }
1918
1919        let edge_type = {
1920            let id_to_type = self.id_to_edge_type.read();
1921            id_to_type.get(record.type_id as usize)?.clone()
1922        };
1923
1924        let mut edge = Edge::new(id, record.src, record.dst, edge_type);
1925
1926        // Get properties
1927        edge.properties = self.edge_properties.get_all(id).into_iter().collect();
1928
1929        Some(edge)
1930    }
1931
1932    /// Gets an edge by ID at a specific epoch.
1933    /// (Tiered storage version)
1934    #[must_use]
1935    #[cfg(feature = "tiered-storage")]
1936    pub fn get_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> Option<Edge> {
1937        let versions = self.edge_versions.read();
1938        let index = versions.get(&id)?;
1939        let version_ref = index.visible_at(epoch)?;
1940
1941        let record = self.read_edge_record(&version_ref)?;
1942
1943        if record.is_deleted() {
1944            return None;
1945        }
1946
1947        let edge_type = {
1948            let id_to_type = self.id_to_edge_type.read();
1949            id_to_type.get(record.type_id as usize)?.clone()
1950        };
1951
1952        let mut edge = Edge::new(id, record.src, record.dst, edge_type);
1953
1954        // Get properties
1955        edge.properties = self.edge_properties.get_all(id).into_iter().collect();
1956
1957        Some(edge)
1958    }
1959
1960    /// Gets an edge visible to a specific transaction.
1961    #[must_use]
1962    #[cfg(not(feature = "tiered-storage"))]
1963    pub fn get_edge_versioned(&self, id: EdgeId, epoch: EpochId, tx_id: TxId) -> Option<Edge> {
1964        let edges = self.edges.read();
1965        let chain = edges.get(&id)?;
1966        let record = chain.visible_to(epoch, tx_id)?;
1967
1968        if record.is_deleted() {
1969            return None;
1970        }
1971
1972        let edge_type = {
1973            let id_to_type = self.id_to_edge_type.read();
1974            id_to_type.get(record.type_id as usize)?.clone()
1975        };
1976
1977        let mut edge = Edge::new(id, record.src, record.dst, edge_type);
1978
1979        // Get properties
1980        edge.properties = self.edge_properties.get_all(id).into_iter().collect();
1981
1982        Some(edge)
1983    }
1984
1985    /// Gets an edge visible to a specific transaction.
1986    /// (Tiered storage version)
1987    #[must_use]
1988    #[cfg(feature = "tiered-storage")]
1989    pub fn get_edge_versioned(&self, id: EdgeId, epoch: EpochId, tx_id: TxId) -> Option<Edge> {
1990        let versions = self.edge_versions.read();
1991        let index = versions.get(&id)?;
1992        let version_ref = index.visible_to(epoch, tx_id)?;
1993
1994        let record = self.read_edge_record(&version_ref)?;
1995
1996        if record.is_deleted() {
1997            return None;
1998        }
1999
2000        let edge_type = {
2001            let id_to_type = self.id_to_edge_type.read();
2002            id_to_type.get(record.type_id as usize)?.clone()
2003        };
2004
2005        let mut edge = Edge::new(id, record.src, record.dst, edge_type);
2006
2007        // Get properties
2008        edge.properties = self.edge_properties.get_all(id).into_iter().collect();
2009
2010        Some(edge)
2011    }
2012
2013    /// Reads an EdgeRecord from arena using a VersionRef.
2014    #[cfg(feature = "tiered-storage")]
2015    #[allow(unsafe_code)]
2016    fn read_edge_record(&self, version_ref: &VersionRef) -> Option<EdgeRecord> {
2017        match version_ref {
2018            VersionRef::Hot(hot_ref) => {
2019                let arena = self.arena_allocator.arena(hot_ref.epoch);
2020                // SAFETY: The offset was returned by alloc_value_with_offset for an EdgeRecord
2021                let record: &EdgeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
2022                Some(*record)
2023            }
2024            VersionRef::Cold(cold_ref) => {
2025                // Read from compressed epoch store
2026                self.epoch_store
2027                    .get_edge(cold_ref.epoch, cold_ref.block_offset, cold_ref.length)
2028            }
2029        }
2030    }
2031
2032    /// Deletes an edge (using latest epoch).
2033    pub fn delete_edge(&self, id: EdgeId) -> bool {
2034        self.needs_stats_recompute.store(true, Ordering::Relaxed);
2035        self.delete_edge_at_epoch(id, self.current_epoch())
2036    }
2037
2038    /// Deletes an edge at a specific epoch.
2039    #[cfg(not(feature = "tiered-storage"))]
2040    pub fn delete_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> bool {
2041        let mut edges = self.edges.write();
2042        if let Some(chain) = edges.get_mut(&id) {
2043            // Get the visible record to check if deleted and get src/dst
2044            let (src, dst) = {
2045                match chain.visible_at(epoch) {
2046                    Some(record) => {
2047                        if record.is_deleted() {
2048                            return false;
2049                        }
2050                        (record.src, record.dst)
2051                    }
2052                    None => return false, // Not visible at this epoch (already deleted)
2053                }
2054            };
2055
2056            // Mark the version chain as deleted
2057            chain.mark_deleted(epoch);
2058
2059            drop(edges); // Release lock
2060
2061            // Mark as deleted in adjacency (soft delete)
2062            self.forward_adj.mark_deleted(src, id);
2063            if let Some(ref backward) = self.backward_adj {
2064                backward.mark_deleted(dst, id);
2065            }
2066
2067            // Remove properties
2068            self.edge_properties.remove_all(id);
2069
2070            true
2071        } else {
2072            false
2073        }
2074    }
2075
2076    /// Deletes an edge at a specific epoch.
2077    /// (Tiered storage version)
2078    #[cfg(feature = "tiered-storage")]
2079    pub fn delete_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> bool {
2080        let mut versions = self.edge_versions.write();
2081        if let Some(index) = versions.get_mut(&id) {
2082            // Get the visible record to check if deleted and get src/dst
2083            let (src, dst) = {
2084                match index.visible_at(epoch) {
2085                    Some(version_ref) => {
2086                        if let Some(record) = self.read_edge_record(&version_ref) {
2087                            if record.is_deleted() {
2088                                return false;
2089                            }
2090                            (record.src, record.dst)
2091                        } else {
2092                            return false;
2093                        }
2094                    }
2095                    None => return false,
2096                }
2097            };
2098
2099            // Mark as deleted in version index
2100            index.mark_deleted(epoch);
2101
2102            drop(versions); // Release lock
2103
2104            // Mark as deleted in adjacency (soft delete)
2105            self.forward_adj.mark_deleted(src, id);
2106            if let Some(ref backward) = self.backward_adj {
2107                backward.mark_deleted(dst, id);
2108            }
2109
2110            // Remove properties
2111            self.edge_properties.remove_all(id);
2112
2113            true
2114        } else {
2115            false
2116        }
2117    }
2118
2119    /// Returns the number of edges (non-deleted at current epoch).
2120    #[must_use]
2121    #[cfg(not(feature = "tiered-storage"))]
2122    pub fn edge_count(&self) -> usize {
2123        let epoch = self.current_epoch();
2124        self.edges
2125            .read()
2126            .values()
2127            .filter_map(|chain| chain.visible_at(epoch))
2128            .filter(|r| !r.is_deleted())
2129            .count()
2130    }
2131
2132    /// Returns the number of edges (non-deleted at current epoch).
2133    /// (Tiered storage version)
2134    #[must_use]
2135    #[cfg(feature = "tiered-storage")]
2136    pub fn edge_count(&self) -> usize {
2137        let epoch = self.current_epoch();
2138        let versions = self.edge_versions.read();
2139        versions
2140            .iter()
2141            .filter(|(_, index)| {
2142                index.visible_at(epoch).map_or(false, |vref| {
2143                    self.read_edge_record(&vref)
2144                        .map_or(false, |r| !r.is_deleted())
2145                })
2146            })
2147            .count()
2148    }
2149
2150    /// Discards all uncommitted versions created by a transaction.
2151    ///
2152    /// This is called during transaction rollback to clean up uncommitted changes.
2153    /// The method removes version chain entries created by the specified transaction.
2154    #[cfg(not(feature = "tiered-storage"))]
2155    pub fn discard_uncommitted_versions(&self, tx_id: TxId) {
2156        // Remove uncommitted node versions
2157        {
2158            let mut nodes = self.nodes.write();
2159            for chain in nodes.values_mut() {
2160                chain.remove_versions_by(tx_id);
2161            }
2162            // Remove completely empty chains (no versions left)
2163            nodes.retain(|_, chain| !chain.is_empty());
2164        }
2165
2166        // Remove uncommitted edge versions
2167        {
2168            let mut edges = self.edges.write();
2169            for chain in edges.values_mut() {
2170                chain.remove_versions_by(tx_id);
2171            }
2172            // Remove completely empty chains (no versions left)
2173            edges.retain(|_, chain| !chain.is_empty());
2174        }
2175    }
2176
2177    /// Discards all uncommitted versions created by a transaction.
2178    /// (Tiered storage version)
2179    #[cfg(feature = "tiered-storage")]
2180    pub fn discard_uncommitted_versions(&self, tx_id: TxId) {
2181        // Remove uncommitted node versions
2182        {
2183            let mut versions = self.node_versions.write();
2184            for index in versions.values_mut() {
2185                index.remove_versions_by(tx_id);
2186            }
2187            // Remove completely empty indexes (no versions left)
2188            versions.retain(|_, index| !index.is_empty());
2189        }
2190
2191        // Remove uncommitted edge versions
2192        {
2193            let mut versions = self.edge_versions.write();
2194            for index in versions.values_mut() {
2195                index.remove_versions_by(tx_id);
2196            }
2197            // Remove completely empty indexes (no versions left)
2198            versions.retain(|_, index| !index.is_empty());
2199        }
2200    }
2201
2202    /// Garbage collects old versions that are no longer visible to any transaction.
2203    ///
2204    /// Versions older than `min_epoch` are pruned from version chains, keeping
2205    /// at most one old version per entity as a baseline. Empty chains are removed.
2206    #[cfg(not(feature = "tiered-storage"))]
2207    pub fn gc_versions(&self, min_epoch: EpochId) {
2208        {
2209            let mut nodes = self.nodes.write();
2210            for chain in nodes.values_mut() {
2211                chain.gc(min_epoch);
2212            }
2213            nodes.retain(|_, chain| !chain.is_empty());
2214        }
2215        {
2216            let mut edges = self.edges.write();
2217            for chain in edges.values_mut() {
2218                chain.gc(min_epoch);
2219            }
2220            edges.retain(|_, chain| !chain.is_empty());
2221        }
2222    }
2223
2224    /// Garbage collects old versions (tiered storage variant).
2225    #[cfg(feature = "tiered-storage")]
2226    pub fn gc_versions(&self, min_epoch: EpochId) {
2227        {
2228            let mut versions = self.node_versions.write();
2229            for index in versions.values_mut() {
2230                index.gc(min_epoch);
2231            }
2232            versions.retain(|_, index| !index.is_empty());
2233        }
2234        {
2235            let mut versions = self.edge_versions.write();
2236            for index in versions.values_mut() {
2237                index.gc(min_epoch);
2238            }
2239            versions.retain(|_, index| !index.is_empty());
2240        }
2241    }
2242
2243    /// Freezes an epoch from hot (arena) storage to cold (compressed) storage.
2244    ///
2245    /// This is called by the transaction manager when an epoch becomes eligible
2246    /// for freezing (no active transactions can see it). The freeze process:
2247    ///
2248    /// 1. Collects all hot version refs for the epoch
2249    /// 2. Reads the corresponding records from arena
2250    /// 3. Compresses them into a `CompressedEpochBlock`
2251    /// 4. Updates `VersionIndex` entries to point to cold storage
2252    /// 5. The arena can be deallocated after all epochs in it are frozen
2253    ///
2254    /// # Arguments
2255    ///
2256    /// * `epoch` - The epoch to freeze
2257    ///
2258    /// # Returns
2259    ///
2260    /// The number of records frozen (nodes + edges).
2261    #[cfg(feature = "tiered-storage")]
2262    #[allow(unsafe_code)]
2263    pub fn freeze_epoch(&self, epoch: EpochId) -> usize {
2264        // Collect node records to freeze
2265        let mut node_records: Vec<(u64, NodeRecord)> = Vec::new();
2266        let mut node_hot_refs: Vec<(NodeId, HotVersionRef)> = Vec::new();
2267
2268        {
2269            let versions = self.node_versions.read();
2270            for (node_id, index) in versions.iter() {
2271                for hot_ref in index.hot_refs_for_epoch(epoch) {
2272                    let arena = self.arena_allocator.arena(hot_ref.epoch);
2273                    // SAFETY: The offset was returned by alloc_value_with_offset for a NodeRecord
2274                    let record: &NodeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
2275                    node_records.push((node_id.as_u64(), *record));
2276                    node_hot_refs.push((*node_id, *hot_ref));
2277                }
2278            }
2279        }
2280
2281        // Collect edge records to freeze
2282        let mut edge_records: Vec<(u64, EdgeRecord)> = Vec::new();
2283        let mut edge_hot_refs: Vec<(EdgeId, HotVersionRef)> = Vec::new();
2284
2285        {
2286            let versions = self.edge_versions.read();
2287            for (edge_id, index) in versions.iter() {
2288                for hot_ref in index.hot_refs_for_epoch(epoch) {
2289                    let arena = self.arena_allocator.arena(hot_ref.epoch);
2290                    // SAFETY: The offset was returned by alloc_value_with_offset for an EdgeRecord
2291                    let record: &EdgeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
2292                    edge_records.push((edge_id.as_u64(), *record));
2293                    edge_hot_refs.push((*edge_id, *hot_ref));
2294                }
2295            }
2296        }
2297
2298        let total_frozen = node_records.len() + edge_records.len();
2299
2300        if total_frozen == 0 {
2301            return 0;
2302        }
2303
2304        // Freeze to compressed storage
2305        let (node_entries, edge_entries) =
2306            self.epoch_store
2307                .freeze_epoch(epoch, node_records, edge_records);
2308
2309        // Build lookup maps for index entries
2310        let node_entry_map: FxHashMap<u64, _> = node_entries
2311            .iter()
2312            .map(|e| (e.entity_id, (e.offset, e.length)))
2313            .collect();
2314        let edge_entry_map: FxHashMap<u64, _> = edge_entries
2315            .iter()
2316            .map(|e| (e.entity_id, (e.offset, e.length)))
2317            .collect();
2318
2319        // Update version indexes to use cold refs
2320        {
2321            let mut versions = self.node_versions.write();
2322            for (node_id, hot_ref) in &node_hot_refs {
2323                if let Some(index) = versions.get_mut(node_id)
2324                    && let Some(&(offset, length)) = node_entry_map.get(&node_id.as_u64())
2325                {
2326                    let cold_ref = ColdVersionRef {
2327                        epoch,
2328                        block_offset: offset,
2329                        length,
2330                        created_by: hot_ref.created_by,
2331                        deleted_epoch: hot_ref.deleted_epoch,
2332                    };
2333                    index.freeze_epoch(epoch, std::iter::once(cold_ref));
2334                }
2335            }
2336        }
2337
2338        {
2339            let mut versions = self.edge_versions.write();
2340            for (edge_id, hot_ref) in &edge_hot_refs {
2341                if let Some(index) = versions.get_mut(edge_id)
2342                    && let Some(&(offset, length)) = edge_entry_map.get(&edge_id.as_u64())
2343                {
2344                    let cold_ref = ColdVersionRef {
2345                        epoch,
2346                        block_offset: offset,
2347                        length,
2348                        created_by: hot_ref.created_by,
2349                        deleted_epoch: hot_ref.deleted_epoch,
2350                    };
2351                    index.freeze_epoch(epoch, std::iter::once(cold_ref));
2352                }
2353            }
2354        }
2355
2356        total_frozen
2357    }
2358
2359    /// Returns the epoch store for cold storage statistics.
2360    #[cfg(feature = "tiered-storage")]
2361    #[must_use]
2362    pub fn epoch_store(&self) -> &EpochStore {
2363        &self.epoch_store
2364    }
2365
2366    /// Returns the number of distinct labels in the store.
2367    #[must_use]
2368    pub fn label_count(&self) -> usize {
2369        self.id_to_label.read().len()
2370    }
2371
2372    /// Returns the number of distinct property keys in the store.
2373    ///
2374    /// This counts unique property keys across both nodes and edges.
2375    #[must_use]
2376    pub fn property_key_count(&self) -> usize {
2377        let node_keys = self.node_properties.column_count();
2378        let edge_keys = self.edge_properties.column_count();
2379        // Note: This may count some keys twice if the same key is used
2380        // for both nodes and edges. A more precise count would require
2381        // tracking unique keys across both storages.
2382        node_keys + edge_keys
2383    }
2384
2385    /// Returns the number of distinct edge types in the store.
2386    #[must_use]
2387    pub fn edge_type_count(&self) -> usize {
2388        self.id_to_edge_type.read().len()
2389    }
2390
2391    // === Traversal ===
2392
2393    /// Iterates over neighbors of a node in the specified direction.
2394    ///
2395    /// This is the fast path for graph traversal - goes straight to the
2396    /// adjacency index without loading full node data.
2397    pub fn neighbors(
2398        &self,
2399        node: NodeId,
2400        direction: Direction,
2401    ) -> impl Iterator<Item = NodeId> + '_ {
2402        let forward: Box<dyn Iterator<Item = NodeId>> = match direction {
2403            Direction::Outgoing | Direction::Both => {
2404                Box::new(self.forward_adj.neighbors(node).into_iter())
2405            }
2406            Direction::Incoming => Box::new(std::iter::empty()),
2407        };
2408
2409        let backward: Box<dyn Iterator<Item = NodeId>> = match direction {
2410            Direction::Incoming | Direction::Both => {
2411                if let Some(ref adj) = self.backward_adj {
2412                    Box::new(adj.neighbors(node).into_iter())
2413                } else {
2414                    Box::new(std::iter::empty())
2415                }
2416            }
2417            Direction::Outgoing => Box::new(std::iter::empty()),
2418        };
2419
2420        forward.chain(backward)
2421    }
2422
2423    /// Returns edges from a node with their targets.
2424    ///
2425    /// Returns an iterator of (target_node, edge_id) pairs.
2426    pub fn edges_from(
2427        &self,
2428        node: NodeId,
2429        direction: Direction,
2430    ) -> impl Iterator<Item = (NodeId, EdgeId)> + '_ {
2431        let forward: Box<dyn Iterator<Item = (NodeId, EdgeId)>> = match direction {
2432            Direction::Outgoing | Direction::Both => {
2433                Box::new(self.forward_adj.edges_from(node).into_iter())
2434            }
2435            Direction::Incoming => Box::new(std::iter::empty()),
2436        };
2437
2438        let backward: Box<dyn Iterator<Item = (NodeId, EdgeId)>> = match direction {
2439            Direction::Incoming | Direction::Both => {
2440                if let Some(ref adj) = self.backward_adj {
2441                    Box::new(adj.edges_from(node).into_iter())
2442                } else {
2443                    Box::new(std::iter::empty())
2444                }
2445            }
2446            Direction::Outgoing => Box::new(std::iter::empty()),
2447        };
2448
2449        forward.chain(backward)
2450    }
2451
2452    /// Returns edges to a node (where the node is the destination).
2453    ///
2454    /// Returns (source_node, edge_id) pairs for all edges pointing TO this node.
2455    /// Uses the backward adjacency index for O(degree) lookup.
2456    ///
2457    /// # Example
2458    ///
2459    /// ```ignore
2460    /// // For edges: A->B, C->B
2461    /// let incoming = store.edges_to(B);
2462    /// // Returns: [(A, edge1), (C, edge2)]
2463    /// ```
2464    pub fn edges_to(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
2465        if let Some(ref backward) = self.backward_adj {
2466            backward.edges_from(node)
2467        } else {
2468            // Fallback: scan all edges (slow but correct)
2469            self.all_edges()
2470                .filter_map(|edge| {
2471                    if edge.dst == node {
2472                        Some((edge.src, edge.id))
2473                    } else {
2474                        None
2475                    }
2476                })
2477                .collect()
2478        }
2479    }
2480
2481    /// Returns the out-degree of a node (number of outgoing edges).
2482    ///
2483    /// Uses the forward adjacency index for O(1) lookup.
2484    #[must_use]
2485    pub fn out_degree(&self, node: NodeId) -> usize {
2486        self.forward_adj.out_degree(node)
2487    }
2488
2489    /// Returns the in-degree of a node (number of incoming edges).
2490    ///
2491    /// Uses the backward adjacency index for O(1) lookup if available,
2492    /// otherwise falls back to scanning edges.
2493    #[must_use]
2494    pub fn in_degree(&self, node: NodeId) -> usize {
2495        if let Some(ref backward) = self.backward_adj {
2496            backward.in_degree(node)
2497        } else {
2498            // Fallback: count edges (slow)
2499            self.all_edges().filter(|edge| edge.dst == node).count()
2500        }
2501    }
2502
2503    /// Gets the type of an edge by ID.
2504    #[must_use]
2505    #[cfg(not(feature = "tiered-storage"))]
2506    pub fn edge_type(&self, id: EdgeId) -> Option<ArcStr> {
2507        let edges = self.edges.read();
2508        let chain = edges.get(&id)?;
2509        let epoch = self.current_epoch();
2510        let record = chain.visible_at(epoch)?;
2511        let id_to_type = self.id_to_edge_type.read();
2512        id_to_type.get(record.type_id as usize).cloned()
2513    }
2514
2515    /// Gets the type of an edge by ID.
2516    /// (Tiered storage version)
2517    #[must_use]
2518    #[cfg(feature = "tiered-storage")]
2519    pub fn edge_type(&self, id: EdgeId) -> Option<ArcStr> {
2520        let versions = self.edge_versions.read();
2521        let index = versions.get(&id)?;
2522        let epoch = self.current_epoch();
2523        let vref = index.visible_at(epoch)?;
2524        let record = self.read_edge_record(&vref)?;
2525        let id_to_type = self.id_to_edge_type.read();
2526        id_to_type.get(record.type_id as usize).cloned()
2527    }
2528
2529    /// Returns all nodes with a specific label.
2530    ///
2531    /// Uses the label index for O(1) lookup per label. Returns a snapshot -
2532    /// concurrent modifications won't affect the returned vector. Results are
2533    /// sorted by NodeId for deterministic iteration order.
2534    pub fn nodes_by_label(&self, label: &str) -> Vec<NodeId> {
2535        let label_to_id = self.label_to_id.read();
2536        if let Some(&label_id) = label_to_id.get(label) {
2537            let index = self.label_index.read();
2538            if let Some(set) = index.get(label_id as usize) {
2539                let mut ids: Vec<NodeId> = set.keys().copied().collect();
2540                ids.sort_unstable();
2541                return ids;
2542            }
2543        }
2544        Vec::new()
2545    }
2546
2547    // === Admin API: Iteration ===
2548
2549    /// Returns an iterator over all nodes in the database.
2550    ///
2551    /// This creates a snapshot of all visible nodes at the current epoch.
2552    /// Useful for dump/export operations.
2553    #[cfg(not(feature = "tiered-storage"))]
2554    pub fn all_nodes(&self) -> impl Iterator<Item = Node> + '_ {
2555        let epoch = self.current_epoch();
2556        let node_ids: Vec<NodeId> = self
2557            .nodes
2558            .read()
2559            .iter()
2560            .filter_map(|(id, chain)| {
2561                chain
2562                    .visible_at(epoch)
2563                    .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
2564            })
2565            .collect();
2566
2567        node_ids.into_iter().filter_map(move |id| self.get_node(id))
2568    }
2569
2570    /// Returns an iterator over all nodes in the database.
2571    /// (Tiered storage version)
2572    #[cfg(feature = "tiered-storage")]
2573    pub fn all_nodes(&self) -> impl Iterator<Item = Node> + '_ {
2574        let node_ids = self.node_ids();
2575        node_ids.into_iter().filter_map(move |id| self.get_node(id))
2576    }
2577
2578    /// Returns an iterator over all edges in the database.
2579    ///
2580    /// This creates a snapshot of all visible edges at the current epoch.
2581    /// Useful for dump/export operations.
2582    #[cfg(not(feature = "tiered-storage"))]
2583    pub fn all_edges(&self) -> impl Iterator<Item = Edge> + '_ {
2584        let epoch = self.current_epoch();
2585        let edge_ids: Vec<EdgeId> = self
2586            .edges
2587            .read()
2588            .iter()
2589            .filter_map(|(id, chain)| {
2590                chain
2591                    .visible_at(epoch)
2592                    .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
2593            })
2594            .collect();
2595
2596        edge_ids.into_iter().filter_map(move |id| self.get_edge(id))
2597    }
2598
2599    /// Returns an iterator over all edges in the database.
2600    /// (Tiered storage version)
2601    #[cfg(feature = "tiered-storage")]
2602    pub fn all_edges(&self) -> impl Iterator<Item = Edge> + '_ {
2603        let epoch = self.current_epoch();
2604        let versions = self.edge_versions.read();
2605        let edge_ids: Vec<EdgeId> = versions
2606            .iter()
2607            .filter_map(|(id, index)| {
2608                index.visible_at(epoch).and_then(|vref| {
2609                    self.read_edge_record(&vref)
2610                        .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
2611                })
2612            })
2613            .collect();
2614
2615        edge_ids.into_iter().filter_map(move |id| self.get_edge(id))
2616    }
2617
2618    /// Returns all label names in the database.
2619    pub fn all_labels(&self) -> Vec<String> {
2620        self.id_to_label
2621            .read()
2622            .iter()
2623            .map(|s| s.to_string())
2624            .collect()
2625    }
2626
2627    /// Returns all edge type names in the database.
2628    pub fn all_edge_types(&self) -> Vec<String> {
2629        self.id_to_edge_type
2630            .read()
2631            .iter()
2632            .map(|s| s.to_string())
2633            .collect()
2634    }
2635
2636    /// Returns all property keys used in the database.
2637    pub fn all_property_keys(&self) -> Vec<String> {
2638        let mut keys = std::collections::HashSet::new();
2639        for key in self.node_properties.keys() {
2640            keys.insert(key.to_string());
2641        }
2642        for key in self.edge_properties.keys() {
2643            keys.insert(key.to_string());
2644        }
2645        keys.into_iter().collect()
2646    }
2647
2648    /// Returns an iterator over nodes with a specific label.
2649    pub fn nodes_with_label<'a>(&'a self, label: &str) -> impl Iterator<Item = Node> + 'a {
2650        let node_ids = self.nodes_by_label(label);
2651        node_ids.into_iter().filter_map(move |id| self.get_node(id))
2652    }
2653
2654    /// Returns an iterator over edges with a specific type.
2655    #[cfg(not(feature = "tiered-storage"))]
2656    pub fn edges_with_type<'a>(&'a self, edge_type: &str) -> impl Iterator<Item = Edge> + 'a {
2657        let epoch = self.current_epoch();
2658        let type_to_id = self.edge_type_to_id.read();
2659
2660        if let Some(&type_id) = type_to_id.get(edge_type) {
2661            let edge_ids: Vec<EdgeId> = self
2662                .edges
2663                .read()
2664                .iter()
2665                .filter_map(|(id, chain)| {
2666                    chain.visible_at(epoch).and_then(|r| {
2667                        if !r.is_deleted() && r.type_id == type_id {
2668                            Some(*id)
2669                        } else {
2670                            None
2671                        }
2672                    })
2673                })
2674                .collect();
2675
2676            // Return a boxed iterator for the found edges
2677            Box::new(edge_ids.into_iter().filter_map(move |id| self.get_edge(id)))
2678                as Box<dyn Iterator<Item = Edge> + 'a>
2679        } else {
2680            // Return empty iterator
2681            Box::new(std::iter::empty()) as Box<dyn Iterator<Item = Edge> + 'a>
2682        }
2683    }
2684
2685    /// Returns an iterator over edges with a specific type.
2686    /// (Tiered storage version)
2687    #[cfg(feature = "tiered-storage")]
2688    pub fn edges_with_type<'a>(&'a self, edge_type: &str) -> impl Iterator<Item = Edge> + 'a {
2689        let epoch = self.current_epoch();
2690        let type_to_id = self.edge_type_to_id.read();
2691
2692        if let Some(&type_id) = type_to_id.get(edge_type) {
2693            let versions = self.edge_versions.read();
2694            let edge_ids: Vec<EdgeId> = versions
2695                .iter()
2696                .filter_map(|(id, index)| {
2697                    index.visible_at(epoch).and_then(|vref| {
2698                        self.read_edge_record(&vref).and_then(|r| {
2699                            if !r.is_deleted() && r.type_id == type_id {
2700                                Some(*id)
2701                            } else {
2702                                None
2703                            }
2704                        })
2705                    })
2706                })
2707                .collect();
2708
2709            Box::new(edge_ids.into_iter().filter_map(move |id| self.get_edge(id)))
2710                as Box<dyn Iterator<Item = Edge> + 'a>
2711        } else {
2712            Box::new(std::iter::empty()) as Box<dyn Iterator<Item = Edge> + 'a>
2713        }
2714    }
2715
2716    // === Zone Map Support ===
2717
2718    /// Checks if a node property predicate might match any nodes.
2719    ///
2720    /// Uses zone maps for early filtering. Returns `true` if there might be
2721    /// matching nodes, `false` if there definitely aren't.
2722    #[must_use]
2723    pub fn node_property_might_match(
2724        &self,
2725        property: &PropertyKey,
2726        op: CompareOp,
2727        value: &Value,
2728    ) -> bool {
2729        self.node_properties.might_match(property, op, value)
2730    }
2731
2732    /// Checks if an edge property predicate might match any edges.
2733    #[must_use]
2734    pub fn edge_property_might_match(
2735        &self,
2736        property: &PropertyKey,
2737        op: CompareOp,
2738        value: &Value,
2739    ) -> bool {
2740        self.edge_properties.might_match(property, op, value)
2741    }
2742
2743    /// Gets the zone map for a node property.
2744    #[must_use]
2745    pub fn node_property_zone_map(&self, property: &PropertyKey) -> Option<ZoneMapEntry> {
2746        self.node_properties.zone_map(property)
2747    }
2748
2749    /// Gets the zone map for an edge property.
2750    #[must_use]
2751    pub fn edge_property_zone_map(&self, property: &PropertyKey) -> Option<ZoneMapEntry> {
2752        self.edge_properties.zone_map(property)
2753    }
2754
2755    /// Rebuilds zone maps for all properties.
2756    pub fn rebuild_zone_maps(&self) {
2757        self.node_properties.rebuild_zone_maps();
2758        self.edge_properties.rebuild_zone_maps();
2759    }
2760
2761    // === Statistics ===
2762
2763    /// Returns the current statistics.
2764    #[must_use]
2765    pub fn statistics(&self) -> Statistics {
2766        self.statistics.read().clone()
2767    }
2768
2769    /// Recomputes statistics if they are stale (i.e., after mutations).
2770    ///
2771    /// Call this before reading statistics for query optimization.
2772    /// Avoids redundant recomputation if no mutations occurred.
2773    pub fn ensure_statistics_fresh(&self) {
2774        if self.needs_stats_recompute.swap(false, Ordering::Relaxed) {
2775            self.compute_statistics();
2776        }
2777    }
2778
2779    /// Recomputes statistics from current data.
2780    ///
2781    /// Scans all labels and edge types to build cardinality estimates for the
2782    /// query optimizer. Call this periodically or after bulk data loads.
2783    #[cfg(not(feature = "tiered-storage"))]
2784    pub fn compute_statistics(&self) {
2785        let mut stats = Statistics::new();
2786
2787        // Compute total counts
2788        stats.total_nodes = self.node_count() as u64;
2789        stats.total_edges = self.edge_count() as u64;
2790
2791        // Compute per-label statistics
2792        let id_to_label = self.id_to_label.read();
2793        let label_index = self.label_index.read();
2794
2795        for (label_id, label_name) in id_to_label.iter().enumerate() {
2796            let node_count = label_index.get(label_id).map_or(0, |set| set.len() as u64);
2797
2798            if node_count > 0 {
2799                // Estimate average degree
2800                let avg_out_degree = if stats.total_nodes > 0 {
2801                    stats.total_edges as f64 / stats.total_nodes as f64
2802                } else {
2803                    0.0
2804                };
2805
2806                let label_stats =
2807                    LabelStatistics::new(node_count).with_degrees(avg_out_degree, avg_out_degree);
2808
2809                stats.update_label(label_name.as_ref(), label_stats);
2810            }
2811        }
2812
2813        // Compute per-edge-type statistics
2814        let id_to_edge_type = self.id_to_edge_type.read();
2815        let edges = self.edges.read();
2816        let epoch = self.current_epoch();
2817
2818        let mut edge_type_counts: FxHashMap<u32, u64> = FxHashMap::default();
2819        for chain in edges.values() {
2820            if let Some(record) = chain.visible_at(epoch)
2821                && !record.is_deleted()
2822            {
2823                *edge_type_counts.entry(record.type_id).or_default() += 1;
2824            }
2825        }
2826
2827        for (type_id, count) in edge_type_counts {
2828            if let Some(type_name) = id_to_edge_type.get(type_id as usize) {
2829                let avg_degree = if stats.total_nodes > 0 {
2830                    count as f64 / stats.total_nodes as f64
2831                } else {
2832                    0.0
2833                };
2834
2835                let edge_stats = EdgeTypeStatistics::new(count, avg_degree, avg_degree);
2836                stats.update_edge_type(type_name.as_ref(), edge_stats);
2837            }
2838        }
2839
2840        *self.statistics.write() = stats;
2841    }
2842
2843    /// Recomputes statistics from current data.
2844    /// (Tiered storage version)
2845    #[cfg(feature = "tiered-storage")]
2846    pub fn compute_statistics(&self) {
2847        let mut stats = Statistics::new();
2848
2849        // Compute total counts
2850        stats.total_nodes = self.node_count() as u64;
2851        stats.total_edges = self.edge_count() as u64;
2852
2853        // Compute per-label statistics
2854        let id_to_label = self.id_to_label.read();
2855        let label_index = self.label_index.read();
2856
2857        for (label_id, label_name) in id_to_label.iter().enumerate() {
2858            let node_count = label_index.get(label_id).map_or(0, |set| set.len() as u64);
2859
2860            if node_count > 0 {
2861                let avg_out_degree = if stats.total_nodes > 0 {
2862                    stats.total_edges as f64 / stats.total_nodes as f64
2863                } else {
2864                    0.0
2865                };
2866
2867                let label_stats =
2868                    LabelStatistics::new(node_count).with_degrees(avg_out_degree, avg_out_degree);
2869
2870                stats.update_label(label_name.as_ref(), label_stats);
2871            }
2872        }
2873
2874        // Compute per-edge-type statistics
2875        let id_to_edge_type = self.id_to_edge_type.read();
2876        let versions = self.edge_versions.read();
2877        let epoch = self.current_epoch();
2878
2879        let mut edge_type_counts: FxHashMap<u32, u64> = FxHashMap::default();
2880        for index in versions.values() {
2881            if let Some(vref) = index.visible_at(epoch)
2882                && let Some(record) = self.read_edge_record(&vref)
2883                && !record.is_deleted()
2884            {
2885                *edge_type_counts.entry(record.type_id).or_default() += 1;
2886            }
2887        }
2888
2889        for (type_id, count) in edge_type_counts {
2890            if let Some(type_name) = id_to_edge_type.get(type_id as usize) {
2891                let avg_degree = if stats.total_nodes > 0 {
2892                    count as f64 / stats.total_nodes as f64
2893                } else {
2894                    0.0
2895                };
2896
2897                let edge_stats = EdgeTypeStatistics::new(count, avg_degree, avg_degree);
2898                stats.update_edge_type(type_name.as_ref(), edge_stats);
2899            }
2900        }
2901
2902        *self.statistics.write() = stats;
2903    }
2904
2905    /// Estimates cardinality for a label scan.
2906    #[must_use]
2907    pub fn estimate_label_cardinality(&self, label: &str) -> f64 {
2908        self.statistics.read().estimate_label_cardinality(label)
2909    }
2910
2911    /// Estimates average degree for an edge type.
2912    #[must_use]
2913    pub fn estimate_avg_degree(&self, edge_type: &str, outgoing: bool) -> f64 {
2914        self.statistics
2915            .read()
2916            .estimate_avg_degree(edge_type, outgoing)
2917    }
2918
2919    // === Internal Helpers ===
2920
2921    fn get_or_create_label_id(&self, label: &str) -> u32 {
2922        {
2923            let label_to_id = self.label_to_id.read();
2924            if let Some(&id) = label_to_id.get(label) {
2925                return id;
2926            }
2927        }
2928
2929        let mut label_to_id = self.label_to_id.write();
2930        let mut id_to_label = self.id_to_label.write();
2931
2932        // Double-check after acquiring write lock
2933        if let Some(&id) = label_to_id.get(label) {
2934            return id;
2935        }
2936
2937        let id = id_to_label.len() as u32;
2938
2939        let label: ArcStr = label.into();
2940        label_to_id.insert(label.clone(), id);
2941        id_to_label.push(label);
2942
2943        id
2944    }
2945
2946    fn get_or_create_edge_type_id(&self, edge_type: &str) -> u32 {
2947        {
2948            let type_to_id = self.edge_type_to_id.read();
2949            if let Some(&id) = type_to_id.get(edge_type) {
2950                return id;
2951            }
2952        }
2953
2954        let mut type_to_id = self.edge_type_to_id.write();
2955        let mut id_to_type = self.id_to_edge_type.write();
2956
2957        // Double-check
2958        if let Some(&id) = type_to_id.get(edge_type) {
2959            return id;
2960        }
2961
2962        let id = id_to_type.len() as u32;
2963        let edge_type: ArcStr = edge_type.into();
2964        type_to_id.insert(edge_type.clone(), id);
2965        id_to_type.push(edge_type);
2966
2967        id
2968    }
2969
2970    // === Recovery Support ===
2971
2972    /// Creates a node with a specific ID during recovery.
2973    ///
2974    /// This is used for WAL recovery to restore nodes with their original IDs.
2975    /// The caller must ensure IDs don't conflict with existing nodes.
2976    #[cfg(not(feature = "tiered-storage"))]
2977    pub fn create_node_with_id(&self, id: NodeId, labels: &[&str]) {
2978        let epoch = self.current_epoch();
2979        let mut record = NodeRecord::new(id, epoch);
2980        record.set_label_count(labels.len() as u16);
2981
2982        // Store labels in node_labels map and label_index
2983        let mut node_label_set = FxHashSet::default();
2984        for label in labels {
2985            let label_id = self.get_or_create_label_id(*label);
2986            node_label_set.insert(label_id);
2987
2988            // Update label index
2989            let mut index = self.label_index.write();
2990            while index.len() <= label_id as usize {
2991                index.push(FxHashMap::default());
2992            }
2993            index[label_id as usize].insert(id, ());
2994        }
2995
2996        // Store node's labels
2997        self.node_labels.write().insert(id, node_label_set);
2998
2999        // Create version chain with initial version (using SYSTEM tx for recovery)
3000        let chain = VersionChain::with_initial(record, epoch, TxId::SYSTEM);
3001        self.nodes.write().insert(id, chain);
3002
3003        // Update next_node_id if necessary to avoid future collisions
3004        let id_val = id.as_u64();
3005        let _ = self
3006            .next_node_id
3007            .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
3008                if id_val >= current {
3009                    Some(id_val + 1)
3010                } else {
3011                    None
3012                }
3013            });
3014    }
3015
3016    /// Creates a node with a specific ID during recovery.
3017    /// (Tiered storage version)
3018    #[cfg(feature = "tiered-storage")]
3019    pub fn create_node_with_id(&self, id: NodeId, labels: &[&str]) {
3020        let epoch = self.current_epoch();
3021        let mut record = NodeRecord::new(id, epoch);
3022        record.set_label_count(labels.len() as u16);
3023
3024        // Store labels in node_labels map and label_index
3025        let mut node_label_set = FxHashSet::default();
3026        for label in labels {
3027            let label_id = self.get_or_create_label_id(*label);
3028            node_label_set.insert(label_id);
3029
3030            // Update label index
3031            let mut index = self.label_index.write();
3032            while index.len() <= label_id as usize {
3033                index.push(FxHashMap::default());
3034            }
3035            index[label_id as usize].insert(id, ());
3036        }
3037
3038        // Store node's labels
3039        self.node_labels.write().insert(id, node_label_set);
3040
3041        // Allocate record in arena and get offset (create epoch if needed)
3042        let arena = self.arena_allocator.arena_or_create(epoch);
3043        let (offset, _stored) = arena.alloc_value_with_offset(record);
3044
3045        // Create HotVersionRef (using SYSTEM tx for recovery)
3046        let hot_ref = HotVersionRef::new(epoch, offset, TxId::SYSTEM);
3047        let mut versions = self.node_versions.write();
3048        versions.insert(id, VersionIndex::with_initial(hot_ref));
3049
3050        // Update next_node_id if necessary to avoid future collisions
3051        let id_val = id.as_u64();
3052        let _ = self
3053            .next_node_id
3054            .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
3055                if id_val >= current {
3056                    Some(id_val + 1)
3057                } else {
3058                    None
3059                }
3060            });
3061    }
3062
3063    /// Creates an edge with a specific ID during recovery.
3064    ///
3065    /// This is used for WAL recovery to restore edges with their original IDs.
3066    #[cfg(not(feature = "tiered-storage"))]
3067    pub fn create_edge_with_id(&self, id: EdgeId, src: NodeId, dst: NodeId, edge_type: &str) {
3068        let epoch = self.current_epoch();
3069        let type_id = self.get_or_create_edge_type_id(edge_type);
3070
3071        let record = EdgeRecord::new(id, src, dst, type_id, epoch);
3072        let chain = VersionChain::with_initial(record, epoch, TxId::SYSTEM);
3073        self.edges.write().insert(id, chain);
3074
3075        // Update adjacency
3076        self.forward_adj.add_edge(src, dst, id);
3077        if let Some(ref backward) = self.backward_adj {
3078            backward.add_edge(dst, src, id);
3079        }
3080
3081        // Update next_edge_id if necessary
3082        let id_val = id.as_u64();
3083        let _ = self
3084            .next_edge_id
3085            .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
3086                if id_val >= current {
3087                    Some(id_val + 1)
3088                } else {
3089                    None
3090                }
3091            });
3092    }
3093
3094    /// Creates an edge with a specific ID during recovery.
3095    /// (Tiered storage version)
3096    #[cfg(feature = "tiered-storage")]
3097    pub fn create_edge_with_id(&self, id: EdgeId, src: NodeId, dst: NodeId, edge_type: &str) {
3098        let epoch = self.current_epoch();
3099        let type_id = self.get_or_create_edge_type_id(edge_type);
3100
3101        let record = EdgeRecord::new(id, src, dst, type_id, epoch);
3102
3103        // Allocate record in arena and get offset (create epoch if needed)
3104        let arena = self.arena_allocator.arena_or_create(epoch);
3105        let (offset, _stored) = arena.alloc_value_with_offset(record);
3106
3107        // Create HotVersionRef (using SYSTEM tx for recovery)
3108        let hot_ref = HotVersionRef::new(epoch, offset, TxId::SYSTEM);
3109        let mut versions = self.edge_versions.write();
3110        versions.insert(id, VersionIndex::with_initial(hot_ref));
3111
3112        // Update adjacency
3113        self.forward_adj.add_edge(src, dst, id);
3114        if let Some(ref backward) = self.backward_adj {
3115            backward.add_edge(dst, src, id);
3116        }
3117
3118        // Update next_edge_id if necessary
3119        let id_val = id.as_u64();
3120        let _ = self
3121            .next_edge_id
3122            .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
3123                if id_val >= current {
3124                    Some(id_val + 1)
3125                } else {
3126                    None
3127                }
3128            });
3129    }
3130
3131    /// Sets the current epoch during recovery.
3132    pub fn set_epoch(&self, epoch: EpochId) {
3133        self.current_epoch.store(epoch.as_u64(), Ordering::SeqCst);
3134    }
3135}
3136
3137impl Default for LpgStore {
3138    fn default() -> Self {
3139        Self::new()
3140    }
3141}
3142
3143#[cfg(test)]
3144mod tests {
3145    use super::*;
3146
3147    #[test]
3148    fn test_create_node() {
3149        let store = LpgStore::new();
3150
3151        let id = store.create_node(&["Person"]);
3152        assert!(id.is_valid());
3153
3154        let node = store.get_node(id).unwrap();
3155        assert!(node.has_label("Person"));
3156        assert!(!node.has_label("Animal"));
3157    }
3158
3159    #[test]
3160    fn test_create_node_with_props() {
3161        let store = LpgStore::new();
3162
3163        let id = store.create_node_with_props(
3164            &["Person"],
3165            [("name", Value::from("Alice")), ("age", Value::from(30i64))],
3166        );
3167
3168        let node = store.get_node(id).unwrap();
3169        assert_eq!(
3170            node.get_property("name").and_then(|v| v.as_str()),
3171            Some("Alice")
3172        );
3173        assert_eq!(
3174            node.get_property("age").and_then(|v| v.as_int64()),
3175            Some(30)
3176        );
3177    }
3178
3179    #[test]
3180    fn test_delete_node() {
3181        let store = LpgStore::new();
3182
3183        let id = store.create_node(&["Person"]);
3184        assert_eq!(store.node_count(), 1);
3185
3186        assert!(store.delete_node(id));
3187        assert_eq!(store.node_count(), 0);
3188        assert!(store.get_node(id).is_none());
3189
3190        // Double delete should return false
3191        assert!(!store.delete_node(id));
3192    }
3193
3194    #[test]
3195    fn test_create_edge() {
3196        let store = LpgStore::new();
3197
3198        let alice = store.create_node(&["Person"]);
3199        let bob = store.create_node(&["Person"]);
3200
3201        let edge_id = store.create_edge(alice, bob, "KNOWS");
3202        assert!(edge_id.is_valid());
3203
3204        let edge = store.get_edge(edge_id).unwrap();
3205        assert_eq!(edge.src, alice);
3206        assert_eq!(edge.dst, bob);
3207        assert_eq!(edge.edge_type.as_str(), "KNOWS");
3208    }
3209
3210    #[test]
3211    fn test_neighbors() {
3212        let store = LpgStore::new();
3213
3214        let a = store.create_node(&["Person"]);
3215        let b = store.create_node(&["Person"]);
3216        let c = store.create_node(&["Person"]);
3217
3218        store.create_edge(a, b, "KNOWS");
3219        store.create_edge(a, c, "KNOWS");
3220
3221        let outgoing: Vec<_> = store.neighbors(a, Direction::Outgoing).collect();
3222        assert_eq!(outgoing.len(), 2);
3223        assert!(outgoing.contains(&b));
3224        assert!(outgoing.contains(&c));
3225
3226        let incoming: Vec<_> = store.neighbors(b, Direction::Incoming).collect();
3227        assert_eq!(incoming.len(), 1);
3228        assert!(incoming.contains(&a));
3229    }
3230
3231    #[test]
3232    fn test_nodes_by_label() {
3233        let store = LpgStore::new();
3234
3235        let p1 = store.create_node(&["Person"]);
3236        let p2 = store.create_node(&["Person"]);
3237        let _a = store.create_node(&["Animal"]);
3238
3239        let persons = store.nodes_by_label("Person");
3240        assert_eq!(persons.len(), 2);
3241        assert!(persons.contains(&p1));
3242        assert!(persons.contains(&p2));
3243
3244        let animals = store.nodes_by_label("Animal");
3245        assert_eq!(animals.len(), 1);
3246    }
3247
3248    #[test]
3249    fn test_delete_edge() {
3250        let store = LpgStore::new();
3251
3252        let a = store.create_node(&["Person"]);
3253        let b = store.create_node(&["Person"]);
3254        let edge_id = store.create_edge(a, b, "KNOWS");
3255
3256        assert_eq!(store.edge_count(), 1);
3257
3258        assert!(store.delete_edge(edge_id));
3259        assert_eq!(store.edge_count(), 0);
3260        assert!(store.get_edge(edge_id).is_none());
3261    }
3262
3263    // === New tests for improved coverage ===
3264
3265    #[test]
3266    fn test_lpg_store_config() {
3267        // Test with_config
3268        let config = LpgStoreConfig {
3269            backward_edges: false,
3270            initial_node_capacity: 100,
3271            initial_edge_capacity: 200,
3272        };
3273        let store = LpgStore::with_config(config);
3274
3275        // Store should work but without backward adjacency
3276        let a = store.create_node(&["Person"]);
3277        let b = store.create_node(&["Person"]);
3278        store.create_edge(a, b, "KNOWS");
3279
3280        // Outgoing should work
3281        let outgoing: Vec<_> = store.neighbors(a, Direction::Outgoing).collect();
3282        assert_eq!(outgoing.len(), 1);
3283
3284        // Incoming should be empty (no backward adjacency)
3285        let incoming: Vec<_> = store.neighbors(b, Direction::Incoming).collect();
3286        assert_eq!(incoming.len(), 0);
3287    }
3288
3289    #[test]
3290    fn test_epoch_management() {
3291        let store = LpgStore::new();
3292
3293        let epoch0 = store.current_epoch();
3294        assert_eq!(epoch0.as_u64(), 0);
3295
3296        let epoch1 = store.new_epoch();
3297        assert_eq!(epoch1.as_u64(), 1);
3298
3299        let current = store.current_epoch();
3300        assert_eq!(current.as_u64(), 1);
3301    }
3302
3303    #[test]
3304    fn test_node_properties() {
3305        let store = LpgStore::new();
3306        let id = store.create_node(&["Person"]);
3307
3308        // Set and get property
3309        store.set_node_property(id, "name", Value::from("Alice"));
3310        let name = store.get_node_property(id, &"name".into());
3311        assert!(matches!(name, Some(Value::String(s)) if s.as_str() == "Alice"));
3312
3313        // Update property
3314        store.set_node_property(id, "name", Value::from("Bob"));
3315        let name = store.get_node_property(id, &"name".into());
3316        assert!(matches!(name, Some(Value::String(s)) if s.as_str() == "Bob"));
3317
3318        // Remove property
3319        let old = store.remove_node_property(id, "name");
3320        assert!(matches!(old, Some(Value::String(s)) if s.as_str() == "Bob"));
3321
3322        // Property should be gone
3323        let name = store.get_node_property(id, &"name".into());
3324        assert!(name.is_none());
3325
3326        // Remove non-existent property
3327        let none = store.remove_node_property(id, "nonexistent");
3328        assert!(none.is_none());
3329    }
3330
3331    #[test]
3332    fn test_edge_properties() {
3333        let store = LpgStore::new();
3334        let a = store.create_node(&["Person"]);
3335        let b = store.create_node(&["Person"]);
3336        let edge_id = store.create_edge(a, b, "KNOWS");
3337
3338        // Set and get property
3339        store.set_edge_property(edge_id, "since", Value::from(2020i64));
3340        let since = store.get_edge_property(edge_id, &"since".into());
3341        assert_eq!(since.and_then(|v| v.as_int64()), Some(2020));
3342
3343        // Remove property
3344        let old = store.remove_edge_property(edge_id, "since");
3345        assert_eq!(old.and_then(|v| v.as_int64()), Some(2020));
3346
3347        let since = store.get_edge_property(edge_id, &"since".into());
3348        assert!(since.is_none());
3349    }
3350
3351    #[test]
3352    fn test_add_remove_label() {
3353        let store = LpgStore::new();
3354        let id = store.create_node(&["Person"]);
3355
3356        // Add new label
3357        assert!(store.add_label(id, "Employee"));
3358
3359        let node = store.get_node(id).unwrap();
3360        assert!(node.has_label("Person"));
3361        assert!(node.has_label("Employee"));
3362
3363        // Adding same label again should fail
3364        assert!(!store.add_label(id, "Employee"));
3365
3366        // Remove label
3367        assert!(store.remove_label(id, "Employee"));
3368
3369        let node = store.get_node(id).unwrap();
3370        assert!(node.has_label("Person"));
3371        assert!(!node.has_label("Employee"));
3372
3373        // Removing non-existent label should fail
3374        assert!(!store.remove_label(id, "Employee"));
3375        assert!(!store.remove_label(id, "NonExistent"));
3376    }
3377
3378    #[test]
3379    fn test_add_label_to_nonexistent_node() {
3380        let store = LpgStore::new();
3381        let fake_id = NodeId::new(999);
3382        assert!(!store.add_label(fake_id, "Label"));
3383    }
3384
3385    #[test]
3386    fn test_remove_label_from_nonexistent_node() {
3387        let store = LpgStore::new();
3388        let fake_id = NodeId::new(999);
3389        assert!(!store.remove_label(fake_id, "Label"));
3390    }
3391
3392    #[test]
3393    fn test_node_ids() {
3394        let store = LpgStore::new();
3395
3396        let n1 = store.create_node(&["Person"]);
3397        let n2 = store.create_node(&["Person"]);
3398        let n3 = store.create_node(&["Person"]);
3399
3400        let ids = store.node_ids();
3401        assert_eq!(ids.len(), 3);
3402        assert!(ids.contains(&n1));
3403        assert!(ids.contains(&n2));
3404        assert!(ids.contains(&n3));
3405
3406        // Delete one
3407        store.delete_node(n2);
3408        let ids = store.node_ids();
3409        assert_eq!(ids.len(), 2);
3410        assert!(!ids.contains(&n2));
3411    }
3412
3413    #[test]
3414    fn test_delete_node_nonexistent() {
3415        let store = LpgStore::new();
3416        let fake_id = NodeId::new(999);
3417        assert!(!store.delete_node(fake_id));
3418    }
3419
3420    #[test]
3421    fn test_delete_edge_nonexistent() {
3422        let store = LpgStore::new();
3423        let fake_id = EdgeId::new(999);
3424        assert!(!store.delete_edge(fake_id));
3425    }
3426
3427    #[test]
3428    fn test_delete_edge_double() {
3429        let store = LpgStore::new();
3430        let a = store.create_node(&["Person"]);
3431        let b = store.create_node(&["Person"]);
3432        let edge_id = store.create_edge(a, b, "KNOWS");
3433
3434        assert!(store.delete_edge(edge_id));
3435        assert!(!store.delete_edge(edge_id)); // Double delete
3436    }
3437
3438    #[test]
3439    fn test_create_edge_with_props() {
3440        let store = LpgStore::new();
3441        let a = store.create_node(&["Person"]);
3442        let b = store.create_node(&["Person"]);
3443
3444        let edge_id = store.create_edge_with_props(
3445            a,
3446            b,
3447            "KNOWS",
3448            [
3449                ("since", Value::from(2020i64)),
3450                ("weight", Value::from(1.0)),
3451            ],
3452        );
3453
3454        let edge = store.get_edge(edge_id).unwrap();
3455        assert_eq!(
3456            edge.get_property("since").and_then(|v| v.as_int64()),
3457            Some(2020)
3458        );
3459        assert_eq!(
3460            edge.get_property("weight").and_then(|v| v.as_float64()),
3461            Some(1.0)
3462        );
3463    }
3464
3465    #[test]
3466    fn test_delete_node_edges() {
3467        let store = LpgStore::new();
3468
3469        let a = store.create_node(&["Person"]);
3470        let b = store.create_node(&["Person"]);
3471        let c = store.create_node(&["Person"]);
3472
3473        store.create_edge(a, b, "KNOWS"); // a -> b
3474        store.create_edge(c, a, "KNOWS"); // c -> a
3475
3476        assert_eq!(store.edge_count(), 2);
3477
3478        // Delete all edges connected to a
3479        store.delete_node_edges(a);
3480
3481        assert_eq!(store.edge_count(), 0);
3482    }
3483
3484    #[test]
3485    fn test_neighbors_both_directions() {
3486        let store = LpgStore::new();
3487
3488        let a = store.create_node(&["Person"]);
3489        let b = store.create_node(&["Person"]);
3490        let c = store.create_node(&["Person"]);
3491
3492        store.create_edge(a, b, "KNOWS"); // a -> b
3493        store.create_edge(c, a, "KNOWS"); // c -> a
3494
3495        // Direction::Both for node a
3496        let neighbors: Vec<_> = store.neighbors(a, Direction::Both).collect();
3497        assert_eq!(neighbors.len(), 2);
3498        assert!(neighbors.contains(&b)); // outgoing
3499        assert!(neighbors.contains(&c)); // incoming
3500    }
3501
3502    #[test]
3503    fn test_edges_from() {
3504        let store = LpgStore::new();
3505
3506        let a = store.create_node(&["Person"]);
3507        let b = store.create_node(&["Person"]);
3508        let c = store.create_node(&["Person"]);
3509
3510        let e1 = store.create_edge(a, b, "KNOWS");
3511        let e2 = store.create_edge(a, c, "KNOWS");
3512
3513        let edges: Vec<_> = store.edges_from(a, Direction::Outgoing).collect();
3514        assert_eq!(edges.len(), 2);
3515        assert!(edges.iter().any(|(_, e)| *e == e1));
3516        assert!(edges.iter().any(|(_, e)| *e == e2));
3517
3518        // Incoming edges to b
3519        let incoming: Vec<_> = store.edges_from(b, Direction::Incoming).collect();
3520        assert_eq!(incoming.len(), 1);
3521        assert_eq!(incoming[0].1, e1);
3522    }
3523
3524    #[test]
3525    fn test_edges_to() {
3526        let store = LpgStore::new();
3527
3528        let a = store.create_node(&["Person"]);
3529        let b = store.create_node(&["Person"]);
3530        let c = store.create_node(&["Person"]);
3531
3532        let e1 = store.create_edge(a, b, "KNOWS");
3533        let e2 = store.create_edge(c, b, "KNOWS");
3534
3535        // Edges pointing TO b
3536        let to_b = store.edges_to(b);
3537        assert_eq!(to_b.len(), 2);
3538        assert!(to_b.iter().any(|(src, e)| *src == a && *e == e1));
3539        assert!(to_b.iter().any(|(src, e)| *src == c && *e == e2));
3540    }
3541
3542    #[test]
3543    fn test_out_degree_in_degree() {
3544        let store = LpgStore::new();
3545
3546        let a = store.create_node(&["Person"]);
3547        let b = store.create_node(&["Person"]);
3548        let c = store.create_node(&["Person"]);
3549
3550        store.create_edge(a, b, "KNOWS");
3551        store.create_edge(a, c, "KNOWS");
3552        store.create_edge(c, b, "KNOWS");
3553
3554        assert_eq!(store.out_degree(a), 2);
3555        assert_eq!(store.out_degree(b), 0);
3556        assert_eq!(store.out_degree(c), 1);
3557
3558        assert_eq!(store.in_degree(a), 0);
3559        assert_eq!(store.in_degree(b), 2);
3560        assert_eq!(store.in_degree(c), 1);
3561    }
3562
3563    #[test]
3564    fn test_edge_type() {
3565        let store = LpgStore::new();
3566
3567        let a = store.create_node(&["Person"]);
3568        let b = store.create_node(&["Person"]);
3569        let edge_id = store.create_edge(a, b, "KNOWS");
3570
3571        let edge_type = store.edge_type(edge_id);
3572        assert_eq!(edge_type.as_deref(), Some("KNOWS"));
3573
3574        // Non-existent edge
3575        let fake_id = EdgeId::new(999);
3576        assert!(store.edge_type(fake_id).is_none());
3577    }
3578
3579    #[test]
3580    fn test_count_methods() {
3581        let store = LpgStore::new();
3582
3583        assert_eq!(store.label_count(), 0);
3584        assert_eq!(store.edge_type_count(), 0);
3585        assert_eq!(store.property_key_count(), 0);
3586
3587        let a = store.create_node_with_props(&["Person"], [("age", Value::from(30i64))]);
3588        let b = store.create_node(&["Company"]);
3589        store.create_edge_with_props(a, b, "WORKS_AT", [("since", Value::from(2020i64))]);
3590
3591        assert_eq!(store.label_count(), 2); // Person, Company
3592        assert_eq!(store.edge_type_count(), 1); // WORKS_AT
3593        assert_eq!(store.property_key_count(), 2); // age, since
3594    }
3595
3596    #[test]
3597    fn test_all_nodes_and_edges() {
3598        let store = LpgStore::new();
3599
3600        let a = store.create_node(&["Person"]);
3601        let b = store.create_node(&["Person"]);
3602        store.create_edge(a, b, "KNOWS");
3603
3604        let nodes: Vec<_> = store.all_nodes().collect();
3605        assert_eq!(nodes.len(), 2);
3606
3607        let edges: Vec<_> = store.all_edges().collect();
3608        assert_eq!(edges.len(), 1);
3609    }
3610
3611    #[test]
3612    fn test_all_labels_and_edge_types() {
3613        let store = LpgStore::new();
3614
3615        store.create_node(&["Person"]);
3616        store.create_node(&["Company"]);
3617        let a = store.create_node(&["Animal"]);
3618        let b = store.create_node(&["Animal"]);
3619        store.create_edge(a, b, "EATS");
3620
3621        let labels = store.all_labels();
3622        assert_eq!(labels.len(), 3);
3623        assert!(labels.contains(&"Person".to_string()));
3624        assert!(labels.contains(&"Company".to_string()));
3625        assert!(labels.contains(&"Animal".to_string()));
3626
3627        let edge_types = store.all_edge_types();
3628        assert_eq!(edge_types.len(), 1);
3629        assert!(edge_types.contains(&"EATS".to_string()));
3630    }
3631
3632    #[test]
3633    fn test_all_property_keys() {
3634        let store = LpgStore::new();
3635
3636        let a = store.create_node_with_props(&["Person"], [("name", Value::from("Alice"))]);
3637        let b = store.create_node_with_props(&["Person"], [("age", Value::from(30i64))]);
3638        store.create_edge_with_props(a, b, "KNOWS", [("since", Value::from(2020i64))]);
3639
3640        let keys = store.all_property_keys();
3641        assert!(keys.contains(&"name".to_string()));
3642        assert!(keys.contains(&"age".to_string()));
3643        assert!(keys.contains(&"since".to_string()));
3644    }
3645
3646    #[test]
3647    fn test_nodes_with_label() {
3648        let store = LpgStore::new();
3649
3650        store.create_node(&["Person"]);
3651        store.create_node(&["Person"]);
3652        store.create_node(&["Company"]);
3653
3654        let persons: Vec<_> = store.nodes_with_label("Person").collect();
3655        assert_eq!(persons.len(), 2);
3656
3657        let companies: Vec<_> = store.nodes_with_label("Company").collect();
3658        assert_eq!(companies.len(), 1);
3659
3660        let none: Vec<_> = store.nodes_with_label("NonExistent").collect();
3661        assert_eq!(none.len(), 0);
3662    }
3663
3664    #[test]
3665    fn test_edges_with_type() {
3666        let store = LpgStore::new();
3667
3668        let a = store.create_node(&["Person"]);
3669        let b = store.create_node(&["Person"]);
3670        let c = store.create_node(&["Company"]);
3671
3672        store.create_edge(a, b, "KNOWS");
3673        store.create_edge(a, c, "WORKS_AT");
3674
3675        let knows: Vec<_> = store.edges_with_type("KNOWS").collect();
3676        assert_eq!(knows.len(), 1);
3677
3678        let works_at: Vec<_> = store.edges_with_type("WORKS_AT").collect();
3679        assert_eq!(works_at.len(), 1);
3680
3681        let none: Vec<_> = store.edges_with_type("NonExistent").collect();
3682        assert_eq!(none.len(), 0);
3683    }
3684
3685    #[test]
3686    fn test_nodes_by_label_nonexistent() {
3687        let store = LpgStore::new();
3688        store.create_node(&["Person"]);
3689
3690        let empty = store.nodes_by_label("NonExistent");
3691        assert!(empty.is_empty());
3692    }
3693
3694    #[test]
3695    fn test_statistics() {
3696        let store = LpgStore::new();
3697
3698        let a = store.create_node(&["Person"]);
3699        let b = store.create_node(&["Person"]);
3700        let c = store.create_node(&["Company"]);
3701
3702        store.create_edge(a, b, "KNOWS");
3703        store.create_edge(a, c, "WORKS_AT");
3704
3705        store.compute_statistics();
3706        let stats = store.statistics();
3707
3708        assert_eq!(stats.total_nodes, 3);
3709        assert_eq!(stats.total_edges, 2);
3710
3711        // Estimates
3712        let person_card = store.estimate_label_cardinality("Person");
3713        assert!(person_card > 0.0);
3714
3715        let avg_degree = store.estimate_avg_degree("KNOWS", true);
3716        assert!(avg_degree >= 0.0);
3717    }
3718
3719    #[test]
3720    fn test_zone_maps() {
3721        let store = LpgStore::new();
3722
3723        store.create_node_with_props(&["Person"], [("age", Value::from(25i64))]);
3724        store.create_node_with_props(&["Person"], [("age", Value::from(35i64))]);
3725
3726        // Zone map should indicate possible matches (30 is within [25, 35] range)
3727        let might_match =
3728            store.node_property_might_match(&"age".into(), CompareOp::Eq, &Value::from(30i64));
3729        // Zone maps return true conservatively when value is within min/max range
3730        assert!(might_match);
3731
3732        let zone = store.node_property_zone_map(&"age".into());
3733        assert!(zone.is_some());
3734
3735        // Non-existent property
3736        let no_zone = store.node_property_zone_map(&"nonexistent".into());
3737        assert!(no_zone.is_none());
3738
3739        // Edge zone maps
3740        let a = store.create_node(&["A"]);
3741        let b = store.create_node(&["B"]);
3742        store.create_edge_with_props(a, b, "REL", [("weight", Value::from(1.0))]);
3743
3744        let edge_zone = store.edge_property_zone_map(&"weight".into());
3745        assert!(edge_zone.is_some());
3746    }
3747
3748    #[test]
3749    fn test_rebuild_zone_maps() {
3750        let store = LpgStore::new();
3751        store.create_node_with_props(&["Person"], [("age", Value::from(25i64))]);
3752
3753        // Should not panic
3754        store.rebuild_zone_maps();
3755    }
3756
3757    #[test]
3758    fn test_create_node_with_id() {
3759        let store = LpgStore::new();
3760
3761        let specific_id = NodeId::new(100);
3762        store.create_node_with_id(specific_id, &["Person", "Employee"]);
3763
3764        let node = store.get_node(specific_id).unwrap();
3765        assert!(node.has_label("Person"));
3766        assert!(node.has_label("Employee"));
3767
3768        // Next auto-generated ID should be > 100
3769        let next = store.create_node(&["Other"]);
3770        assert!(next.as_u64() > 100);
3771    }
3772
3773    #[test]
3774    fn test_create_edge_with_id() {
3775        let store = LpgStore::new();
3776
3777        let a = store.create_node(&["A"]);
3778        let b = store.create_node(&["B"]);
3779
3780        let specific_id = EdgeId::new(500);
3781        store.create_edge_with_id(specific_id, a, b, "REL");
3782
3783        let edge = store.get_edge(specific_id).unwrap();
3784        assert_eq!(edge.src, a);
3785        assert_eq!(edge.dst, b);
3786        assert_eq!(edge.edge_type.as_str(), "REL");
3787
3788        // Next auto-generated ID should be > 500
3789        let next = store.create_edge(a, b, "OTHER");
3790        assert!(next.as_u64() > 500);
3791    }
3792
3793    #[test]
3794    fn test_set_epoch() {
3795        let store = LpgStore::new();
3796
3797        assert_eq!(store.current_epoch().as_u64(), 0);
3798
3799        store.set_epoch(EpochId::new(42));
3800        assert_eq!(store.current_epoch().as_u64(), 42);
3801    }
3802
3803    #[test]
3804    fn test_get_node_nonexistent() {
3805        let store = LpgStore::new();
3806        let fake_id = NodeId::new(999);
3807        assert!(store.get_node(fake_id).is_none());
3808    }
3809
3810    #[test]
3811    fn test_get_edge_nonexistent() {
3812        let store = LpgStore::new();
3813        let fake_id = EdgeId::new(999);
3814        assert!(store.get_edge(fake_id).is_none());
3815    }
3816
3817    #[test]
3818    fn test_multiple_labels() {
3819        let store = LpgStore::new();
3820
3821        let id = store.create_node(&["Person", "Employee", "Manager"]);
3822        let node = store.get_node(id).unwrap();
3823
3824        assert!(node.has_label("Person"));
3825        assert!(node.has_label("Employee"));
3826        assert!(node.has_label("Manager"));
3827        assert!(!node.has_label("Other"));
3828    }
3829
3830    #[test]
3831    fn test_default_impl() {
3832        let store: LpgStore = Default::default();
3833        assert_eq!(store.node_count(), 0);
3834        assert_eq!(store.edge_count(), 0);
3835    }
3836
3837    #[test]
3838    fn test_edges_from_both_directions() {
3839        let store = LpgStore::new();
3840
3841        let a = store.create_node(&["A"]);
3842        let b = store.create_node(&["B"]);
3843        let c = store.create_node(&["C"]);
3844
3845        let e1 = store.create_edge(a, b, "R1"); // a -> b
3846        let e2 = store.create_edge(c, a, "R2"); // c -> a
3847
3848        // Both directions from a
3849        let edges: Vec<_> = store.edges_from(a, Direction::Both).collect();
3850        assert_eq!(edges.len(), 2);
3851        assert!(edges.iter().any(|(_, e)| *e == e1)); // outgoing
3852        assert!(edges.iter().any(|(_, e)| *e == e2)); // incoming
3853    }
3854
3855    #[test]
3856    fn test_no_backward_adj_in_degree() {
3857        let config = LpgStoreConfig {
3858            backward_edges: false,
3859            initial_node_capacity: 10,
3860            initial_edge_capacity: 10,
3861        };
3862        let store = LpgStore::with_config(config);
3863
3864        let a = store.create_node(&["A"]);
3865        let b = store.create_node(&["B"]);
3866        store.create_edge(a, b, "R");
3867
3868        // in_degree should still work (falls back to scanning)
3869        let degree = store.in_degree(b);
3870        assert_eq!(degree, 1);
3871    }
3872
3873    #[test]
3874    fn test_no_backward_adj_edges_to() {
3875        let config = LpgStoreConfig {
3876            backward_edges: false,
3877            initial_node_capacity: 10,
3878            initial_edge_capacity: 10,
3879        };
3880        let store = LpgStore::with_config(config);
3881
3882        let a = store.create_node(&["A"]);
3883        let b = store.create_node(&["B"]);
3884        let e = store.create_edge(a, b, "R");
3885
3886        // edges_to should still work (falls back to scanning)
3887        let edges = store.edges_to(b);
3888        assert_eq!(edges.len(), 1);
3889        assert_eq!(edges[0].1, e);
3890    }
3891
3892    #[test]
3893    fn test_node_versioned_creation() {
3894        let store = LpgStore::new();
3895
3896        let epoch = store.new_epoch();
3897        let tx_id = TxId::new(1);
3898
3899        let id = store.create_node_versioned(&["Person"], epoch, tx_id);
3900        assert!(store.get_node(id).is_some());
3901    }
3902
3903    #[test]
3904    fn test_edge_versioned_creation() {
3905        let store = LpgStore::new();
3906
3907        let a = store.create_node(&["A"]);
3908        let b = store.create_node(&["B"]);
3909
3910        let epoch = store.new_epoch();
3911        let tx_id = TxId::new(1);
3912
3913        let edge_id = store.create_edge_versioned(a, b, "REL", epoch, tx_id);
3914        assert!(store.get_edge(edge_id).is_some());
3915    }
3916
3917    #[test]
3918    fn test_node_with_props_versioned() {
3919        let store = LpgStore::new();
3920
3921        let epoch = store.new_epoch();
3922        let tx_id = TxId::new(1);
3923
3924        let id = store.create_node_with_props_versioned(
3925            &["Person"],
3926            [("name", Value::from("Alice"))],
3927            epoch,
3928            tx_id,
3929        );
3930
3931        let node = store.get_node(id).unwrap();
3932        assert_eq!(
3933            node.get_property("name").and_then(|v| v.as_str()),
3934            Some("Alice")
3935        );
3936    }
3937
3938    #[test]
3939    fn test_discard_uncommitted_versions() {
3940        let store = LpgStore::new();
3941
3942        let epoch = store.new_epoch();
3943        let tx_id = TxId::new(42);
3944
3945        // Create node with specific tx
3946        let node_id = store.create_node_versioned(&["Person"], epoch, tx_id);
3947        assert!(store.get_node(node_id).is_some());
3948
3949        // Discard uncommitted versions for this tx
3950        store.discard_uncommitted_versions(tx_id);
3951
3952        // Node should be gone (version chain was removed)
3953        assert!(store.get_node(node_id).is_none());
3954    }
3955
3956    // === Property Index Tests ===
3957
3958    #[test]
3959    fn test_property_index_create_and_lookup() {
3960        let store = LpgStore::new();
3961
3962        // Create nodes with properties
3963        let alice = store.create_node(&["Person"]);
3964        let bob = store.create_node(&["Person"]);
3965        let charlie = store.create_node(&["Person"]);
3966
3967        store.set_node_property(alice, "city", Value::from("NYC"));
3968        store.set_node_property(bob, "city", Value::from("NYC"));
3969        store.set_node_property(charlie, "city", Value::from("LA"));
3970
3971        // Before indexing, lookup still works (via scan)
3972        let nyc_people = store.find_nodes_by_property("city", &Value::from("NYC"));
3973        assert_eq!(nyc_people.len(), 2);
3974
3975        // Create index
3976        store.create_property_index("city");
3977        assert!(store.has_property_index("city"));
3978
3979        // Indexed lookup should return same results
3980        let nyc_people = store.find_nodes_by_property("city", &Value::from("NYC"));
3981        assert_eq!(nyc_people.len(), 2);
3982        assert!(nyc_people.contains(&alice));
3983        assert!(nyc_people.contains(&bob));
3984
3985        let la_people = store.find_nodes_by_property("city", &Value::from("LA"));
3986        assert_eq!(la_people.len(), 1);
3987        assert!(la_people.contains(&charlie));
3988    }
3989
3990    #[test]
3991    fn test_property_index_maintained_on_update() {
3992        let store = LpgStore::new();
3993
3994        // Create index first
3995        store.create_property_index("status");
3996
3997        let node = store.create_node(&["Task"]);
3998        store.set_node_property(node, "status", Value::from("pending"));
3999
4000        // Should find by initial value
4001        let pending = store.find_nodes_by_property("status", &Value::from("pending"));
4002        assert_eq!(pending.len(), 1);
4003        assert!(pending.contains(&node));
4004
4005        // Update the property
4006        store.set_node_property(node, "status", Value::from("done"));
4007
4008        // Old value should not find it
4009        let pending = store.find_nodes_by_property("status", &Value::from("pending"));
4010        assert!(pending.is_empty());
4011
4012        // New value should find it
4013        let done = store.find_nodes_by_property("status", &Value::from("done"));
4014        assert_eq!(done.len(), 1);
4015        assert!(done.contains(&node));
4016    }
4017
4018    #[test]
4019    fn test_property_index_maintained_on_remove() {
4020        let store = LpgStore::new();
4021
4022        store.create_property_index("tag");
4023
4024        let node = store.create_node(&["Item"]);
4025        store.set_node_property(node, "tag", Value::from("important"));
4026
4027        // Should find it
4028        let found = store.find_nodes_by_property("tag", &Value::from("important"));
4029        assert_eq!(found.len(), 1);
4030
4031        // Remove the property
4032        store.remove_node_property(node, "tag");
4033
4034        // Should no longer find it
4035        let found = store.find_nodes_by_property("tag", &Value::from("important"));
4036        assert!(found.is_empty());
4037    }
4038
4039    #[test]
4040    fn test_property_index_drop() {
4041        let store = LpgStore::new();
4042
4043        store.create_property_index("key");
4044        assert!(store.has_property_index("key"));
4045
4046        assert!(store.drop_property_index("key"));
4047        assert!(!store.has_property_index("key"));
4048
4049        // Dropping non-existent index returns false
4050        assert!(!store.drop_property_index("key"));
4051    }
4052
4053    #[test]
4054    fn test_property_index_multiple_values() {
4055        let store = LpgStore::new();
4056
4057        store.create_property_index("age");
4058
4059        // Create multiple nodes with same and different ages
4060        let n1 = store.create_node(&["Person"]);
4061        let n2 = store.create_node(&["Person"]);
4062        let n3 = store.create_node(&["Person"]);
4063        let n4 = store.create_node(&["Person"]);
4064
4065        store.set_node_property(n1, "age", Value::from(25i64));
4066        store.set_node_property(n2, "age", Value::from(25i64));
4067        store.set_node_property(n3, "age", Value::from(30i64));
4068        store.set_node_property(n4, "age", Value::from(25i64));
4069
4070        let age_25 = store.find_nodes_by_property("age", &Value::from(25i64));
4071        assert_eq!(age_25.len(), 3);
4072
4073        let age_30 = store.find_nodes_by_property("age", &Value::from(30i64));
4074        assert_eq!(age_30.len(), 1);
4075
4076        let age_40 = store.find_nodes_by_property("age", &Value::from(40i64));
4077        assert!(age_40.is_empty());
4078    }
4079
4080    #[test]
4081    fn test_property_index_builds_from_existing_data() {
4082        let store = LpgStore::new();
4083
4084        // Create nodes first
4085        let n1 = store.create_node(&["Person"]);
4086        let n2 = store.create_node(&["Person"]);
4087        store.set_node_property(n1, "email", Value::from("alice@example.com"));
4088        store.set_node_property(n2, "email", Value::from("bob@example.com"));
4089
4090        // Create index after data exists
4091        store.create_property_index("email");
4092
4093        // Index should include existing data
4094        let alice = store.find_nodes_by_property("email", &Value::from("alice@example.com"));
4095        assert_eq!(alice.len(), 1);
4096        assert!(alice.contains(&n1));
4097
4098        let bob = store.find_nodes_by_property("email", &Value::from("bob@example.com"));
4099        assert_eq!(bob.len(), 1);
4100        assert!(bob.contains(&n2));
4101    }
4102
4103    #[test]
4104    fn test_get_node_property_batch() {
4105        let store = LpgStore::new();
4106
4107        let n1 = store.create_node(&["Person"]);
4108        let n2 = store.create_node(&["Person"]);
4109        let n3 = store.create_node(&["Person"]);
4110
4111        store.set_node_property(n1, "age", Value::from(25i64));
4112        store.set_node_property(n2, "age", Value::from(30i64));
4113        // n3 has no age property
4114
4115        let age_key = PropertyKey::new("age");
4116        let values = store.get_node_property_batch(&[n1, n2, n3], &age_key);
4117
4118        assert_eq!(values.len(), 3);
4119        assert_eq!(values[0], Some(Value::from(25i64)));
4120        assert_eq!(values[1], Some(Value::from(30i64)));
4121        assert_eq!(values[2], None);
4122    }
4123
4124    #[test]
4125    fn test_get_node_property_batch_empty() {
4126        let store = LpgStore::new();
4127        let key = PropertyKey::new("any");
4128
4129        let values = store.get_node_property_batch(&[], &key);
4130        assert!(values.is_empty());
4131    }
4132
4133    #[test]
4134    fn test_get_nodes_properties_batch() {
4135        let store = LpgStore::new();
4136
4137        let n1 = store.create_node(&["Person"]);
4138        let n2 = store.create_node(&["Person"]);
4139        let n3 = store.create_node(&["Person"]);
4140
4141        store.set_node_property(n1, "name", Value::from("Alice"));
4142        store.set_node_property(n1, "age", Value::from(25i64));
4143        store.set_node_property(n2, "name", Value::from("Bob"));
4144        // n3 has no properties
4145
4146        let all_props = store.get_nodes_properties_batch(&[n1, n2, n3]);
4147
4148        assert_eq!(all_props.len(), 3);
4149        assert_eq!(all_props[0].len(), 2); // name and age
4150        assert_eq!(all_props[1].len(), 1); // name only
4151        assert_eq!(all_props[2].len(), 0); // no properties
4152
4153        assert_eq!(
4154            all_props[0].get(&PropertyKey::new("name")),
4155            Some(&Value::from("Alice"))
4156        );
4157        assert_eq!(
4158            all_props[1].get(&PropertyKey::new("name")),
4159            Some(&Value::from("Bob"))
4160        );
4161    }
4162
4163    #[test]
4164    fn test_get_nodes_properties_batch_empty() {
4165        let store = LpgStore::new();
4166
4167        let all_props = store.get_nodes_properties_batch(&[]);
4168        assert!(all_props.is_empty());
4169    }
4170
4171    #[test]
4172    fn test_get_nodes_properties_selective_batch() {
4173        let store = LpgStore::new();
4174
4175        let n1 = store.create_node(&["Person"]);
4176        let n2 = store.create_node(&["Person"]);
4177
4178        // Set multiple properties
4179        store.set_node_property(n1, "name", Value::from("Alice"));
4180        store.set_node_property(n1, "age", Value::from(25i64));
4181        store.set_node_property(n1, "email", Value::from("alice@example.com"));
4182        store.set_node_property(n2, "name", Value::from("Bob"));
4183        store.set_node_property(n2, "age", Value::from(30i64));
4184        store.set_node_property(n2, "city", Value::from("NYC"));
4185
4186        // Request only name and age (not email or city)
4187        let keys = vec![PropertyKey::new("name"), PropertyKey::new("age")];
4188        let props = store.get_nodes_properties_selective_batch(&[n1, n2], &keys);
4189
4190        assert_eq!(props.len(), 2);
4191
4192        // n1: should have name and age, but NOT email
4193        assert_eq!(props[0].len(), 2);
4194        assert_eq!(
4195            props[0].get(&PropertyKey::new("name")),
4196            Some(&Value::from("Alice"))
4197        );
4198        assert_eq!(
4199            props[0].get(&PropertyKey::new("age")),
4200            Some(&Value::from(25i64))
4201        );
4202        assert_eq!(props[0].get(&PropertyKey::new("email")), None);
4203
4204        // n2: should have name and age, but NOT city
4205        assert_eq!(props[1].len(), 2);
4206        assert_eq!(
4207            props[1].get(&PropertyKey::new("name")),
4208            Some(&Value::from("Bob"))
4209        );
4210        assert_eq!(
4211            props[1].get(&PropertyKey::new("age")),
4212            Some(&Value::from(30i64))
4213        );
4214        assert_eq!(props[1].get(&PropertyKey::new("city")), None);
4215    }
4216
4217    #[test]
4218    fn test_get_nodes_properties_selective_batch_empty_keys() {
4219        let store = LpgStore::new();
4220
4221        let n1 = store.create_node(&["Person"]);
4222        store.set_node_property(n1, "name", Value::from("Alice"));
4223
4224        // Request no properties
4225        let props = store.get_nodes_properties_selective_batch(&[n1], &[]);
4226
4227        assert_eq!(props.len(), 1);
4228        assert!(props[0].is_empty()); // Empty map when no keys requested
4229    }
4230
4231    #[test]
4232    fn test_get_nodes_properties_selective_batch_missing_keys() {
4233        let store = LpgStore::new();
4234
4235        let n1 = store.create_node(&["Person"]);
4236        store.set_node_property(n1, "name", Value::from("Alice"));
4237
4238        // Request a property that doesn't exist
4239        let keys = vec![PropertyKey::new("nonexistent"), PropertyKey::new("name")];
4240        let props = store.get_nodes_properties_selective_batch(&[n1], &keys);
4241
4242        assert_eq!(props.len(), 1);
4243        assert_eq!(props[0].len(), 1); // Only name exists
4244        assert_eq!(
4245            props[0].get(&PropertyKey::new("name")),
4246            Some(&Value::from("Alice"))
4247        );
4248    }
4249
4250    // === Range Query Tests ===
4251
4252    #[test]
4253    fn test_find_nodes_in_range_inclusive() {
4254        let store = LpgStore::new();
4255
4256        let n1 = store.create_node_with_props(&["Person"], [("age", Value::from(20i64))]);
4257        let n2 = store.create_node_with_props(&["Person"], [("age", Value::from(30i64))]);
4258        let n3 = store.create_node_with_props(&["Person"], [("age", Value::from(40i64))]);
4259        let _n4 = store.create_node_with_props(&["Person"], [("age", Value::from(50i64))]);
4260
4261        // age >= 20 AND age <= 40 (inclusive both sides)
4262        let result = store.find_nodes_in_range(
4263            "age",
4264            Some(&Value::from(20i64)),
4265            Some(&Value::from(40i64)),
4266            true,
4267            true,
4268        );
4269        assert_eq!(result.len(), 3);
4270        assert!(result.contains(&n1));
4271        assert!(result.contains(&n2));
4272        assert!(result.contains(&n3));
4273    }
4274
4275    #[test]
4276    fn test_find_nodes_in_range_exclusive() {
4277        let store = LpgStore::new();
4278
4279        store.create_node_with_props(&["Person"], [("age", Value::from(20i64))]);
4280        let n2 = store.create_node_with_props(&["Person"], [("age", Value::from(30i64))]);
4281        store.create_node_with_props(&["Person"], [("age", Value::from(40i64))]);
4282
4283        // age > 20 AND age < 40 (exclusive both sides)
4284        let result = store.find_nodes_in_range(
4285            "age",
4286            Some(&Value::from(20i64)),
4287            Some(&Value::from(40i64)),
4288            false,
4289            false,
4290        );
4291        assert_eq!(result.len(), 1);
4292        assert!(result.contains(&n2));
4293    }
4294
4295    #[test]
4296    fn test_find_nodes_in_range_open_ended() {
4297        let store = LpgStore::new();
4298
4299        store.create_node_with_props(&["Person"], [("age", Value::from(20i64))]);
4300        store.create_node_with_props(&["Person"], [("age", Value::from(30i64))]);
4301        let n3 = store.create_node_with_props(&["Person"], [("age", Value::from(40i64))]);
4302        let n4 = store.create_node_with_props(&["Person"], [("age", Value::from(50i64))]);
4303
4304        // age >= 35 (no upper bound)
4305        let result = store.find_nodes_in_range("age", Some(&Value::from(35i64)), None, true, true);
4306        assert_eq!(result.len(), 2);
4307        assert!(result.contains(&n3));
4308        assert!(result.contains(&n4));
4309
4310        // age <= 25 (no lower bound)
4311        let result = store.find_nodes_in_range("age", None, Some(&Value::from(25i64)), true, true);
4312        assert_eq!(result.len(), 1);
4313    }
4314
4315    #[test]
4316    fn test_find_nodes_in_range_empty_result() {
4317        let store = LpgStore::new();
4318
4319        store.create_node_with_props(&["Person"], [("age", Value::from(20i64))]);
4320
4321        // Range that doesn't match anything
4322        let result = store.find_nodes_in_range(
4323            "age",
4324            Some(&Value::from(100i64)),
4325            Some(&Value::from(200i64)),
4326            true,
4327            true,
4328        );
4329        assert!(result.is_empty());
4330    }
4331
4332    #[test]
4333    fn test_find_nodes_in_range_nonexistent_property() {
4334        let store = LpgStore::new();
4335
4336        store.create_node_with_props(&["Person"], [("age", Value::from(20i64))]);
4337
4338        let result = store.find_nodes_in_range(
4339            "weight",
4340            Some(&Value::from(50i64)),
4341            Some(&Value::from(100i64)),
4342            true,
4343            true,
4344        );
4345        assert!(result.is_empty());
4346    }
4347
4348    // === Multi-Property Query Tests ===
4349
4350    #[test]
4351    fn test_find_nodes_by_properties_multiple_conditions() {
4352        let store = LpgStore::new();
4353
4354        let alice = store.create_node_with_props(
4355            &["Person"],
4356            [("name", Value::from("Alice")), ("city", Value::from("NYC"))],
4357        );
4358        store.create_node_with_props(
4359            &["Person"],
4360            [("name", Value::from("Bob")), ("city", Value::from("NYC"))],
4361        );
4362        store.create_node_with_props(
4363            &["Person"],
4364            [("name", Value::from("Alice")), ("city", Value::from("LA"))],
4365        );
4366
4367        // Match name="Alice" AND city="NYC"
4368        let result = store.find_nodes_by_properties(&[
4369            ("name", Value::from("Alice")),
4370            ("city", Value::from("NYC")),
4371        ]);
4372        assert_eq!(result.len(), 1);
4373        assert!(result.contains(&alice));
4374    }
4375
4376    #[test]
4377    fn test_find_nodes_by_properties_empty_conditions() {
4378        let store = LpgStore::new();
4379
4380        store.create_node(&["Person"]);
4381        store.create_node(&["Person"]);
4382
4383        // Empty conditions should return all nodes
4384        let result = store.find_nodes_by_properties(&[]);
4385        assert_eq!(result.len(), 2);
4386    }
4387
4388    #[test]
4389    fn test_find_nodes_by_properties_no_match() {
4390        let store = LpgStore::new();
4391
4392        store.create_node_with_props(&["Person"], [("name", Value::from("Alice"))]);
4393
4394        let result = store.find_nodes_by_properties(&[("name", Value::from("Nobody"))]);
4395        assert!(result.is_empty());
4396    }
4397
4398    #[test]
4399    fn test_find_nodes_by_properties_with_index() {
4400        let store = LpgStore::new();
4401
4402        // Create index on name
4403        store.create_property_index("name");
4404
4405        let alice = store.create_node_with_props(
4406            &["Person"],
4407            [("name", Value::from("Alice")), ("age", Value::from(30i64))],
4408        );
4409        store.create_node_with_props(
4410            &["Person"],
4411            [("name", Value::from("Bob")), ("age", Value::from(30i64))],
4412        );
4413
4414        // Index should accelerate the lookup
4415        let result = store.find_nodes_by_properties(&[
4416            ("name", Value::from("Alice")),
4417            ("age", Value::from(30i64)),
4418        ]);
4419        assert_eq!(result.len(), 1);
4420        assert!(result.contains(&alice));
4421    }
4422
4423    // === Cardinality Estimation Tests ===
4424
4425    #[test]
4426    fn test_estimate_label_cardinality() {
4427        let store = LpgStore::new();
4428
4429        store.create_node(&["Person"]);
4430        store.create_node(&["Person"]);
4431        store.create_node(&["Animal"]);
4432
4433        store.ensure_statistics_fresh();
4434
4435        let person_est = store.estimate_label_cardinality("Person");
4436        let animal_est = store.estimate_label_cardinality("Animal");
4437        let unknown_est = store.estimate_label_cardinality("Unknown");
4438
4439        assert!(
4440            person_est >= 1.0,
4441            "Person should have cardinality >= 1, got {person_est}"
4442        );
4443        assert!(
4444            animal_est >= 1.0,
4445            "Animal should have cardinality >= 1, got {animal_est}"
4446        );
4447        // Unknown label should return some default (not panic)
4448        assert!(unknown_est >= 0.0);
4449    }
4450
4451    #[test]
4452    fn test_estimate_avg_degree() {
4453        let store = LpgStore::new();
4454
4455        let a = store.create_node(&["Person"]);
4456        let b = store.create_node(&["Person"]);
4457        let c = store.create_node(&["Person"]);
4458
4459        store.create_edge(a, b, "KNOWS");
4460        store.create_edge(a, c, "KNOWS");
4461        store.create_edge(b, c, "KNOWS");
4462
4463        store.ensure_statistics_fresh();
4464
4465        let outgoing = store.estimate_avg_degree("KNOWS", true);
4466        let incoming = store.estimate_avg_degree("KNOWS", false);
4467
4468        assert!(
4469            outgoing > 0.0,
4470            "Outgoing degree should be > 0, got {outgoing}"
4471        );
4472        assert!(
4473            incoming > 0.0,
4474            "Incoming degree should be > 0, got {incoming}"
4475        );
4476    }
4477
4478    // === Delete operations ===
4479
4480    #[test]
4481    fn test_delete_node_does_not_cascade() {
4482        let store = LpgStore::new();
4483
4484        let a = store.create_node(&["A"]);
4485        let b = store.create_node(&["B"]);
4486        let e = store.create_edge(a, b, "KNOWS");
4487
4488        assert!(store.delete_node(a));
4489        assert!(store.get_node(a).is_none());
4490
4491        // Edges are NOT automatically deleted (non-detach delete)
4492        assert!(
4493            store.get_edge(e).is_some(),
4494            "Edge should survive non-detach node delete"
4495        );
4496    }
4497
4498    #[test]
4499    fn test_delete_already_deleted_node() {
4500        let store = LpgStore::new();
4501        let a = store.create_node(&["A"]);
4502
4503        assert!(store.delete_node(a));
4504        // Second delete should return false (already deleted)
4505        assert!(!store.delete_node(a));
4506    }
4507
4508    #[test]
4509    fn test_delete_nonexistent_node() {
4510        let store = LpgStore::new();
4511        assert!(!store.delete_node(NodeId::new(999)));
4512    }
4513}